How to submit Spark jobs to EMR cluster from Airflow

Table of Contents

Introduction

I have been asked and seen the questions

how others are automating apache spark jobs on EMR

how to submit spark jobs to an EMR cluster from Airflow ?

In this post we go over the Apache Airflow way to

  1. Create an AWS EMR cluster.
  2. Submit Apache Spark jobs to the cluster using EMR’s Step function from Airflow.
  3. Wait for completion of the jobs.
  4. Terminate the AWS EMR cluster.

Design

Let’s build a simple DAG which uploads a local pyspark script and some data into a S3 bucket, starts an EMR cluster, submits a spark job that uses the uploaded script in the S3 bucket and when the job is complete terminates the EMR cluster. If you have an always-on spark cluster you can skip the tasks that start and terminate the EMR cluster.

Spark submit design

Setup

Prerequisites

  1. docker (make sure to have docker-compose as well).
  2. git to clone the starter repo.
  3. AWS account to set up required cloud services.
  4. Install and configure AWS CLI on your machine.

If this is your first time using AWS, make sure to check for presence of the EMR_EC2_DefaultRole and EMR_DefaultRole default role as shown below.

aws iam list-roles | grep 'EMR_DefaultRole\|EMR_EC2_DefaultRole'
# "RoleName": "EMR_DefaultRole",
# "RoleName": "EMR_EC2_DefaultRole",

If the roles not present, create them using the following command

aws emr create-default-roles

Also create a bucket, using the following command.

aws s3api create-bucket --acl public-read-write --bucket <your-bucket-name>

Throughout this post replace <your-bucket-name> with your bucket name. eg.) if your bucket name is my-bucket then the above command becomes aws s3api create-bucket --acl public-read-write --bucket my-bucket. Press q to exit the prompt. In real projects the bucket would not be open to the public as shown at public-read-write.

Clone repository

Let’s start by cloning the repository and switching to the start-here branch.

git clone https://github.com/josephmachado/spark_submit_airflow.git
cd spark_submit_airflow
git checkout start-here
rm -r .git # if you want to remove the git reference and initialize your own

In this branch we will have DummyOperator for all the tasks, throughout this post they will be replaced with the actual operators required.

Get data

In your project directory, download the data and move it to the appropriate location as shown below.

mkdir ./dags/data
wget https://www.dropbox.com/sh/amdyc6z8744hrl5/AACZ6P5QnM5nbX4Gnk9_JW0Ma/movie_review/movie_review.csv?dl=0
mv movie_review* ./dags/data/movie_review.csv

The project folder structure should look like shown below

Project folder structure

Code

Let’s start implementing the following sections.

  1. Move data and script to AWS S3
  2. Create an EMR cluster
  3. Run jobs in the EMR cluster and wait for it to complete
  4. Terminate the EMR cluster

The random_text_classification.py is a naive pyspark script that reads in our data and if the review contains the word good it classifies it as positive else negative review. The code is self explanatory. The dag defined at spark_submit_airflow.py is the outline we will build on. This is a simple dag scheduled to run at 10:00 AM UTC everyday.

Move data and script to the cloud

In most cases the data that needs to be processed is present in the AWS S3. We can also store our pyspark script in AWS S3 and let our spark job know where it is located. We use Apache Airflow’s S3Hook to connect to our S3 bucket and move the data and script to the required location.

from airflow.hooks.S3_hook import S3Hook
from airflow.operators import PythonOperator

# Configurations
BUCKET_NAME = "<your-bucket-name>"
local_data = "./dags/data/movie_review.csv"
s3_data = "data/movie_review.csv"
local_script = "./dags/scripts/spark/random_text_classification.py"
s3_script = "scripts/random_text_classification.py"

# helper function
def _local_to_s3(filename, key, bucket_name=BUCKET_NAME):
    s3 = S3Hook()
    s3.load_file(filename=filename, bucket_name=bucket_name, replace=True, key=key)

data_to_s3 = PythonOperator(
    dag=dag,
    task_id="data_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={"filename": local_data, "key": s3_data,},
)
script_to_s3 = PythonOperator(
    dag=dag,
    task_id="script_to_s3",
    python_callable=_local_to_s3,
    op_kwargs={"filename": local_script, "key": s3_script,},
)

