Data Pipelines with Pyspark

Using Canadian Anti-Fraud Centre Open Data

Pyspark
Python
R
CAFC
Downloads
Author

Eileen Murphy

Published

August 15, 2025

Note

Updated 2025-10-15 Updated CAFC data with Q3 data published Oct 2, 2025.
Improved automation to process files.

Introduction

Pyspark is an invaluable tool to process datasets in parallel partitions for processing efficiency. The data can be written and merged to another file with many different options in format, such as csv, tsv, json, parquet, and xml.

The structured pipeline once written will always be reproducible and easy to maintain.

The Canadian Anti-Fraud Centre updates and its data dictionary are available and updated every quarter on the CKAN platform on the Open Government Portal. The CKAN platform allows federal and municipal governments as well as companies to maintain their catalog of datasets in a consistent and transparent way, whether it’s public or private to all their users.

Process

In this exercise, we are going to use the Canadian Anti-Fraud Centre Reporting Dataset on the CKAN platform and do the following:
1. Extract english and french field names into 2 datasets to represent English and French distinct but replicated datasets (in progress).
2. Change date type to date and change fields to numeric that we want to aggregate.
3. Include monthly and yearly aggregate datasets.
4. Filter out invalid records that have invalid Country names.
5. Streamline access to new generated datasets by downloading csv files to local drive.

Function to run script to update datasets every quarter

Show the code
# Don't run this here - put this in cronjob or github actions to update cafc dataset every quarter
#%run cafc_quarterly_fraud_data_pipeline.py #Set up on cronjob to run each quarter

Contents of script to run to update datasets - only to be run once every quarter

Show the code
#remove include (above) to generate lines of code used to extact cafc data
# get path to script
pathway <- here::here("posts","pyspark","cafc_quarterly_fraud_data_pipeline.py")

# generate output of lines from script

lines <- readLines(pathway, warn=FALSE)
cat(lines, sep = "\n")
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, to_date, trim, when, year, month, sum as _sum

# ------------ 1: START SPARK SESSION ------------
spark = SparkSession.builder.appName("CAFC_Quarterly_Refinery_Pipeline").getOrCreate()

# ------------ 2: READ & CLEAN DATA ------------
# This requires manual download of the yyyy_QQ_cafc.tsv file from cafc open data portal

#import glob

#csv_files = glob.glob("posts/pyspark/data/*.tsv", recursive=True)

import os
from pathlib import Path

# List all csv files recursively
tsv_files = list(Path("data").rglob("*.tsv"))

# Get the file with the latest modification time
# Place downloaded file from cafc website in data directory. 

tsv_files = list(Path("data/").rglob("*.tsv"))

if tsv_files:
    recent_file = max(tsv_files, key=lambda f: os.path.getmtime(f))
    recent_file = str(recent_file)
    print(recent_file)
    df = spark.read.option("header", "true").option("delimiter", "\t").csv(recent_file)
else:
    print("No TSV files found.")


french_cols = [
    "Type de plainte recue", "Pays", "Province/Etat",
    "Categories thematiques sur la fraude et la cybercriminalite",
    "Methode de sollicitation", "Genre",
    "Langue de correspondance", "Type de plainte"
]
df = df.drop(*french_cols)

rename_map = {
    "Numero d'identification / Number ID": "id",
    "Date Received / Date recue": "date",
    "Complaint Received Type": "complaint_type",
    "Country": "country", "Province/State": "province",
    "Fraud and Cybercrime Thematic Categories": "fraud_cat",
    "Solicitation Method": "sol_method",
    "Gender": "gender", "Language of Correspondence": "lang_cor",
    "Victim Age Range / Tranche d'age des victimes": "age_range",
    "Complaint Type": "complaint_subtype",
    "Number of Victims / Nombre de victimes": "num_victims",
    "Dollar Loss /pertes financieres": "dollar_loss"
}
for old, new in rename_map.items():
    if old in df.columns:
        df = df.withColumnRenamed(old, new)

df = df.withColumn("num_victims",
    when(trim(col("num_victims")) == "", None).otherwise(col("num_victims").cast("integer")))
df = df.withColumn("dollar_loss",
    when(trim(col("dollar_loss")) == "", None)
    .otherwise(regexp_replace(col("dollar_loss"), "[$,]", "").cast("double")))

df = df.withColumn("date", to_date("date", "yyyy-MM-dd"))

# clean blank records
df = df.filter(df.country != "Not Specified") 
    #.show(truncate=False) 

# ------------ 3: DETECT QUARTER ------------
min_date = df.agg({"date": "min"}).collect()[0][0]
max_date = df.agg({"date": "max"}).collect()[0][0]
year_val = max_date.year
quarter_val = (max_date.month - 1) // 3 + 1
label = f"{year_val}_Q{quarter_val}"

out_dir = f"outputs_{label}"
os.makedirs(out_dir, exist_ok=True)

# ------------ 4: SAVE CLEANED DATA ------------
#df.write.mode("overwrite").option("header", True,"locale", "en-US").csv(f"{out_dir}/cleaned_cafc")

df.write.mode("overwrite") \
    .option("header", True) \
    .option("locale", "en-US") \
    .csv(f"{out_dir}/cleaned_cafc")

# ------------ 5: SUMMARIES ------------
monthly_summary = df.groupBy(year("date").alias("year"), month("date").alias("month")).agg(
    _sum("dollar_loss").alias("total_loss"),
    _sum("num_victims").alias("total_victims")
).orderBy("year", "month")
monthly_summary.write.mode("overwrite").option("header", True).csv(f"{out_dir}/monthly_summary")

