Free Airflow 3.0 Tutorial

Master the core concepts of Apache Airflow 3.0 — from your first DAG to advanced scheduling — with hands-on code examples.

Master the core concepts of Apache Airflow 3.0 — from your first DAG to advanced scheduling — with hands-on code examples.”
TECHNICAL UPSKILL
BREAK INTO DE
REAL WORLD
LEARN FUNDAMENTALS
Author

Joseph Machado

Published

February 15, 2026

Keywords

Apache Airflow 3.0, Airflow tutorial, Airflow DAG tutorial, how to create Airflow DAG, Airflow for beginners, data pipeline tutorial, Apache Airflow pipeline tutorial, data engineering tutorial

Airflow is a must-know for data engineers

If you are a data engineer or looking to break into Data Engineering, Apache Airflow is a must know. If you are worried that

You don’t have enough time to learn Apache Airflow

Just using cronjobs will not make your resume pop

This post is for you. Understanding the concepts of Orchestration and scheduling with Apache Airflow will significantly improve your chances in your next data engineering interview.

By the end of this post, you will have learned how to use Airflow and how Airflow runs your code, giving you sufficient knowledge to dig into any Airflow system.

To follow along with code you need to do the following.

Prerequisites

  1. Docker version >= 20.10.17 and Docker compose v2 version >= v2.10.2.

Clone and start the container as shown below:

git clone https://github.com/josephmachado/airflow-tutorial.git
cd airflow-tutorial
docker compose up -d --build

Open Airflow at http://localhost:8080 and stop containers after you are done with docker compose down.

Define Pipelines With DAG

A directed acyclic graph (DAG) is a one-way graph with no cycles, making it ideal for representing batch data pipelines.

Note

The terms DAG and data pipeline are used interchangeably.

Use the DAG function or @dag decorator to define a DAG in Airflow. Define properties and attributes for your DAG, such as: dag_id, description, start_date, end_date, dagrun_timeout and many more shown here.

A DAG is made up of one or more tasks.

Tasks represent a unit of work in your pipeline. Tasks can be of 3 main types.

  1. Operators are prebuilt tasks for specific operations. E.g., S3CreateBucketOperator, SQLExecuteQueryOperator. You can see a list of all the active operators here.
  2. Sensors are tasks that keep checking for an event. A sensor will periodically poke the external system to see if an event occurred. E.g. S3KeySensor
  3. Any Python function can be used as a task by using the @task decorator.

Similar to DAG, a task enables you to add a set of configs relevant to the task, such as retry, etc. Available task configurations

"""
Simple ETL DAG — Airflow 3
code at ./airlfow-tutorial/blob/main/airflow/dags/basic_example.py
"""

import pendulum
from airflow.sdk import dag, task
import time


@dag(
    dag_id="simple_etl",
    schedule="@daily",
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["etl", "example"],
)
def simple_etl():
    @task(owner="ExternalSystem")
    def extract() -> list[dict]:
        """Generate fake raw records."""
        raw_data = [
            {"id": 1, "name": "alice", "amount": "100.5"},
            {"id": 2, "name": "bob", "amount": "200.0"},
            {"id": 3, "name": "charlie", "amount": "  50.75 "},
        ]
        print(f"Extracted {len(raw_data)} records")
        return raw_data

    @task(owner="StartDataEngineering", retries=2, retry_delay=2)
    def transform(raw: list[dict]) -> list[dict]:
        """Clean and type-cast the raw records."""
        transformed = [
            {
                "id": record["id"],
                "name": record["name"].strip().title(),
                "amount": round(float(record["amount"].strip()), 2),
            }
            for record in raw
        ]
        print(f"Transformed {len(transformed)} records")
        return transformed

    @task()
    def load(records: list[dict]) -> None:
        """Print records to simulate a load step."""
        print(f"Loading {len(records)} records:")
        for record in records:
            print(f"  → {record}")

    # Wire up the pipeline
    raw = extract()
    cleaned = transform(raw)
    load(cleaned)


simple_etl()

The above code will create a DAG called simple_etl as shown below.

DAG Grid View

DAG Grid View

Use XComs to communicate across tasks

Tasks are meant to be self-contained. However, there are cases where cross-task communication is required. This is where XComs helps.

XComs is a key-value pair system into which a task can add a value, and another task can then read that value with its key. In the above simple_etl our tasks move data between each other in this code block:

    # Wire up the pipeline
    raw = extract()
    cleaned = transform(raw)
    load(cleaned)

We can see the XComs value in the UI as shown below.

XComs

XComs

By default, XComs are stored in the Airflow DB, so it is not recommended for large values. For large values, you would have to define a custom XComs backend, such as cloud storage.

Create DAGs dynamically

