Data Pipeline Design Patterns - #2. Coding patterns in Python


Using the appropriate code design pattern can make your code easy to read, extensible, and seamless to modify existing logic, debug, and enable developers to onboard quicker. If you have wondered

What design patterns do people follow when writing code for a typical data pipeline?

What do people mean by abstract and concrete implementation

Why do data engineers prefer writing functional code?

What do the terms Factory, Strategy, Singleton, Object pools mean, and when to use them?

Then this post is for you. This post will cover the typical code design patterns for building data pipelines. We will learn about the pros and cons of each design pattern, when to use them, and, more importantly, when not to use them.

By the end of this post, you will have an overview of the typical code design patterns used for building data pipelines.

Note that this is part 2 of the data design pattern series; The first article can be found here Data Pipeline Design Patterns - #1. Data flow patterns

Sample project

To demonstrate code design patterns, we will build a simple ETL project that lets us pull data from Reddit, transform it and store it in a sqlite3 database.

flowchart LR A[API] -->|Extract| B[Transform ] B -->|Load| C[Database]

The completed project is available at github/com/josephmachado/socialetl with setup instructions .

We will start with a simple ETL script and refactor it based on new requirements, which will be introduced in each section. By the end of the post, you should be able to identify the common data pipeline design patterns and when to & when not to use them.

Code design patterns

Let’s start with a simple Reddit etl script, which we will refactor throughout this post.

# for complete code check out
import praw
import os
import sqlite3