From the above code snippet, we see how the local script file random_text_classification.py and data at movie_review.csv are moved to the S3 bucket that was created.

create an EMR cluster

Let’s create an EMR cluster. Apache Airflow has an EmrCreateJobFlowOperator operator to create an EMR cluster. We have to define the cluster configurations and the operator can use that to create the EMR cluster.

from airflow.contrib.operators.emr_create_job_flow_operator import (
    EmrCreateJobFlowOperator,
)

JOB_FLOW_OVERRIDES = {
    "Name": "Movie review classifier",
    "ReleaseLabel": "emr-5.29.0",
    "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}], # We want our EMR cluster to have HDFS and Spark
    "Configurations": [
        {
            "Classification": "spark-env",
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"}, # by default EMR uses py2, change it to py3
                }
            ],
        }
    ],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master node",
                "Market": "SPOT",
                "InstanceRole": "MASTER",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 1,
            },
            {
                "Name": "Core - 2",
                "Market": "SPOT", # Spot instances are a "use as available" instances
                "InstanceRole": "CORE",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 2,
            },
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False, # this lets us programmatically terminate the cluster
    },
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

# Create an EMR cluster
create_emr_cluster = EmrCreateJobFlowOperator(
    task_id="create_emr_cluster",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id="aws_default",
    emr_conn_id="emr_default",
    dag=dag,
)

The EmrCreateJobFlowOperator creates a cluster and stores the EMR cluster id(unique identifier) in xcom, which is a key value store used to access variables across Airflow tasks.

add steps and wait to complete

Let’s add the individual steps that we need to run on the cluster.

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator

s3_clean = "clean_data/"
SPARK_STEPS = [ # Note the params values are supplied to the operator
    {
        "Name": "Move raw data from S3 to HDFS",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=s3://{{ params.BUCKET_NAME }}/data",
                "--dest=/movie",
            ],
        },
    },
    {
        "Name": "Classify movie reviews",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode",
                "client",
                "s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
            ],
        },
    },
    {
        "Name": "Move clean data from HDFS to S3",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=/output",
                "--dest=s3://{{ params.BUCKET_NAME }}/{{ params.s3_clean }}",
            ],
        },
    },
]

# Add your steps to the EMR cluster
step_adder = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
    aws_conn_id="aws_default",
    steps=SPARK_STEPS,
    params={ # these params are used to fill the paramterized values in SPARK_STEPS json
        "BUCKET_NAME": BUCKET_NAME,
        "s3_data": s3_data,
        "s3_script": s3_script,
        "s3_clean": s3_clean,
    },
    dag=dag,
)

last_step = len(SPARK_STEPS) - 1 # this value will let the sensor know the last step to watch
# wait for the steps to complete
step_checker = EmrStepSensor(
    task_id="watch_step",
    job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')["
    + str(last_step)
    + "] }}",
    aws_conn_id="aws_default",
    dag=dag,
)

In the above code we can see that we specify 3 steps in the SPARK_STEPS json, they are

  1. copy data from AWS S3 into the clusters HDFS location /movie.
  2. Run a naive text classification spark script random_text_classification.py which reads input from /movie and write output to /output.
  3. Copy the data from cluster HDFS location /output to AWS S3 clean_data location, denoted by the s3_clean configuration variable.

We get the EMR cluster id from xcom as shown in job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}". We also get the last step id from xcom in our EmrStepSensor. The step sensor will periodically check if that last step is completed or skipped or terminated.

terminate EMR cluster

After the step sensor senses the completion of the last step, we can terminate our EMR cluster.

from airflow.contrib.operators.emr_terminate_job_flow_operator import (
    EmrTerminateJobFlowOperator,
)

# Terminate the EMR cluster
terminate_emr_cluster = EmrTerminateJobFlowOperator(
    task_id="terminate_emr_cluster",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
    aws_conn_id="aws_default",
    dag=dag,
)

In the above snippet, we get the cluster id from xcom and then terminate the cluster.

Run the DAG

Let’s create our Apache Airflow instance as shown below from our project directory

docker-compose -f docker-compose-LocalExecutor.yml up -d

