Migrating Apache Spark workloads from AWS EMR to Kubernetes

Dima Statz
ITNEXT
Published in
10 min readSep 30, 2020

--

Introduction

ESG research found that 43% of respondents considering cloud as their primary deployment for Apache Spark. And it makes a lot of sense because the cloud provides scalability, reliability, availability, and massive economies of scale. Another strong selling point of cloud deployment is a low barrier of entry in the form of managed services. Each one of the ‘Big Three’ cloud providers comes with its own offering to run Apache Spark as a managed service. You probably heard about AWS Elastic Map Reduce, Azure Databricks, Azure HDInsight, Google Dataproc.

I will focus on AWS Elastic Map Reduce since we are running our Spark workloads on AWS. We are using Apache Airflow for the workflow orchestration.

Data Flow

The data comes from different sources that are spread across different geo regions and not necessarily running on the AWS cloud. For example, some of the data sources are web apps running in browsers, others are mobile applications, some are external data pipelines, etc. Here and here you can see how we implemented our data ingestion steps. All input data collected in S3 buckets and indexed by the creation date in AWS DynamoDB. Doing so allows us to process data batches by any given time interval. We are processing ±2TB data per day while having ‘special events’ days when the amount of data can be much bigger.

Problem Statement

Overall, AWS EMR does a great job. It is a reliable, scalable, and flexible tool to manage Apache Spark clusters. AWS EMR comes with out-of-the-box monitoring in a form of AWS Cloudwatch, it provides a rich toolbox that includes Zeppelin, Livy, Hue, etc, and has very good security features. But AWS EMR has its own downgrades as well.

Portability: if you are building a multi-cloud or hybrid (cloud/on-prem) solution, be aware that migrating Spark Applications from AWS EMR can be a big deal. After running for a while on AWS EMR, you can find yourself tightly coupled to AWS specific features. It can be something simple, like logging and monitoring and it can be more complicated like an auto-scaling mechanism, custom master/worker AMIs, AWS security features, etc.

Cost overhead: the Amazon EMR price is in addition to the Amazon EC2 price. Take a look at the pricing example here

AWS EMR Pricing

As you can see you have to add 25% to the on-demand price of underlying EC2 when using m5.xlarge, m5.2xlarge or m5.4xlarge machines. When you are working with big machines, the EMR fee is between 12% and 6%. When using spots, the EMR price can be up to 35% of the price of the underlying EC2 spot instances. For example, see the attached 3-day bill that is taken from a real prod environment. Here, the AWS EMR cluster consists of spot instances and probably you can see that the EMR service is somewhat expensive.

A 3-day bill ($)

By doing yearly projection of the given three-day bill, you can see that in this specific case the EMR Service can be expensive as 300,000$/year.

Objectives

Summarizing the above, the desired framework for running Apache Spark achieves the overall goal through the following objectives:

Scalability.

The new solution should provide the ability to scale Apache Spark cluster out and in as computing needs change.

Reliability:

The new solution should monitor compute nodes and automatically terminate and replace instances in case of failure.

Portability:

The new solution should eliminate the dependency on the underlying infrastructure.

Cost-effectiveness:

The new solution should reduce the cost of managed services.

Running Apache Spark on Kubernetes

Apache Spark currently supports 4 different cluster managers: Standalone, Apache Mesos, Hadoop YARN, and Kubernetes. The first three are well known and have been around for a while. You can find a great article that covers all of them here. Kubernetes' support is new, it was introduced in Spark 2.3 (Feb 2018), and it is a big step forward from the portability point of view. Kubernetes provides a powerful abstraction for managing containerized applications. Kubernetes eliminates infrastructure lock-in and allows running applications across multiple operating environments, including dedicated on-prem servers, virtualized private clouds, and public clouds.

Kubernetes allows you to deploy cloud-native applications anywhere and manage them exactly as you like everywhere”

Kubernetes’ has two options to run Apache Spark applications: by spark-submit or by using the Kubernetes Operator for Spark. The latter option brings a lot in terms of management and monitoring. I will focus on the first one, as it is very simple, natively supported by Spark, and has good integration with Apache Airflow.

How it works

The spark-submit script in Spark’s bin directory is used to launch applications on a cluster using various resource managers (YARN, Apache Mesos, Kubernetes):

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

