How to make data pipelines idempotent

What is an idempotent function

Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application” - wikipedia

Defined as

f(f(x)) = f(x)

In the data engineering context, this can come to mean that: running a data pipeline multiple times with the same input will always produce the same output.

Pre-requisites

If you’d like to code along, please install the following:

  1. docker
  2. git

Follow the steps below to clone the github repo, sh into the container, and install pyarrow and fastparquet.

git clone https://github.com/josephmachado/idempotent-data-pipeline.git
cd idempotent-data-pipeline
docker pull amancevice/pandas
docker run -it -v $(pwd):/var/lib/pandas amancevice/pandas sh
pip install pyarrow # required for pandas to write to parquet
pip install fastparquet # required for pandas to write to parquet
# all the following commands are to be executed within this docker container

Why idempotency matters

Rerunning a data pipeline can create duplicate data or fail to remove stale data. Making a data pipeline idempotent can prevent these.

Let’s use an example to illustrate this. Say we have a data pipeline that

  1. Pulls parking violation data from a file (or S3, table, etc).
  2. Performs some transformations on it.
  3. Writes the data to a directory (or S3, table, etc) named run date and partitioned by the vehicle’s Registration State.
#!/usr/bin/env python3

import argparse
import os

import pandas as pd


def run_parking_violations_data_pipeline(
    input_file: str, output_loc: str, run_id: str
) -> None:

    df = pd.read_csv(input_file)
    # your transformations
    df.to_parquet(
        os.path.join(output_loc, run_id), partition_cols=["Registration State"]
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input-file",
        type=str,
        help="The input file",
    )
    parser.add_argument(
        "--output-loc",
        type=str,
        help="The output folder",
    )
    parser.add_argument(
        "--run-id",
        type=str,
        help="The day of run, in yyyymmdd format",
    )

    opts = parser.parse_args()
    run_parking_violations_data_pipeline(
        input_file=opts.input_file, output_loc=opts.output_loc, run_id=opts.run_id
    )

The above script is available as parking_violation_data_pipeline.py in the cloned github repo. We can run it inside the docker container, as shown below.

python parking_violation_data_pipeline.py --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls data/out/20210519/ # view contents of the output folder

data partitioned

Let’s assume that the next day, you realize that there are records with Registration state = 99. You decide to filter it out and reprocess the previous day’s data with the new logic (aka backfill).

Make the following changes to filter out rows with Registration state = 99.

    df = pd.read_csv(input_file)
    # your transformations
+   states_to_remove = ["99"]
+   df_fin = df[~df["Registration State"].isin(states_to_remove)]
+   df_fin.to_parquet(
        os.path.join(output_loc, run_id), partition_cols=["Registration State"]
    )

The script with this filter is available as parking_violation_data_pipeline_w_filter.py in the cloned github repo. Let’s rerun this data pipeline for 20210519.

python parking_violation_data_pipeline_w_filter.py --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls -ltha data/out/20210519/ # data with registration state 99 still present; this is stale data from the previous run.

data partitioned w filter

This is incorrect because the stale data generated from the previous run is still present.

Making your data pipeline idempotent

A common way to make your data pipeline idempotent is to use the delete-write pattern. As the name implies, the pipeline will first delete the existing data before writing new data. Be very careful to only delete data that the data pipeline will re-create. This can be done as shown below.

import os
+import shutil


def run_parking_violations_data_pipeline(
    input_file: str, output_loc: str, run_id: str
) -> None:
+   output_path = os.path.join(output_loc, run_id)
+   if os.path.exists(output_path):
+       shutil.rmtree(output_path) # removes entire folder

    df = pd.read_csv(input_file)

This idempotent script is available as parking_violation_data_pipeline_idempotent.py in the cloned github repo. Let’s rerun this data pipeline for 20210519.

python parking_violation_data_pipeline_idempotent.py --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls -ltha data/out/20210519/ | grep 99 # data with registration state 99 not present anymore; this is correct
exit # exit the docker container

The stale data has been removed and the data is now correct. For SQL based transformations, you can follow a similar pattern, as shown below.

CREATE TEMP TABLE TEMP_YYYY_MM_DD
AS 
SELECT c1,
    c2,
    SOME_TRANSFORMATION_FUNCTION(c3) as c3
FROM stage_table
WHERE day = 'yyyy-mm-dd';

-- note the delete-write pattern
DELETE FROM final_table
WHERE day = 'yyyy-mm-dd';

INSERT INTO final_table(c1, c2, c3)
SELECT c1,
    c2,
    c3
FROM TEMP_YYYY_MM_DD;

DROP TEMP TABLE TEMP_YYYY_MM_DD;

Note that the run-specific temporary table TEMP_YYYY_MM_DD. This is to prevent table name collisions when multiple jobs (with different dates) are running simultaneously.

Conclusion

Hope this article gives you a good understanding of why idempotence is crucial and how to make your data pipelines idempotent. To recap, they

  1. Prevent duplicates
  2. Remove stale data
  3. Saves data storage cost

When you are building your next data pipeline, make them idempotent. This will save you a lot of trouble when you have to rerun the data pipelines due to backfilling, failed runs, or other errors.

Further reading

  1. What is staging
  2. Partition late arriving events
  3. Beginner data engineering project

References

  1. Fivetran - idempotence
  2. Python 3.9 & Pandas Docker