Writing memory efficient data pipelines in Python

Introduction

If you are

Wondering how to write memory efficient data pipelines in python

Working with a dataset that is too large to fit into memory

Then this post is for you. We will go over how to write memory efficient data pipelines using generators and when to use distributed data processing frameworks.

1. Using generators

“Regular functions compute a value and return it, but generators return an iterator that returns a stream of values.” - Python Docs

Creating a generator is very similar to creating a function. You can use the generator expression () or define a function that uses yield instead of return.

Generators are usually used in a loop. For each iteration of the calling loop:

  1. Control is passed back to the generator function from the calling loop.
  2. The generator yields the next value to the loop and the control is passed back to the loop.
  3. Steps 1 and 2 are repeated until all the generated values are exhausted.

You can also get data directly from a generator using next(generator_obj).

Let’s say our objective is to process the parking violation 2018 dataset with the following steps:

  1. Keep only the violations issued by police (denoted by P in the data), to vehicles with the make FORD in NJ.
  2. Replace P with police.
  3. Concat house number, street name, and registration state fields into a single address field.
  4. Write the result’s in to a csv file with the header vehicle_make,issuing_agency,address.

Using generator expression

import csv

input_file_name = "./parking-violations-issued-fiscal-year-2018.csv"
output_file_name = "./nj_ford_trasnportation_issued_pv_2018.csv"

# 1. stream data from input file
read_file_object = open(input_file_name, "r")
extractor = csv.reader(read_file_object)  # csv reader produces a generator

# 2. keep only required fields
# field index => field;
# 2 => registration state, 7 => vehicle make, 8 => issuing agency, 23 => house number, 24 => street name
col_filtered_stream = ([row[2], row[7], row[8], row[23], row[24]] for row in extractor)

# 3. keep only violations issued by police, to vehicles with the make FORD in NJ
value_filtered_stream = filter(
    lambda x: all([x[0] == "NJ", x[1] == "FORD", x[2] == "P"]), col_filtered_stream
)

# 4. replace P with police
transformed_stream = (
    [stream[0], stream[1], "police", stream[3], stream[4]]
    for stream in value_filtered_stream
)

# 5. concat house number, street name, registration state into a single address field
final_stream = (
    [stream[1], stream[2], ", ".join([stream[3], stream[4], stream[1]])]
    for stream in transformed_stream
)

final_stream  # this is a generator object and has not yet started generating data

# 6. write a header row for output data
write_file_object = open(output_file_name, "w")
loader = csv.writer(
    write_file_object, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
)
header = ["vehicle_make", "issuing_agency", "address"]
loader.writerow(header)

# 7. stream data into an output file
loader.writerows(final_stream)  # loader asks for data from final_stream

In the above example, we create generators with the () comprehension format. We also chain multiple generators to form a data pipeline, this is called chaining. The above example is simple and the logic can be easily described using lambda functions.

Let’s see an example where we use a more complex function. This function will calculate the number of days between the violation issue date and the vehicle expiration date.

import csv
from datetime import datetime


input_file_name = "./parking-violations-issued-fiscal-year-2018.csv"
output_file_name = "./pv_2018_w_days_until_violation.csv"


def get_days_until_expiration(row):
    issue_date_str = row[4]
    vehicle_expiration_date_float = row[12]
    issue_date = datetime.strptime(issue_date_str[:10], "%Y-%m-%d")
    try:
        vehicle_expiration_date = datetime.strptime(
            str(vehicle_expiration_date_float).split(".")[0], "%Y%m%d"
        )
        date_diff = (vehicle_expiration_date - issue_date).days
    except ValueError as ve:
        date_diff = -1
    return date_diff


# 1. stream data from input file
read_file_object = open(input_file_name, "r")
extractor = csv.reader(read_file_object)
# skip header
next(extractor)

# 2. calculate days until expiration
final_stream = (row + [get_days_until_expiration(row)] for row in extractor)

# 3. stream data into an output file
write_file_object = open(output_file_name, "w")
loader = csv.writer(
    write_file_object, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
)

loader.writerows(final_stream)

Using generator yield

You can also create your own generator as shown below.

def naive_generator(count=100000):
    for i in range(count):
        yield i


next(naive_generator)  # 1
next(naive_generator)  # 2
# ...

This is a simple example, but the principle remains the same for real use cases.

Mini batching

