How to submit Spark jobs to EMR cluster from Airflow

Table of Contents

Introduction

I have been asked and seen the questions

how others are automating apache spark jobs on EMR

how to submit spark jobs to an EMR cluster from Airflow ?

In this post we go over the Apache Airflow way to

  1. Create an AWS EMR cluster.
  2. Submit Apache Spark jobs to the cluster using EMR’s Step function from Airflow.
  3. Wait for completion of the jobs.
  4. Terminate the AWS EMR cluster.

Youtube version

If you prefer following along on video

Design

Let’s build a simple DAG which uploads a local pyspark script and some data into a S3 bucket, starts an EMR cluster, submits a spark job that uses the uploaded script in the S3 bucket and when the job is complete terminates the EMR cluster. If you have an always-on spark cluster you can skip the tasks that start and terminate the EMR cluster.

Spark submit design

Setup

Prerequisites

  1. docker (make sure to have docker-compose as well).
  2. git to clone the starter repo.
  3. AWS account to set up required cloud services.
  4. Install and configure AWS CLI on your machine.

If this is your first time using AWS, make sure to check for presence of the EMR_EC2_DefaultRole and EMR_DefaultRole default role as shown below.

aws iam list-roles | grep 'EMR_DefaultRole\|EMR_EC2_DefaultRole'
# "RoleName": "EMR_DefaultRole",
# "RoleName": "EMR_EC2_DefaultRole",

If the roles not present, create them using the following command

aws emr create-default-roles

Also create a bucket, using the following command.

aws s3api create-bucket --acl public-read-write --bucket <your-bucket-name>

Throughout this post replace <your-bucket-name> with your bucket name. eg.) if your bucket name is my-bucket then the above command becomes aws s3api create-bucket --acl public-read-write --bucket my-bucket. Press q to exit the prompt. In real projects the bucket would not be open to the public as shown at public-read-write.

Clone repository

Let’s start by cloning the repository and switching to the start-here branch.

git clone git@github.com:josephmachado/spark_submit_airflow.git
cd spark_submit_airflow
git checkout start-here
rm -r .git # if you want to remove the git reference and initialize your own

In this branch we will have DummyOperator for all the tasks, throughout this post they will be replaced with the actual operators required.

Get data

In your project directory, download the data and move it to the appropriate location as shown below.

mkdir ./dags/data
wget https://www.dropbox.com/sh/amdyc6z8744hrl5/AACZ6P5QnM5nbX4Gnk9_JW0Ma/movie_review/movie_review.csv?dl=0
mv movie_review* ./dags/data/movie_review.csv

The project folder structure should look like shown below

Project folder structure

Code

Let’s start implementing the following sections.

  1. Move data and script to AWS S3
  2. Create an EMR cluster
  3. Run jobs in the EMR cluster and wait for it to complete
  4. Terminate the EMR cluster

The random_text_classification.py is a naive pyspark script that reads in our data and if the review contains the word good it classifies it as positive else negative review. The code is self explanatory. The dag defined at spark_submit_airflow.py is the outline we will build on. This is a simple dag scheduled to run at 10:00 AM UTC everyday.

Move data and script to the cloud

In most cases the data that needs to be processed is present in the AWS S3. We can also store our pyspark script in AWS S3 and let our spark job know where it is located. We use Apache Airflow’s S3Hook to connect to our S3 bucket and move the data and script to the required location.

from airflow.hooks.S3_hook import S3Hook
from airflow.operators import PythonOperator

# Configurations
BUCKET_NAME = "<your-bucket-name>"
local_data = "./dags/data/movie_review.csv"
s3_data = "data/movie_review.csv"
local_script = "./dags/scripts/spark/random_text_classification.py"
s3_script = "scripts/random_text_classification.py"

# helper function
def _local_to_s3(filename, key, bucket_name=BUCKET_NAME):
    s3 = S3Hook()
    s3.load_file(filename=filename, bucket_name=bucket_name, replace=True, key=key)

data_to_s3 = PythonOperator(
    dag=dag,
    task_id="data_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={"filename": local_data, "key": s3_data,},
)
script_to_s3 = PythonOperator(
    dag=dag,
    task_id="script_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={"filename": local_script, "key": s3_script,},
)

From the above code snippet, we see how the local script file random_text_classification.py and data at movie_review.csv are moved to the S3 bucket that was created.

create an EMR cluster

Let’s create an EMR cluster. Apache Airflow has an EmrCreateJobFlowOperator operator to create an EMR cluster. We have to define the cluster configurations and the operator can use that to create the EMR cluster.

from airflow.contrib.operators.emr_create_job_flow_operator import (
    EmrCreateJobFlowOperator,
)