This will create the services needed to run Apache Airflow locally. Wait for a couple of minutes(~1-2min) and then you can go to http://localhost:8080/admin/ to turn on the spark_submit_airflow DAG which is set to run at 10:00 AM UTC everyday. The DAG takes a while to complete since

  1. The data needs to be copied to S3.
  2. An EMR cluster needs to be started(this take around 8 - 10min generally).

You can see the status of the DAG at http://localhost:8080/admin/airflow/graph?dag_id=spark_submit_airflow

Spark submit DAG

NOTE: If your job fails or you stop your Airflow instance make sure to check your AWS EMR UI console to terminate any running EMR cluster. You can also remove the S3 you created as shown below

aws s3 rm s3://<your-bucket-name> --recursive
aws s3api delete-bucket --bucket <your-bucket-name>

Press q to exit the prompt. You can spin down the local Airflow instance as shown below.

docker-compose -f docker-compose-LocalExecutor.yml down

Conclusion

Hope this gives you an idea of how to create a temporary AWS EMR clusters to run spark jobs. One of the biggest issues with this approach is the time it takes to create the EMR cluster. If you are using an always-on EMR cluster you can skip the tasks to create and terminate the EMR cluster. In the above code snippet the jobs will run serially, there may be cases where parallel runs of jobs may be better. In such cases you can use this , to schedule steps in parallel. If you have any questions or comments please leave them in the comment section below.

Further reading

  1. Airflow pros and cons
  2. Scheduling SQL script with Airflow

Land your dream Data Engineering job!

Overwhelmed by all the concepts you need to learn to become a data engineer? Have difficulty finding good data projects for your portfolio? Are online tutorials littered with sponsored tools and not foundational concepts?

Learning data engineer can be a long and rough road, but it doesn't have to be!

Pick up any new tool/framework with a clear understanding of data engineering fundamentals. Demonstrate your expertise by building well-documented real-world projects on GitHub.

Sign up for my free DE-101 course that will take you from basics to building data projects in 4 weeks!

