Scheduling a SQL script, using Apache Airflow, with an example

Author

Joseph Machado

Published

March 29, 2020

Keywords

Apache Airflow

One of the most common use cases for Apache Airflow is to run scheduled SQL scripts. Developers who start with Airflow often ask the following questions

“How to use airflow to orchestrate sql?” “How to specify date filters based on schedule intervals in Airflow?”

This post aims to cover the above questions. This post assumes you have a basic understanding of Apache Airflow and SQL. Let’s see how we can schedule a SQL script using Airflow, with an example. Let’s assume we want to run a SQL script every day at midnight. This SQL script performs data aggregation over the previous day’s data from event table and stores this data in another event_stats table.

Sample ETL

We can use Airflow to run the SQL script every day. In this example we use MySQL, but airflow provides operators to connect to most databases.

DAG: Directed Acyclic Graph, In Airflow this is used to denote a data pipeline which runs on a scheduled interval. A DAG can be made up of one or more individual tasks.

from airflow import DAG
from airflow.operators.mysql_operator import MySqlOperator

default_arg = {'owner': 'airflow', 'start_date': '2020-02-28'}

dag = DAG('simple-mysql-dag',
          default_args=default_arg,
          schedule_interval='0 0 * * *')

mysql_task = MySqlOperator(dag=dag,
                           mysql_conn_id='mysql_default', 
                           task_id='mysql_task'
                           sql='<path>/sample_sql.sql',
                           params={'test_user_id': -99})

mysql_task

In the above script

  1. 0 0 * * * is a cron schedule format, denoting that the DAG should be run everyday at midnight, which is denoted by the 0th hour of every day. (note that Airflow by default runs on UTC time) mysql_conn_id is the connection id for your SQL database, you can set this in admin -> connections from airflow UI. There you will set the username and password that Airflow uses to access your database. ‍
  2. The SQL script to perform this operation is stored in a separate file sample_sql.sql. This file is read in when the DAG is being run.
USE your_database;

DROP TABLE IF EXISTS event_stats_staging;
CREATE TABLE event_stats_staging
AS SELECT date
        , user_id
        , sum(spend_amt) total_spend_amt
     FROM event
    WHERE date = {{ macros.ds }} 
      AND user_id <> {{ params.test_user_id }}
    GROUP BY date, user_id;

   INSERT INTO event_stats (
          date
        , user_id
        , total_spend_amt
   )
   SELECT date
        , user_id
        , total_spend_amt
     FROM event_stats_staging;

DROP TABLE event_stats_staging;

The values within { } are called templated parameters. Airflow replaces them with a variable that is passed in through the DAG script at run-time or made available via Airflow metadata macros. This may seem like overkill for our use case. But it becomes very helpful when we have more complex logic and want to dynamically generate parts of the script, such as where clauses, at run time.

There are 2 key concepts in the templated SQL script shown above

  1. Airflow macros: They provide access to the metadata that is available for each DAG run. We use the execution date as it provides the previous date over which we want to aggregate the data. ref: https://airflow.apache.org/docs/stable/macros.html

  2. Templated parameters: If we want our SQL script to have some parameters that can be filled at run time from the DAG, we can pass them as parameters to the task. In our example we filter out a specific user_id (-99) before aggregation.

In this post we saw

  1. How to schedule SQL scripts using Apache Airflow
  2. How Airflow connects to the database using the connection id
  3. How to pass in parameters at run-time using input parameters and macros

Hope this gives you an understanding of how to schedule SQL scripts in Airflow and how to use templating. If you have any questions or comments, please let me know below.

Back to top

Land your dream Data Engineering job with my free book!

Build data engineering proficiency with my free book!

Are you looking to enter the field of data engineering? And are you

> Overwhelmed by all the concepts/jargon/frameworks of data engineering?

> Feeling lost because there is no clear roadmap for someone to quickly get up to speed with the essentials of data engineering?

Learning to be a data engineer can be a long and rough road, but it doesn't have to be!

Imagine knowing the fundamentals of data engineering that are crucial to any data team. You will be able to quickly pick up any new tool or framework.

Sign up for my free Data Engineering 101 Course. You will get

✅ Instant access to my Data Engineering 101 e-book, which covers SQL, Python, Docker, dbt, Airflow & Spark.

✅ Executable code to practice and exercises to test yourself.

✅ Weekly email for 4 weeks with the exercise solutions.

Join now and get started on your data engineering journey!

    Testimonials:

    I really appreciate you putting these detailed posts together for your readers, you explain things in such a detailed, simple manner that's well organized and easy to follow. I appreciate it so so much!
    I have learned a lot from the course which is much more practical.
    This course helped me build a project and actually land a data engineering job! Thank you.

    When you subscribe, you'll also get emails about data engineering concepts, development practices, career advice, and projects every 2 weeks (or so) to help you level up your data engineering skills. We respect your email privacy.