def extract():
    client = praw.Reddit(
    subreddit = client.subreddit('dataengineering')
    top_subreddit =
    data = []
    for submission in top_subreddit:
                'title': submission.title,
                'score': submission.score,
                'url': submission.url,
                'comments': submission.num_comments,
                'created': submission.created,
                'text': submission.selftext,
    return data

def transform(data):
    Function to only keep outliers.
    Outliers are based on num of comments > 2 standard deviations from mean
    num_comments = [post.get('comments') for post in data]

    mean_num_comments = sum(num_comments) / len(num_comments)
    std_num_comments = (
        sum([(x - mean_num_comments) ** 2 for x in num_comments])
        / len(num_comments)
    ) ** 0.5
    return [
        for post in data
        if post.get('comments') > mean_num_comments + 2 * std_num_comments

def load(data):
    # create a db connection
    conn = sqlite3.connect('./data/socialetl.db')
    cur = conn.cursor()
        # insert data into DB
        for post in data:
                    INSERT INTO social_posts (
                        id, source, social_data
                    ) VALUES (
                        :id, :source, :social_data
                    'id': post.get('id'),
                    'score': post.get('score'),
                    'social_data': str(
                            'title': post.get('title'),
                            'url': post.get('url'),
                            'comments': post.get('num_comments'),
                            'created': post.get('created'),
                            'text': post.get('selftext'),

def main():
    # pull data from Reddit
    data = extract()
    # transform reddit data
    transformed_data = transform(data)
    # load data into database
if __name__ == '__main__':

1. Functional design

As a DE, you might have heard people say, “write functional code” let’s break down what it means.

  1. Atomicity: A function should only do one task.
  2. Idempotency: If you run the code multiple times with the same input, the output should be the same. In the case of storing the output in an external data store, the output should not be duplicated.
  3. No side effects: A function should not affect any external data (variable or other) besides its output.

Note: Additional FP concepts higher order functions , functional composition , & referential transparency .

Let’s examine the load method from the code above.

  1. Atomicity: No, because it does two things, manages a database connection and loads data into a DB. We can use a Dependency Injection technique to accept the DB connection as an input to the load function.
  2. Idempotency: No, because it inserts all the data into the social_posts table. If the input to the load function has duplicates or the load function is accidentally run twice, duplicates will be inserted into the social_posts table. We can prevent this using an UPSERT, which will update or insert a record depending on if it’s already present (identified by a key) or not respectively.
  3. No side effects: Load function has no side effects. However, note that when the load function starts accepting DB connection as an input parameter (dependency injection), we should not close it within the load function since that will affect the state of a variable external to the load function.

Let’s look at what a functional load function will look like:

def load(social_data, db_conn) -> None:'Loading twitter data.')
    if db_conn is None:
        raise ValueError(
            'db_cursor is None. Please pass a valid DatabaseConnection'
            ' object.'

    cur = db_conn.cursor()
        for post in social_data:
                INSERT OR REPLACE INTO social_posts (
                    id, source, social_data
                ) VALUES (
                    :id, :source, :social_data
                    'source': post.source,
                    'social_data': str(asdict(post.social_data)),

Note: If you think shouldn’t the cursor be created outside the load function, then you are on the right path. We will see how to do this effectively in the context managers section.

2. Factory pattern

Let’s assume we have to design data pipelines to pull data from Twitter, Mastodon, Linkedin, etc. All of these social data pipelines follow a similar pattern. We can use the Factory pattern to create a uniform interface for pulling social data. Let’s see how the factory pattern works:

  1. Use a factory pattern when multiple pipelines follow a similar pattern. E.g., if you want to add etl for Twitter, mastodon, Linkedin, etc.
  2. A factory will be responsible for creating the appropriate (Reddit, Twitter, or mastodon) etl object. The code that calls the factory will use that elt object, unaware of its internal implementation.
  3. Prevents complex if..else statements that are hard to manage and provides a standard interface to interact with multiple similar pipelines.
  4. You can define a standard set of methods that all the ETL classes must implement. The common method’s names and signatures (inputs, & outputs) are called abstract interfaces since that defines how we interact with any ETL implementing the standard (e.g., see SocialETL class ). The actual implementation is called the concrete implementation (e.g., RedditETL )
classDiagram etl_factory <|-- RedditETL etl_factory <|-- TwitterETL etl_factory <|-- MastodonETL etl_factory: Create one of Reddit/Twitter/Mastodon ETL based on input main <|-- etl_factory main: get etl_factory(source) source is one of Reddit, Twitter or Mastodon RedditETL: Concrete implementation of SocialETL RedditETL <|-- SocialETL: Implemented by TwitterETL: Concrete implementation of SocialETL TwitterETL <|-- SocialETL: Implemented by MastodonETL: Concrete implementation of SocialETL MastodonETL <|-- SocialETL: Implemented by SocialETL : Abstract Class, with abstract methods SocialETL : extract(id, num_records, client) SocialETL : transform(social_data, transform_function) SocialETL : load(social_data, db_cursor) class RedditETL class TwitterETL class MastodonETL

Let’s see how we can add an etl factory to create Twitter and Reddit pipelines. See complete code here .

import os
from abc import ABC, abstractmethod # python module to define abstract interfaces

# Abstract class with abstract methods
class SocialETL(ABC):
    def extract(self, id, num_records, client):

    def transform(self, social_data):

    def load(self, social_data, db_conn):

    def run(self, db_conn, client, id, num_records):

# Concrete implementation of the abstract Class
class RedditETL(SocialETL):
    def extract(self, id, num_records, client):
        # code to extract reddit data

    def transform(self, social_data):
        # code to transform reddit data

    def load(self, social_data, db_conn):
        # code to load reddit data into the final table

    def run(self, db_conn, client, id, num_records):
        # code to run extract, transform and load

# Concrete implementation of the abstract Class
class TwitterETL(SocialETL):
    def extract(self, id, num_records, client):
        # code to extract reddit data

    def transform(self, social_data):
        # code to transform reddit data

    def load(self, social_data, db_conn):
        # code to load reddit data into the final table

    def run(self, db_conn, client, id, num_records):
        # code to run extract, transform and load

# This "factory" will acccept an input and give you the appropriate object that you can use to perform ETL
def etl_factory(source):
    factory = {
        'Reddit': (
        'Twitter': (
    if source in factory:
        return factory[source]
        raise ValueError(
            f"source {source} is not supported. Please pass a valid source."

# calling code
client, social_etl = etl_factory(source), client, ...)

Separation of responsibility (definition, creation, & use) is crucial for code maintenance and testability and prevents modifying code in multiple places in case of a new feature. Note that even though we use classes, we still follow functional principles at a function level.


  1. If you have multiple similar etls, a factory can significantly improve code consistency and make it much easier to evolve your pipelines.
  2. Using factory patterns to establish connections to external systems will enable easier testing. E.g., a db factory allows you to use sqllite3 for dev testing and pg for prod easily. You can also put your spark session behind a factory to use less executor in dev and higher mem setting in prod, etc.


  1. If you use the factory method to define data pipelines that are inherently different (e.g., ETL v ELT or API data pulls vs. S3 -> S3 data transfer, etc.), it will make the code highly complex and brittle. Only use a factory if the underlying data pipelines have a similar structure.
  2. Using the factory method when there are only one or two data pipelines (without any signs of more data pipelines) is premature optimization and can potentially slow down development due to abstract interface limiting development velocity.

3. Strategy pattern

Our initial transformation function used standard deviation to identify outliers (see transform function ). Let’s assume we want to have the option to choose from multiple transformation functions, such as randomly selecting a few posts or not applying any transformations based on inputs to the pipeline.

We can use a Strategy Pattern, which allows our code to choose one way of transformation among multiple methods of transformations (aka chose one strategy among various strategies). Seen below are some strategies for transforming the data pulled from either Reddit or Twitter

import random
import logging

def no_transformation(social_data):'No transformation applied.')
    return social_data

def random_choice_filter(social_data):'Randomly choosing 2 social media data points.')
    return random.choices(social_data, k=2)

def standard_deviation_outlier_filter(social_data):
    # code to only keep standard deviation based outlier
    return filtered_data

# a factory to return the appropriate transformation function
def transformation_factory(value):
        factory = {
            'sd': standard_deviation_outlier_filter,
            'no_tx': no_transformation,
            'rand': random_choice_filter,
        return factory[value]

class Reddit(SocialETL):

    def transform(self, social_data, transform_function):
            """ Function to transform reddit data, by only keeping the
            posts with number of comments greater than 2 standard deviations
            away from the mean number of comments.
                social_data (List[RedditPostData]): List of reddit post data.
                List[RedditPostData]: Filtered list of reddit post data.
  'Transforming reddit data.')
            return transform_function(social_data)

     def run(self, db_cursor_context, client, transform_function, id = 'dataengineering', num_records = 100):
        """ Function to run the ETL pipeline.
            db_cursor_context (DatabaseConnection): Database connection.
            client (praw.Reddit): Reddit client.
            id (str): Subreddit to get data from.
            num_records (int): Number of records to get.
        """'Running reddit ETL.')
                    id=id, num_records=num_records, client=client

    # other methods

# Calling code
transformation = 'sd' # 'no_tx' or 'rand'
client, social_etl = etl_factory(source)
db = db_factory()

Note how the calling code injects a transformation function based on the transformation variable. The key idea is to have transformation functions with the same input and output parameters, which makes them switchable. It can be hard to understand which function was executed without proper logging.

4. Singleton, & Object pool patterns

Use a singleton pattern when your program should only have one object of a class for the entirety of its run. The singleton pattern is commonly used in database connections, logs, etc. However, it can make testing incredibly difficult since all your tests can only use one object. If not designed without sufficient guardrails, one can create multiple instances of singleton classes in Python. In general, Singleton is considered anti pattern .

A pattern that builds on Singleton is called an Object pool pattern. In this pattern, instead of being able to use only one single object, you can use an object from a pool of objects. The pool size is set depending on the use cases. Object pool pattern is commonly seen in applications that have multiple incoming requests and need to communicate with the database quickly(e.g., backend apps, stream processing). Having a pool of db connections allow incoming requests to communicate with the DB, without having to create a new connection(takes longer) or having to wait for a singleton object to finish serving other requests. E.g., Psycopg2 connection pool . However, note that the connections must be returned to their initial state after use and before returning to the pool.

More often than not, in batch data pipeline applications, it is better to have a factory method to create a DB connection since this will give you the flexibility to set your config depending on the environment, and you will not have to deal with cleaning up connections to be returned to a pool, etc.

Checkout our DB connection factory here

Reader exercise: Create an abstract interface for DB connection and create a DB class for Postgres. Please feel free to put up a PR in this repo .

Python helpers

Python provides tools for working with data, checking types, and encapsulating common functionality. In this section, we will go over a few important ones which can significantly improve your code.

1. Typing

Although Python is a dynamic language, having type hints + a type checker (e.g., mypy) can save you from multiple type incompatibility issues at runtime.

E.g., here is the extract method from the RedditETL class that shows the expected input & output types.

from typing import List

class RedditETL(SocialETL):

    def extract(
        id: str,
        num_records: int,
        client: praw.Reddit,
    ) -> List[SocialMediaData]:
    # Code

We can see that the extract method expects three inputs id (type string), num_records (type integer), and client (type praw.Reddit) and returns a list of SocialMediaData (Class we will see in the dataclass section).

Python functions can accept inputs and produce outputs which are functions themselves (aka higher order functions). We can also define function types as shown below.

from typing import Callable, List
# code

def transformation_factory(value: str) -> Callable[[List[SocialMediaData]], List[SocialMediaData]]:
    factory = {
        'sd': standard_deviation_outlier_filter,
        'no_tx': no_transformation,
        'rand': random_choice_filter,
    return factory[value]

Callable[ [List[SocialMediaData]], List[SocialMediaData]] represents a function.

  1. First parameter: Defines the type of inputs to the function ([List[SocialMediaData]]). The first part of Callable is a list to account for multiple possible inputs.
  2. Second parameter: Defines the output type of the function (List[SocialMediaData]).

Mypy is a static type checker for Python. We can run mypy (as shown here ) to verify that our code respects the types we have defined. E.g., If we call a function with a different type than what it expects, mypy will throw an error.

2. Dataclass

Dataclasses are designed to store data as objects of the data class.


  1. Designed to represent data in Python.
  2. Has type hints and enables code completion with IDEs.
  3. Enables setting default values.
  4. Ability to add custom processing of data on object creation. See post-init .
  5. Ability to emulate immutability with frozen dataclasses .
  6. Ability to inherit from other data classes .


  1. Dataclasses are regular classes and include a slight overhead during creation.
  2. It is overkill for simple dictionary-based data. E.g. [{‘name’: ‘Tanya’}, {‘name’: ‘Sophia’}]

In our code, we use Dataclass to represent the data we get from Reddit and Twitter.

from dataclasses import asdict, dataclass

class RedditPostData:
    """Dataclass to hold reddit post data.
        title (str): Title of the reddit post.
        score (int): Score of the reddit post.
        url (str): URL of the reddit post.
        comms_num (int): Number of comments on the reddit post.
        created (str): Datetime (string repr) of when the reddit
             post was created.

    title: str
    score: int
    url: str
    comms_num: int
    created: str
    text: str

class TwitterTweetData:
    """Dataclass to hold twitter post data.
        text (str): Text of the twitter post.

    text: str

class SocialMediaData:
    """Dataclass to hold social media data.
        id (str): ID of the social media post.
        text (str): Text of the social media post.

    id: str
    source: str
    social_data: RedditPostData | TwitterTweetData # social_data can be one of the Reddit or Twitter data types

3. Context Managers

When establishing a connection to an external system, such as a database, file system, etc., we should clean up the connection when done (or on an error) to prevent memory leaks and to free up system resources.

In our example, we need to handle closing our database connection when the load is done (or in case it errors out). Typically this is done with a try..except..finally block, but this creates duplicate code wherever we connect to our database.

Instead, we can create context managers, which are called using with blocks. We will create a context manager for our DB connections which automatically close on success or failure. We can define a context manager using the contextmanager decorator, as shown below.

from contextlib import contextmanager

class DatabaseConnection:
    def __init__(
        self, db_type: str = 'sqlite3', db_file: str = 'data/socialetl.db'
    ) -> None:
        """Class to connect to a database.
            db_type (str, optional): Database type.
                Defaults to 'sqlite3'.
            db_file (str, optional): Database file.
                Defaults to 'data/socialetl.db'.
        self._db_type = db_type
        self._db_file = db_file

    def managed_cursor(self) -> Iterator[sqlite3.Cursor]:
        """ Function to create a managed database cursor.
            sqlite3.Cursor: A sqlite3 cursor.
        if self._db_type == 'sqlite3':
            _conn = sqlite3.connect(self._db_file)
            cur = _conn.cursor()
                yield cur

    def __str__(self) -> str:
        return f'{self._db_type}://{self._db_file}'

db = DatabaseConnection()

with db.managed_cursor() as cur: # cursor and connection are open
    cur.execute("YOUR SQL QUERY")
# cursor and connection are close

When we use the with db.managed_cursor() as cur clause, the code

  1. Runs code in managed_cursor method up to the yield cur line.
  2. Makes the cursor available as cur inside the with block.
  3. After the with block, the code control is returned to the finally part of the managed_cursor method, committing the changes and closing the connection.
sequenceDiagram calling_func->>+managed_cursor: Can I get a DB cursor? activate managed_cursor managed_cursor->>sqlite3: Can you provide me a DB connection? sqlite3->>managed_cursor: Yes, here you go! Note right of managed_cursor: Creates a cursor from DB connection managed_cursor->>+calling_func: Here's a cursor for you activate calling_func Note left of calling_func: Run code inside with block calling_func->>managed_cursor: Here's your cursor back deactivate calling_func Note right of managed_cursor: Commit transaction and close connection managed_cursor->>+calling_func: I'm all done! deactivate managed_cursor

4. Testing with pytest

Testing is critical to ensure that our code does what we expect it to and prevent errors when we modify our code. Placing all our test scripts under the tests folder will help keep tests separate from pipeline code.

In our project, we will use pytest to run tests. Let’s go over some critical concepts for testing.

  1. Test scripts: While we can create one test script and test all our functions in there, it’s best practice to have one test file per pipeline (e.g., & Pytest uses file and function names (prefaced with test_) to identify test functions (see this link for more details ).
  2. Schema setup: When testing locally, it’s ideal to create (set up) and drop (teardown) tables per test run. Recreating tables each time you run all the tests in your test directory will enable you to write more straightforward tests (e.g., if you are testing if a function inserts ten rows into a table without a teardown of the tables, the next run will show 20 rows). When setting up schema (or fake data) for testing, we can create them at different scopes. The different scopes are
    1. Session level: The setup is run once before we start testing all the test files, and the teardown is run once after all the testing is done.
    2. Class level: The setup and teardown are before and after each Class (usually named class Testxyz).
    3. Function level: The setup and teardown are before and after each function (usually named test_).
  3. Fixtures: When testing, we don’t want to hit external API (Reddit/Twitter) as it may be costly and may return different data each time, making testing almost impossible. We can create static data to replicate data from external systems, called fixtures. One way to use a fixture with Pytest is to create functions (mock_twitter_data & mock_reddit_data ) that return the static data and add them as inputs (dependency injection) to the functions where we need to use them (e.g., test_transform ).
  4. Mocking functions: When we run tests, we might want to override the behavior of a function. In such cases, we can use mocker to override the behavior of a function. E.g., We want to return a db connection to our testdb, wherever we call the db_factory function. We can define this using mocker, as shown here . Note that we define mocks for db_factory twice, this is because when we mock a function we need to specify the location that it is used at, not where it is defined, this allows us the flexibility to mock function based on where they are used. We use a session_mocker to specify that the mock applies to the entire session (there is a mocker for non-session level mocking).

We can run tests as shown below.

python -m pytest --log-cli-level info -p no:warnings -v ./tests

Note we tell pytest to print all info level logging, ignore warnings and look for test scripts under the tests folder. is used to define fixtures for an entire directory (tests dir in our case). In our, we define a mock_social_posts_table fixture function and set it to be run once per session. When we run pytest, the setup part (pre-yield statement) is run, then all out tests are executed, and then the teardown part (post-yield statement) is run.

sequenceDiagram Note left of mock_social_posts_table: Running set up mock_social_posts_table->>+tests: I've set up the fixtures activate tests tests->>+mock_social_posts_table: I've run all the tests deactivate tests Note left of mock_social_posts_table: Running tear down

5. Decorators

Decorators add functionality to other functions. E.g.,

def log_metadata(func):

    def log_wrapper():
        print(f'Adding functionality to {func.__name__}')
        # do some other thing
        return func()

    return log_wrapper

def some_func():
    print('do some thing')

In the above example, we add functionality to the some_func function, by decorating it with the log_metadata function. The log_metadata function can be used as a decorator for multiple functions.

In our project, we use log_metadata to log information about the function name, the input arguments, and dump it into a log_metadata table for tracking data lineage. The inspect module helps us identify objects and their input parameters.

Storing the inputs to each function, the time it was run, and the name of the function, help us debug issues with our pipelines (aka Data lineage).


In addition to the above, here are a few more tips to help you

  1. Project structure: A consistent project structure makes your imports sensible and helps you easily navigate the code base. This python guide is a great starting point.
  2. Naming: Follow a naming standard for your code. Google style guide is a great place to start.
  3. Automated formatting, lint checks, & type checks: Consistent code style helps you and your team with reading and understanding the code base and reduces nit comments on PRs. Automate them with black, isort, flake8, and mypy, see Makefile for usage.
  4. Makefile: Make commands allow you to define aliases for longer commands. In our project, the Makefile helps us run tests, lint & type checks, formatting, run etls, and connect to our database. Most terminals also support tab completion for make commands defined in your Makefile.
  5. Githooks: While we can run the make commands before creating a PR, we can take this a step further by automating running these with githooks. We can use a pre-commit git hook to run automated checks every time you create a commit. See setup instructions to set up a pre-commit hook.
  6. dotenv: We use the dotenv module to read config variables from .env file and load them into the OS environment without having to set them at the OS level. This enables us to have separate .env files when running locally vs in production.
  7. Reproducable environment: We use venv to create reproducable environments. We can also use docker to do the same, but it is optional for a simple project.


This article covered the commonly used design patterns and python techniques used to write clean data pipelines. To recap, we saw how

  1. Functional data pipelines produce consistent outputs on re-runs and lead to easily testable code.
  2. Factory patterns can create standard interfaces for similar pipelines, making using/switching them simple.
  3. Strategy pattern allows the code to choose from the multiple data processing options at runtime.
  4. Singleton pattern only allows the creation of one class object. Object pools offer a pool of objects that the program can reuse.
  5. In batch data pipelines, factory pattern connecting to external systems allows for easy testability.
  6. Python functionality like type checks prevent runtime type issues, dataclasses help store and process data effectively, context managers handle closing connections and prevent memory leaks, and decorators enrich the functionality of other functions.
  7. Setting up tests to ensure that the code you write does what you expect it to do.

Note that the design patterns are suggestions and not absolutely required. When designing data pipelines, one must ask themselves if a design pattern can help keep their code clean, now and in the future; if the answer is a no or a maybe, it is better to implement a design pattern only when needed. E.g., if you only have two pipelines (and don’t see it increasing), you don’t need a factory pattern.

If you have any questions or comments, please leave them in the comment section below. If you have any issues with running the code, please open a GitHub issue here .

Further reading

  1. Design patterns
  2. Creating idempotent pipelines
  3. Data pipeline testing


  1. Python patterns
  2. Ecotrust canada TOC generator
  3. Arjan codes

Please consider sharing, it helps out a lot!