when the master’s URL starts with k8s://[Kubernetes_DNS], Spark will create a Spark driver running within a Kubernetes pod on the cluster specified by Kubernetes_DNS. The driver will create executors in Kubernetes pods and will execute the application code. When the application completes, the executor pods terminate and are cleaned up, but the driver pod persists logs and remains in “completed” state until it’s eventually garbage collected or manually cleaned up.

Performing spark-submit in K8s cluster

Now we have a clear picture of how running Apache Spark on K8s works and we have the motivation to start the migration process.

Run AWS EKS cluster

The first step is to create the Kubernetes cluster. Here we will use EKS — a fully-managed Kubernetes cluster. AWS has solid documentation of how to get started with EKS, for example, you can see step by step guides here and here. I will just summarize the whole process. Start with the cloning of the following repository:

git clone https://github.com/aws-samples/amazon-eks-apache-spark-etl-sample.git

Open and edit the example/eksctl.yaml file. It comes with the following content:

eksctl.yaml

Edit this file in case if you want to create the EKS cluster in the region other than us-east-1 (N.Virginia) or you prefer another machine type and capacity. Change line 22 and set your IAM_POLICY_ARN. Then create EKS cluster by running the following command

eksctl create cluster -f example/eksctl.yaml

It takes around 15 minutes to set up the new EKS cluster. Finally, you should be able to see the following information on the EKS cluster dashboard:

EKS Dashboard

API server endpoint (in the orange rectangle) is important, it will be used by Apache Airflow to submit spark jobs. Once the cluster is running, create the Kubernetes service account and the cluster role binding to give Kubernetes edit permissions to the Spark job:

kubectl apply -f example/kubernetes/spark-rbac.yaml

Note, that here we assume that Spark Applications will run in the default namespace. Change the spark-rbac.yaml file if you are going to run Spark Applications in the dedicated namespace.

spark-rbac.yaml

Build the Docker image for Spark applications

By definition, Kubernetes is a container-orchestration system and thus we have to create a docker image for Spark Applications. Here I will describe how it can be done for Scala applications. The idea is to create the docker image which contains a distribution of Apache Spark and Hadoop and a Fat Jar that contains all project class files and resources packed together with all its dependencies. The first step is to add (if not exists) an assembly SBT plugin to the project.sbt file

project.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")

The goal of using this plugin is simple: Create a fat JAR of your Scala project with all of its dependencies. The next step is to overview the SBT file for missing dependencies. AWS EMR comes with a lot of preinstalled libraries that will be missing when running on Kubernetes. So we have to bring them all to the docker image. Of course, the amount and types of libraries will differ from project to project, but at least we need the following:

"com.amazonaws" % "aws-java-sdk" % "1.7.4"
"org.apache.hadoop" % "hadoop-aws" % "2.7.1"

These libraries will be used in order to consume AWS services, like S3, DynamoDB, etc. And the last step is to create the docker file. Add new file

Dockerfile

Use the Dockerfile’s content from the cloned repository — https://github.com/aws-samples/amazon-eks-apache-spark-etl-sample.git as a blueprint. This docker file installs the following distribution of the Apache Spark:

https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

Be careful when changing Spark or Hadoop versions in the SBT file. Each Apache Spark version is compatible with very specific versions of Hadoop. Other important lines in the Dockerfile that you should be aware of:

# Define working directory
WORKDIR /opt/input

# Project Definition layers change less often than application code
COPY build.sbt ./