Join now and get started on your data engineering journey!

    Testimonials:

    I really appreciate you putting these detailed posts together for your readers, you explain things in such a detailed, simple manner that's well organized and easy to follow. I appreciate it so so much!
    I have learned a lot from the course which is much more practical.
    This course helped me build a project and actually land a data engineering job! Thank you.

    When you subscribe, you'll also get emails about data engineering concepts, development practices, career advice, and projects every 2 weeks (or so) to help you level up your data engineering skills. We respect your email privacy.

    M ↓   Markdown
    J
    Justin Wong
    2 points
    4 years ago

    Questions

    1. What is the point of argparse in random_text_classification? Why not just hardcode as /movies, and /output
    2. What is the best way to debug Spark script? Is it better to run the code through Jupyter notebook before integrating with Airflow?

    Learning lessons

    I took this base code and extended it to my own yelp-spark project. Here are some major learning lessons:

    1. When using s3-dist-cp, the --src arg MUST be a directory. If you want to filter for specific files, you would add another regex argument
    2. If you want to ssh into the EMR cluster, you need to update JOB_FLOW_OVERRIDES with this argument {Ec2KeyName: <key>}
    3. If you run into any issues with your steps, the console provides very poor logging, your best bet is to SSH into the cluster itself The steps logging file is located here: /mnt/var/log/hadoop/steps/ To figure out the step ID, go to EMR console, and go to Steps tab
    4. When you ssh into EMR, if you want to locate the files you moved from S3 >> HDFS, you need to run this command: hadoop fs -ls hdfs:///
    5. Inside pyspark program, I had a hard time figuring out how to reference the HDFS files that were moved earlier. This stackoverflow answer helped me understand.
    J
    Joseph Machado
    0 points
    4 years ago
    1. Yes we can hardcode the values, but argparser enables us to use different values if we choose to. True, In this case we could have just hard coded the values, but having an argparser is generally considered best practice.

    2. I do exactly what you had explained, I ssh into the cluster and get the code and executor sizes as expected. Then once I have that down automate with Airflow.

    Those are some great notes Justin. Do you mind if I add them as a separate section to this article ?

    J
    Justin Wong
    0 points
    4 years ago

    I see, thanks for the explanations. Sure thing, I would be honored!

    P
    PanzerFlow
    0 points
    3 years ago

    Thank you for lessons learned #1, I was stuck for a while until tried that.

    ?
    Anonymous
    1 point
    4 years ago

    Hi thanks for the wonderful guide but I am facing this error when I am running my dags and I look up it and try to solved but I didn't go can you please help me.

    Invalid bucket name "< mobeenmehdisparkjob>": Bucket name must match the regex "^[a-zA-Z0-9.-_]{1,255}$". ,

    J
    Joseph Machado
    0 points
    4 years ago

    Can you remove the < and > from you bucket name? The error says that is not allowed in the bucket name.

    i.e.. replace <your-bucket-name> with mobeenmehdisparkjob

    W
    WkYxnTGh
    0 points
    11 months ago

    e

    J
    Jessica Hitchcock
    1 point
    3 years ago

    Thanks for this! I am struggling to get this to work because the airflow I am using (v2.0.2) doesn't have a connection set up for emr_default. I get an error saying that the emr_default connection does not exist.

    Could some one please share what their emr_default connection looks like in the Airflow connections UI so that I can manually add this?

    J
    Joseph Machado
    1 point
    3 years ago

    Ah, I see the version of Airflow I used here comes with default emr_default connection.

    Go to Admin -> Connections tab and you can create a emr_default connection in Airflow UI. If you are not using the docker compose file provided in the repo here you will have to enter your aws creds in the UI. Hope this helps, lmk if you have more questions.

    W
    WkYxnTGh
    0 points
    11 months ago

    e

    J
    Justin Wong
    1 point
    4 years ago

    Forgot to say, thank you for the great article as usual Joseph!

    One additional note...

    I had to enter the following into Airflow UI (admin -> connections -> aws_default) before the load_to_s3 and EMR Operator commands would work:

    a. AWS key
    b. pw
    c. region name (https://airflow.apache.org/...)

    Caveat: I'm running the latest apache/airflow Docker image.

    J
    Joseph Machado
    2 points
    4 years ago

    Hi Justin, Thank you. In the repository we are copying over local AWS credentials into the container https://github.com/josephmachado/spark_submit_airflow/blob/21408da9a10f70143e3d7d608ab261dead059f90/docker-compose-LocalExecutor.yml#L39

    Airflow, uses boto3, which looks for the default AWS credentials in the default location, which I am assuming is not available in the other docker image.

    C
    Christopher Kindl
    1 point
    4 years ago

    Hi Joseph,

    I followed your tutorial so far. However, when the emr cluster created I get an error message saying: "The VPC/subnet configuration was invalid: Subnet is required : The specified instance type m5.xlarge can only be used in a VPC"

    I have the ID codes about VP and subnet, but how can I incorporate those values into the JOB_FLOW_OVERRIDES json? there are no examples out there when it comes to this information. Many thanks for helping!

    C
    Chris
    0 points
    4 years ago

    What is the difference between deploy-mode = client and deploy-mode=cluster with EMR? Should it always be cluster? Why do you use client?

    J
    Joseph Machado
    0 points
    4 years ago

    Hi Chris, Spark has a driver process, the client mode specifies driver to run outside your spark cluster workers. More details here: https://spark.apache.org/docs/latest/submitting-applications.html

    W
    WkYxnTGh
    0 points
    11 months ago

    e

    C
    Chris
    0 points
    4 years ago

    Can you please explain the line in your text classifier py code:
    if __name__ == "__main__":

    nvm, I found out here: https://stackoverflow.com/q...

    J
    Justin Wong
    0 points
    4 years ago

    Quick note:
    This didn't return anything for me:
    aws iam list-roles | grep 'EMR_DefaultRole|EMR_EC2_DefaultRole'

    but this did:
    aws iam list-roles | grep 'EMR_DefaultRole'

    J
    Joseph Machado
    0 points
    4 years ago

    @disqus_60LFr6SW35:disqus Interesting, the "|" in the grep command is an OR condition. We use aws iam list-roles | grep 'EMR_DefaultRole|EMR_EC2_DefaultRole' to check if your account has the necessary roles. If not, we create them with aws emr create-default-roles. After you create the roles this command aws iam list-roles | grep 'EMR_DefaultRole|EMR_EC2_DefaultRole' should return values.

    J
    Justin Wong
    0 points
    4 years ago

    I had to take out the escape char ''
    aws iam list-roles | grep 'EMR_DefaultRole|EMR_EC2_DefaultRole'

    I'm running OSX, maybe different on linux?

    W
    WkYxnTGh
    0 points
    11 months ago

    e

    ?
    Anonymous
    0 points
    3 years ago

    When I use your set-up, I don't get any errors, but I want to use a more recent version of Airflow to sync with my other dags (that do run via airflow 2.2.3), but this results in a "tocore.exceptions.NoCredentialsError: Unable to locate credentials" error. Any solutions?

    ?
    Anonymous
    0 points
    3 years ago

    it has something to do with the way how you configure airflow in docker versus how I installed it via https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html

    J
    Joseph Kevin Machado
    0 points
    3 years ago

    Hi, In the setup we mount the local aws credentials into the docker container using volumes, as shown here https://github.com/josephmachado/spark_submit_airflow/blob/21408da9a10f70143e3d7d608ab261dead059f90/docker-compose-LocalExecutor.yml#L39 . If you are setting up a separate docker container make sure to mount your local aws creds as shown in that link. Without mounting it, boto3 running within your container will not be able to locate the aws credentials.

    Alternatively you can also supply the creds in python, but that is generally not a safe practice. Hope this helps. LMK if you have more questions.

    W
    WkYxnTGh
    0 points
    11 months ago

    e

    ?
    Anonymous
    0 points
    4 years ago

    How do you add an argument to the python script in SPARK_STEPS? I tried adding the following but does not work:

    "s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
    "--<ARGUMENT>",
    "<VALUE>"
    
    J
    Joseph Machado
    0 points
    4 years ago

    That should work, as long as you have the python script set to receive that argument. Could you please post the python code which accepts the argument and the spark step definition ?

    W
    WkYxnTGh
    0 points
    11 months ago

    e

    J
    Jose
    0 points
    4 years ago

    For testing purposes I tried using a EmrAddStepsOperator to submit a python script that gets a macros as an argument:

    Operator:

    step_adder = EmrAddStepsOperator(
        task_id="add_steps",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id="aws_default",
        steps=SPARK_STEPS,
        params={
            "BUCKET_NAME": BUCKET_NAME,
            "s3_script": s3_script,
            "year": "{execution_date.year}",
        },
        dag=dag,
    )
    

    SPARK_STEPS:

    SPARK_STEPS = [
        {
            "Name": "Run script",
            "ActionOnFailure": "CANCEL_AND_WAIT",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "spark-submit",
                    "--deploy-mode",
                    "client",
                    "s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
                    "--year",
                    "{{params.year}}"
                ],
            },
        }
    ]
    

    Python script:

    import argparse
    if __name__ == "__main__":
        parser = argparse.ArgumentParser()
        parser.add_argument("--year", type=str, help="execution year", default="2016")
        args = parser.parse_args()
        year = args.year
    

    I got an unknown error on the watch_step. So I got another question: How to catch errors in a python script using the EmrAddStepsOperator?

    J
    Joseph Machado
    0 points
    4 years ago

    Hi Jose,

    1. This "{execution_date.year}" may not be giving you what you want.
    2. I got an unknown error on the watch_step => Hmm can you check if the steps ran from AWS EMR UI ? The watch steps waits for EMR cluster to start and wats for the last step to complete.
    3. How to catch errors in a python script using the EmrAddStepsOperator => Standard python testing would be the easiest option here.
    W
    WkYxnTGh
    0 points
    11 months ago

    e

    C
    Cương Nguyễn Ngọc
    0 points
    3 years ago

    hello. i just wonder why in S3Hook dont have any parameter for connection? thanks you for help ^^

    J
    Joseph Kevin Machado
    0 points
    3 years ago

    Hi, So the S3hook operator uses boto3 internally. Boto3 looks for AWS credentials in the /usr/local/airflow/.aws/ location within the docker. We copy over our local aws credentials to this location in the docker compose file here: https://github.com/josephmachado/spark_submit_airflow/blob/21408da9a10f70143e3d7d608ab261dead059f90/docker-compose-LocalExecutor.yml#L39

    Hope this helps. Please LMK if you have any questions.