Data Quality Tools are Expensive and Hard-to-use
Data quality tools are expensive and a real pain to use. If you are:
Struggling with expensive, limited-features, and hard-to-use data quality tools
Tired of paying $50k- $150k a year just for a data quality tool
Adopting a tool designed for dependency just to execute simple data quality queries
This post is for you. Picture being able to define exactly how data quality checks run, using systems that check whatever you want.
By the end, you’ll know how to run DQ checks without paying vendors thousands of dollars.
Note: If you are using dbt: dbt-utils, and dbt-expectations cover most of what you need.
Prerequisite:
data_quality_implementation.py
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "duckdb",
# "requests",
# ]
# ///
from typing import Any
import duckdb
from duckdb import DuckDBPyConnection
def is_column_values_in(
conn: DuckDBPyConnection,
table_name: str,
column_name: str,
allowed_values: list[str],
):
"""Function to check if all the values in column {column_name} are within the {allowed_values} list
Args:
conn: DuckDBPyConnection connection belonging to a db that has the table_name
table_name: Name of the table to be validated
column_name: Name of the column to be validated
allowed_values: List of allowed values in the column
Returns:
"""
sql_list = [f"'{v}'" for v in allowed_values]
sql_query = f"select * from {table_name} where {column_name} not in ({(',').join(sql_list)}) limit 1"
rows = conn.sql(sql_query).fetchall()
return True if len(rows) == 0 else False
conn = duckdb.connect()
# Create a sample DuckDB table
conn.execute("""
CREATE TABLE employees (
id INTEGER,
name VARCHAR,
department VARCHAR,
status VARCHAR
)
""")
# Insert sample data
conn.execute("""
INSERT INTO employees VALUES
(1, 'Alice', 'Engineering', 'active'),
(2, 'Bob', 'Marketing', 'inactive'),
(3, 'Charlie', 'Engineering', 'active'),
(4, 'Diana', 'HR', 'pending'),
(5, 'Eve', 'Marketing', 'unknown') -- invalid status
""")
conn.commit()
# Run DQ check: status must be one of the allowed values
allowed_statuses = ["active", "inactive", "pending"]
# This will fail due to the unknown status column value in our sample data
result = is_column_values_in(conn, "employees", "status", allowed_statuses)
print(f"\nDQ Check — all 'status' values in {allowed_statuses}:")
print(f" PASSED: {result}")
allowed_departments = ["Engineering", "Marketing", "HR"]
# This will pass, since all our employees.department confirm to these values
result2 = is_column_values_in(conn, "employees", "department", allowed_departments)
print(f"\nDQ Check — all 'department' values in {allowed_departments}:")
print(f" PASSED: {result2}")
def is_column_values_in_v2(
conn: DuckDBPyConnection,
table_name: str,
column_name: str,
allowed_values: list[str],
pass_threshold: float = 0.9,
sample_rows: int = 20,
) -> tuple[bool, list[tuple[Any]]]:
"""Function to check if all the values in column {column_name} are within the {allowed_values} list
Args:
conn: DuckDBPyConnection connection belonging to a db that has the table_name
table_name: Name of the table to be validated
column_name: Name of the column to be validated
allowed_values: List of allowed values in the column
pass_threshold: percentage threshold that allows some failure. Defaults to 90% pass threshold
sample_rows: The number of sample rows to return when DQ check fails
Returns:
pass/fail, failed_rows
"""
sql_list = ", ".join(f"'{v}'" for v in allowed_values)
sql_violations = f"""
SELECT * FROM {table_name}
WHERE {column_name} NOT IN ({sql_list})
USING SAMPLE {sample_rows}
"""
sql_fail_rate = f"""
SELECT ROUND(count_if({column_name} NOT IN ({sql_list})) * 1.0 / COUNT(*), 2) AS fail_rate
FROM {table_name}
"""
sample = conn.sql(sql_violations).fetchall()
fail_rate = conn.sql(sql_fail_rate).fetchone()[0]
passed = fail_rate <= round(1 - pass_threshold, 2)
return passed, {"fail_rate": fail_rate, "violations": sample}
# Run DQ check: status must be one of the allowed values
allowed_statuses = ["active", "inactive", "pending"]
result = is_column_values_in_v2(conn, "employees", "status", allowed_statuses, 0.8)
# This will PASS since the unknown accounts for (1/5) or 20%
# of the data and our pass_threshold is 80%
print(f"\nDQ Check — all 'status' values in {allowed_statuses}:")
print(f" PASSED: {result}")Run this self contained Python script with uv as shown below.
You should see the following results.
DQ Check — all 'status' values in ['active', 'inactive', 'pending']:
PASSED: False
DQ Check — all 'department' values in ['Engineering', 'Marketing', 'HR']:
PASSED: True
DQ Check — all 'status' values in ['active', 'inactive', 'pending']:
PASSED: (True, {'fail_rate': 0.2, 'violations': [(5, 'Eve', 'Marketing', 'unknown')]})Implementing Data Quality Checks in Python
Write a SQL query to select all the rows that fail your data quality check.
Shown below: Implementation of a function that checks if the values in a column fall within an acceptable set of values.
import duckdb
from duckdb import DuckDBPyConnection
def is_column_values_in(
conn: DuckDBPyConnection,
table_name: str,
column_name: str,
allowed_values: list[str],
):
"""Function to check if all the values in column {column_name} are within the {allowed_values} list
Args:
conn: DuckDBPyConnection connection belonging to a db that has the table_name
table_name: Name of the table to be validated
column_name: Name of the column to be validated
allowed_values: List of allowed values in the column
Returns:
"""
sql_list = [f"'{v}'" for v in allowed_values]
sql_query = f"select * from {table_name} where {column_name} not in ({(',').join(sql_list)}) limit 1"
rows = conn.sql(sql_query).fetchall()
return True if len(rows) == 0 else FalseA query that selects the failed rows is the technique used for implementing data quality checks by most data quality tools.
Calling the is_column_values_in function with fake data.
conn = duckdb.connect()
# Create a sample DuckDB table
conn.execute("""
CREATE TABLE employees (
id INTEGER,
name VARCHAR,
department VARCHAR,
status VARCHAR
)
""")
# Insert sample data
conn.execute("""
INSERT INTO employees VALUES
(1, 'Alice', 'Engineering', 'active'),
(2, 'Bob', 'Marketing', 'inactive'),
(3, 'Charlie', 'Engineering', 'active'),
(4, 'Diana', 'HR', 'pending'),
(5, 'Eve', 'Marketing', 'unknown') -- invalid status
""")
conn.commit()
# Run DQ check: status must be one of the allowed values
allowed_statuses = ["active", "inactive", "pending"]
# This will fail due to the unknown status column value in our sample data
result = is_column_values_in(conn, "employees", "status", allowed_statuses)
print(f"\nDQ Check — all 'status' values in {allowed_statuses}:")
print(f" PASSED: {result}")
allowed_departments = ["Engineering", "Marketing", "HR"]
# This will pass, since all our employees.department confirm to these values
result2 = is_column_values_in(conn, "employees", "department", allowed_departments)
print(f"\nDQ Check — all 'department' values in {allowed_departments}:")
print(f" PASSED: {result2}")SQL logic for data quality checks varies by transformation engine. For example, DuckDB is used in the code block above.
Irrespective of the transformation engine used, you can inject the object representing the data processor into the function.
Return a Sample of Failed Rows for Debugging
When a data quality check fails, a sample of failed rows helps with debugging. Most SQL variants allow you to choose a random sample of rows, as shown below.
Use Failed Rows to Calculate Pass Rate
There are cases where not all rows need to pass data quality checks. For example, a certain failure rate may be acceptable.
Set a pass_threshold to specify what percent of rows must pass your checks.
The implementation for selecting a sample of failed rows and defining a pass_threshold is shown below.
from typing import Any
def is_column_values_in_v2(
conn: DuckDBPyConnection,
table_name: str,
column_name: str,
allowed_values: list[str],
pass_threshold: float = 0.9,
sample_rows: int = 20,
) -> tuple[bool, list[tuple[Any]]]:
"""Function to check if all the values in column {column_name} are within the {allowed_values} list
Args:
conn: DuckDBPyConnection connection belonging to a db that has the table_name
table_name: Name of the table to be validated
column_name: Name of the column to be validated
allowed_values: List of allowed values in the column
pass_threshold: percentage threshold that allows some failure. Defaults to 90% pass threshold
sample_rows: The number of sample rows to return when DQ check fails
Returns:
pass/fail, failed_rows
"""
sql_list = ", ".join(f"'{v}'" for v in allowed_values)
sql_violations = f"""
SELECT * FROM {table_name}
WHERE {column_name} NOT IN ({sql_list})
USING SAMPLE {sample_rows}
"""
sql_fail_rate = f"""
SELECT ROUND(count_if({column_name} NOT IN ({sql_list})) * 1.0 / COUNT(*), 2) AS fail_rate
FROM {table_name}
"""
sample = conn.sql(sql_violations).fetchall()
fail_rate = conn.sql(sql_fail_rate).fetchone()[0]
passed = fail_rate <= round(1 - pass_threshold, 2)
return passed, {"fail_rate": fail_rate, "violations": sample}Try it out with the same data as above.
# Run DQ check: status must be one of the allowed values
allowed_statuses = ["active", "inactive", "pending"]
result = is_column_values_in_v2(conn, "employees", "status", allowed_statuses, 0.8)
# This will PASS since the unknown accounts for (1/5) or 20%
# of the data and our pass_threshold is 80%
print(f"\nDQ Check — all 'status' values in {allowed_statuses}:")
print(f" PASSED: {result}")The is_column_values_in_v2 version:
- Returns a sample (default 20 rows) of failed rows for debugging
- Enables threshold-based checks, allowing for nuanced use cases
With more pipelines, you’ll have to repeat the DQ check logic across them.
Modularize the data quality checks and place them in their own folder. Read this post on how to implement a shared data quality module.
You don’t need a 100k $ a year vendor subscription
To recap, we saw
- How to use SQL to implement data quality checks
- Extending SQL to handle quality thresholds and making it debuggable
When implementing data quality checks for your pipeline rollout, your own. It will be simpler, extendible, and easy to use. Identifying data quality checks and working with stakeholders are the hard parts; keep the implementation simple.