JOB_FLOW_OVERRIDES = {
    "Name": "Movie review classifier",
    "ReleaseLabel": "emr-5.29.0",
    "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}], # We want our EMR cluster to have HDFS and Spark
    "Configurations": [
        {
            "Classification": "spark-env",
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"}, # by default EMR uses py2, change it to py3
                }
            ],
        }
    ],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master node",
                "Market": "SPOT",
                "InstanceRole": "MASTER",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 1,
            },
            {
                "Name": "Core - 2",
                "Market": "SPOT", # Spot instances are a "use as available" instances
                "InstanceRole": "CORE",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 2,
            },
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False, # this lets us programmatically terminate the cluster
    },
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

# Create an EMR cluster
create_emr_cluster = EmrCreateJobFlowOperator(
    task_id="create_emr_cluster",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id="aws_default",
    emr_conn_id="emr_default",
    dag=dag,
)

The EmrCreateJobFlowOperator creates a cluster and stores the EMR cluster id(unique identifier) in xcom, which is a key value store used to access variables across Airflow tasks.

add steps and wait to complete

Let’s add the individual steps that we need to run on the cluster.

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator

s3_clean = "clean_data/"
SPARK_STEPS = [ # Note the params values are supplied to the operator
    {
        "Name": "Move raw data from S3 to HDFS",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=s3://{{ params.BUCKET_NAME }}/data",
                "--dest=/movie",
            ],
        },
    },
    {
        "Name": "Classify movie reviews",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode",
                "client",
                "s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
            ],
        },
    },
    {
        "Name": "Move clean data from HDFS to S3",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=/output",
                "--dest=s3://{{ params.BUCKET_NAME }}/{{ params.s3_clean }}",
            ],
        },
    },
]

# Add your steps to the EMR cluster
step_adder = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
    aws_conn_id="aws_default",
    steps=SPARK_STEPS,
    params={ # these params are used to fill the paramterized values in SPARK_STEPS json
        "BUCKET_NAME": BUCKET_NAME,
        "s3_data": s3_data,
        "s3_script": s3_script,
        "s3_clean": s3_clean,
    },
    dag=dag,
)

last_step = len(SPARK_STEPS) - 1 # this value will let the sensor know the last step to watch
# wait for the steps to complete
step_checker = EmrStepSensor(
    task_id="watch_step",
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')["
    + str(last_step)
    + "] }}",
    aws_conn_id="aws_default",
    dag=dag,
)

In the above code we can see that we specify 3 steps in the SPARK_STEPS json, they are

  1. copy data from AWS S3 into the clusters HDFS location /movie.
  2. Run a naive text classification spark script random_text_classification.py which reads input from /movie and write output to /output.
  3. Copy the data from cluster HDFS location /output to AWS S3 clean_data location, denoted by the s3_clean configuration variable.

We get the EMR cluster id from xcom as shown in job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}". We also get the last step id from xcom in our EmrStepSensor. The step sensor will periodically check if that last step is completed or skipped or terminated.

terminate EMR cluster

After the step sensor senses the completion of the last step, we can terminate our EMR cluster.

from airflow.contrib.operators.emr_terminate_job_flow_operator import (
    EmrTerminateJobFlowOperator,
)

# Terminate the EMR cluster
terminate_emr_cluster = EmrTerminateJobFlowOperator(
    task_id="terminate_emr_cluster",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
    aws_conn_id="aws_default",
    dag=dag,
)

In the above snippet, we get the cluster id from xcom and then terminate the cluster.

Run the DAG

Let’s create our Apache Airflow instance as shown below from our project directory

docker-compose -f docker-compose-LocalExecutor.yml up -d

This will create the services needed to run Apache Airflow locally. Wait for a couple of minutes(~1-2min) and then you can go to http://localhost:8080/admin/ to turn on the spark_submit_airflow DAG which is set to run at 10:00 AM UTC everyday. The DAG takes a while to complete since

  1. The data needs to be copied to S3.
  2. An EMR cluster needs to be started(this take around 8 - 10min generally).

You can see the status of the DAG at http://localhost:8080/admin/airflow/graph?dag_id=spark_submit_airflow

Spark submit DAG

NOTE: If your job fails or you stop your Airflow instance make sure to check your AWS EMR UI console to terminate any running EMR cluster. You can also remove the S3 you created as shown below

aws s3 rm s3://<your-bucket-name> --recursive
aws s3api delete-bucket --bucket <your-bucket-name>

Press q to exit the prompt. You can spin down the local Airflow instance as shown below.

docker-compose -f docker-compose-LocalExecutor.yml down

Conclusion

Hope this gives you an idea of how to create a temporary AWS EMR clusters to run spark jobs. One of the biggest issues with this approach is the time it takes to create the EMR cluster. If you are using an always-on EMR cluster you can skip the tasks to create and terminate the EMR cluster. In the above code snippet the jobs will run serially, there may be cases where parallel runs of jobs may be better. In such cases you can use this, to schedule steps in parallel. If you have any questions or comments please leave them in the comment section below.

References:

  1. https://airflow.readthedocs.io/en/latest/howto/operator/amazon/aws/emr.html
  2. https://github.com/puckel/docker-airflow