Data Engineering Project for Beginners - Batch edition

1. Introduction

A real data engineering project usually involves multiple components. Setting up a data engineering project, while conforming to best practices can be extremely time-consuming. If you are

A data analyst, student, scientist, or engineer looking to gain data engineering experience, but are unable to find a good starter project.

Wanting to work on a data engineering project that simulates a real-life project.

Looking for an end-to-end data engineering project.

Looking for a good project to get data engineering experience for job interviews.

Then this tutorial is for you. In this tutorial, you will

  1. Set up Apache Airflow, AWS EMR, AWS Redshift, AWS Spectrum, and AWS S3.

  2. Learn data pipeline best practices.

  3. Learn how to spot failure points in data pipelines and build systems resistant to failures.

  4. Learn how to design and build a data pipeline from business requirements.

If you are interested in a stream processing project, please check out Data Engineering Project for Beginners - Stream Edition

2. Objective

Let’s assume that you work for a user behavior analytics company that collects user data and creates a user profile. We are tasked with building a data pipeline to populate the user_behavior_metric table. The user_behavior_metric table is an OLAP table, meant to be used by analysts, dashboard software, etc. It is built from

  1. user_purchase: OLTP table with user purchase information.
  2. movie_review.csv: Data sent every day by an external data vendor.

DE Project Objective

3. Design

We will be using Airflow to orchestrate the following tasks:

  1. Classifying movie reviews with Apache Spark.
  2. Loading the classified movie reviews into the data warehouse.
  3. Extracting user purchase data from an OLTP database and loading it into the data warehouse.
  4. Joining the classified movie review data and user purchase data to get user behavior metric data.

DE Project Design

4. Setup

4.1 Prerequisite

  1. Docker with at least 4GB of RAM and Docker Compose v1.27.0 or later
  2. psql
  3. AWS account
  4. AWS CLI installed and configured

To set up the infrastructure and base tables, we have a script called setup_infra.sh. This can be run as shown below.

git clone https://github.com/josephmachado/beginner_de_project.git
cd beginner_de_project
./setup_infra.sh {your-bucket-name}

Replace the place holder with your bucket name, eg) ./setup_infra.sh sde-sample-bkt. The bucket name you choose has to be unique. If any of the commands fail, open setup_infra.sh and run the failed commands manually.

The setup script will take about 10 mins to set up the required infrastructure. Setup logs are stored locally in a file called setup.log. Log on to www.localhost:8080 to see the Airflow UI. The username and password are both airflow.

When you are done with the project, do not forget to tear down the infrastructure using the tear_down_infra.sh script.

./tear_down_infra.sh {your-bucket-name}

4.2 Infrastructure

For our project, we will have the following

Local components:

  1. Apache Airflow‘s web server and scheduler in docker containers.
  2. Apache Airflow‘s metadata database (Postgres) in a docker container.

DE Project Local Infra

AWS Components:

  1. AWS S3 as our data lake.
  2. AWS Redshift Spectrum as our data warehouse.
  3. AWS IAM to allow Spectrum to access data in S3 at query time.
  4. AWS EMR to run Apache Spark jobs for text classification.

DE Project AWS Infra

4.3 Data lake structure

We will use AWS S3 as our data lake. Data from external systems will be stored here for further processing. AWS S3 will be used as storage for use with AWS Redshift Spectrum.

In our project, we will use one bucket with multiple folders.

S3 data lake

  1. raw: To store raw data. This is denoted as Raw Area in the design section.
  2. stage: This is denoted as Stage Area in the design section.
  3. scripts: This is used to store spark script, for use by AWS EMR.

This pattern will let us create different buckets for different environments.

4.4 Creating tables and Airflow configurations

The setup script also creates the tables that are required by our data pipeline. We create the following tables:

  1. retail.user_purchase table, defined at pgsetup/create_user_purchase.sql in the repository. The data is mounted into the Postgres container’s file system. This data is loaded into the table using the COPY command.
  2. spectrum.user_purchase_staging table, defined as having its data stored in the data lake’s stage location. Note that the table also has a partition defined on the insert_date.
  3. spectrum.classified_movie_review table, defined as having its data stored in the data lake’s stage location.
  4. public.user_behavior_metric table is the table that we want to load data into.

