Spark on K8s — Run a Spark job on Amazon EKS cluster
It took me 2 weeks to successfully submit a Spark job on Amazon EKS cluster, because lack of documentations, or most of them are about running on Kubernetes with kops or GKE, not Amazon EKS.
In this tutorial, I will show you how-to run a sample Spark job on EKS cluster.
What is EKS?
According to AWS official document:
Amazon Elastic Kubernetes Service (Amazon EKS) makes it easy to deploy, manage, and scale containerized applications using Kubernetes on AWS.
Amazon EKS runs the Kubernetes management infrastructure for you across multiple AWS availability zones to eliminate a single point of failure. Amazon EKS is certified Kubernetes conformant so you can use existing tooling and plugins from partners and the Kubernetes community. Applications running on any standard Kubernetes environment are fully compatible and can be easily migrated to Amazon EKS.
Amazon EKS is generally available for all AWS customers.
Why choosing EKS?
The purpose we choose EKS instead of kops (Kubernetes Operation) because:
- We want to migrate our existing EMR clusters and all the Spark jobs to Kubernetes (K8s). Actually, we have several production K8s clusters provisioning by kops, but we want to explore AWS EKS to compare it with other approaches.
- No control plane to manage.
Amazon EKS runs the Kubernetes management infrastructure (kube-apiserver, etcd, kube-scheduler, kube-proxy and coredns, etc.) across multiple AWS Availability Zones, automatically detects and replaces unhealthy control plane nodes, and provides on-demand upgrades and patching. You simply provision worker nodes and connect them to the provided Amazon EKS endpoint.
- Easy to integrate with other AWS services (IAM, Load Balancer, EC2 Spot Instances and CloudWatch log, etc.).
- Our company have 24/7 Premium Support from AWS 😛.
You can read AWS EKS features for more reference.
Requirements
- Basic knowledge about K8s, Docker and Spark.
- Administrator permission on a running AWS EKS cluster. You can visit https://eksworkshop.com to learn how to setup.
- kubectl to manage K8s cluster (https://kubernetes.io/docs/tasks/tools/install-kubectl/).
Below is the general information of my EKS cluster (IP addresses are fake ones):
➜ kubectl cluster-info
Kubernetes master is running at https://4A5<i_am_tu>545E6.sk1.ap-southeast-1.eks.amazonaws.com
CoreDNS is running at https://4A5<i_am_tu>545E6.sk1.ap-southeast-1.eks.amazonaws.com/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
➜ kubectl get nodes
NAME STATUS ROLES AGE VERSION
ip-10–0–0–3.ap-southeast-1.compute.internal Ready <none> 2d9h v1.12.7
ip-10–0–0–5.ap-southeast-1.compute.internal Ready <none> 2d9h v1.12.7
ip-10–0–0–7.ap-southeast-1.compute.internal Ready <none> 2d9h v1.12.7
➜ kubectl get all -n kube-system
NAME READY STATUS RESTARTS AGE
pod/aws-node-fb5kv 1/1 Running 0 2d9h
pod/aws-node-ldkqc 1/1 Running 0 2d9h
pod/aws-node-vx95t 1/1 Running 0 2d9h
pod/coredns-78966b4675-c7bms 1/1 Running 0 2d9h
pod/coredns-78966b4675-wd6hx 1/1 Running 0 2d9h
pod/kube-proxy-48g97 1/1 Running 0 2d9h
pod/kube-proxy-9tvfk 1/1 Running 0 2d9h
pod/kube-proxy-zk5z6 1/1 Running 0 2d9h
pod/tiller-deploy-56d7789fd4-nc7z7 1/1 Running 0 2d7hNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kube-dns ClusterIP 172.20.0.10 <none> 53/UDP,53/TCP 2d9h
service/tiller-deploy ClusterIP 172.20.63.157 <none> 44134/TCP 2d9hNAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE
daemonset.apps/aws-node 3 3 3 3 3 <none> 2d9h
daemonset.apps/kube-proxy 3 3 3 3 3 <none> 2d9hNAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
deployment.apps/coredns 2 2 2 2 2d9h
deployment.apps/tiller-deploy 1 1 1 1 2d9hNAME DESIRED CURRENT READY AGE
replicaset.apps/coredns-78966b4675 2 2 2 2d9h
replicaset.apps/tiller-deploy-56d7789fd4 1 1 1 2d7h
replicaset.apps/tiller-deploy-6656d56444 0 0 0 2d9h
How it work
From Spark official guideline:
spark-submit
can be directly used to submit a Spark application to a Kubernetes cluster. The submission mechanism works as follows:- Spark creates a Spark driver running within a Kubernetes pod.
- The driver creates executors which are also running within Kubernetes pods and connects to them, and executes application code.
- When the application completes, the executor pods terminate and are cleaned up, but the driver pod persists logs and remains in “completed” state in the Kubernetes API until it’s eventually garbage collected or manually cleaned up.
Note that in the completed state, the driver pod does not use any computational or memory resources.
The driver and executor pod scheduling is handled by Kubernetes. It is possible to schedule the driver and executor pods on a subset of available nodes through a node selector using the configuration property for it. It will be possible to use more advanced scheduling hints like node/pod affinities in a future release.
A sample spark-submit looks like this
➜ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/examples.jar
Create namespace, service account and role for Jump pod
The jump pod is a special pod helps us to deploy cluster or client mode Spark applications into EKS cluster.
- Create a file named spark_role.yaml as following
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark-pi
namespace: spark-pi
automountServiceAccountToken: true
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-pi-role
namespace: spark-pi
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps"]
verbs: ["get", "list", "watch", "create", "delete", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-pi-role-binding
namespace: spark-pi
subjects:
- kind: ServiceAccount
name: spark-pi
namespace: spark-pi
roleRef:
kind: Role
name: spark-pi-role
apiGroup: rbac.authorization.k8s.io
- Run kubectl to create namespace and service account
➜ kubectl create namespace spark-pi
namespace/spark-pi created
➜ kubectl apply -f ~/lab/k8s/spark_role.yml
serviceaccount/spark-pi created
role.rbac.authorization.k8s.io/spark-pi-role created
rolebinding.rbac.authorization.k8s.io/spark-pi-role-binding created
- Verify whether new service account has permission to create/delete pods
➜ kubectl auth can-i create pod --as=system:serviceaccount:spark-pi:spark-pi -n spark-pi
yes
Building Docker image to run spark-submit
I wrote a simple script to download spark source code, run mvn compile, build and push Docker images.
I’m using docker:dind image to run and build Docker image inside a Docker container
➜ docker container run \
--privileged -it \
--name spark-build \
-v /var/run/docker.sock:/var/run/docker.sock \
-v ${PWD}:/tmp \
-e USER=<your_docker_username> \
-e PASSWORD=<your_docker_password> \
-w /opt \
docker:dind \
sh /tmp/build.sh
You can use the Docker image I’ve built from above script with tag vitamingaugau/spark:spark-2.4.4.
Specially thanks to Andy Grove and his post for pointing out the issue with Kubernetes client. We need to patch the Kubernetes client version before building Spark from source. The fix should be come together with Spark 2.4.5, hopefully 🤞.
java.net.ProtocolException: Expected HTTP 101 response but was '403 Forbidden'
at okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:216)
at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:183)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
It’s show time!
Now we’re ready to submit example Spark application to our EKS cluster
- Running jump pod from my Docker image
➜ kubectl run --generator=run-pod/v1 jump-pod --rm -i --tty --serviceaccount=spark-pi --namespace=spark-pi --image vitamingaugau/spark:spark-2.4.4 sh
- Inside the pod shell, prepare some variables
sh-4.4# export SA=spark-pi
sh-4.4# export NAMESPACE=spark-pi
sh-4.4# export TOKEN=/var/run/secrets/kubernetes.io/serviceaccount/token
sh-4.4# export CACERT=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
- Run spark-submit command inside jump pod:
sh-4.4# /opt/spark/bin/spark-submit \
--master=k8s://https://4A5<i_am_tu>545E6.sk1.ap-southeast-1.eks.amazonaws.com:443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.driver.pod.name=spark-pi-driver \
--conf spark.kubernetes.container.image=vitamingaugau/spark:spark-2.4.4 \
--conf spark.kubernetes.namespace=$NAMESPACE \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=$SA \
--conf spark.kubernetes.authenticate.submission.caCertFile=$CACERT \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=$TOKEN \
--conf spark.executor.instances=2 \
local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.4.jar 20000
Noted: Remember to define K8s apiserver port (e.g: 443
) when run spark-submit command.
- The command will keep watching state of driver pod, from Creating -> Running -> Succeeded -> Terminated. You can see sample output as below:
19/09/05 20:20:08 INFO LoggingPodStatusWatcherImpl: State changed, new state:
pod name: spark-pi-driver
namespace: spark-pi
labels: spark-app-selector -> spark-1d3977f57e224a3e8bd8429f78b83076, spark-role -> driver
pod uid: 8cd8e900-d01a-11e9-9747-028eb46babf0
creation time: 2019-09-05T20:20:06Z
service account name: spark-pi
volumes: spark-local-dir-1, spark-conf-volume, spark-pi-token-qp4d4
node name: ip-10-150-58-108.ap-southeast-1.compute.internal
start time: 2019-09-05T20:20:06Z
container images: vitamingaugau/spark:spark-2.4.4
phase: Running
status: [ContainerStatus(containerID=docker://c115b2b8a29a1d5293449761cf39cd8b030bdb8290ef247fc7d6bfff4654053c, image=vitamingaugau/spark:spark-2.4.4, imageID=docker-pullable://vitamingaugau/spark@sha256:3e26a8bae5e210f26be5c64d33b110653bb97630ad19be9e415d20576ad75e91, lastState=ContainerState(running=null, terminated=null, waiting=null, additionalProperties={}), name=spark-kubernetes-driver, ready=true, restartCount=0, state=ContainerState(running=ContainerStateRunning(startedAt=2019-09-05T20:20:07Z, additionalProperties={}), terminated=null, waiting=null, additionalProperties={}), additionalProperties={})]
19/09/05 20:21:26 INFO LoggingPodStatusWatcherImpl: State changed, new state:
pod name: spark-pi-driver
namespace: spark-pi
labels: spark-app-selector -> spark-1d3977f57e224a3e8bd8429f78b83076, spark-role -> driver
pod uid: 8cd8e900-d01a-11e9-9747-028eb46babf0
creation time: 2019-09-05T20:20:06Z
service account name: spark-pi
volumes: spark-local-dir-1, spark-conf-volume, spark-pi-token-qp4d4
node name: ip-10-150-58-108.ap-southeast-1.compute.internal
start time: 2019-09-05T20:20:06Z
container images: vitamingaugau/spark:spark-2.4.4
phase: Succeeded
status: [ContainerStatus(containerID=docker://c115b2b8a29a1d5293449761cf39cd8b030bdb8290ef247fc7d6bfff4654053c, image=vitamingaugau/spark:spark-2.4.4, imageID=docker-pullable://vitamingaugau/spark@sha256:3e26a8bae5e210f26be5c64d33b110653bb97630ad19be9e415d20576ad75e91, lastState=ContainerState(running=null, terminated=null, waiting=null, additionalProperties={}), name=spark-kubernetes-driver, ready=false, restartCount=0, state=ContainerState(running=null, terminated=ContainerStateTerminated(containerID=docker://c115b2b8a29a1d5293449761cf39cd8b030bdb8290ef247fc7d6bfff4654053c, exitCode=0, finishedAt=2019-09-05T20:21:25Z, message=null, reason=Completed, signal=null, startedAt=2019-09-05T20:20:07Z, additionalProperties={}), waiting=null, additionalProperties={}), additionalProperties={})]
19/09/05 20:21:26 INFO LoggingPodStatusWatcherImpl: Container final statuses:Container name: spark-kubernetes-driver
Container image: vitamingaugau/spark:spark-2.4.4
Container state: Terminated
Exit code: 0
19/09/05 20:21:26 INFO Client: Application spark-pi finished.
19/09/05 20:21:26 INFO ShutdownHookManager: Shutdown hook called
19/09/05 20:21:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-48ee836f-53af-497e-a9ba-472a8106e5d5
- Open another terminal and using kubectl to check
➜ kubectl -n spark-pi get all
NAME READY STATUS RESTARTS AGE
pod/jump-pod 1/1 Running 0 70m
pod/spark-pi-1567714805266-exec-1 1/1 Running 0 2s
pod/spark-pi-1567714805266-exec-2 1/1 Running 0 2s
pod/spark-pi-driver 1/1 Running 0 7sNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/spark-pi-1567714805266-driver-svc ClusterIP None <none> 7078/TCP,7079/TCP 8s
- Checking driver pod logs, you will see a SparkUI endpoint has been created, you can access the UI to check the status of Spark application
➜ kubectl -n spark-pi logs pod/spark-pi-driver..................
19/09/05 20:17:34 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/09/05 20:17:34 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://spark-pi-1567714650240-driver-svc.spark-pi.svc:4040
..................
➜ kubectl -n spark-pi port-forward pod/spark-pi-driver 4040:4040
Now you can access the Spark UI via http://127.0.0.1:4040
What next?!
Checkout my new tutorials for Spark on K8s series:
Feel free to let me know if you’re facing any issue when follow this tutorial.