Show the code
# Don't run this here - put this in cronjob or github actions to update cafc dataset every quarter
#%run cafc_quarterly_fraud_data_pipeline.py #Set up on cronjob to run each quarterUsing Canadian Anti-Fraud Centre Open Data
Eileen Murphy
August 15, 2025
Updated 2025-10-15 Updated CAFC data with Q3 data published Oct 2, 2025.
Improved automation to process files.
Pyspark is an invaluable tool to process datasets in parallel partitions for processing efficiency. The data can be written and merged to another file with many different options in format, such as csv, tsv, json, parquet, and xml.
The structured pipeline once written will always be reproducible and easy to maintain.
The Canadian Anti-Fraud Centre updates and its data dictionary are available and updated every quarter on the CKAN platform on the Open Government Portal. The CKAN platform allows federal and municipal governments as well as companies to maintain their catalog of datasets in a consistent and transparent way, whether it’s public or private to all their users.
In this exercise, we are going to use the Canadian Anti-Fraud Centre Reporting Dataset on the CKAN platform and do the following:
1. Extract english and french field names into 2 datasets to represent English and French distinct but replicated datasets (in progress).
2. Change date type to date and change fields to numeric that we want to aggregate.
3. Include monthly and yearly aggregate datasets.
4. Filter out invalid records that have invalid Country names.
5. Streamline access to new generated datasets by downloading csv files to local drive.
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, to_date, trim, when, year, month, sum as _sum
# ------------ 1: START SPARK SESSION ------------
spark = SparkSession.builder.appName("CAFC_Quarterly_Refinery_Pipeline").getOrCreate()
# ------------ 2: READ & CLEAN DATA ------------
# This requires manual download of the yyyy_QQ_cafc.tsv file from cafc open data portal
#import glob
#csv_files = glob.glob("posts/pyspark/data/*.tsv", recursive=True)
import os
from pathlib import Path
# List all csv files recursively
tsv_files = list(Path("data").rglob("*.tsv"))
# Get the file with the latest modification time
# Place downloaded file from cafc website in data directory.
tsv_files = list(Path("data/").rglob("*.tsv"))
if tsv_files:
recent_file = max(tsv_files, key=lambda f: os.path.getmtime(f))
recent_file = str(recent_file)
print(recent_file)
df = spark.read.option("header", "true").option("delimiter", "\t").csv(recent_file)
else:
print("No TSV files found.")
french_cols = [
"Type de plainte recue", "Pays", "Province/Etat",
"Categories thematiques sur la fraude et la cybercriminalite",
"Methode de sollicitation", "Genre",
"Langue de correspondance", "Type de plainte"
]
df = df.drop(*french_cols)
rename_map = {
"Numero d'identification / Number ID": "id",
"Date Received / Date recue": "date",
"Complaint Received Type": "complaint_type",
"Country": "country", "Province/State": "province",
"Fraud and Cybercrime Thematic Categories": "fraud_cat",
"Solicitation Method": "sol_method",
"Gender": "gender", "Language of Correspondence": "lang_cor",
"Victim Age Range / Tranche d'age des victimes": "age_range",
"Complaint Type": "complaint_subtype",
"Number of Victims / Nombre de victimes": "num_victims",
"Dollar Loss /pertes financieres": "dollar_loss"
}
for old, new in rename_map.items():
if old in df.columns:
df = df.withColumnRenamed(old, new)
df = df.withColumn("num_victims",
when(trim(col("num_victims")) == "", None).otherwise(col("num_victims").cast("integer")))
df = df.withColumn("dollar_loss",
when(trim(col("dollar_loss")) == "", None)
.otherwise(regexp_replace(col("dollar_loss"), "[$,]", "").cast("double")))
df = df.withColumn("date", to_date("date", "yyyy-MM-dd"))
# clean blank records
df = df.filter(df.country != "Not Specified")
#.show(truncate=False)
# ------------ 3: DETECT QUARTER ------------
min_date = df.agg({"date": "min"}).collect()[0][0]
max_date = df.agg({"date": "max"}).collect()[0][0]
year_val = max_date.year
quarter_val = (max_date.month - 1) // 3 + 1
label = f"{year_val}_Q{quarter_val}"
out_dir = f"outputs_{label}"
os.makedirs(out_dir, exist_ok=True)
# ------------ 4: SAVE CLEANED DATA ------------
#df.write.mode("overwrite").option("header", True,"locale", "en-US").csv(f"{out_dir}/cleaned_cafc")
df.write.mode("overwrite") \
.option("header", True) \
.option("locale", "en-US") \
.csv(f"{out_dir}/cleaned_cafc")
# ------------ 5: SUMMARIES ------------
monthly_summary = df.groupBy(year("date").alias("year"), month("date").alias("month")).agg(
_sum("dollar_loss").alias("total_loss"),
_sum("num_victims").alias("total_victims")
).orderBy("year", "month")
monthly_summary.write.mode("overwrite").option("header", True).csv(f"{out_dir}/monthly_summary")
yearly_summary = df.groupBy(year("date").alias("year")).agg(
_sum("dollar_loss").alias("total_loss"),
_sum("num_victims").alias("total_victims")
).orderBy("year")
yearly_summary.write.mode("overwrite").option("header", True).csv(f"{out_dir}/yearly_summary")
print(f"cafc_quarterly_fraud_data_pipeline.py script complete: {out_dir}")
# Clean (base) directory for posts
cafc_clean <- here::here("posts", "pyspark")
# Get a list of all output directories matching the pattern
csv_dir <- list.files(path = cafc_clean, pattern = "outputs_*", all.files = TRUE, full.names = TRUE)
# Get the most recent output directory (assuming sorted filenames)
extract_dir <- max(csv_dir)
# Concatenate the extracted directory and "cleaned_cafc" subdirectory correctly
cleaned_cafc_path <- file.path(extract_dir, "cleaned_cafc")
# Get all CSVs
file_paths <- list.files(path = cleaned_cafc_path, pattern = "\\.csv$", all.files = TRUE, full.names = TRUE)# A tibble: 6 × 13
id date complaint_type country province fraud_cat sol_method gender
<dbl> <date> <chr> <chr> <chr> <chr> <chr> <chr>
1 350308 2025-09-29 Phone Canada Ontario Personal … Direct ca… Female
2 350309 2025-09-29 Phone Canada Quebec Identity … Other/unk… Male
3 350310 2025-09-29 Phone Canada Quebec Extortion Direct ca… Female
4 350311 2025-09-29 Phone Canada Ontario Identity … Other/unk… Female
5 350312 2025-09-29 Phone Canada Manitoba Identity … Other/unk… Female
6 350313 2025-09-29 Phone Canada Alberta Investmen… Internet Male
# ℹ 5 more variables: lang_cor <chr>, age_range <chr>, complaint_subtype <chr>,
# num_victims <dbl>, dollar_loss <dbl>
# Concatenate the extracted directory and "cleaned_cafc" subdirectory correctly
cleaned_monthly_summary_path <- file.path(extract_dir, "monthly_summary")
file_paths <- list.files(path = cleaned_monthly_summary_path, pattern = "\\.csv$", all.files = TRUE, full.names = TRUE)
monthly_summary_df <- file_paths %>%
map_dfr(read_csv, show_col_types = FALSE)[1] "CAFC Latest Update : 2025_Q3"
library(downloadthis)
#Sys.setlocale("LC_NUMERIC", "C")
#options(OutDec = ".")
merged_df |>
download_this(
output_name = "clean_cafc_en",
output_extension = ".csv",
button_label = "Download clean_cafc_en",
button_type = "default",
self_contained = TRUE,
has_icon = TRUE,
icon = "fa fa-save",
id = "cafc-btn",
csv2 = FALSE # Use write_csv(), dot decimal, comma separator
)