In addition, the script also creates Airflow connections and variables.

  1. redshift connection: To connect to the AWS Redshift cluster.
  2. postgres_default connection: To connect to the local Postgres database.
  3. BUCKET variable: To indicate the bucket to be used as the data lake for this pipeline.
  4. EMR_ID variable: To send commands to an AWS EMR cluster.

You can see them on Airflow’s UI as shown below.

Admin conn Admin var

4.5 AWS Infrastructure costs

We use

  1. 3 m4.xlarge type nodes for our AWS EMR cluster.
  2. 1 dc2.large for our AWS Redshift cluster.
  3. 1 iam role to allow Redshift access to S3.
  4. 1 S3 bucket with about 150MB in size.

A very rough estimate would be 313.91 USD per month for our setup. This means our infrastructure will cost approximately about 0.43 USD per hour. Use this AWS cost calculator to verify costs.

5. Code walkthrough

The data for user_behavior_metric is generated from 2 main datasets. We will look at how each of them is ingested, transformed, and used to get the data for the final table.

5.1 Loading user purchase data into the data warehouse

To load the user purchase data from Postgres into AWS Redshift we run the following tasks.

  1. extract_user_purchase_data: Unloads data from Postgres to a local file system in the Postgres container. This filesystem is volume synced between our local Postgres and Airflow containers. This allows Airflow to access this data.
  2. user_purchase_to_stage_data_lake: Moves the extracted data to data lake’s staging area at stage/user_purchase/{{ ds }}/user_purchase.csv, where ds will be replaced by the run date in YYYY-MM-DD format. This ds will serve as the insert_date partition, defined at table creation.
  3. user_purchase_stage_data_lake_to_stage_tbl: Runs a Redshift query to make the spectrum.user_purchase_staging table aware of the new date partition.

User purchase

extract_user_purchase_data = PostgresOperator(
    dag=dag,
    task_id="extract_user_purchase_data",
    sql="./scripts/sql/unload_user_purchase.sql",
    postgres_conn_id="postgres_default",
    params={"user_purchase": "/temp/user_purchase.csv"},
    depends_on_past=True,
    wait_for_downstream=True,
)

user_purchase_to_stage_data_lake = PythonOperator(
    dag=dag,
    task_id="user_purchase_to_stage_data_lake",
    python_callable=_local_to_s3,
    op_kwargs={
        "file_name": "/temp/user_purchase.csv",
        "key": "stage/user_purchase/{{ ds }}/user_purchase.csv",
        "bucket_name": BUCKET_NAME,
        "remove_local": "true",
    },
)

user_purchase_stage_data_lake_to_stage_tbl = PythonOperator(
    dag=dag,
    task_id="user_purchase_stage_data_lake_to_stage_tbl",
    python_callable=run_redshift_external_query,
    op_kwargs={
        "qry": "alter table spectrum.user_purchase_staging add if not exists partition(insert_date='{{ ds }}') \
            location 's3://"
        + BUCKET_NAME
        + "/stage/user_purchase/{{ ds }}'",
    },
)

extract_user_purchase_data >> user_purchase_to_stage_data_lake >> user_purchase_stage_data_lake_to_stage_tbl

./scripts/sql/unload_user_purchase.sql

COPY (
       select invoice_number,
              stock_code,
              detail,
              quantity,
              invoice_date,
              unit_price,
              customer_id,
              country
       from retail.user_purchase -- we should have a date filter here to pull only required date's data
) TO '{{ params.user_purchase }}' WITH (FORMAT CSV, HEADER);
-- user_purchase will be replaced with /temp/user_purchase.csv from the params in extract_user_purchase_data task

It’s not always a good pattern to store the entire dataset in our local filesystem. Since we could get an out-of-memory error if our dataset is too large. So, an alternative would be to have a streaming process that writes in batches to our data lake, as shown here.

5.2 Loading classified movie review data into the data warehouse

