import requests
from typing import Any, Dict, List
import logging
import sys
def get_exchange_data() -> List[Dict[str, Any]]:
url = "https://api.coincap.io/v2/exchanges" # Note this API is not longer active
try:
r = requests.get(url)
except requests.ConnectionError as ce:
logging.error(f"There was an error with the request, {ce}")
sys.exit(1)
return r.json().get("data", [])Data pipelines fail after hours of processing because you didn’t insert columns in the right order
If you’ve been frustrated by pipeline issues where:
Long-running pipelines fail because you missed a
,or didn’t select columns in the right order
Output data schemas change unintentionally due to a small code modification
Your transformation logic breaks when the source schema changes unexpectedly
This post is for you. Imagine being confident that your data processing system will interact seamlessly with all external systems before pushing to production.
By the end of this post, you’ll master integration testing for data pipelines. You’ll learn the concepts, how to implement them in Python with pytest, and which tests deliver the highest ROI for catching bugs before production.
Test your code’s intgration with external systems before a production run
From the image above, you can see the pipeline components that require integration testing. When systems interact (source, transformation, destination), create integration tests.
Most pipelines have ETL in some order (ELTL, etc). It’s a good idea to create integration tests for each interaction, especially when multiple services or custom code are involved. For example, assume we have a Python script → S3 → Snowflake. The priority of testing should be
- Python script → S3; first, as it involves custom code
- S3 → Snowflake: Snowflake has an official connector (Snowpipe); ensure that the Snowpipe configuration is correct.
Manually checking may suffice for a simple pipeline. But as the pipeline, inputs, and data team grow in size, it will become increasingly difficult to check every system interaction and to ensure that the output produced has the same schema we expect each time.
Note: Here, the schema indicates the number of columns, column order, column data type, partitioning, cloud storage path, etc.
Simulate external systems with containers/mocks and write tests with Pytest
Integration tests are run locally and on CI environments. External systems are simulated with containers(open source, e.g. Spark & Iceberg) or mocked if they are not (e.g. Moto for AWS services).
Let’s look at an example of mocking an external API. Full code available here.
Now let’s write a test case where our get_exchange_data function gets overridden (patched) with the below logic, which reads data from a csv (fixture).
import csv
def test_exchange_data_etl_run(self, mocker):
mocker.patch(
"bitcoinmonitor.exchange_data_etl.get_exchange_data",
return_value=[
r
for r in csv.DictReader(
open("test/fixtures/sample_raw_exchange_data.csv")
)
],
)
run() # pipeline run function
expected_result = [] # some expected data here
result = self.get_exchange_data()
print(result)
assert expected_result == resultWe’d also want input data (aka Fixtures), which can be pulled from the production system (anonymized as needed) or faked with a library like Faker.
❗Fixtures are time-consuming to create when your pipeline involves multiple input sources and joins.
Integration tests are meant to test the code functionality. Running a pipeline manually and validating (e.g., data constraints, business rules) the output can miss schema changes or fail to verify that inserts into a table work as expected.
Let’s look at an example where we verify that the destination data has the expected schema:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DecimalType, TimestampType
def test_student_table_schema_and_partition(spark):
# Run pipeline
run_pipeline(spark) # Assume this funtion loads data to the destination table student
# Read iceberg table
df = spark.read.format("iceberg").load("local.default.student")
# Expected schema
expected_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("score", DecimalType(10, 2), True),
StructField("load_etl_ts", TimestampType(), True),
StructField("load_etl_h", IntegerType(), True)
])
# Check schema
assert df.schema == expected_schema, f"Schema mismatch: expected {expected_schema}, got {df.schema}"
# Check partitioning
# Assume, the table is to be partitioned by load_etl_h
table = spark.table("local.default.student")
partition_cols = spark.sql("DESCRIBE local.default.student").filter("col_name == '# Partition Information'")
partitions = spark.sql("SHOW PARTITIONS local.default.student") if partition_cols.count() > 0 else None
# Verify partition column exists in metadata
metadata = spark.sql("SELECT * FROM local.default.student.partitions LIMIT 1")
assert "load_etl_h" in [field.name for field in metadata.schema.fields], "Partition column load_etl_h not found"
# Verify data exists
assert df.count() > 0, "Table is empty"Do not test every interface between systems
- Do not test
integration features; test that you are using them correctly. For example, do not test whether a create table destination creates a destination table; instead, run your pipeline and check that the destination table produces the expected output with the expected schema. - Do not test
functionality that doesn't matter. e.g., if you are just dumping data to an S3 output path and don’t care about the schema, just test that the data is written to the right path. - If your
external systems are flaky(random failures, changing schemas, etc.), do not write integration tests; instead,work on setting up a data contractor evolving your pipeline to handle changing schemas. - Do not test
framework features(e.g., Airflow retry), but test that you have used it as expected (e.g., Task order of a pipeline).
Conclusion
To recap, we saw:
- Why testing the interaction between systems matters
- What is integration testing?
- How to do integration testing for a Python data pipeline
- How to decide which system interactions do not need to be tested
While it may be tempting to write test cases for every integration, it is a lot of work. Make sure to start with integration tests that test the integration whose breakage will cost a lot of time/effort to fix.
The next time you build a pipeline, ensure the data systems continue to work well together through integration testing. Use this post as a guide to help you define what tests to create and how to create them.
