Airflow is a must-know for data engineers
If you are a data engineer or looking to break into Data Engineering, Apache Airflow is a must know. If you are worried that
You don’t have enough time to learn Apache Airflow
Just using cronjobs will not make your resume pop
This post is for you. Understanding the concepts of Orchestration and scheduling with Apache Airflow will significantly improve your chances in your next data engineering interview.
By the end of this post, you will have learned how to use Airflow and how Airflow runs your code, giving you sufficient knowledge to dig into any Airflow system.
To follow along with code you need to do the following.
Prerequisites
Clone and start the container as shown below:
Open Airflow at http://localhost:8080 and stop containers after you are done with docker compose down.
Define Pipelines With DAG
A directed acyclic graph (DAG) is a one-way graph with no cycles, making it ideal for representing batch data pipelines.
The terms DAG and data pipeline are used interchangeably.
Use the DAG function or @dag decorator to define a DAG in Airflow. Define properties and attributes for your DAG, such as: dag_id, description, start_date, end_date, dagrun_timeout and many more shown here.
A DAG is made up of one or more tasks.
Tasks represent a unit of work in your pipeline. Tasks can be of 3 main types.
- Operators are prebuilt tasks for specific operations. E.g., S3CreateBucketOperator, SQLExecuteQueryOperator. You can see a list of all the active operators here.
- Sensors are tasks that keep checking for an event. A sensor will periodically poke the external system to see if an event occurred. E.g. S3KeySensor
- Any Python function can be used as a task by using the
@taskdecorator.
Similar to DAG, a task enables you to add a set of configs relevant to the task, such as retry, etc. Available task configurations
"""
Simple ETL DAG — Airflow 3
code at ./airlfow-tutorial/blob/main/airflow/dags/basic_example.py
"""
import pendulum
from airflow.sdk import dag, task
import time
@dag(
dag_id="simple_etl",
schedule="@daily",
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
catchup=False,
tags=["etl", "example"],
)
def simple_etl():
@task(owner="ExternalSystem")
def extract() -> list[dict]:
"""Generate fake raw records."""
raw_data = [
{"id": 1, "name": "alice", "amount": "100.5"},
{"id": 2, "name": "bob", "amount": "200.0"},
{"id": 3, "name": "charlie", "amount": " 50.75 "},
]
print(f"Extracted {len(raw_data)} records")
return raw_data
@task(owner="StartDataEngineering", retries=2, retry_delay=2)
def transform(raw: list[dict]) -> list[dict]:
"""Clean and type-cast the raw records."""
transformed = [
{
"id": record["id"],
"name": record["name"].strip().title(),
"amount": round(float(record["amount"].strip()), 2),
}
for record in raw
]
print(f"Transformed {len(transformed)} records")
return transformed
@task()
def load(records: list[dict]) -> None:
"""Print records to simulate a load step."""
print(f"Loading {len(records)} records:")
for record in records:
print(f" → {record}")
# Wire up the pipeline
raw = extract()
cleaned = transform(raw)
load(cleaned)
simple_etl()The above code will create a DAG called simple_etl as shown below.
Use XComs to communicate across tasks
Tasks are meant to be self-contained. However, there are cases where cross-task communication is required. This is where XComs helps.
XComs is a key-value pair system into which a task can add a value, and another task can then read that value with its key. In the above simple_etl our tasks move data between each other in this code block:
We can see the XComs value in the UI as shown below.
By default, XComs are stored in the Airflow DB, so it is not recommended for large values. For large values, you would have to define a custom XComs backend, such as cloud storage.
Create DAGs dynamically
We can dynamically generate DAGs using standard Python. See the below code from ./airflow/dags/create_dynamic_dags.py.
Within a DAG, we can dynamically create tasks as shown below.
"""
Simple Dynamic DAG — Airflow 3
code at: ./airlfow-tutorial/blob/main/airflow/dags/dynamic_dag.py
Each item in the list gets its own task instance at runtime.
"""
import pendulum
from airflow.sdk import dag, task
@dag(
dag_id="simple_dynamic_dag",
schedule="@daily",
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
catchup=False,
tags=["dynamic", "example"],
)
def simple_dynamic():
@task()
def process(table: str) -> str:
"""
This task is dynamically mapped — one instance per item.
Airflow creates 3 task instances at runtime:
process-orders, process-customers, process-products
"""
result = f"Processed table: {table.upper()}"
print(result)
return result
@task()
def summarise(results: list[str]) -> None:
"""Receives all mapped outputs as a single list."""
print("Summary:")
for r in results:
print(f" ✓ {r}")
items = ["orders", "customers", "products"]
# .expand() is what makes it dynamic —
# one task instance is created per element in `items`
processed = process.expand(table=items)
summarise(processed)
simple_dynamic()We can see the process task having 3 instances, one per item.
We used a simple mapping to map to 3 elements in a list. However, there are complex task generation strategies shown here.
DAGs can be scheduled or set to run when a dataset is updated
DAGs can be set to run at a given frequency or start in response to an update in a dataset.
Scheduling DAGs
Data pipelines are designed to process data for some chunk of time. This chunk of time is called the data interval.
When Data interval to be processed = pipeline frequency
Most data pipelines are designed to process data for the most recent time range. This means that if your pipeline runs every day at 12:00 A.M., it is meant to capture yesterday’s data from 12:00:00 AM to 11:59:59 PM yesterday for that run.
The data interval for a @daily run will be 12:00:00 AM yesterday to 12:00:00 AM today.
Even if the DAG does not start exactly at 12:00:00 AM every day, your pipeline will need to know the start and end times of the interval to be processed.
Airflow scheduling is built around this concept and provides the start and end times as variables in the DAG. Let’s look at an example.
# Code at ./airlfow-tutorial/blob/main/airflow/dags/standard_data_interval_example.py
import pendulum
from airflow.sdk import dag, task, get_current_context
from airflow.timetables.interval import CronDataIntervalTimetable
@dag(
dag_id="minutely_interval_printer",
schedule=CronDataIntervalTimetable(
"* * * * *", # every minute
timezone=pendulum.timezone("UTC"),
), # this is equivalent to just using cron directly as schedule="* * * * *"
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "timetable"],
)
def minutely_interval_printer():
@task()
def print_interval() -> None:
context = get_current_context()
start = context["data_interval_start"]
end = context["data_interval_end"]
duration_seconds = (end - start).total_seconds()
print(f"data_interval_start : {start}")
print(f"data_interval_end : {end}")
print(f"Window duration : {duration_seconds:.0f} seconds")
print_interval()
minutely_interval_printer()We can see the context[data_interval_start] & context[data_interval_end] in the task level logs as shown below.
We use the CronDataIntervalTimetable method to define the pipeline frequency. The pipeline frequency is used to derive the data interval.
While cron runs at the same frequency, you can also use DeltaDataIntervalTimetable to tell the DAG to run every n(say 30) minutes. Here’s a quick comparison of the two.
Cron (*/30 * * * *) |
Delta (30 min) | |
|---|---|---|
| DAG start | 12:38 PM | 12:38 PM |
| 1st run | 1:00 PM | 1:08 PM |
| 2nd run | 1:30 PM | 1:38 PM |
| 3rd run | 2:00 PM | 2:08 PM |
| Data window | Fixed clock intervals | Relative to last run |
| Misses 12:38–1:00 PM? | ✅ Yes | ❌ No |
Key takeaway: Cron snaps to the clock, so the first 22 minutes of data are lost. Delta starts counting from when you actually turn it on.
When Data interval != pipeline frequency
There are cases where you will want to run your pipelines, but process data for a different data interval (or not need interval logic).
E.g., consider a rolling-window data-interval processing or a pipeline that runs every month but only processes data for the last 4 days, etc.
We can define this with the trigger pattern. Let’s look at the example below.
# code at ./airlfow-tutorial/blob/main/airflow/dags/custom_data_interval_example.py
import pendulum
from datetime import timedelta
from airflow.sdk import dag, task, get_current_context
from airflow.timetables.trigger import CronTriggerTimetable
@dag(
dag_id="custom_data_interval_minutely_interval_printer",
schedule=CronTriggerTimetable(
"* * * * *", # every minute on the hour
timezone="UTC",
interval=timedelta(hours=1), # data window = previous hour → now
),
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "timetable"],
)
def hourly_interval_printer():
@task()
def print_interval() -> None:
context = get_current_context()
start = context["data_interval_start"]
end = context["data_interval_end"]
print(f"data_interval_start : {start}")
print(f"data_interval_end : {end}")
total = int((end - start).total_seconds())
hours, remainder = divmod(total, 3600)
minutes, seconds = divmod(remainder, 60)
print(f"Window duration : {hours}h {minutes}m {seconds}s")
print_interval()
hourly_interval_printer()We can see how the pipeline is scheduled every minute, but the data interval is an hour.
Event Driven DAGs
There are many cases when you want to run a pipeline when the upstream data to that pipeline is updated.
With asset-based scheduling, you define the assets a pipeline produces and use them as a trigger for any downstream pipeline. Let’s look at the example below.
- The asset is identified by a unique url
- A task outlet tells Airflow that the specified outlet asset is updated
- The consumer DAG will use the asset as a schedule
- The
triggering_asset_eventwill be filled in by Airflow and have the information about the asset and the DAG that updated it
We can see how an upstream dag run completion triggers a downstream dag.
When the upstream pipeline completes, the downstream pipeline gets triggered.
Before asset-based scheduling, Airflow used to use
TriggerDagRunOperatortask in the upstream DAG to start the downstream DAG.ExternalTaskSensortask for the downstream DAG to wait for the upstream DAG to finish successfully.
Both of these approaches are hard to maintain and debug.
We can also define the downstream DAG to wait for multiple assets to complete one update before starting.
We can also combine asset-based scheduling with time-based scheduling to ensure that the workflow remains responsive to data changes and consistently runs regular checks or updates. See AssetOrTimeSchedule for details.
Run tasks in parallel with executors
Airflow tasks are run in parallel using the executors.
We use the LocalExecutor, which runs tasks as individual Python processes.
# bash into the running docker container
docker exec -ti airflow-tutorial bash
ps aux | grep '[a]irflow worker' | wc -l # This will show 32 workers
cat airflow.cfg | grep 'parallelism ='
# airflow.cfg contains airflows configuration and
# this will shown parallelism = 32 corresponding
# to the 32 we see from the previous commandSee a list of executors available here. Shown below is a list of popular executors from Airflow’s 2025 survey.
The executors are swappable, so you can start with a LocalExecutor and scale to other executors as needed.
We can use pools to limit the number of tasks that can be run in parallel.
For example, if your Airflow is starting 1000s of tasks, your system resources may get overwhelmed. In such cases, you can create a pool and limit it to 100, so only 100 tasks will run in parallel at any given time.
You can assign higher priority weights to high-urgency tasks.
Airflow’s dag processor creates DAGs, and the Scheduler starts them
When you create a Python file with a DAG definition, the dag-processor is responsible for parsing it and creating a DAG. The DAG information is stored in Airflow’s database.
Once the DAGs are created, they are monitored by the scheduler process, which checks (by default every minute) whether any DAGs need to be started.
Let’s take a look at these processes as shown below.
See run times, configs, DAG visualizations, run backfills with Airflow UI
Airflow UI enables you to dig into individual DAGs, tasks, check logs, view the code used to create the DAG, etc.
You will also be able to see cross-dag dependencies when using asset-based scheduling.
You can also trigger a backfill from the Airflow UI. Use the Airflow config tab to store variable values and connection credentials that can be reused across DAGs.
Conclusion
To recap, we saw
- How to create pipelines with Airflow DAGs
- Scheduling a DAG
- Designing a DAG to start when there is an update to upstream data.
- How executors run your tasks
- How DAG processor and scheduler take your Python code and start a DAG
- How to see run times, configs, DAG visualizations, and run backfills with Airflow UI
Airflow is a very powerful (& complex) orchestrator and scheduler, use this tutorial to understand how to use Airflow and practice building a portfolio project from one of these












