Using Spark with GPFS on the ACCRE Cluster
Motivation
Spark is a very versatile tool for big data applications because:
- it distributes jobs transparently to the application
- there’s no MPI programming to worry about; nodes communicate via
ssh
- it runs locally, on distributed filesystems, or HDFS
This last point is key, because ACCRE’s HPC cluster utilizes a distributed, or shared, filesystem for making data available to each node in the cluster. Thus, Spark can send an application to an arbitrary number of nodes and read the necessary data from the distributed filesystem as if it they were present locally. In essence, we’re substituting reads from disk (in the HDFS paradigm) for reads across the network.
In this post, we demonstrate how to analyze files on a shared network drive using Spark. Note that we’re not using the BigData Cluster for this job; rather, we’re running Spark on the production partition of the ACCRE cluster to reveal the inner workings of Spark’s standalone cluster model.
GPFS (General Parallel File System) is a proprietary, resilient, replicated
network files system developed at IBM. The traditional ACCRE HPC Cluster uses
GPFS to share files in the /home
, /scratch
, and /data
. The test application we’re running is wordcount which will be
the subject of another post. Feel free to follow along with the code repo.
SLURM Configuration
SLURM is the scheduler and resource manager utilized on the ACCRE HPC cluster, and in order to provision a cluster on SLURM, we have to go through SLURM. The credit for working out most of the logic goes to this post on serverfault.
The mechanics of launching a Spark cluster are a little more complicated than
most SLURM workflows, but the idea is pretty straightforward: we request a SLURM
job using sbatch
that blocks out all the resources we’ll need for starting the
cluster. Within this allocation, we’re going to launch a Spark standalone cluster,
which has the distinct roles of Client, Master, and Worker. The code that each of these
roles execute is defined with the task-roles.sh
shell script; this script takes
a single argment that indicates which role is intended to be executed. Let’s look at
the sbatch
job and these roles in depth.
Provisioning resources with SLURM
The resource request for our Spark cluster is provisioned through the SLURM script
batch-job.slurm
; this script requests, along with memory and time, a total of
8 tasks corresponding to one task for the Client,
one task for the Master, and an six tasks for Workers. Currently,
SLURM only supports homogeneous resource allocation, so there’s no way to customize
the resources depending on the task type. It’s a little bit of waste of resources,
but it’s pretty much necessary at this point.
#!/bin/bash
#SBATCH --ntasks=8
#SBATCH --cpus-per-task=2
#SBATCH --time=00:10:00
#SBATCH --mem-per-cpu=8G
#SBATCH --partition=debug
# Create a directory on /scratch to hold some info for this job
export JOB_ID=$SLURM_JOB_ID
export JOB_HOME="/scratch/$USER/$JOB_ID"
echo "JOB_HOME=$JOB_HOME"
mkdir -p $JOB_HOME
export NWORKERS=$(( $SLURM_NTASKS - 2 ))
echo "NWORKERS=$NWORKERS"
export LAST_PROC=$(( $SLURM_NTASKS - 1 ))
In addition to the SLURM directives requesting resources,
there are a number of global variables set in batch-job.slurm
which are necessary to configure the Spark setup. The number of cores and memory
allocated to the Spark job are calculated here, which prevents Spark from
grabbing more resources than SLURM has allocted. So, in order to ecreate a larger
cluster, just change the SBATCH directives for ntasks
and, possibly, mem-per-cpu
.
# Don't mess with these unless you know what you are doing
export SPARK_WORKER_CORES=$SLURM_CPUS_PER_TASK
export SPARK_DAEMON_MEMORY=$(( SLURM_MEM_PER_CPU * $SLURM_CPUS_PER_TASK - 1000 ))m
export SPARK_MEMORY=$SPARK_DAEMON_MEMORY
# These are the defaults anyways, but configurable
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
# Load the Spark package
setpkgs -a spark_2.1.0 # sets JAVA_HOME and SPARK_HOME
# Try to load stuff that the spark scripts will load
source "$SPARK_HOME/sbin/spark-config.sh"
source "$SPARK_HOME/bin/load-spark-env.sh"
Once our enviroment is set, we want to launch parallel tasks for our Client, Master,
and Workers using SLURM’s srun
command . To do this, we use the --multi-prog
option
of srun
, which allows specifying different exectuables for each task using
a plain-text config file. Rather than updating this file every time we
wish to change the number of tasks (with our #SBATCH
directives),
we simply generate this file on the fly from within the batch-job.slurm
script:
# Create the configuration file specific to this batch job
cluster_conf=$"# This file has been generated by $0\n"
cluster_conf+=$"0 ./task-roles.sh CLIENT\n"
cluster_conf+=$"1 ./task-roles.sh MASTER\n"
cluster_conf+=$"2-$LAST_PROC ./task-roles.sh WORKER"
echo -e $cluster_conf > cluster.conf
The file cluster.conf
contains on each line a task number or range of task numbers
and the executables to be run on those tasks:
# This file has been generated by /usr/spool/slurm/job12609822/slurm_script
0 ./task-roles.sh CLIENT
1 ./task-roles.sh MASTER
2-7 ./task-roles.sh WORKER
Finally, we pass the cluster.conf
filename to srun
:
# Start the CLIENT, MASTER, and WORKERS
srun -l -o slurm-%j-task-%t.out --multi-prog cluster.conf
In the next section, we’ll look at what each of these tasks actually does.
Assiging roles to tasks
The file task-roles.sh
stores the executable code called by srun
for each process
in our job.
The Client
The first code block that encapsulates one level of our recursion is the Client block.
The Client block starts with exporting the SLURM_JOB_ID
of the client and a number of
Spark-specific variables, and because these variables are exported, they will be
available to the child processes (SLURM jobs) that we start from Client.
If we don’t specify a node to SLURM, then we don’t know a priori on which node
the Master job will land. So, instead, we wait for the Master to write its URL to
a GPFS-shared drive, in this case $JOB_HOME/master_url
. The Client waits for
the master to write to this location with a timeout of 100 seconds.
if [ $LEVEL == "CLIENT" ]; then
# Wait for the master to signal back
getMasterURL
echo "Master Host: $MASTER_HOST"
msg1="To tunnel to MasterUI and JobUI -> ssh "
msg1+="-L $SPARK_MASTER_WEBUI_PORT:$MASTER_HOST:$SPARK_MASTER_WEBUI_PORT "
msg1+="-L 4040:$MASTER_HOST:4040 "
msg1+="$USER@login.accre.vanderbilt.edu"
echo $msg1
# Wait for workers to signal back
confirmWorkers
# Submit the Spark application, as defined in APP
$SPARK_HOME/bin/spark-submit \
--master $MASTER_URL \
--deploy-mode client \
--total-executor-cores $(( NWORKERS * $SPARK_EXECUTOR_CORES )) \
$APP
The function getMasterURL
waits for the Master to write to Workers, and
once it does, it sets the variable MASTER_URL
:
# Reads the master URL from a shared location, indicating that the
# Master process has started
function getMasterURL {
# Read the master url from shared location
MASTER_HOST=''
i=0
while [ -z "$MASTER_HOST" ]; do
if (( $i > 100 )); then
echo "Starting master timed out";
exit 1
fi
sleep 1s
local flag_path=$JOB_HOME/master_host
if [ -f $flag_path ]; then
MASTER_HOST=$(head -1 $flag_path)
else
echo "Master host not yet intialized"
fi
((i++))
done
MASTER_URL="spark://$MASTER_HOST:$SPARK_MASTER_PORT"
}
The function confirmWorkers
waits for all Workers to write their process IDs to
a shared location and then releases execution back to the calling script:
# Reads files named after worker process IDs
# Once each worker has written its hostname to file,
# this function terminates
function confirmWorkers {
local i=0
while (( i < $NWORKERS )); do
i=0
for w_procid in `seq 2 $LAST_PROC`; do
echo "Checking for workers"
if [ -f $JOB_HOME/$w_procid ]; then
(( i++ ))
fi
done
sleep 1s
done
}
Note that we create directories to keep all the job output together under our a parent directory named after the Client’s job ID.
Once the setup is out of the way, we’re ready to allocate and launch a SLURM job that will run Spark Master program. Here, we’re providing 10G of memory on a single node to run the Master.
Once the Workers are up, we are ready to submit our job using spark-submit
. The
options we’re passing to the spark-submit
specify the address of the Master and
that the driver will run on the Client node.
# TODO: Optionally, wait for workers to signal back
sleep 5s
# Specify input files
INPUT="README.md"
OUTPUT="wordcount_$(date +%Y%m%d_%H%M%S)"
APP="spark-wc_2.11-1.0.jar"
# Submit the Spark jar
$SPARK_HOME/bin/spark-submit \
--master $MASTER_URL \
--deploy-mode client \
$APP $INPUT $OUTPUT
# Tear down the master and workers
scancel $MASTER_JOB_ID
scancel $WORKERS_JOB_ID
At the end of the Client job, we cancel the SLURM jobs for the Master and Workers since they would otherwise continue running until reaching their time limit.
The Master
elif [ $LEVEL == MASTER ]; then
export SPARK_MASTER_HOST=$(hostname)
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export MASTER_URL="spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
export MASTER_WEBUI_URL="spark://$SPARK_MASTER_HOST:$SPARK_MASTER_WEBUI_PORT"
# Try to load stuff that the spark scripts will load
source "$SPARK_HOME/sbin/spark-config.sh"
source "$SPARK_HOME/bin/load-spark-env.sh"
# Write the master url to shared disk so that the client can read it.
echo $MASTER_URL > $JOB_HOME/master_url
# Start the Spark master
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master \
--ip $SPARK_MASTER_HOST \
--port $SPARK_MASTER_PORT \
--webui-port $SPARK_MASTER_WEBUI_PORT
The Worker
We force the Workers to wait on the Master to broadcast its URL before they start themselves, even though Workers will automatically retry if the master URL is not specified. Then, we simply need to start the worker through its class.
elif [ $LEVEL == "WORKER" ]; then
getMasterURL
export SPARK_WORKER_DIR=/tmp/$USER/$JOB_ID/work
export SPARK_LOCAL_DIR=/tmp/$USER/$JOB_ID
mkdir -p $SPARK_WORKER_DIR $SPARK_LOCAL_DIR
echo "Master URL = $MASTER_URL"
hostname > $JOB_HOME/$SLURM_PROCID
# Start the Worker
"$SPARK_HOME/bin/spark-class" \
org.apache.spark.deploy.worker.Worker $MASTER_URL
Running the job
To kick things off, all that’s necessary is sbatch batch-job.slurm
. Once the job
launches, a series of output files will be generated, one for each task and one for
the parent batch job itself.
In your slurm-#########-task-0.out
file, you’ll see the instructions for port
forwarding the Spark UIs to you local machine.
Simply run these commands from a new terminal session, for instance:
ssh -L 8080:vmpXXXX:8080 -L 4040:vmpXXXX:4040 vunetid@login.accre.vanderbilt.edu
The Spark Master UI
In your favorite web browser, navigate to localhost:8080
;
this UI is useful for making sure that your cluster has been set up appropriately.
It lists the wokers deployed, their memory allocation, and any jobs running on the
cluster:
The Spark Job UI
At localhost:4000
you’ll find this UI (provided a job is running):
This UI is particularly cool because it renders nice SVG graphics depicting the all
the steps of your job:
And finally, here’s a video clicking through the web UIs, but trust me, it’s much more fun to look at your own job.
Final thoughts
Since we’ve started the Master and Workers as non-terminating jobs, the SLURM job will run until the time limit is reached or until the job itself is cancelled. There’s no reason the client is limited to a single job submission, however.
Running Spark on top of SLURM might be useful for researchers who’ve already got lots of data stored on a shared filesystem on the cluster and don’t really want to move it to HDFS and back again. It’s easy to imagine a number of one-off jobs that a researcher could construct to analyze their data, and Spark provides a convenient way of composing those jobs quickly.