Apache Spark on Kubernetes - Submitting a job to Spark on Kubernetes (Part 2)


This is the Preparing Spark Docker image for submitting a job to Spark on Kubernetes (Part 2) from article series (see Part 1).

In this article we explain two options for job submitting to cluster:

  1. Create an inherited custom Docker image with job jar and submit capability.
  2. Mount a volume to original image with job jar.

Introduction

Given the Spark Standalone cluster we built in previous article, we no create a sample Spark job to be submitted to cluster. Then we describe two options for submitting to the cluster - jar file via inherited Docker image or Kubernetes volume.

Sample Spark Job

We have prepared an example Spark Job in examples/sql-example directory. The job has a simple pom.xml with only 2 dependencies:

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

Both of them are defined as provided, since these Jars are part of the Spark distribution. Beware of versions, however. There are variables in pom.xml

    <properties>
        <project.java.version>1.8</project.java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.0.1</spark.version>
    </properties>

which should match the Spark distribution version used in Docker images.

Source code of the project is very simple. It just prints a small Dataset with Spark SQL.

Building this project with maven:

examples/sql-example > mvn package

Produces a tiny 5KB Jar file:

-rw-rw-r-- 1 michalklempa michalklempa 4,7K jan  6 13:10 spark-sql-example-1.0.0-SNAPSHOT.jar

Submitting a job

We describe two ways of providing the Jar file into driver container:

Depending on surrounding environment and circumstances, you may find each of them useful in different scenarios. If your build system keeps versions of built Docker images from your source code, former option may be suitable. If your build system keeps track of Jar files and versions and deployment scripts are capable of providing volumes to Kubernetes dynamically, the latter is a way t ogo.

Inherited Docker image

We create a multi-stage Docker image for our example SQL Spark job. First stage (called build) will build the Jar file and second phase will place the Jar inside inherited Docker image:

ARG SPARK_VERSION=3.0.1
ARG HADOOP_VERSION=2.7
ARG ARG_JAR_NAME=spark-sql-example-1.0.0-SNAPSHOT.jar
ARG ARG_MAIN_CLASS=com.michalklempa.spark.sql.example.Main

FROM maven:3-openjdk-8 as build
ARG ARG_JAR_NAME
ARG ARG_MAIN_CLASS

RUN mkdir /work
COPY pom.xml /work/pom.xml
COPY src /work/src

WORKDIR /work
RUN mvn clean package

FROM michalklempa/spark:${SPARK_VERSION}-hadoop${HADOOP_VERSION}
ARG ARG_JAR_NAME
ARG ARG_MAIN_CLASS

COPY --from=build /work/target/${ARG_JAR_NAME} /${ARG_JAR_NAME}

ENV MAIN_CLASS=${ARG_MAIN_CLASS}
ENV JAR="/${ARG_JAR_NAME}"

Those build arguments for Spark version and Hadoop version, although not used in first stage, need to be on first lines of Dockefile, since those are re-used in FROM statement later. As we can see, besides building the Jar, only thing we do, is placing the Jar to root directory in final image and we set two environment variables:

  • MAIN_CLASS
  • JAR

Both of them are used in script submit.sh from original image. The submit.sh is an interesting piece:


env

export SPARK_PUBLIC_DNS=$(hostname -i)

java ${JAVA_OPTS} \
  -cp "${JAR}:${SPARK_HOME}/conf:${SPARK_HOME}/jars/*" \
  org.apache.spark.deploy.SparkSubmit \
  --deploy-mode client \
  --master spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT} \
  --class ${MAIN_CLASS} \
  ${SUBMIT_OPTS} \
  --conf 'spark.driver.host='$(hostname -i) \
  local://${JAR}

