How to submit Spark jobs to EMR cluster from Airflow
Table of Contents
Introduction
I have been asked and seen the questions
how others are automating apache spark jobs on EMR
how to submit spark jobs to an EMR cluster from Airflow ?
In this post we go over the Apache Airflow
way to
- Create an AWS EMR cluster.
- Submit Apache Spark jobs to the cluster using EMR’s Step function from Airflow.
- Wait for completion of the jobs.
- Terminate the AWS EMR cluster.
Design
Let’s build a simple DAG
which uploads a local pyspark
script and some data into a S3 bucket, starts an EMR cluster, submits a spark job that uses the uploaded script in the S3 bucket and when the job is complete terminates the EMR cluster. If you have an always-on spark cluster you can skip the tasks that start and terminate the EMR cluster.
Setup
Prerequisites
- docker (make sure to have docker-compose as well).
- git to clone the starter repo.
- AWS account to set up required cloud services.
- Install and configure AWS CLI on your machine.
If this is your first time using AWS, make sure to check for presence of the EMR_EC2_DefaultRole
and EMR_DefaultRole
default role as shown below.
aws iam list-roles | grep 'EMR_DefaultRole\|EMR_EC2_DefaultRole'
# "RoleName": "EMR_DefaultRole",
# "RoleName": "EMR_EC2_DefaultRole",
If the roles not present, create them using the following command
aws emr create-default-roles
Also create a bucket, using the following command.
aws s3api create-bucket --acl public-read-write --bucket <your-bucket-name>
Throughout this post replace <your-bucket-name>
with your bucket name. eg.) if your bucket name is my-bucket
then the above command becomes aws s3api create-bucket --acl public-read-write --bucket my-bucket
. Press q
to exit the prompt. In real projects the bucket would not be open to the public as shown at public-read-write
.
Clone repository
Let’s start by cloning the repository and switching to the start-here
branch.
git clone https://github.com/josephmachado/spark_submit_airflow.git
cd spark_submit_airflow
git checkout start-here
rm -r .git # if you want to remove the git reference and initialize your own
In this branch we will have DummyOperator
for all the tasks, throughout this post they will be replaced with the actual operators required.
Get data
In your project directory, download the data and move it to the appropriate location as shown below.
mkdir ./dags/data
wget https://www.dropbox.com/sh/amdyc6z8744hrl5/AACZ6P5QnM5nbX4Gnk9_JW0Ma/movie_review/movie_review.csv?dl=0
mv movie_review* ./dags/data/movie_review.csv
The project folder structure should look like shown below
Code
Let’s start implementing the following sections.
- Move data and script to AWS S3
- Create an EMR cluster
- Run jobs in the EMR cluster and wait for it to complete
- Terminate the EMR cluster
The random_text_classification.py
is a naive pyspark
script that reads in our data and if the review contains the word good
it classifies it as positive else negative review. The code is self explanatory. The dag defined at spark_submit_airflow.py
is the outline we will build on. This is a simple dag scheduled to run at 10:00 AM UTC everyday.
Move data and script to the cloud
In most cases the data that needs to be processed is present in the AWS S3
. We can also store our pyspark
script in AWS S3
and let our spark job know where it is located. We use Apache Airflow’s S3Hook
to connect to our S3 bucket and move the data and script to the required location.
from airflow.hooks.S3_hook import S3Hook
from airflow.operators import PythonOperator
# Configurations
BUCKET_NAME = "<your-bucket-name>"
local_data = "./dags/data/movie_review.csv"
s3_data = "data/movie_review.csv"
local_script = "./dags/scripts/spark/random_text_classification.py"
s3_script = "scripts/random_text_classification.py"
# helper function
def _local_to_s3(filename, key, bucket_name=BUCKET_NAME):
s3 = S3Hook()
s3.load_file(filename=filename, bucket_name=bucket_name, replace=True, key=key)
data_to_s3 = PythonOperator(
dag=dag,
task_id="data_to_s3",
python_callable=_local_to_s3,
op_kwargs={"filename": local_data, "key": s3_data,},
)
script_to_s3 = PythonOperator(
dag=dag,
task_id="script_to_s3",
python_callable=_local_to_s3,
op_kwargs={"filename": local_script, "key": s3_script,},
)
From the above code snippet, we see how the local script file random_text_classification.py
and data at movie_review.csv
are moved to the S3
bucket that was created.
create an EMR cluster
Let’s create an EMR cluster. Apache Airflow has an EmrCreateJobFlowOperator
operator to create an EMR cluster. We have to define the cluster configurations and the operator can use that to create the EMR cluster.
from airflow.contrib.operators.emr_create_job_flow_operator import (
EmrCreateJobFlowOperator,
)
JOB_FLOW_OVERRIDES = {
"Name": "Movie review classifier",
"ReleaseLabel": "emr-5.29.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}], # We want our EMR cluster to have HDFS and Spark
"Configurations": [
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"}, # by default EMR uses py2, change it to py3
}
],
}
],
"Instances": {
"InstanceGroups": [
{
"Name": "Master node",
"Market": "SPOT",
"InstanceRole": "MASTER",
"InstanceType": "m4.xlarge",
"InstanceCount": 1,
},
{
"Name": "Core - 2",
"Market": "SPOT", # Spot instances are a "use as available" instances
"InstanceRole": "CORE",
"InstanceType": "m4.xlarge",
"InstanceCount": 2,
},
],
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False, # this lets us programmatically terminate the cluster
},
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole",
}
# Create an EMR cluster
create_emr_cluster = EmrCreateJobFlowOperator(
task_id="create_emr_cluster",
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id="aws_default",
emr_conn_id="emr_default",
dag=dag,
)
The EmrCreateJobFlowOperator
creates a cluster and stores the EMR cluster id(unique identifier) in xcom
, which is a key value store used to access variables across Airflow tasks.
add steps and wait to complete
Let’s add the individual steps that we need to run on the cluster.
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
s3_clean = "clean_data/"
SPARK_STEPS = [ # Note the params values are supplied to the operator
{
"Name": "Move raw data from S3 to HDFS",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"s3-dist-cp",
"--src=s3://{{ params.BUCKET_NAME }}/data",
"--dest=/movie",
],
},
},
{
"Name": "Classify movie reviews",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"client",
"s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
],
},
},
{
"Name": "Move clean data from HDFS to S3",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"s3-dist-cp",
"--src=/output",
"--dest=s3://{{ params.BUCKET_NAME }}/{{ params.s3_clean }}",
],
},
},
]
# Add your steps to the EMR cluster
step_adder = EmrAddStepsOperator(
task_id="add_steps",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id="aws_default",
steps=SPARK_STEPS,
params={ # these params are used to fill the paramterized values in SPARK_STEPS json
"BUCKET_NAME": BUCKET_NAME,
"s3_data": s3_data,
"s3_script": s3_script,
"s3_clean": s3_clean,
},
dag=dag,
)
last_step = len(SPARK_STEPS) - 1 # this value will let the sensor know the last step to watch
# wait for the steps to complete
step_checker = EmrStepSensor(
task_id="watch_step",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')["
+ str(last_step)
+ "] }}",
aws_conn_id="aws_default",
dag=dag,
)
In the above code we can see that we specify 3 steps in the SPARK_STEPS
json, they are
- copy data from AWS S3 into the clusters HDFS location
/movie
. - Run a naive text classification spark script
random_text_classification.py
which reads input from/movie
and write output to/output
. - Copy the data from cluster HDFS location
/output
to AWS S3clean_data
location, denoted by the s3_clean configuration variable.
We get the EMR cluster id from xcom
as shown in job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}"
. We also get the last step id from xcom
in our EmrStepSensor
. The step sensor will periodically check if that last step is completed or skipped or terminated.
terminate EMR cluster
After the step sensor senses the completion of the last step, we can terminate our EMR cluster.
from airflow.contrib.operators.emr_terminate_job_flow_operator import (
EmrTerminateJobFlowOperator,
)
# Terminate the EMR cluster
terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id="terminate_emr_cluster",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id="aws_default",
dag=dag,
)
In the above snippet, we get the cluster id from xcom
and then terminate the cluster.
Run the DAG
Let’s create our Apache Airflow instance as shown below from our project directory
docker-compose -f docker-compose-LocalExecutor.yml up -d
This will create the services needed to run Apache Airflow locally. Wait for a couple of minutes(~1-2min) and then you can go to http://localhost:8080/admin/
to turn on the spark_submit_airflow
DAG which is set to run at 10:00 AM UTC everyday. The DAG takes a while to complete since
- The data needs to be copied to S3.
- An EMR cluster needs to be started(this take around 8 - 10min generally).
You can see the status of the DAG at http://localhost:8080/admin/airflow/graph?dag_id=spark_submit_airflow
NOTE: If your job fails or you stop your Airflow instance make sure to check your AWS EMR UI console to terminate any running EMR cluster. You can also remove the S3 you created as shown below
aws s3 rm s3://<your-bucket-name> --recursive
aws s3api delete-bucket --bucket <your-bucket-name>
Press q
to exit the prompt. You can spin down the local Airflow instance as shown below.
docker-compose -f docker-compose-LocalExecutor.yml down
Conclusion
Hope this gives you an idea of how to create a temporary AWS EMR clusters to run spark jobs. One of the biggest issues with this approach is the time it takes to create the EMR cluster. If you are using an always-on EMR cluster you can skip the tasks to create and terminate the EMR cluster. In the above code snippet the jobs will run serially, there may be cases where parallel runs of jobs may be better. In such cases you can use this , to schedule steps in parallel. If you have any questions or comments please leave them in the comment section below.
Questions
Learning lessons
I took this base code and extended it to my own yelp-spark project. Here are some major learning lessons:
/mnt/var/log/hadoop/steps/
To figure out the step ID, go to EMR console, and go toSteps
tabhadoop fs -ls hdfs:///
Yes we can hardcode the values, but argparser enables us to use different values if we choose to. True, In this case we could have just hard coded the values, but having an argparser is generally considered best practice.
I do exactly what you had explained, I ssh into the cluster and get the code and executor sizes as expected. Then once I have that down automate with Airflow.
Those are some great notes Justin. Do you mind if I add them as a separate section to this article ?
I see, thanks for the explanations. Sure thing, I would be honored!
Thank you for lessons learned #1, I was stuck for a while until tried that.
Hi thanks for the wonderful guide but I am facing this error when I am running my dags and I look up it and try to solved but I didn't go can you please help me.
Invalid bucket name "< mobeenmehdisparkjob>": Bucket name must match the regex "^[a-zA-Z0-9.-_]{1,255}$". ,
Can you remove the
<
and>
from you bucket name? The error says that is not allowed in the bucket name.i.e.. replace
<your-bucket-name>
withmobeenmehdisparkjob
e
Thanks for this! I am struggling to get this to work because the airflow I am using (v2.0.2) doesn't have a connection set up for
emr_default
. I get an error saying that the emr_default connection does not exist.Could some one please share what their emr_default connection looks like in the Airflow connections UI so that I can manually add this?
Ah, I see the version of Airflow I used here comes with default emr_default connection.
Go to Admin -> Connections tab and you can create a
emr_default
connection in Airflow UI. If you are not using the docker compose file provided in the repo here you will have to enter your aws creds in the UI. Hope this helps, lmk if you have more questions.e
Forgot to say, thank you for the great article as usual Joseph!
One additional note...
I had to enter the following into Airflow UI (admin -> connections -> aws_default) before the load_to_s3 and EMR Operator commands would work:
a. AWS key
b. pw
c. region name (https://airflow.apache.org/...)
Caveat: I'm running the latest apache/airflow Docker image.
Hi Justin, Thank you. In the repository we are copying over local AWS credentials into the container https://github.com/josephmachado/spark_submit_airflow/blob/21408da9a10f70143e3d7d608ab261dead059f90/docker-compose-LocalExecutor.yml#L39
Airflow, uses boto3, which looks for the default AWS credentials in the default location, which I am assuming is not available in the other docker image.
Hi Joseph,
I followed your tutorial so far. However, when the emr cluster created I get an error message saying: "The VPC/subnet configuration was invalid: Subnet is required : The specified instance type m5.xlarge can only be used in a VPC"
I have the ID codes about VP and subnet, but how can I incorporate those values into the JOB_FLOW_OVERRIDES json? there are no examples out there when it comes to this information. Many thanks for helping!
What is the difference between deploy-mode = client and deploy-mode=cluster with EMR? Should it always be cluster? Why do you use client?
Hi Chris, Spark has a driver process, the client mode specifies driver to run outside your spark cluster workers. More details here: https://spark.apache.org/docs/latest/submitting-applications.html
e
Can you please explain the line in your text classifier py code:
if __name__ == "__main__":
nvm, I found out here: https://stackoverflow.com/q...
Quick note:
This didn't return anything for me:
aws iam list-roles | grep 'EMR_DefaultRole|EMR_EC2_DefaultRole'
but this did:
aws iam list-roles | grep 'EMR_DefaultRole'
@disqus_60LFr6SW35:disqus Interesting, the "|" in the grep command is an OR condition. We use aws iam list-roles | grep 'EMR_DefaultRole|EMR_EC2_DefaultRole' to check if your account has the necessary roles. If not, we create them with aws emr create-default-roles. After you create the roles this command aws iam list-roles | grep 'EMR_DefaultRole|EMR_EC2_DefaultRole' should return values.
I had to take out the escape char ''
aws iam list-roles | grep 'EMR_DefaultRole|EMR_EC2_DefaultRole'
I'm running OSX, maybe different on linux?
e
When I use your set-up, I don't get any errors, but I want to use a more recent version of Airflow to sync with my other dags (that do run via airflow 2.2.3), but this results in a "tocore.exceptions.NoCredentialsError: Unable to locate credentials" error. Any solutions?
it has something to do with the way how you configure airflow in docker versus how I installed it via https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html
Hi, In the setup we mount the local aws credentials into the docker container using volumes, as shown here https://github.com/josephmachado/spark_submit_airflow/blob/21408da9a10f70143e3d7d608ab261dead059f90/docker-compose-LocalExecutor.yml#L39 . If you are setting up a separate docker container make sure to mount your local aws creds as shown in that link. Without mounting it, boto3 running within your container will not be able to locate the aws credentials.
Alternatively you can also supply the creds in python, but that is generally not a safe practice. Hope this helps. LMK if you have more questions.
e
How do you add an argument to the python script in SPARK_STEPS? I tried adding the following but does not work:
That should work, as long as you have the python script set to receive that argument. Could you please post the python code which accepts the argument and the spark step definition ?
e
For testing purposes I tried using a
EmrAddStepsOperator
to submit a python script that gets a macros as an argument:Operator:
SPARK_STEPS:
Python script:
I got an unknown error on the
watch_step
. So I got another question: How to catch errors in a python script using theEmrAddStepsOperator
?Hi Jose,
"{execution_date.year}"
may not be giving you what you want.e
hello. i just wonder why in S3Hook dont have any parameter for connection? thanks you for help ^^
Hi, So the S3hook operator uses boto3 internally. Boto3 looks for AWS credentials in the
/usr/local/airflow/.aws/
location within the docker. We copy over our local aws credentials to this location in the docker compose file here: https://github.com/josephmachado/spark_submit_airflow/blob/21408da9a10f70143e3d7d608ab261dead059f90/docker-compose-LocalExecutor.yml#L39Hope this helps. Please LMK if you have any questions.