To get the classified movie review data into AWS Redshift, we run the following tasks:

  1. movie_review_to_raw_data_lake: Copies local file data/movie_review.csv to data lake’s raw area.
  2. spark_script_to_s3: Copies our pyspark script to data lake’s script area. This allows AWS EMR to reference it.
  3. start_emr_movie_classification_script: Adds the EMR steps defined at dags/scripts/emr/clean_movie_review.json to our EMR cluster. This task adds 3 EMR steps to the cluster, they do the following
    1. Moves raw data from S3 to HDFS: Copies data from data lake’s raw area into EMR’s HDFS.
    2. Classifies movie reviews: Runs the review classification pyspark script.
    3. Moves classified data from HDFS to S3: Copies data from EMR’s HDFS to data lake’s staging area.
  4. wait_for_movie_classification_transformation: This is a sensor task that waits for the final step (Move classified data from HDFS to S3) to finish.

Movie review

movie_review_to_raw_data_lake = PythonOperator(
    dag=dag,
    task_id="movie_review_to_raw_data_lake",
    python_callable=_local_to_s3,
    op_kwargs={
        "file_name": "/data/movie_review.csv",
        "key": "raw/movie_review/{{ ds }}/movie.csv",
        "bucket_name": BUCKET_NAME,
    },
)

spark_script_to_s3 = PythonOperator(
    dag=dag,
    task_id="spark_script_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={
        "file_name": "./dags/scripts/spark/random_text_classification.py",
        "key": "scripts/random_text_classification.py",
        "bucket_name": BUCKET_NAME,
    },
)

start_emr_movie_classification_script = EmrAddStepsOperator(
    dag=dag,
    task_id="start_emr_movie_classification_script",
    job_flow_id=EMR_ID,
    aws_conn_id="aws_default",
    steps=EMR_STEPS,
    params={
        "BUCKET_NAME": BUCKET_NAME,
        "raw_movie_review": "raw/movie_review",
        "text_classifier_script": "scripts/random_text_classifier.py",
        "stage_movie_review": "stage/movie_review",
    },
    depends_on_past=True,
)

last_step = len(EMR_STEPS) - 1

wait_for_movie_classification_transformation = EmrStepSensor(
    dag=dag,
    task_id="wait_for_movie_classification_transformation",
    job_flow_id=EMR_ID,
    step_id='{{ task_instance.xcom_pull("start_emr_movie_classification_script", key="return_value")['
    + str(last_step)
    + "] }}",
    depends_on_past=True,
)

[
    movie_review_to_raw_data_lake,
    spark_script_to_s3,
] >> start_emr_movie_classification_script >> wait_for_movie_classification_transformation

5.3 Generating user behavior metric

With both the user purchase data and the classified movie data in the data warehouse, we can get the data for the user_behavior_metric table. This is done using the generate_user_behavior_metric task. This task runs a redshift SQL script to populate the public.user_behavior_metric table.

User behavior

generate_user_behavior_metric = PostgresOperator(
    dag=dag,
    task_id="generate_user_behavior_metric",
    sql="scripts/sql/generate_user_behavior_metric.sql",
    postgres_conn_id="redshift",
)

end_of_data_pipeline = DummyOperator(task_id="end_of_data_pipeline", dag=dag) # dummy operator to indicate DAG complete

[
    user_purchase_stage_data_lake_to_stage_tbl,
    wait_for_movie_classification_transformation,
] >> generate_user_behavior_metric >> end_of_data_pipeline

The sql query generates customer level aggregate metrics, using spectrum.user_purchase_staging and spectrum.classified_movie_review.

-- scripts/sql/generate_user_behavior_metric.sql

DELETE FROM public.user_behavior_metric
WHERE insert_date = '{{ ds }}';
INSERT INTO public.user_behavior_metric (
        customerid,
        amount_spent,
        review_score,
        review_count,
        insert_date
    )
SELECT ups.customerid,
    CAST(
        SUM(ups.Quantity * ups.UnitPrice) AS DECIMAL(18, 5)
    ) AS amount_spent,
    SUM(mrcs.positive_review) AS review_score,
    count(mrcs.cid) AS review_count,
    '{{ ds }}'
