Show the code
# Don't run this here - put this in cronjob or github actions to update cafc dataset every quarter
#%run cafc_quarterly_fraud_data_pipeline.py #Set up on cronjob to run each quarterUsing Canadian Anti-Fraud Centre Open Data
Eileen Murphy
August 15, 2025
Updated 2025-10-15 Updated CAFC data with Q3 data published Oct 2, 2025.
Improved automation to process files.
Pyspark is an invaluable tool to process datasets in parallel partitions for processing efficiency. The data can be written and merged to another file with many different options in format, such as csv, tsv, json, parquet, and xml.
The structured pipeline once written will always be reproducible and easy to maintain.
The Canadian Anti-Fraud Centre updates and its data dictionary are available and updated every quarter on the CKAN platform on the Open Government Portal. The CKAN platform allows federal and municipal governments as well as companies to maintain their catalog of datasets in a consistent and transparent way, whether it’s public or private to all their users.
In this exercise, we are going to use the Canadian Anti-Fraud Centre Reporting Dataset on the CKAN platform and do the following:
1. Extract english and french field names into 2 datasets to represent English and French distinct but replicated datasets (in progress).
2. Change date type to date and change fields to numeric that we want to aggregate.
3. Include monthly and yearly aggregate datasets.
4. Filter out invalid records that have invalid Country names.
5. Streamline access to new generated datasets by downloading csv files to local drive.
#remove include (above) to generate lines of code used to extact cafc data
# get path to script
pathway <- here::here("posts","pyspark","cafc_quarterly_fraud_data_pipeline.py")
# generate output of lines from script
lines <- readLines(pathway, warn=FALSE)
#Uncomment next line for code source
#cat(lines, sep = "\n")# Clean (base) directory for posts
cafc_clean <- here::here("posts", "pyspark")
# Get a list of all output directories matching the pattern
csv_dir <- list.files(path = cafc_clean, pattern = "outputs_*", all.files = TRUE, full.names = TRUE)
# Get the most recent output directory (assuming sorted filenames)
extract_dir <- max(csv_dir)
# Concatenate the extracted directory and "cleaned_cafc" subdirectory correctly
cleaned_cafc_path <- file.path(extract_dir, "cleaned_cafc")
# Get all CSVs
file_paths <- list.files(path = cleaned_cafc_path, pattern = "\\.csv$", all.files = TRUE, full.names = TRUE)# A tibble: 6 × 13
id date complaint_type country province fraud_cat sol_method gender
<dbl> <date> <chr> <chr> <chr> <chr> <chr> <chr>
1 350308 2025-09-29 Phone Canada Ontario Personal … Direct ca… Female
2 350309 2025-09-29 Phone Canada Quebec Identity … Other/unk… Male
3 350310 2025-09-29 Phone Canada Quebec Extortion Direct ca… Female
4 350311 2025-09-29 Phone Canada Ontario Identity … Other/unk… Female
5 350312 2025-09-29 Phone Canada Manitoba Identity … Other/unk… Female
6 350313 2025-09-29 Phone Canada Alberta Investmen… Internet Male
# ℹ 5 more variables: lang_cor <chr>, age_range <chr>, complaint_subtype <chr>,
# num_victims <dbl>, dollar_loss <dbl>
# Concatenate the extracted directory and "cleaned_cafc" subdirectory correctly
cleaned_monthly_summary_path <- file.path(extract_dir, "monthly_summary")
file_paths <- list.files(path = cleaned_monthly_summary_path, pattern = "\\.csv$", all.files = TRUE, full.names = TRUE)
monthly_summary_df <- file_paths %>%
map_dfr(read_csv, show_col_types = FALSE)[1] "CAFC Latest Update : 2025_Q3"
library(downloadthis)
#Sys.setlocale("LC_NUMERIC", "C")
#options(OutDec = ".")
merged_df |>
download_this(
output_name = "clean_cafc_en",
output_extension = ".csv",
button_label = "Download clean_cafc_en",
button_type = "default",
self_contained = TRUE,
has_icon = TRUE,
icon = "fa fa-save",
id = "cafc-btn",
csv2 = FALSE # Use write_csv(), dot decimal, comma separator
)