End-to-end data engineering project - batch edition

Objective

It can be difficult to know where to begin when starting a data engineering side project. If you have wondered

What data to use for your data project?

How to design your data project?

Then this post is for you. We will go over the key components, and help you understand what you need to design and build your data projects. We will do this using a sample end-to-end data engineering project.

Setup

Let’s assume that we work for an online store and have to get customer’s and orders data ready for analysis using a data visualization tool. The data pipeline architecture is shown below.

Architecture

This is a standard ELT pattern, as all of the code is in python and SQL you can modify them for your data project.

We will use Docker to spin up our infrastructure. To follow along, you will need the following:

  1. Docker and Docker Compose
  2. git

Clone the repo and start the containers as shown below.

git clone https://github.com/josephmachado/online_store.git
cd online_store
make up
docker ps # to see all the running containers

We use Dagster as our orchestrator and scheduler. Go to your local dagster instance at http://localhost:3000/ and turn on the schedule button near the top of the screen.

Dagster schedule

After your data pipeline runs a few times (>= 2), log into our Metabase dashboard service at http://localhost:3001/. The login credentials for Metabase are below.

username: james.holden@rocinante.com
password: password1234

Search for and click on Online store overview using the search bar on the top left corner to get to the pre-created dashboard. The dashboard operates on the transformed data from the data pipeline.

Metabase UI

You can use make down to spin down all the containers.

Components

In this section, we will look at the main components of our data pipeline. The idea here is to use these as a starting point for your data project.

Source systems

In real data pipelines, you will use sources such as your internal database, APIs, files, cloud storage systems, etc. We use a data generator that creates fake source data, and a fast API server to serve fake customer risk scores. The tables are created when we spin up docker containers using the customer DB setup script and warehouse tables setup script.

If you cannot find publicly available datasets for your problem of interest, generating fake data is a good alternative.

Schedule & Orchestrate

Orchestrator is the tool/framework used to ensure that the tasks are executed in order, retrying on failures, storing metadata, and displaying progress via UI.

The scheduler is responsible for starting data pipelines at their scheduled frequency.

We use Dagster as our scheduler and orchestrator due to its ease of use and setup. A few key concepts to understand about Dagster are:

  1. ops: These are code that does the computation. In our case, this includes are the extract, load, and transform (with inbuilt dbt op) code. Our ops are defined here.
  2. jobs: We use a job to define the order of execution of operations. Our data ingestion job is defined here.
  3. schedules: Schedules define the frequency of a job run. Our schedule is defined here.
  4. repository: Jobs and schedules are organized into repositories. Our repository is defined here.

Given below is the code that chains together ops to create our job.

def online_store_data_ingestion():
    dbt_test_op(
        dbt_run_op(
            [
                load_customer_risk_score(extract_customer_risk_score()),
                load_customer_data(extract_customer_data()),
                load_orders_data(extract_orders_data()),
            ]
        )
    )

Notice how our python job shown above creates the data pipeline shown below.

Dagster UI

Extract

Extractors pull data from the source system. We extract customer data from our application’s customer database, orders data from s3, and customer risk scores from an API endpoint.

Load

Loaders load data into the destination system(s). We load our orders, customers, and customer risk score data into store.orders, store.customers, and store.customer_risk_score warehouse tables respectively.

We can also load data into other systems as needed. E.g. load customer data into an elastic search cluster for text-based querying, orders into graph database for graph-based querying, cache systems, etc.

The code for extract and load components are present here as load_* functions.

Transform

Transformers clean, apply business logic and model the data ready to be used. This transformed data is used by our stakeholders.

We use dbt to execute our transformation scripts. We de-duplicate the data, cast columns to the correct data types, join with other tables (customer risk score and states), and create the fct_orders and dim_customers views as shown below (based on Kimball modeling).

dim_customers Customer dimension

fct_orders Orders fact

Recommended reading:

  1. Benefits of dbt
  2. Dbt tutorial

Data visualization

Data visualization tools enable business users, analysts, engineers to generate aggregates at different levels, create sharable dashboards, write sql queries, etc. They do not have to connect directly to our warehouse.

We use metabase as our data visualization tool.

Choosing tools & frameworks

We have used open-source software to make development easy, free, and configurable. You may want to switch out tools as necessary. E.g. Fivetran instead of our custom code, etc.

Recommended reading:

  1. How to choose your data tools

Future work & improvements

While we have a working data pipeline, there are a lot of possible improvements. Some of the crucial ones are listed below.

  1. Idempotence: In the customer load step we insert all the data for the past 5 minutes into our warehouse. This process will result in duplicate data since our pipeline runs every 2 minutes. Making the load step idempotent will prevent duplicate data. Read this article on how to make your data pipelines idempotent for more details.
  2. Backfills: Backfilling is an inevitable part of data engineering. To learn more about backfills and how to make our data pipeline backfill-able, read this article
  3. Change data capture & Slowly changing dimensions: A key feature of having a data warehouse is storing queryable historical data. This is commonly done with a modeling technique called slowly changing dimension which requires us to know all the create, delete and update operations happening on the source table. We can use a technique called change data capture (CDC) to capture all the create, delete and update operations happening on the source table. Read how to implement CDC, and how to create SCD2 tables with dbt.
  4. Testing: We have a unit test and post processing dbt test for our pipeline. Read how to add tests, and how to set up CI tests to take our testing capabilities to the next level.
  5. Scale: When passing data between functions, dagster stores them in a temporary file (default). In our load steps, we load the entire data into python process memory and then insert them into the warehouse, while this works for small data sets, it will not work if the data is larger than the process memory. Read how to stream data in your python process and how to scale your data pipeline for ideas on how to handle big data.

Please feel free to create an issue or open a PR here.

Conclusion

To recap we saw

  1. The key components of a data pipeline
  2. Generating fake data
  3. How to design your data project
  4. Future work & improvements

For your data project choose the tools you know the best or would like to learn. Use this project’s infrastructure and architecture as a starting point to build your data project.

Hope this article gives you a good understanding of the key components of a data pipeline and how you can set up your data project. If you have any questions, comments, or ideas please leave them in the comment section below.

Further reading

  1. Want to understand what the data pipeline components are and how to choose them? Read this article.
  2. Read this article for a quick introduction to data warehouses.
  3. Curious about ways to load data into a data warehouse? Checkout this article.
  4. Want to do a similar project with Airflow? Checkout this article.

References

  1. Dagster docs
  2. Metabase docs
  3. FastAPI docker
  4. Dagster docker setup
  5. dbt docs

Please consider sharing, it helps out a lot!