Assignment 5
Introduction
In this assignment, you will run Spark jobs on Amazon Web Service's (AWS) Elastic MapReduce (EMR) service, a cluster computing facility in AWS cloud. It was named for MapReduce as it was the original 'feature' but the service has continually expanded to cover emerging workload/apps/frameworks. Spark was added in EMR 4 and has since tracked the evolution of Spark; you can transfer the lessons you learned on the local cluster directly over to AWS.
Some advantages of an EMR cluster over a local cluster:
- It can read directly from S3 object storage, a widely-used AWS service.
- Many organizations already store their data on AWS, so running the analytics there as well makes sense.
- It is optimized for a cloud environment.
- It integrates with other AWS services (metrics, machine learning, queuing, ...)
- It scales on demand—pay for what you need, when you use it.
- It features specialized hardware that can accelerate tasks such as machine learning.
Some disadvantages:
- It has a complex console for interactive use.
- You have to turn clusters off when not in use if you do not want to pay for idle capacity.
Although we will focus on Amazon's service, similar tools exist on other major cloud vendors:
In addition, Databricks, the company founded by the creator of Spark, offers a managed Spark service that runs on all the above vendors.
One important point that we will return to several times in this assignment: When an EMR cluster (or indeed any cloud computing resource) is running, you are charged for every minute, whether or not it is performing any productive work for you in your application. When you are done working with a cluster, either to take a break during the assignment or when you are completely done, terminate the cluster to stop the charges. We will describe how to do that in a later section.
Prerequisite 1: Code that you will need from Assignment 4
Before starting this assignment, you will need the following programs from Assignment 4:
relative_score_bcast.py
weather_etl.py
Both should be correct, of course. If you have doubts, test them on the local cluster before running them on EMR.
Prerequisite 2: Tunnelling Spark History for the local cluster
In Step 3 of the Assignment, you will look up the Spark History for an application run on the local cluster. In order to do this you will need to configure tunnelling for this feature.
If you have not already done so, add the following line to
your entry in ~/.ssh/config
for the local cluster:
LocalForward 18080 controller.local:18080
Preparing your S3 bucket
Most EMR applications take their input from Amazon S3, the object storage service for AWS. S3 can inexpensively store very large datasets and is often used for data lakes on AWS. Analytics tools such as Amazon Redshift, Amazon EMR Spark, and Snowflake can all take input from S3. We will use EMR Spark in this assignment.
The top-level unit of S3 is a bucket, which is located in an AWS Region. Unlike many other AWS services, an S3 bucket is not further localized by Availability Zone within a Region.
There is one further complication: S3 bucket names are globally
unique. And when Amazon says "global", they mean it. If you
successfully create a bucket named
my-oh-so-marvellous-bucket-yes-yes
, no one else can use that bucket
name. Not any other user. Not in any region. So long as you have that
bucket name, no one else can use it anywhere in the world. (There
are some specialized AWS "partitions", such as the one for the US
Government, that have their own namespace but virtually all
applications run in the general partition that we will use.)
For this assignment, we will write BUCKET anywhere you need to place your bucket name. For example, if we ask you to submit the parameters,
--conf spark.yarn.maxAppAttempts=1 s3://BUCKET/relative_score_bcast.py s3://BUCKET/reddit-1 s3://BUCKET/output/reddit-1-1
and your bucket name is my-bouquet
, you would write
--conf spark.yarn.maxAppAttempts=1 s3://my-bouquet/relative_score_bcast.py s3://my-bouquet/reddit-1 s3://my-bouquet/output/reddit-1-1
The best practice for naming your AWS resources is to adopt a naming convention to document the purpose, type and scope of each resource. AWS (and cloud services) also has a tag feature that offers a programmatic approach but we will not be using it here. See here for a starting point on naming conventions.
For this course, start with the bucket name of c732-
SFU-ID-a5
.
Create an S3 bucket for your course work
To create your bucket, do the following steps:
- Sign on to your IAM id for AWS.
- Go to the S3 Console:
- Drop down the menu labelled
Services
(in the top left corner of the AWS landing page). - Locate the
Storage
category in the menu and click onS3
within that category.
- Drop down the menu labelled
- Click
Create bucket
(middle right), producing a dialog box. - In the dialog box, enter:
- Bucket name (Text):
BUCKET
(Start with the namec732-
SFU-ID-a5
and add qualifiers as required to uniqueify your name.) - AWS Region (Dropdown):
US West (Oregon) us-west-2
- Accept defaults for all other items
- Scroll to the bottom of the page and click
Create bucket
.
- Bucket name (Text):
- A status bar will appear top-of-page. If it is green with
Successfully created ...
, close the dialog by clicking theX
.
Your EMR Spark jobs will get their source code and input data from your new bucket, as well as send output there.
Upload your code to the S3 bucket
The first step is to upload your two Spark programs from Assignment 4 to your new S3 bucket.
- Go to the S3 top-level Console.
- You should see
BUCKET
in the list of Buckets. - Click
BUCKET
. - Click
Upload
(upper left) to go to theUpload
dialog. - Click
Add files
. - Using your operating system's selection tool, locate and select
relative_score_bcast.py
on your computer. - Use the same process to select
weather_etl.py
for upload. - The S3
Upload
dialog should now list the two Python programs in itsFiles and folders
section ands3://BUCKET
in itsDestination
section. - Click
Upload
. - A status bar will appear top-of-page. If it is green with
Upload succeeded
, clickClose
.
Upload the reddit-1
dataset to the S3 bucket
You will also need to get a copy of the reddit-1
dataset on your own
machine:
- From the list of files at
ggbaker.ca/732-datasets/,
select
reddit-1.zip
and download it. - Unzip the file, creating a directory named
reddit-1
.
Now upload that directory to S3:
- From the
BUCKET
bucket, clickUpload
. - Click
Add folder
. - Using your operating system's selection tool, select the
reddit-1
directory. - Confirm that you want to upload 10 files.
- The S3
Upload
dialog should now list all 10 partitions ofreddit-1
in itsFiles and folders
section ands3://BUCKET
in itsDestination
section. - Click
Upload
. - A blue status bar will appear, turning green on success. Click
Close
.
Create an output folder in the bucket
The last preparatory step is to create a folder for output from your Spark runs:
- From the
BUCKET
bucket, clickCreate folder
. - Enter the name
output
. - Click
Create folder
. - Once again you should see a green status bar. Close it.
Verify the bucket contents
Verify that your BUCKET
bucket contains the following:
relative_score_bcast.py
weather_etl.py
- A
reddit-1
folder with 10 partitions inside it. - An
output
folder.
Starting an EMR cluster
Now that you have your code and input data ready, you can run your first Spark job on EMR.
As mentioned in the introduction, the consoles for AWS services tend to be complex and busy. Most of the information they display is irrelevant to your purposes. The key here is to focus on the few items that matter to you and ignoring the rest. This is a useful professional skill for working with AWS specifically and cloud systems more generally.
To start your first cluster:
- Sign on to your IAM id for AWS.
- Go to the EMR Console:
- Enter
emr
in the search bar at the top of the page - Click on the item labelled
EMR
.
- Enter
- In the upper right corner, select
Oregon
from the dropdown list of AWS Regions. - In the sidebar menu, section
EMR on EC2
, click onClusters
. - Click
Create cluster
.
For this step of the assignment, we're going to create a small, simple
cluster, using the Quick Options
dialog.
-
General configuration
section:- Cluster name (Text):
c732-emr-2x-m5.2xl
(Most other AWS resources including EMR clusters are only scoped within your account and need not be globally unique.) - Logging (Checkbox): Checked
- Cluster name (Text):
-
Software configuration
section:- Release (Dropdown): choose the most recent EMR release
- Applications (Radio button):
Spark
-
Cluster configuration
section:- Instance type (Dropdown):
m5.2xlarge
(for Primary and Core) - Delete the
Task
instance group.
- Instance type (Dropdown):
-
Cluster scaling and provisioning option
section:- Instance(s) of core nodes:
2
- Instance(s) of core nodes:
-
Security configuration and EC2 key pair
section:- Amazon EC2 key pair for SSH: you do not need to specify a key pair. (If you have created a key pair in your AWS account previously, you do not need to use one at this time.)
-
Identity and Access Management (IAM) roles
section:- Service role:
EMR_DefaultRole
- Instance profile:
EMR_EC2_DefaultRole
- Service role:
-
Click
Create cluster
.
It can take 5–15 minutes to create a cluster, depending upon the
current AWS load. Near the top of the status screen, you will see a
green Starting
status. Wait until the status changes to Running
.
Terminating the cluster if you want to take a break
This assignment can take considerable time to complete. If you want to break and resume later, you will want to terminate the cluster so that you do not accumulate charges while you are away.
To terminate the cluster and stop charges, click Terminate
at the
top of the cluster window.
Starting a clone of a terminated cluster
When you want to restart work, instead of creating a new cluster from scratch, you can start a clone:
- Open the list of clusters by clicking on
Clusters
in the sidebar, under theEMR on EC2
header. - In the list of clusters, click the box to the left of the one you would like to clone.
- Press
Clone
. - When asked, "Would you like to include steps?", check
No
and pressClone
. - After a pause, you will see a page summarizing the cluster.
- The cluster is specified by a sequence of four pages, of which you
are currently seeing the last. If you want to modify a parameter of
the cluster, you can press
Previous
andNext
to locate the page on which to modify it. Then pressNext
to return to the fourth page. - Once you are satisfied with the cluster specification, press
Create cluster
.
The new cluster will take several minutes to start. When its
status becomes Waiting
you can resume the assignment where you left
off.
Section 1: Submitting a first Spark application
Once the status line reads Waiting
in green, you can submit Spark
applications. On EMR, each application is called a "Step". For
interactive use, you start up a cluster, submit steps one at a time,
then terminate the cluster when you are done. In batch use, you would
instead predefine the list of steps and they would begin to run
as soon as the cluster was ready, after which it would terminate.
-
Select the
Steps
tab. -
Click
Add step
. -
In the
Add step
dialog:- Step type (Dropdown):
Spark Application
- Name (Text):
Relative average Reddit-1
- Deploy mode (Dropdown):
Client
- Application location (Text):
s3://BUCKET/relative_score_bcast.py
- Spark-submit options (Text):
--conf spark.yarn.maxAppAttempts=1
- Arguments (Text):
s3://BUCKET/reddit-1 s3://BUCKET/output/reddit-1-1
Note: The text box is narrow and will wrap your arguments at spaces and hyphens. The arguments will nonetheless be correctly formatted when submitted to Spark. - Action on failure (Dropdown):
Continue
[Default]
- Step type (Dropdown):
-
Click
Add step
(Lower right)
Wait for change of Status
EMR steps proceed through three Status
values: From Pending
to Running
and concluding in either Completed
(in gray) orFailed
(in red):
Your step
will start with a Pending
Status. This display does not automatically refresh,
so you will have to press the the "Refresh" icon (clockwise circle) at
top of step list, and wait for Status to change to Running
and
eventually either of the final status values.
Diagnosing a Failed
Status
If your step failed, it was most likely due to one of two reasons. You can diagnose these case with a few operations on the console.
-
The output folder already exists. An annoying feature of Spark is that its jobs can fail in a write stage if you specify an output directory that already exists. Click the leftmost triangle on the step to open up the details:
In this case, the
Reason
section will state that the output directory already exists. -
There is an error in the Python code. If you have a Python error, the Step details will have a reason section reading
Unknown error
and you will have to look at thestdout
log for details. It can take several minutes for that log to become available from the EMR Console. Initially, you will see something like this (note the "No logs created yet" in the upper right):(The "No logs created yet" message is misleading. The logs have been created on the cluster but not yet transferred to the management console.) Occasionally press the "refresh" icon (the clockwise arrow) until the logs become available:
Click
stdout
to see the output, which should indicate the type of error and its line number.
Viewing the results of a success
If the step ended with a "Completed" status, it will have produced
output partitions in the S3 directory you specified,
output/reddit-1-1
. In the S3 console, navigate to that
directory. Download the part-00000
partition and verify that it
contains the values you expect from Assignment 4.
We won't be using the output files any more, so you can delete them in the
S3 console when you're done reviewing them. But leave the output
directory, which we'll continue to need.
Section 2: Pushing the filter closer to the data
In this section, we will see how pushing some of the computation closer to the data can dramatically reduce the amount of data that we have to process. S3 is a more sophisticated data store than HDFS used on the local cluster and can do important parts of the computation for us before the data is sent to Spark.
A fascinating feature of EMR's adaptation of Spark is that if the query planner is informed that input is coming from S3, the planner will automatically push as much filtering as possible out to S3. The only change you have to make in your code is to specify S3 as the input source.
Loading the input data to S3
This section will read a different dataset, weather-1
. As you did in
the first section, download the dataset to your machine from
ggbaker.ca/732-datasets/ and
upload it to S3 in your bucket.
Running the original application without S3 filtering
Add a new step with to your cluster with the following parameters:
- Step type (Dropdown):
Spark Application
- Name (Text):
Weather, no S3 filtering
- Deploy mode (Dropdown):
Client
- Spark-submit options (Text):
--conf spark.yarn.maxAppAttempts=1
- Application location (Text):
s3://BUCKET/weather_etl.py
- Arguments (Text):
s3://BUCKET/weather-1 s3://BUCKET/output/weather-1
- Action on failure (Dropdown):
Continue
[Default]
The step should run quickly.
Viewing the read size in the Spark history
EMR saves a step's Spark history for a week after it has completed. You can review that history during that week, even after the cluster has been terminated. There is a tremendous amount of detail in a Spark history, which you can use to optimize your applications. We'll dip into those details a few times in this assignment. In this section, we'll just look at the size of the input data.
- Select the
Summary
tab in the cluster page. - Click the
Spark history server
link, located under theApplication user interfaces
header. It may take a minute or two for the history to load into a new browser tab. - Look for the application named
weather ETL
(or whatever you named the application in your call toSparkSession.builder.appName()
). Click on the link in theApp ID
column. Note: In some cases, you may have to go to theShow incomplete application
page to see your application, even though your application has in fact completed. - Click on the
Stages
item in the top menu bar. - Click on the lowest "Completed stage", which should have a Stage ID
of 0 and be labelled something like
json at NativeMethodAccessorImpl.java:N
(where N is a line number)
Near the top left of the screen, you will see counts of input size, number of input records, output size, and number of output records. Record these values. [❓]
Rerunning the application with S3 filtering
Spark on EMR can make use of S3 Select to limit the amount of data that must be transferred from S3 into Spark. To see it work, make the following two changes to your copy of weather_etl.py
:
- Change the name specified in
SparkSession.builder.appName
toweather ETL S3 select
. - Change the data load call in the main routine to
spark.read.format("s3selectCSV").schema(observation_schema).options(compression='gzip').load(inputs)
. whereobservation_schema
is the variable containing the input schema andinputs
is the variable containing the input file directory.
Save the revised program as weather_etl_s3_select.py
and upload it
to your bucket in S3.
Run a new step with the same parameters as above, except:
- Name (Text):
Weather, with S3 filtering
- Application location (Text):
s3://BUCKET/weather_etl_s3_select.py
- Output directory
s3://BUCKET/output/weather-2
This step should run in about the same time as the first. However, we
will see that it read a much smaller amount of data from S3.
Look up this application's history in the Spark history server (recall
that it is named weather ETL S3 select
) and record its input and
output counts as you did for the version without S3 filtering. [❓]
Terminate this cluster
Terminate this cluster. In the next section, we will use a different configuration.
Section 3: Exploiting the elasticity of cloud services
In a cloud service, you pay for what you use when you use it. You can take advantage of this to expand your capacity temporarily when you need more. This property is called elasticity.
In this final section, you will use EMR to temporarily obtain some extra capacity to complete a job more quickly than you could achieve it on the standard cluster used in the course.
For comparison, we're going to run the same job on the local cluster first. Sign on to the local cluster and run the following job:
spark-submit --conf spark.dynamicAllocation.enabled=false --num-executors=4 --executor-cores=1 relative_score_bcast.py /courses/732/reddit-5 reddit-5-1
Bring up the Spark History at
localhost:18080 (you will need to have an
active SSH connection and have met Prerequisite 2 above). Locate
your application on the list and click on its App ID link. From the
Spark Jobs page, note the Total Uptime
(top left) and record the durations of
each job listed in the table. [❓]
Running an on-demand cluster from EMR
Imagine that the run you just made on the local cluster was the largest available to you. For example, the cluster might be shared by multiple teams, with your team allocated no more than this. What can you do if you need results faster than you can get them from your allocated local resources? You scale up by running an on-demand cluster from a cloud vendor such as AWS.
To do this, start a clone of the last cluster (as usual, not copying
any steps from it) but this time change the
hardware specification (on page "Step 2: Hardware" of the cloning
sequence) of Node type Core
:
- Amend the name of the clone to
c732-emr-4x-m7gd.xl
- Set Instance type of the "Core" Core Instance Group to
m7gd.xlarge
(Note thed
) - Set Instance count to
4
We are requesting 4 instances, each with 4 cores, rather than the 4 instances with 1 core that we were allocated on the local cluster.
Once you have made this change, advance to the fourth page ("Step 4")
and press Create cluster
. You will see that these more powerful
instances also start up far more quickly than the weaker m5.2xlarge
instances we used on our earlier EMR clusters.
When the cluster is ready, submit a Spark job running
relative_score_bcast.py
reading input from
s3://sfu-cmpt-732-redshift/reddit-5/
(this is a publicly-accessible S3
bucket). Remember to specify a brand-new output directory so that your
application does not fail on its write.
When the job has completed, open up its Spark history and select the
Jobs page. Record the Total Uptime
, as well as the time of each job when
running on this cluster. [❓]
Once you have run the job, terminate the cluster. These more powerful instances are more expensive and you do not want to run them for longer than necessary.
Questions
Use a word processor of your choice to generate answers.pdf
and fill in the following:
-
Take a screen shot of your list of EMR clusters (if more than one page, only the page with the most recent), showing that all have
Terminated
status. -
For Section 2:
-
What fraction of the input file was prefiltered by S3 before it was sent to Spark?
-
Comparing the different input numbers for the regular version versus the prefiltered one, what operations were performed by S3 and which ones performed in Spark?
-
-
For Section 3:
-
Reviewing the job times in the Spark history, which operations took the most time? Is the application IO-bound or compute-bound?
-
Look up the hourly costs of the
m7gd.xlarge
instance on the EC2 On-Demand Pricing page. Estimate the cost of processing a dataset ten times as large asreddit-5
using just those 4 instances. If you wanted instead to process this larger dataset making full use of 16 instances, how would it have to be organized?
-
Submission
Submit your files to the CourSys activity Assignment 5.