How to make data pipelines idempotent

What is an idempotent function

Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application” - wikipedia

Defined as

f(f(x)) = f(x)

In the data engineering context, this can come to mean that: running a data pipeline multiple times with the same input will always produce the same output.


If you’d like to code along, please install the following:

  1. docker
  2. git

Follow the steps below to clone the github repo, sh into the container, and install pyarrow and fastparquet.

git clone
cd idempotent-data-pipeline
docker pull amancevice/pandas
docker run -it -v $(pwd):/var/lib/pandas amancevice/pandas sh
pip install pyarrow # required for pandas to write to parquet
pip install fastparquet # required for pandas to write to parquet
# all the following commands are to be executed within this docker container

Why idempotency matters

Rerunning a data pipeline can create duplicate data or fail to remove stale data. Making a data pipeline idempotent can prevent these.

Let’s use an example to illustrate this. Say we have a data pipeline that

  1. Pulls parking violation data from a file (or S3, table, etc).
  2. Performs some transformations on it.
  3. Writes the data to a directory (or S3, table, etc) named run date and partitioned by the vehicle’s Registration State.
#!/usr/bin/env python3

import argparse
import os

import pandas as pd

def run_parking_violations_data_pipeline(
    input_file: str, output_loc: str, run_id: str
) -> None:

    df = pd.read_csv(input_file)
    # your transformations
        os.path.join(output_loc, run_id), partition_cols=["Registration State"]

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
        help="The input file",
        help="The output folder",
        help="The day of run, in yyyymmdd format",

    opts = parser.parse_args()
        input_file=opts.input_file, output_loc=opts.output_loc, run_id=opts.run_id

The above script is available as in the cloned github repo. We can run it inside the docker container, as shown below.

python --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls data/out/20210519/ # view contents of the output folder

data partitioned

Let’s assume that the next day, you realize that there are records with Registration state = 99. You decide to filter it out and reprocess the previous day’s data with the new logic (aka backfill).

Make the following changes to filter out rows with Registration state = 99.

    df = pd.read_csv(input_file)
    # your transformations
+   states_to_remove = ["99"]
+   df_fin = df[~df["Registration State"].isin(states_to_remove)]
+   df_fin.to_parquet(
        os.path.join(output_loc, run_id), partition_cols=["Registration State"]

The script with this filter is available as in the cloned github repo. Let’s rerun this data pipeline for 20210519.

python --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls -ltha data/out/20210519/ # data with registration state 99 still present; this is stale data from the previous run.

data partitioned w filter

This is incorrect because the stale data generated from the previous run is still present.

Making your data pipeline idempotent

A common way to make your data pipeline idempotent is to use the delete-write pattern. As the name implies, the pipeline will first delete the existing data before writing new data. Be very careful to only delete data that the data pipeline will re-create. This can be done as shown below.

import os
+import shutil

def run_parking_violations_data_pipeline(
    input_file: str, output_loc: str, run_id: str
) -> None:
+   output_path = os.path.join(output_loc, run_id)
+   if os.path.exists(output_path):
+       shutil.rmtree(output_path) # removes entire folder

    df = pd.read_csv(input_file)

This idempotent script is available as in the cloned github repo. Let’s rerun this data pipeline for 20210519.

python --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls -ltha data/out/20210519/ | grep 99 # data with registration state 99 not present anymore; this is correct
exit # exit the docker container

The stale data has been removed and the data is now correct. For SQL based transformations, you can follow a similar pattern, as shown below.

FROM stage_table
WHERE day = 'yyyy-mm-dd';

-- note the delete-write pattern
DELETE FROM final_table
WHERE day = 'yyyy-mm-dd';

INSERT INTO final_table(c1, c2, c3)


Note that the run-specific temporary table TEMP_YYYY_MM_DD. This is to prevent table name collisions when multiple jobs (with different dates) are running simultaneously.


Hope this article gives you a good understanding of why idempotence is crucial and how to make your data pipelines idempotent. To recap, they

  1. Prevent duplicates
  2. Remove stale data
  3. Saves data storage cost

When you are building your next data pipeline, make them idempotent. This will save you a lot of trouble when you have to rerun the data pipelines due to backfilling, failed runs, or other errors.

Further reading

  1. What is staging
  2. Partition late arriving events
  3. Beginner data engineering project


  1. Fivetran - idempotence
  2. Python 3.9 & Pandas Docker