FROM spectrum.user_purchase_staging ups
    JOIN (
        SELECT cid,
            CASE
                WHEN positive_review IS True THEN 1
                ELSE 0
            END AS positive_review
        FROM spectrum.classified_movie_review
        WHERE insert_date = '{{ ds }}'
    ) mrcs ON ups.customerid = mrcs.cid
WHERE ups.insert_date = '{{ ds }}'
GROUP BY ups.customerid;

Log on to www.localhost:8080 to see the Airflow UI. The username and password are both airflow. Turn on the DAG. It can take about 10min to complete one run.

DE Project Airflow

5.4. Checking results

You can check the public.user_behavior_metric redshift table from your terminal as shown below.

export REDSHIFT_HOST=$(aws redshift describe-clusters --cluster-identifier sde-batch-de-project --query 'Clusters[0].Endpoint.Address' --output text)
psql postgres://sde_user:sdeP0ssword0987@$REDSHIFT_HOST:5439/dev

In the SQL prompt, use the following queries to see the generated data.

select insert_date, count(*) as cnt from spectrum.classified_movie_review group by insert_date order by cnt desc; -- 100,000 per day
select insert_date, count(*) as cnt from spectrum.user_purchase_staging group by insert_date order by cnt desc; -- 541,908 per day
select insert_date, count(*) as cnt from public.user_behavior_metric group by insert_date order by cnt desc; -- 908 per day
\q

The counts should match. When you are done with the project, do not forget to tear down the infrastructure using the tear_down_infra.sh script.

./tear_down_infra.sh {your-bucket-name}

6. Design considerations

Now that you have the data pipeline running successfully, it’s time to review some design choices.

  1. Idempotent data pipeline

    Check that all the tasks are idempotent. If you re-run a partially run or a failed task, your output should not be any different than if the task had run successfully. Read this article to understand idempotency in detail. Hint, there is at least one non-idempotent task.

  2. Monitoring & Alerting

    The data pipeline can be monitored from the Airflow UI. EMR Steps can be monitored via the AWS UI. We do not have any alerting in case of task failures, data quality issues, hanging tasks, etc. In real projects, there is usually a monitoring and alerting system. Some common systems used for monitoring and alerting are cloud watch, datadog, or newrelic.

  3. Quality control

    We do not check for data quality in this data pipeline. We can set up basic count, standard deviation, etc checks before we load data into the final table. For advanced testing requirements consider using a data quality framework, such as great_expectations.

  4. Concurrent runs

    If you have to re-run the pipeline for the past 3 months, it would be ideal to have them run concurrently and not sequentially. We can set levels of concurrency as shown here. Even with an appropriate concurrency setting, our data pipeline has one task that is blocking. Figuring out the blocking task is left as an exercise for the reader.

  5. Changing DAG frequency

    What are the changes necessary to run this DAG every hour? How will the table schema need to change to support this? Do you have to rename the DAG?

  6. Backfills

    Assume you have a logic change in the movie classification script. You want this change to be applied to the past 3 weeks. If you want to re-run the data pipeline for the past 3 weeks, will you re-run the entire DAG or only parts of it. Why would you choose one option over the other? You can use this command to run backfills. Prefix the command with docker exec -d beginner_de_project_airflow-webserver_1 to run it on our Airflow docker container.

  7. Data size

    Will the data pipeline run successfully if your data size increases by 10x, 100x, 1000x why? why not?

7. Next steps

If you are interested in working more with this data pipeline, please consider contributing to the following.

  1. Unit tests, DAG run tests, and integration tests.
  2. Use Taskflow API for the DAG.

If you have other ideas, please feel free to open Github issues or pull requests.

8. Conclusion

Hope this article gives you a good idea of how to design and build an end-to-end data pipeline. To recap, we saw:

  1. Infrastructure setup
  2. Data pipeline best practices
  3. Design considerations
  4. Next steps

Please leave any questions or comments in the comment section below.

9. Further reading

  1. Airflow pros and cons
  2. What is a data warehouse
  3. What and why staging?
  4. Beginner DE project - Stream edition
  5. How to submit spark EMR jobs from Airflow
  6. DE project to impress hiring manager

10. References

  1. Airflow Docker setup
  2. Airflow docs
  3. AWS CLI docs
  4. Functional Data engineering
  5. Airflow common pit falls