We can dynamically generate DAGs using standard Python. See the below code from ./airflow/dags/create_dynamic_dags.py.

Dynamic DAG

Dynamic DAG

Within a DAG, we can dynamically create tasks as shown below.

"""
Simple Dynamic DAG — Airflow 3
code at: ./airlfow-tutorial/blob/main/airflow/dags/dynamic_dag.py
Each item in the list gets its own task instance at runtime.
"""

import pendulum
from airflow.sdk import dag, task


@dag(
    dag_id="simple_dynamic_dag",
    schedule="@daily",
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["dynamic", "example"],
)
def simple_dynamic():
    @task()
    def process(table: str) -> str:
        """
        This task is dynamically mapped — one instance per item.
        Airflow creates 3 task instances at runtime:
          process-orders, process-customers, process-products
        """
        result = f"Processed table: {table.upper()}"
        print(result)
        return result

    @task()
    def summarise(results: list[str]) -> None:
        """Receives all mapped outputs as a single list."""
        print("Summary:")
        for r in results:
            print(f"  ✓ {r}")

    items = ["orders", "customers", "products"]

    # .expand() is what makes it dynamic —
    # one task instance is created per element in `items`
    processed = process.expand(table=items)

    summarise(processed)


simple_dynamic()

We can see the process task having 3 instances, one per item.

Task Mapping

Task Mapping

We used a simple mapping to map to 3 elements in a list. However, there are complex task generation strategies shown here.

DAGs can be scheduled or set to run when a dataset is updated

DAGs can be set to run at a given frequency or start in response to an update in a dataset.

Scheduling DAGs

Important

Data pipelines are designed to process data for some chunk of time. This chunk of time is called the data interval.

When Data interval to be processed = pipeline frequency

Most data pipelines are designed to process data for the most recent time range. This means that if your pipeline runs every day at 12:00 A.M., it is meant to capture yesterday’s data from 12:00:00 AM to 11:59:59 PM yesterday for that run.

Important

The data interval for a @daily run will be 12:00:00 AM yesterday to 12:00:00 AM today.

Data Interval

Data Interval

Even if the DAG does not start exactly at 12:00:00 AM every day, your pipeline will need to know the start and end times of the interval to be processed.

Airflow scheduling is built around this concept and provides the start and end times as variables in the DAG. Let’s look at an example.

# Code at ./airlfow-tutorial/blob/main/airflow/dags/standard_data_interval_example.py
import pendulum
from airflow.sdk import dag, task, get_current_context
from airflow.timetables.interval import CronDataIntervalTimetable


@dag(
    dag_id="minutely_interval_printer",
    schedule=CronDataIntervalTimetable(
        "* * * * *",  # every minute
        timezone=pendulum.timezone("UTC"),
    ), # this is equivalent to just using cron directly as schedule="* * * * *"
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example", "timetable"],
)
def minutely_interval_printer():
    @task()
    def print_interval() -> None:
        context = get_current_context()

        start = context["data_interval_start"]
        end = context["data_interval_end"]
        duration_seconds = (end - start).total_seconds()

        print(f"data_interval_start : {start}")
        print(f"data_interval_end   : {end}")
        print(f"Window duration     : {duration_seconds:.0f} seconds")

    print_interval()


minutely_interval_printer()

We can see the context[data_interval_start] & context[data_interval_end] in the task level logs as shown below.

Data Interval

Data Interval

We use the CronDataIntervalTimetable method to define the pipeline frequency. The pipeline frequency is used to derive the data interval.

While cron runs at the same frequency, you can also use DeltaDataIntervalTimetable to tell the DAG to run every n(say 30) minutes. Here’s a quick comparison of the two.

Cron (*/30 * * * *) Delta (30 min)
DAG start 12:38 PM 12:38 PM
1st run 1:00 PM 1:08 PM
2nd run 1:30 PM 1:38 PM
3rd run 2:00 PM 2:08 PM
Data window Fixed clock intervals Relative to last run
Misses 12:38–1:00 PM? ✅ Yes ❌ No

Key takeaway: Cron snaps to the clock, so the first 22 minutes of data are lost. Delta starts counting from when you actually turn it on.

When Data interval != pipeline frequency

There are cases where you will want to run your pipelines, but process data for a different data interval (or not need interval logic).

E.g., consider a rolling-window data-interval processing or a pipeline that runs every month but only processes data for the last 4 days, etc.

Overlapping Data Interval

Overlapping Data Interval

We can define this with the trigger pattern. Let’s look at the example below.