There is a possibility to specify custom JAVA_OPTS if you want, just by setting the environment variable. Custom submission options to SparkSubmit class, can be specified by SUBMIT_OPTS. Url used to specify Jar file starts with local:// scheme, which needs the Jar file to be present on every worker node when submitting a job to Spark cluster. Usually, the Jar is distributed by Spark itself, or with YARN or by some other means (HDFS, S3, Azure Blob Storage). Since in this setup, we have complete control of the Docker image used, we can just supply the Jar file in place and then we do not need to manage the Jar distribution. If your build system is versioning Docker images from your artifacts, putting the Jar file inside, makes perfect sense.

Using the former Dockerfile, lets build the image:

> eval $(minikube docker-env)
> docker build -t michalklempa/spark-sql-example .
...
 ---> Running in 5990a696bf1c
Removing intermediate container 5990a696bf1c
 ---> 1c52373acb80
Step 16/18 : COPY --from=build /work/target/${ARG_JAR_NAME} /${ARG_JAR_NAME}
 ---> 83a7fae7826f
Step 17/18 : ENV MAIN_CLASS=${ARG_MAIN_CLASS}
 ---> Running in 7c206f9e6499
Removing intermediate container 7c206f9e6499
 ---> 9927df4584e1
Step 18/18 : ENV JAR="/${ARG_JAR_NAME}"
 ---> Running in 26992958fe9e
Removing intermediate container 26992958fe9e
 ---> 2233ba372a4c
Successfully built 2233ba372a4c
Successfully tagged michalklempa/spark-sql-example:latest

Once build, we may test the job with manifest.yml from examples directory.

> kubectl apply -f manifest.yml
> kubectl get pod
NAME                                       READY   STATUS      RESTARTS   AGE
spark-driver-z5542                         1/1     Running     0          3m
spark-master-deployment-6ff87fc9df-n5g2s   1/1     Running     0          3m
spark-worker-deployment-56fb57c88d-5jhgc   1/1     Running     0          3m
spark-worker-deployment-56fb57c88d-xbwx4   1/1     Running     0          3m

Driver pod should log output Dataset:

> kubectl logs spark-driver-z5542
KUBERNETES_SERVICE_PORT_HTTPS=443
KUBERNETES_SERVICE_PORT=443
HOSTNAME=spark-driver
LANGUAGE=en_US:en
JAVA_HOME=/usr/lib/jvm/zulu11-ca-amd64
SPARK_MASTER_HOST=spark-master
PWD=/opt/spark
HOME=/root
LANG=en_US.UTF-8
KUBERNETES_PORT_443_TCP=tcp://10.96.0.1:443
SPARK_MASTER_PORT=7077
SHLVL=1
SPARK_HOME=/opt/spark
KUBERNETES_PORT_443_TCP_PROTO=tcp
KUBERNETES_PORT_443_TCP_ADDR=10.96.0.1
MAIN_CLASS=com.michalklempa.spark.sql.example.Main
...
21/01/06 19:41:14 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-master:7077...
21/01/06 19:41:34 INFO TransportClientFactory: Successfully created connection to spark-master/10.97.134.250:7077 after 2 ms (0 ms spent in bootstraps)
21/01/06 19:41:34 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20210106194134-0000
21/01/06 19:41:34 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45031.
...
21/01/06 19:41:54 INFO CodeGenerator: Code generated in 27.510135 ms
+-----------+----+
|temperature|snow|
+-----------+----+
|        1.0|30.0|
|        1.5|60.0|
|       -5.5|70.0|
|       -7.0|55.0|
+-----------+----+

21/01/06 19:41:54 INFO SparkUI: Stopped Spark web UI at http://172.18.0.4:4040

After running, the status of driver pod is completed:

kubectl get pod
NAME                                       READY   STATUS      RESTARTS   AGE
spark-driver-z5542                         0/1     Completed   0          39m
spark-master-deployment-6ff87fc9df-n5g2s   1/1     Running     0          39m
spark-worker-deployment-56fb57c88d-5jhgc   1/1     Running     0          39m
spark-worker-deployment-56fb57c88d-xbwx4   1/1     Running     0          39m

