How to Write Integration Tests for Python Data Pipelines

Catch integration issues before production failures. Learn why integration tests prevent pipeline bugs and how to write high-ROI tests for Python data pipelines with pytest.

Complete guide to integration testing for Python data pipelines. Learn how to catch schema mismatches, column ordering issues, and transformation bugs before production using pytest and best practices. Includes real-world examples for data engineers.
BEGINNER
BREAK INTO DE
TECHNICAL UPSKILL
PYTHON
Author

Joseph Machado

Keywords

integration, python, data pipeline, pytest, unit test, data systems, integration testing best practices, data pipeline testing, pytest tutorial, schema validation, data quality testing

Data pipelines fail after hours of processing because you didn’t insert columns in the right order

If you’ve been frustrated by pipeline issues where:

Long-running pipelines fail because you missed a , or didn’t select columns in the right order

Output data schemas change unintentionally due to a small code modification

Your transformation logic breaks when the source schema changes unexpectedly

This post is for you. Imagine being confident that your data processing system will interact seamlessly with all external systems before pushing to production.

By the end of this post, you’ll master integration testing for data pipelines. You’ll learn the concepts, how to implement them in Python with pytest, and which tests deliver the highest ROI for catching bugs before production.

Test your code’s intgration with external systems before a production run

Integration Test

Integration Test

From the image above, you can see the pipeline components that require integration testing. When systems interact (source, transformation, destination), create integration tests.

Most pipelines have ETL in some order (ELTL, etc). It’s a good idea to create integration tests for each interaction, especially when multiple services or custom code are involved. For example, assume we have a Python script → S3 → Snowflake. The priority of testing should be

  1. Python script → S3; first, as it involves custom code
  2. S3 → Snowflake: Snowflake has an official connector (Snowpipe); ensure that the Snowpipe configuration is correct.

Manually checking may suffice for a simple pipeline. But as the pipeline, inputs, and data team grow in size, it will become increasingly difficult to check every system interaction and to ensure that the output produced has the same schema we expect each time.

Note: Here, the schema indicates the number of columns, column order, column data type, partitioning, cloud storage path, etc.

Simulate external systems with containers/mocks and write tests with Pytest

Integration tests are run locally and on CI environments. External systems are simulated with containers(open source, e.g. Spark & Iceberg) or mocked if they are not (e.g. Moto for AWS services).

Let’s look at an example of mocking an external API. Full code available here.

import requests
from typing import Any, Dict, List
import logging
import sys

def get_exchange_data() -> List[Dict[str, Any]]:
    url = "https://api.coincap.io/v2/exchanges" # Note this API is not longer active
    try:
        r = requests.get(url)
    except requests.ConnectionError as ce:
        logging.error(f"There was an error with the request, {ce}")
        sys.exit(1)
    return r.json().get("data", [])

Now let’s write a test case where our get_exchange_data function gets overridden (patched) with the below logic, which reads data from a csv (fixture).

import csv

def test_exchange_data_etl_run(self, mocker):
        mocker.patch(
            "bitcoinmonitor.exchange_data_etl.get_exchange_data",
            return_value=[
                r
                for r in csv.DictReader(
                    open("test/fixtures/sample_raw_exchange_data.csv")
                )
            ],
        )
        run() # pipeline run function
        expected_result = [] # some expected data here
        result = self.get_exchange_data()
        print(result)
        assert expected_result == result

We’d also want input data (aka Fixtures), which can be pulled from the production system (anonymized as needed) or faked with a library like Faker.

❗Fixtures are time-consuming to create when your pipeline involves multiple input sources and joins.

Integration tests are meant to test the code functionality. Running a pipeline manually and validating (e.g., data constraints, business rules) the output can miss schema changes or fail to verify that inserts into a table work as expected.

Let’s look at an example where we verify that the destination data has the expected schema:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DecimalType, TimestampType

def test_student_table_schema_and_partition(spark):
    # Run pipeline
    run_pipeline(spark) # Assume this funtion loads data to the destination table student
    
    # Read iceberg table
    df = spark.read.format("iceberg").load("local.default.student")
    
    # Expected schema
    expected_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("score", DecimalType(10, 2), True),
        StructField("load_etl_ts", TimestampType(), True),
        StructField("load_etl_h", IntegerType(), True)
    ])
    
    # Check schema
    assert df.schema == expected_schema, f"Schema mismatch: expected {expected_schema}, got {df.schema}"
    
    # Check partitioning
    # Assume, the table is to be partitioned by load_etl_h
    table = spark.table("local.default.student")
    partition_cols = spark.sql("DESCRIBE local.default.student").filter("col_name == '# Partition Information'")
    
    partitions = spark.sql("SHOW PARTITIONS local.default.student") if partition_cols.count() > 0 else None
    
    # Verify partition column exists in metadata
    metadata = spark.sql("SELECT * FROM local.default.student.partitions LIMIT 1")
    assert "load_etl_h" in [field.name for field in metadata.schema.fields], "Partition column load_etl_h not found"
    
    # Verify data exists
    assert df.count() > 0, "Table is empty"

Do not test every interface between systems

  1. Do not test integration features; test that you are using them correctly. For example, do not test whether a create table destination creates a destination table; instead, run your pipeline and check that the destination table produces the expected output with the expected schema.
  2. Do not test functionality that doesn't matter. e.g., if you are just dumping data to an S3 output path and don’t care about the schema, just test that the data is written to the right path.
  3. If your external systems are flaky (random failures, changing schemas, etc.), do not write integration tests; instead, work on setting up a data contract or evolving your pipeline to handle changing schemas.
  4. Do not test framework features (e.g., Airflow retry), but test that you have used it as expected (e.g., Task order of a pipeline).

Conclusion

To recap, we saw:

  1. Why testing the interaction between systems matters
  2. What is integration testing?
  3. How to do integration testing for a Python data pipeline
  4. How to decide which system interactions do not need to be tested

While it may be tempting to write test cases for every integration, it is a lot of work. Make sure to start with integration tests that test the integration whose breakage will cost a lot of time/effort to fix.

The next time you build a pipeline, ensure the data systems continue to work well together through integration testing. Use this post as a guide to help you define what tests to create and how to create them.

Read these

  1. Data pipeline tests
  2. Automated CI pipeline tests
  3. Using Pytest
  4. dbt ci testing
Back to top