# code at ./airlfow-tutorial/blob/main/airflow/dags/custom_data_interval_example.py
import pendulum
from datetime import timedelta
from airflow.sdk import dag, task, get_current_context
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    dag_id="custom_data_interval_minutely_interval_printer",
    schedule=CronTriggerTimetable(
        "* * * * *",  # every minute on the hour
        timezone="UTC",
        interval=timedelta(hours=1),  # data window = previous hour → now
    ),
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example", "timetable"],
)
def hourly_interval_printer():
    @task()
    def print_interval() -> None:
        context = get_current_context()

        start = context["data_interval_start"]
        end = context["data_interval_end"]

        print(f"data_interval_start : {start}")
        print(f"data_interval_end   : {end}")
        total = int((end - start).total_seconds())
        hours, remainder = divmod(total, 3600)
        minutes, seconds = divmod(remainder, 60)
        print(f"Window duration : {hours}h {minutes}m {seconds}s")

    print_interval()


hourly_interval_printer()

Cron Trigger

Cron Trigger

We can see how the pipeline is scheduled every minute, but the data interval is an hour.

Event Driven DAGs

There are many cases when you want to run a pipeline when the upstream data to that pipeline is updated.

With asset-based scheduling, you define the assets a pipeline produces and use them as a trigger for any downstream pipeline. Let’s look at the example below.

Asset Dags

Asset Dags
  1. The asset is identified by a unique url
  2. A task outlet tells Airflow that the specified outlet asset is updated
  3. The consumer DAG will use the asset as a schedule
  4. The triggering_asset_event will be filled in by Airflow and have the information about the asset and the DAG that updated it

We can see how an upstream dag run completion triggers a downstream dag.

Asset Scheduling

Asset Scheduling

When the upstream pipeline completes, the downstream pipeline gets triggered.

Before asset-based scheduling, Airflow used to use

  • TriggerDagRunOperator task in the upstream DAG to start the downstream DAG.
  • ExternalTaskSensor task for the downstream DAG to wait for the upstream DAG to finish successfully.

Both of these approaches are hard to maintain and debug.

We can also define the downstream DAG to wait for multiple assets to complete one update before starting.

Multiple Asset Scheduling

Multiple Asset Scheduling

We can also combine asset-based scheduling with time-based scheduling to ensure that the workflow remains responsive to data changes and consistently runs regular checks or updates. See AssetOrTimeSchedule for details.

Run tasks in parallel with executors

Airflow tasks are run in parallel using the executors.

We use the LocalExecutor, which runs tasks as individual Python processes.

# bash into the running docker container 
docker exec -ti airflow-tutorial bash 

ps aux | grep '[a]irflow worker' | wc -l # This will show 32 workers

cat airflow.cfg | grep 'parallelism =' 
# airflow.cfg contains airflows configuration and 
# this will shown parallelism = 32 corresponding 
# to the 32 we see from the previous command

See a list of executors available here. Shown below is a list of popular executors from Airflow’s 2025 survey.

Executor usage

Executor usage

The executors are swappable, so you can start with a LocalExecutor and scale to other executors as needed.

We can use pools to limit the number of tasks that can be run in parallel.

For example, if your Airflow is starting 1000s of tasks, your system resources may get overwhelmed. In such cases, you can create a pool and limit it to 100, so only 100 tasks will run in parallel at any given time.

You can assign higher priority weights to high-urgency tasks.

Airflow’s dag processor creates DAGs, and the Scheduler starts them

When you create a Python file with a DAG definition, the dag-processor is responsible for parsing it and creating a DAG. The DAG information is stored in Airflow’s database.

Once the DAGs are created, they are monitored by the scheduler process, which checks (by default every minute) whether any DAGs need to be started.

Let’s take a look at these processes as shown below.

docker exec -ti airflow-tutorial bash

ps aux | grep '[d]ag-processor' # You will see the dag-processor process 

ps aux | grep '[a]irflow scheduler' # You will see the scheduler process

See run times, configs, DAG visualizations, run backfills with Airflow UI

Airflow UI enables you to dig into individual DAGs, tasks, check logs, view the code used to create the DAG, etc.

UI

UI

You will also be able to see cross-dag dependencies when using asset-based scheduling.

Cross DAG Dependency

Cross DAG Dependency

You can also trigger a backfill from the Airflow UI. Use the Airflow config tab to store variable values and connection credentials that can be reused across DAGs.

Conclusion

To recap, we saw

  1. How to create pipelines with Airflow DAGs
  2. Scheduling a DAG
  3. Designing a DAG to start when there is an update to upstream data.
  4. How executors run your tasks
  5. How DAG processor and scheduler take your Python code and start a DAG
  6. How to see run times, configs, DAG visualizations, and run backfills with Airflow UI

Airflow is a very powerful (& complex) orchestrator and scheduler, use this tutorial to understand how to use Airflow and practice building a portfolio project from one of these

Read These

  1. DE101
  2. Data Projects
Back to top