WORKDIR /opt/input/project
COPY project/build.properties ./
COPY project/*.sbt ./

WORKDIR /opt/input

# Copy rest of application
COPY . ./
RUN SBT_OPTS="-Xms4096M -Xmx4096M -Xss1024M -XX:MaxMetaspaceSize=4096M" sbt clean assembly

It copies all project’s source files and resources to the docker image and builds the Fat Jar. Build the docker image:

docker build -t <REPO_NAME>/<IMAGE_NAME>:v1.0 .

Ensure that you can see the following file in your docker image

//opt/spark/jars/*-assembly-v1.0.0.jar

This file contains your Spark Application code. And finally, you can upload your docker image to the AWS ECR

docker push <REPO_NAME>/<IMAGE_NAME>:v1.0

Airflow KubernetesPodOperator

Now we are ready to perform spark-submit from the code in Apache Airflow. Apache Airflow supports starting new Kubernetes pods either on the underlying Kubernetes cluster or on the remote one. For example, you can use KubernetesPodOperator in the following way:

I guess the code itself is pretty straightforward and I will focus on important parameters only. We will run Spark Applications on the underlying Kubernetes cluster in the default namespace under the “spark” service account.

eks_in_cluster = true
eks_namespace = "default"
eks_service_account_name="spark"

Note, that usually, it makes sense to isolate Spark Applications and to run them in the dedicated namespace. So if the default namespace is in use by other applications, it is better to create a dedicated spark apps namespace.

eks_context = CLUSTER_ARN

Cluster ARN can be found on the EKS Dashboard

EKS Dashboard

The eks_command parameter is just your spark-submit command and it should have the following form:

/bin/sh,-c,/opt/spark/bin/spark-submit --master {} --deploy-mode cluster --name {} --conf spark.driver.memory={} --conf spark.executor.instances={} --conf spark.executor.memory={} --conf spark.executor.cores={}  --conf spark.kubernetes.container.image={}  --conf spark.kubernetes.authenticate.driver.serviceAccountName={} {} --class {} {} {}

User k8s://API_SERVER_ENDPOINT (from the EKS dashboard) as the cluster's master.

Now, when Apache Airflow performs the KubernetesPodOperator, you will see that new pods for driver and executors are created. All logs can be fetched from pods stdout. Use Kubernetes port forwarding to monitor the job via the Spark UI hosted on the Spark driver. Just run

kubectl port-forward <SPARK_DRIVER_POD_NAME> 4040:4040

and navigate from your browser to localhost:4040 to see the Spark UI.

Monitoring with Prometheus and Grafana

One of the very strong features of EMR is a built-in monitoring and alerting capabilities in the form of CloudWatch metrics and alarms. Metrics are updated every five minutes and automatically collected and pushed to CloudWatch for every EMR cluster. The metrics reported by Amazon EMR provide the information that can be used to track the progress of the Apache Spark workloads, analyze Memory and CPU usage, detect unhealthy nodes, etc. Obviously, when moving to Kubernetes, we are losing all these capabilities. Fortunately, there are a lot of monitoring and observability tools that can be installed on K8s clusters. One of the most popular solutions is Prometheus and Grafana. The setup of these tools is out of the scope of this article but you can find a good introduction to Prometheus and Grafana on K8s here. Once these tools are set we can easily define golden metrics for our Spark Applications: Latency, Traffic, Errors, Saturation by using Grafana queries to Prometheus data source.

  1. Saturation — a measure of used resources
node:node_cpu_saturation_load1:node:node_memory_utilisation:ratio*100
CPU and Memory utilization

2. Latency — the time it takes to service a request (batch)

avg(kube_pod_completion_time{namespace="default", pod=~".*driver.*"} - kube_pod_start_time{namespace="default", pod=~".*driver.*"})
Spark App Duration

3. Traffic is defined as a custom app metric and shows the number of rows in the input Spark DataFrame

Spark Input Size

4. Error — counts the number of the failed pods. Can be aggregated by reason.

count(kube_pod_container_status_terminated_reason{namespace="default", container=~".*driver.*"})
Spark Errors

Summary

Migrating Apache Spark brings a lot in terms of portability, cost, and efficiency of resource sharing.

Cost

When comparing to EMR, the cost of running the same Spark workloads on Kubernetes is dramatically cheaper. AWS EKS cluster costs only 0.10$/hour (±72$/month) . On other hand, AWS EMR price is always a function of the cost of underlying EC2 machines. As it was shown at the beginning of the article, the EMR’s contribution to the total cost is significant, especially when Spark workloads are massive. If you need a rough estimation, you will save up to 20% comparing to EMR running on on-demand EC2 and up to 30% comparing to EMR running on spots.

Portability

When running on EMR, Spark applications and the tooling that support them are closely tied to the underlying AWS infrastructure such as proprietary scaling techniques, dependencies on AMIs, security features, logging and monitoring systems, etc. Kubernetes eliminates AWS lock-in by providing core capabilities for containers without imposing restrictions. When using Kubernetes you can run your Apache Spark Workloads across multiple operating environments, including GCP, AWS, Azure, on-prem servers, and private clouds.

Resource Sharing

Kubernetes makes highly efficient use of the underlying infrastructure since containers much lighter than virtual machines. Sharing the same EC2 machines between Spark Applications and other services reduces the need for computing power.

--

--