Consider an example where your data pipeline has to call an external paid service. Let’s assume that this service charges per call. It does not matter if you call it with 1 row vs 10,000 rows, It charges the same price.

In this scenario you can accumulate the rows in memory. When it hits a specified threshold, then make the service call.

import time
import csv

input_file_name = "./parking-violations-issued-fiscal-year-2018.csv"
output_file_name = "./pv_2018_ext_call_enriched.csv"


def service_call(rows):
    time.sleep(15)  # simulating an external service call
    return rows


def batched_service_transforms(rows, batch_size=10000):
    batch = []
    for row in rows:
        batch.append(row)
        if len(batch) >= batch_size:
            yield from service_call(batch)
            batch = []
    yield from service_call(batch)


# 1. stream data from input file
read_file_object = open(input_file_name, "r")
extractor = csv.reader(read_file_object)

# 2. make batched calls to the external service
final_stream = batched_service_transforms(extractor)
next(final_stream)  # you will notice a 15s wait, simulating the external service call
[
    next(final_stream) for i in range(9999)
]  # you will notice almost no wait, since this data is held in process memory

# we have iterated through the first batch of 10,000
# the next call will invoke the service_call function, thus sleeping for 15s
next(final_stream)  # you will notice a 15s wait

Note that the yield from items is short hand for

for i in items:
    yield i

Reading in batches from a database

When pulling a large dataset (either from a DB or an external service) into your python process, you will need to make a tradeoff between

  1. Memory: Pulling the entire data will cause out of memory errors.
  2. Speed: Fetching one row at a time from the database will incur expensive network calls.

A good tradeoff would be to fetch data in batches. The size of a batch will depend on the memory available and speed requirements of your data pipeline.

In the following code snippet, we fetch data from the database in batches of 10,000 rows. These 10,000 rows will be fetched when required downstream, kept in memory, and served one row at a time until its empty. This process is repeated until the entire dataset is traversed.

import psycopg2


def generate_from_db(username, password, host, port, dbname, batch_size=10000):
    conn_url = f"postgresql://{username}:{password}@{host}:{port}/{dbname}"

    conn = psycopg2.connect(conn_url)
    cur = conn.cursor(name="get_large_data")
    cur.execute(
        "SELECT c1,c2,c3 FROM big_table"
    )  # this will get the data ready on the db side

    while True:
        rows = cur.fetchmany(
            batch_size
        )  # this will fetch data in batches from the ready data in db
        if not rows:
            break
        yield from rows

    cur.close()
    conn.close()


next(generate_from_db("username", "password", "host", 5432, "database"))

Points to note in the above example

  1. Opening and closing a db connection is expensive, hence, we keep the connection open.
  2. In the above example, we use server side caching, which keeps the data ready to be served in your database. Alternatively, you can use sort, limit, offset to get the batches.

Pros & Cons

Pros

  1. No need to install or maintain external libraries.
  2. Native python modules have good documentation.
  3. Easy to use.
  4. Since most of the distributed data processing frameworks support python, it’s relatively easy to port this code over if needed.

Cons

  1. Parallelizing data processing is an involved process.
  2. Sorting and aggregating will require you to keep the data in memory.
  3. Joining multiple datasets will require complex patterns and handling edge cases.

2. Using distributed frameworks

Another option is to leverage distributed frameworks like Spark, Flink or Dask. While they are very powerful tools, you may not always need them. If you think that your data will grow significantly in size, complexity or that the requirements for speed of data processing will be high, definitely consider using these tools.

Pros & Cons

Pros

  1. Most data processing functions are in-built.
  2. Can easily scale to large data sets.
  3. If you are in a python ecosystem, it’s very easy to use any of these frameworks.

Cons

  1. Can be hard to install, setup clusters, and upgrade.
  2. If cluster resources are not allocated appropriately, the processing may fail.
  3. They have their own quirks and gotchas of which to be aware.

Conclusion

Hope this article gives you a good understanding of how to use generators to write memory efficient data pipelines. The next time you have to build a data pipeline to process a larger than memory data set, try using generators.

As always, please leave any comments or questions in the comment section below.

Further reading

  1. Optimizing spark code
  2. Updating MySQL in batches
  3. Pulling data from an API

References

  1. postgres caching in depth
  2. Fetching rows in batch using psycopg
  3. Understanding, yield from