Mounting Kubernetes volume

Assuming minikube with docker driver, we first need to mount our examples/sql-example directory into minikube cluster:

> pwd
example/sql-examples
> minikube mount $(pwd):/mnt
📁  Mounting host path /home/michalklempa/github/docker-spark/examples/sql-example into VM as /mnt ...
    ▪ Mount type:   
    ▪ User ID:      docker
    ▪ Group ID:     docker
    ▪ Version:      9p2000.L
    ▪ Message Size: 262144
    ▪ Permissions:  755 (-rwxr-xr-x)
    ▪ Options:      map[]
    ▪ Bind Address: 172.17.0.1:39291
🚀  Userspace file server: ufs starting
✅  Successfully mounted /home/michalklempa/github/docker-spark/examples/sql-example to /mnt

📌  NOTE: This process must stay alive for the mount to be accessible ...

This will mount our working source code directory into running docker container of minikube, to the /mnt directory. When we build the project on our host machine, the target/spark-sql-example-1.0.0-SNAPSHOT.jar file appears as /mnt/target/spark-sql-example-1.0.0-SNAPSHOT.jar for Kubernetes.

We will use the stock docker image michalklempa/spark:3.0.1-hadoop2.7 for running Spark cluster and mount the JAR file as a volume. We need to add volume specification to the Pod template spec:

          volumes:
            - name: jar
              hostPath:
                path: /mnt/target/spark-sql-example-1.0.0-SNAPSHOT.jar
                type: File

And volume mount to container spec:

              volumeMounts:
                - mountPath: /spark-sql-example-1.0.0-SNAPSHOT.jar
                  name: jar

This two snippets are needed for worker deployment pods and driver pod, complete snippet of worker pod spec:

  - apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: spark-worker-deployment
      labels:
        app: spark
        role: worker
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: spark
          role: worker
      template:
        metadata:
          labels:
            app: spark
            role: worker
        spec:
          restartPolicy: Always
          enableServiceLinks: false
          containers:
            - name: spark-worker
              image: michalklempa/spark:3.0.1-hadoop2.7
              imagePullPolicy: Never
              command:
                - /worker.sh
              env:
                - name: SPARK_MASTER_HOST
                  value: "spark-master"
                - name: SPARK_MASTER_PORT
                  value: "7077"
              volumeMounts:
                - mountPath: /spark-sql-example-1.0.0-SNAPSHOT.jar
                  name: jar
          volumes:
            - name: jar
              hostPath:
                path: /mnt/target/spark-sql-example-1.0.0-SNAPSHOT.jar
                type: File

and driver spec:

spec:
  completions: 1
  template:
    metadata:
      labels:
        app: spark
        role: driver
    spec:
      restartPolicy: OnFailure
      enableServiceLinks: false
      hostname: spark-driver
      containers:
        - name: spark-driver
          image: michalklempa/spark:3.0.1-hadoop2.7
          imagePullPolicy: Never
          command:
            - /submit.sh
          env:
            - name: SPARK_MASTER_HOST
              value: "spark-master"
            - name: SPARK_MASTER_PORT
              value: "7077"
            - name: MAIN_CLASS
              value: "com.michalklempa.spark.sql.example.Main"
            - name: JAR
              value: "/spark-sql-example-1.0.0-SNAPSHOT.jar"
          volumeMounts:
            - mountPath: /spark-sql-example-1.0.0-SNAPSHOT.jar
              name: jar
      volumes:
        - name: jar
          hostPath:
            path: /mnt/target/spark-sql-example-1.0.0-SNAPSHOT.jar
            type: File

Driver pod also needs environment variables JAR and MAIN_CLASS for submit.sh to work. Complete manifest example is available as manifest-volumes.yml.

Apply this manifest:

kubectl apply -f manifest-volume.yml

Thats it.

Conclusion

All the project files and roles are available in docker-spark github repository. Pre-built docker images available at hub.docker.com/r/michalklempa/spark.