Data Engineering Project for Beginners - Batch edition
- 1. Introduction
- 2. Objective
- 3. Design
- 4. Setup
- 5. Code walkthrough
- 6. Design considerations
- 7. Next steps
- 8. Conclusion
- 9. Further reading
- 10. References
1. Introduction
A real data engineering project usually involves multiple components. Setting up a data engineering project, while conforming to best practices can be extremely time-consuming. If you are
A data analyst, student, scientist, or engineer looking to gain data engineering experience, but are unable to find a good starter project.
Wanting to work on a data engineering project that simulates a real-life project.
Looking for an end-to-end data engineering project.
Looking for a good project to get data engineering experience for job interviews.
Then this tutorial is for you. In this tutorial, you will
-
Set up Apache Airflow, AWS EMR, AWS Redshift, AWS Spectrum, and AWS S3.
-
Learn data pipeline best practices.
-
Learn how to spot failure points in data pipelines and build systems resistant to failures.
-
Learn how to design and build a data pipeline from business requirements.
If you are interested in a stream processing project, please check out Data Engineering Project for Beginners - Stream Edition. If you are interested in a local only data engineering project, checkout this post.
2. Objective
Let’s assume that you work for a user behavior analytics company that collects user data and creates a user profile. We are tasked with building a data pipeline to populate the user_behavior_metric
table. The user_behavior_metric
table is an OLAP table, meant to be used by analysts, dashboard software, etc. It is built from
user_purchase
: OLTP table with user purchase information.movie_review.csv
: Data sent every day by an external data vendor.
3. Design
We will be using Airflow to orchestrate the following tasks:
- Classifying movie reviews with Apache Spark.
- Loading the classified movie reviews into the data warehouse.
- Extracting user purchase data from an OLTP database and loading it into the data warehouse.
- Joining the classified movie review data and user purchase data to get
user behavior metric
data.
We will use metabase to visualize our data.
4. Setup
4.1 Prerequisite
- Docker with at least 4GB of RAM and Docker Compose v1.27.0 or later
- psql
- AWS account
- AWS CLI installed and configured
Clone and cd into the project directory.
git clone https://github.com/josephmachado/beginner_de_project.git
cd beginner_de_project
4.2 Local run
When running locally, you can use the make command to manage infrastructure. We use the following docker containers
- Airflow
- Postgres DB (as Airflow metadata DB)
- Metabase for data visualization
You can start the local containers as shown below.
make up # start all containers
make ci # runs format checks, type checks, static checks, and tests
make down # stops the containers
Since we cannot replicate AWS components locally, we have not set them up here. To learn more about how to set up components locally read this article
We have a dag validity test defined here.
4.3 Deploy to AWS
To set up the AWS infrastructure we have a script called setup_infra.sh
.
Note: We run all of our infrastructure on AWS us-east-1. If you want to change this, please change the corresponding variables in infra_variables.txt.
Setup can be run as shown below.
make down # since our AWS infra will be port forwarded to 8080 and 3000 which are used by local Airflow and Metabase respectively
./setup_infra.sh {your-bucket-name} # e.g ./setup_infra.sh my-test-bucket
In the prompt enter yes
to authenticate the ssh connection.
This sets up the following components
- 1 AWS EC2, running Airflow, Metabase
- 1 AWS EMR cluster
- 1 AWS Redshift cluster
- 1 AWS S3 bucket
The command will also open Airflow running on an EC2 instance. You can also checkout
- Airflow www.localhost:8080 (username and password are both
airflow
) - Metabase www.localhost:3000
The first time you log in, create a user name and password. To establish a connection to your Redshift cluster, you will need the redshift host, which you can get using the command
aws redshift describe-clusters --cluster-identifier sde-batch-de-project --query 'Clusters[0].Endpoint.Address' --output text
The port, username, and password are in infra_variables.txt and the database is dev
.
4.4 Data lake structure
We will use AWS S3 as our data lake. Data from external systems will be stored here for further processing. AWS S3 will be used as storage for use with AWS Redshift Spectrum.
In our project, we will use one bucket with multiple folders.
- raw: To store raw data. This is denoted as
Raw Area
in the design section. - stage: This is denoted as
Stage Area
in the design section. - scripts: This is used to store spark script, for use by AWS EMR.
This pattern will let us create different buckets for different environments.
4.5 Creating tables and Airflow configurations
The setup script also creates the tables that are required by our data pipeline. We create the following tables:
retail.user_purchase
table, defined atpgsetup/create_user_purchase.sql
in the repository. The data is mounted into the Postgres container’s file system. This data is loaded into the table using theCOPY
command.spectrum.user_purchase_staging
table, defined as having its data stored in the data lake’s stage location. Note that the table also has a partition defined on the insert_date.spectrum.classified_movie_review
table, defined as having its data stored in the data lake’s stage location.public.user_behavior_metric
table is the table that we want to load data into.
In addition, the script also creates Airflow connections and variables.
redshift connection
: To connect to the AWS Redshift cluster.postgres_default connection
: To connect to the local Postgres database.BUCKET variable
: To indicate the bucket to be used as the data lake for this pipeline.EMR_ID variable
: To send commands to an AWS EMR cluster.
You can see them on Airflow’s UI as shown below.
4.6 AWS Infrastructure costs
We use
- 3
m4.xlarge
type nodes for our AWS EMR cluster. - 1
dc2.large
for our AWS Redshift cluster. - 1
iam role
to allow Redshift access to S3. - 1
S3 bucket
with about 150MB in size. - 1
t2.large
AWS EC2 instance.
A very rough estimate would be 358.89 USD
per month for our setup. This means our infrastructure will cost approximately about 0.49 USD
per hour. Use this AWS cost calculator to verify costs.
5. Code walkthrough
The data for user_behavior_metric
is generated from 2 main datasets. We will look at how each of them is ingested, transformed, and used to get the data for the final table.
5.1 Loading user purchase data into the data warehouse
To load the user purchase data from Postgres into AWS Redshift we run the following tasks.
- extract_user_purchase_data: Unloads data from Postgres to a local file system in the Postgres container. This filesystem is volume synced between our local Postgres and Airflow containers. This allows Airflow to access this data.
- user_purchase_to_stage_data_lake: Moves the extracted data to data lake’s staging area at
stage/user_purchase/{{ ds }}/user_purchase.csv
, whereds
will be replaced by the run date inYYYY-MM-DD
format. Thisds
will serve as theinsert_date
partition, defined at table creation. - user_purchase_stage_data_lake_to_stage_tbl: Runs a Redshift query to make the
spectrum.user_purchase_staging
table aware of the new date partition.
extract_user_purchase_data = PostgresOperator(
dag=dag,
task_id="extract_user_purchase_data",
sql="./scripts/sql/unload_user_purchase.sql",
postgres_conn_id="postgres_default",
params={"user_purchase": "/temp/user_purchase.csv"},
depends_on_past=True,
wait_for_downstream=True,
)
user_purchase_to_stage_data_lake = PythonOperator(
dag=dag,
task_id="user_purchase_to_stage_data_lake",
python_callable=_local_to_s3,
op_kwargs={
"file_name": "/temp/user_purchase.csv",
"key": "stage/user_purchase/{{ ds }}/user_purchase.csv",
"bucket_name": BUCKET_NAME,
"remove_local": "true",
},
)
user_purchase_stage_data_lake_to_stage_tbl = PythonOperator(
dag=dag,
task_id="user_purchase_stage_data_lake_to_stage_tbl",
python_callable=run_redshift_external_query,
op_kwargs={
"qry": "alter table spectrum.user_purchase_staging add if not exists partition(insert_date='{{ ds }}') \
location 's3://"
+ BUCKET_NAME
+ "/stage/user_purchase/{{ ds }}'",
},
)
extract_user_purchase_data >> user_purchase_to_stage_data_lake >> user_purchase_stage_data_lake_to_stage_tbl
./scripts/sql/unload_user_purchase.sql
COPY (
select invoice_number,
stock_code,
detail,
quantity,
invoice_date,
unit_price,
customer_id,
country
from retail.user_purchase -- we should have a date filter here to pull only required date's data
) TO '{{ params.user_purchase }}' WITH (FORMAT CSV, HEADER);
-- user_purchase will be replaced with /temp/user_purchase.csv from the params in extract_user_purchase_data task
It’s not always a good pattern to store the entire dataset in our local filesystem. Since we could get an out-of-memory error if our dataset is too large. So, an alternative would be to have a streaming process that writes in batches to our data lake, as shown here.
5.2 Loading classified movie review data into the data warehouse
To get the classified movie review data into AWS Redshift, we run the following tasks:
- movie_review_to_raw_data_lake: Copies local file
data/movie_review.csv
to data lake’s raw area. - spark_script_to_s3: Copies our pyspark script to data lake’s script area. This allows AWS EMR to reference it.
- start_emr_movie_classification_script: Adds the EMR steps defined at
dags/scripts/emr/clean_movie_review.json
to our EMR cluster. This task adds 3 EMR steps to the cluster, they do the following- Moves raw data from S3 to HDFS: Copies data from data lake’s raw area into EMR’s HDFS.
- Classifies movie reviews: Runs the review classification pyspark script.
- Moves classified data from HDFS to S3: Copies data from EMR’s HDFS to data lake’s staging area.
- wait_for_movie_classification_transformation: This is a sensor task that waits for the final step (
Move classified data from HDFS to S3
) to finish.
movie_review_to_raw_data_lake = PythonOperator(
dag=dag,
task_id="movie_review_to_raw_data_lake",
python_callable=_local_to_s3,
op_kwargs={
"file_name": "/data/movie_review.csv",
"key": "raw/movie_review/{{ ds }}/movie.csv",
"bucket_name": BUCKET_NAME,
},
)
spark_script_to_s3 = PythonOperator(
dag=dag,
task_id="spark_script_to_s3",
python_callable=_local_to_s3,
op_kwargs={
"file_name": "./dags/scripts/spark/random_text_classification.py",
"key": "scripts/random_text_classification.py",
"bucket_name": BUCKET_NAME,
},
)
start_emr_movie_classification_script = EmrAddStepsOperator(
dag=dag,
task_id="start_emr_movie_classification_script",
job_flow_id=EMR_ID,
aws_conn_id="aws_default",
steps=EMR_STEPS,
params={
"BUCKET_NAME": BUCKET_NAME,
"raw_movie_review": "raw/movie_review",
"text_classifier_script": "scripts/random_text_classifier.py",
"stage_movie_review": "stage/movie_review",
},
depends_on_past=True,
)
last_step = len(EMR_STEPS) - 1
wait_for_movie_classification_transformation = EmrStepSensor(
dag=dag,
task_id="wait_for_movie_classification_transformation",
job_flow_id=EMR_ID,
step_id='{{ task_instance.xcom_pull("start_emr_movie_classification_script", key="return_value")['
+ str(last_step)
+ "] }}",
depends_on_past=True,
)
[
movie_review_to_raw_data_lake,
spark_script_to_s3,
] >> start_emr_movie_classification_script >> wait_for_movie_classification_transformation
5.3 Generating user behavior metric
With both the user purchase
data and the classified movie
data in the data warehouse, we can get the data for the user_behavior_metric
table. This is done using the generate_user_behavior_metric
task. This task runs a redshift SQL script to populate the public.user_behavior_metric
table.
generate_user_behavior_metric = PostgresOperator(
dag=dag,
task_id="generate_user_behavior_metric",
sql="scripts/sql/generate_user_behavior_metric.sql",
postgres_conn_id="redshift",
)
end_of_data_pipeline = DummyOperator(task_id="end_of_data_pipeline", dag=dag) # dummy operator to indicate DAG complete
[
user_purchase_stage_data_lake_to_stage_tbl,
wait_for_movie_classification_transformation,
] >> generate_user_behavior_metric >> end_of_data_pipeline
The sql query generates customer level aggregate metrics, using spectrum.user_purchase_staging
and spectrum.classified_movie_review
.
-- scripts/sql/generate_user_behavior_metric.sql
DELETE FROM public.user_behavior_metric
WHERE insert_date = '{{ ds }}';
INSERT INTO public.user_behavior_metric (
customerid,
amount_spent,
review_score,
review_count,
insert_date
)
SELECT ups.customerid,
CAST(
SUM(ups.Quantity * ups.UnitPrice) AS DECIMAL(18, 5)
) AS amount_spent,
SUM(mrcs.positive_review) AS review_score,
count(mrcs.cid) AS review_count,
'{{ ds }}'
FROM spectrum.user_purchase_staging ups
JOIN (
SELECT cid,
CASE
WHEN positive_review IS True THEN 1
ELSE 0
END AS positive_review
FROM spectrum.classified_movie_review
WHERE insert_date = '{{ ds }}'
) mrcs ON ups.customerid = mrcs.cid
WHERE ups.insert_date = '{{ ds }}'
GROUP BY ups.customerid;
Log on to www.localhost:8080 to see the Airflow UI. The username and password are both airflow
. Turn on the DAG. It can take about 10min to complete one run.
5.4. Checking results
You can check the public.user_behavior_metric
redshift table from your terminal as shown below.
export REDSHIFT_HOST=$(aws redshift describe-clusters --cluster-identifier sde-batch-de-project --query 'Clusters[0].Endpoint.Address' --output text)
psql postgres://sde_user:sdeP0ssword0987@$REDSHIFT_HOST:5439/dev
In the SQL prompt, use the following queries to see the generated data.
select insert_date, count(*) as cnt from spectrum.classified_movie_review group by insert_date order by cnt desc; -- 100,000 per day
select insert_date, count(*) as cnt from spectrum.user_purchase_staging group by insert_date order by cnt desc; -- 541,908 per day
select insert_date, count(*) as cnt from public.user_behavior_metric group by insert_date order by cnt desc; -- 908 per day
\q
You can also connect to Metabase running www.localhost:3000 and visualize the data, as shown below.
The counts should match. When you are done with the project, do not forget to tear down the infrastructure using the tear_down_infra.sh
script.
./tear_down_infra.sh {your-bucket-name}
6. Design considerations
Now that you have the data pipeline running successfully, it’s time to review some design choices.
-
Idempotent data pipeline
Check that all the tasks are idempotent. If you re-run a partially run or a failed task, your output should not be any different than if the task had run successfully. Read this article to understand idempotency in detail.
Hint, there is at least one non-idempotent task
. -
Monitoring & Alerting
The data pipeline can be monitored from the Airflow UI. EMR Steps can be monitored via the AWS UI. We do not have any alerting in case of task failures, data quality issues, hanging tasks, etc. In real projects, there is usually a monitoring and alerting system. Some common systems used for monitoring and alerting are
cloud watch
,datadog
, ornewrelic
. -
Quality control
We do not check for data quality in this data pipeline. We can set up basic count, standard deviation, etc checks before we load data into the final table. For advanced testing requirements consider using a data quality framework, such as great_expectations.
-
Concurrent runs
If you have to re-run the pipeline for the past 3 months, it would be ideal to have them run concurrently and not sequentially. We can set levels of concurrency as shown here. Even with an appropriate concurrency setting, our data pipeline has one task that is blocking. Figuring out the blocking task is left as an exercise for the reader.
-
Changing DAG frequency
What are the changes necessary to run this DAG every hour? How will the table schema need to change to support this? Do you have to rename the DAG?
-
Backfills
Assume you have a logic change in the movie classification script. You want this change to be applied to the past 3 weeks. If you want to re-run the data pipeline for the past 3 weeks, will you
re-run the entire DAG or only parts of it
. Why would you choose one option over the other? You can use this command to run backfills. Prefix the command withdocker exec -d beginner_de_project_airflow-webserver_1
to run it on our Airflow docker container. -
Data size
Will the data pipeline run successfully if your data size increases by 10x, 100x, 1000x why? why not?
7. Next steps
If you are interested in working more with this data pipeline, please consider contributing to the following.
- Unit tests, DAG run tests, and integration tests.
- Use Taskflow API for the DAG.
If you have other ideas, please feel free to open Github issues or pull requests.
8. Conclusion
Hope this article gives you a good idea of how to design and build an end-to-end data pipeline. To recap, we saw:
- Infrastructure setup
- Data pipeline best practices
- Design considerations
- Next steps
Please leave any questions or comments in the comment section below.
9. Further reading
- Airflow pros and cons
- What is a data warehouse
- What and why staging?
- Beginner DE project - Stream edition
- How to submit spark EMR jobs from Airflow
- DE project to impress hiring manager