yearly_summary = df.groupBy(year("date").alias("year")).agg(
    _sum("dollar_loss").alias("total_loss"),
    _sum("num_victims").alias("total_victims")
).orderBy("year")
yearly_summary.write.mode("overwrite").option("header", True).csv(f"{out_dir}/yearly_summary")


print(f"cafc_quarterly_fraud_data_pipeline.py script complete: {out_dir}")

Required Libraries

Show the code
# Load necessary libraries
# install.packages("tidyverse")
suppressPackageStartupMessages(library(tidyverse))

Find latest extracted cleaned cafc file

Show the code
# Clean (base) directory for posts
cafc_clean <- here::here("posts", "pyspark")

# Get a list of all output directories matching the pattern
csv_dir <- list.files(path = cafc_clean, pattern = "outputs_*", all.files = TRUE, full.names = TRUE)

# Get the most recent output directory (assuming sorted filenames)
extract_dir <- max(csv_dir)

# Concatenate the extracted directory and "cleaned_cafc" subdirectory correctly
cleaned_cafc_path <- file.path(extract_dir, "cleaned_cafc")

# Get all CSVs
file_paths <- list.files(path = cleaned_cafc_path, pattern = "\\.csv$", all.files = TRUE, full.names = TRUE)

Merge the partitions to one file

Show the code
# Read and combine all CSV files into a single data frame
merged_df <- file_paths |>
  map_dfr(read_csv, show_col_types = FALSE)

Show sample records of merged CAFC dataset

Show the code
# View the first few rows of the merged data frame
head(merged_df)
# A tibble: 6 × 13
      id date       complaint_type country province fraud_cat  sol_method gender
   <dbl> <date>     <chr>          <chr>   <chr>    <chr>      <chr>      <chr> 
1 350308 2025-09-29 Phone          Canada  Ontario  Personal … Direct ca… Female
2 350309 2025-09-29 Phone          Canada  Quebec   Identity … Other/unk… Male  
3 350310 2025-09-29 Phone          Canada  Quebec   Extortion  Direct ca… Female
4 350311 2025-09-29 Phone          Canada  Ontario  Identity … Other/unk… Female
5 350312 2025-09-29 Phone          Canada  Manitoba Identity … Other/unk… Female
6 350313 2025-09-29 Phone          Canada  Alberta  Investmen… Internet   Male  
# ℹ 5 more variables: lang_cor <chr>, age_range <chr>, complaint_subtype <chr>,
#   num_victims <dbl>, dollar_loss <dbl>

Retrieve monthly summaries

Show the code
# Concatenate the extracted directory and "cleaned_cafc" subdirectory correctly
cleaned_monthly_summary_path <- file.path(extract_dir, "monthly_summary")

file_paths <- list.files(path = cleaned_monthly_summary_path, pattern = "\\.csv$", all.files = TRUE, full.names = TRUE)

monthly_summary_df <- file_paths %>%
  map_dfr(read_csv, show_col_types = FALSE)

Show sample of monthly summary of victims

Show the code
head(monthly_summary_df)
# A tibble: 6 × 4
   year month total_loss total_victims
  <dbl> <dbl>      <dbl>         <dbl>
1  2021     1  12894120.          4411
2  2021     2  27042043.          5055
3  2021     3  17017938.          6111
4  2021     4  22120277.          4711
5  2021     5  19401052.          4348
6  2021     6  19796861.          4633

Retrieve yearly summaries of victims

Show the code
cleaned_yearly_summary_path <- file.path(extract_dir, "yearly_summary")


file_paths <- list.files(path = cleaned_yearly_summary_path, pattern = "\\.csv$", all.files = TRUE, full.names = TRUE)

yearly_summary_df <- file_paths %>%
  map_dfr(read_csv, show_col_types = FALSE)

Show sample records of yearly summaries of victims

Show the code
head(yearly_summary_df)
# A tibble: 5 × 3
   year total_loss total_victims
  <dbl>      <dbl>         <dbl>
1  2021 309470943.         52832
2  2022 444355315.         47551
3  2023 497239290.         35746
4  2024 524288975.         30253
5  2025 439048068.         19264

Deployment

Show the code
library(stringr)

CAFC_latest_update <- str_sub(extract_dir, -7)
paste("CAFC Latest Update", CAFC_latest_update, sep= " : ")
[1] "CAFC Latest Update : 2025_Q3"

Download CSV files directly

Show the code
library(downloadthis)

#Sys.setlocale("LC_NUMERIC", "C")
#options(OutDec = ".")

merged_df |>
  download_this(
    output_name = "clean_cafc_en",
    output_extension = ".csv",
    button_label = "Download clean_cafc_en",
    button_type = "default",
    self_contained = TRUE,
    has_icon = TRUE,
    icon = "fa fa-save",
    id = "cafc-btn",
    csv2 = FALSE      # Use write_csv(), dot decimal, comma separator
  )
Show the code
library(downloadthis)

monthly_summary_df %>%
  download_this(
    output_name = "cafc_monthly_summary_en",
    output_extension = ".csv",
    button_label = "Download cafc_monthly_summary_en.csv",
    button_type = "default",
    self_contained = TRUE,
    has_icon = TRUE,
    icon = "fa fa-save",
    id = "cafc-btn",
    csv2 = FALSE 
  )
Show the code
library(downloadthis)

yearly_summary_df %>%
  download_this(
    output_name = "cafc_yearly_summary_en",
    output_extension = ".csv",
    button_label = "Download cafc_yearly_summary_en.csv",
    button_type = "default",
    self_contained = TRUE,
    has_icon = TRUE,
    icon = "fa fa-save",
    id = "cafc-btn",
    csv2 = FALSE 
  )