Introduction

Jupyter notebooks are a popular way of executing code with an in-browser GUI. In addtition to code execution, they support plotting, markdown, and much more functionality with the aim of tightly integrating code and documenation. The simplest way to view a Jupyter notebook running remotely is to use port forwarding via ssh, for example:

ssh -L 9999:<remote_host>:8889 <remote_host>

This will forward the notebook running on the remote host at port 8889 to your local port 9999. To see what’s happening on your local port 9999, just type localhost:9999 into the address bar of your favorite browser. Since we haven’t started the notebook, you shouldn’t see anything yet.

Use Lmod to load the required software

The Big Data cluster now uses Lmod to load software packages, just like the main ACCRE cluster. The Big Data cluster has its own set of packages specific to big data workflows, including Spark. So, the first step in getting our Jupyter notebook up and running is to load Spark and Anaconda

ml purge
ml Spark/2.2.0 Anaconda3/4.2.0

Specify PySpark configuration

The module Anaconda3 contains the pertinent commands that we need to run PySpark, namely python3 and jupyter. To use these commands, we need to tell Spark which version of Python we need to use; this happens in a few places.

Spark executors are launched on the worker nodes in the cluster, and the PYSPARK_PYTHON environemnt variable tells Spark workers which version of Python to use.

export PYSPARK_PYTHON=$(which python3)

This works because all Lmod software is installed in the /opt directory, an NFS mounted directory that is shared across the entire big data cluster. So even though Lmod is only installed on the gateway node, the python3 command is present on each machine in the cluster at the path returned by $(which python3).

For PySpark applications, the Spark driver should execute on the client node. In this case, we need to tell the driver to execute the jupyter command rather than a python command:

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8889 --ip="*""

Setting PYSPARK_DRIVER_PYTHON to jupyter is, admittedly, a bit of a hack, since jupyter doesn’t actually function the same way as python. As you may have already guessed, PYSPARK_DRIVER_PYTHON_OPTS specifies everything passed to the jupyter command.

Launch the PySpark job

The Spark/2.2.0 module makes available the pyspark command, which is what we’ll use to kick off our interactive job:

pyspark --master yarn

The --master yarn option tells PySpark that we want to use YARN to schedule our job across the Big Data cluster; if we omitted it, we would simply run PySpark on the gateway node without access to the HDFS filesystem and the computational power of the worker nodes. Thus, this is usually what you want, unless you’re doing small testing or debugging. The magic of distributing your job across the cluster with this single option is made possible by the environment variable HADOOP_CONF_DIR which is automatically set when you load Spark/2.2.0.

After running the above command, you should see some normal logging output from the jupyter notebook command, so now log in to localhost:9999 and you should see the Jupyter file browser launched from your directory on the cluster. Click on New > Python [default] and Voila! You now have a new Jupyter notebook instance.

A simple demonstration of PySpark

The content in this section comes from the pyspark-notebook-example in our GitHub repo.

Pro Tip: The command jupyter nbconvert --to markdown <notebook>.ipynb converts a Jupyter notebook to pure markdown, making it easy to incorporate into blog posts, for example!

%matplotlib inline
import matplotlib.pyplot as plt

import pyspark
import random
import time

Estimate \(\pi\)

As a simple example, let’s approximate \(pi\) with brute force.

Define a function to decide whether a randomly chosen point in the \(XY\) plane lies inside the unit circle.

def inside(p):     
    x, y = random.random(), random.random()
    return x*x + y*y < 1

Define a function that uses the SparkContext, sc, to distribute the work across partitions

def approx_pi(num_samples, num_partitions):
    num_inside = sc.parallelize(range(0, num_samples), num_partitions).filter(inside).count()
    pi = 4 * num_inside / num_samples
    return pi

Explore how the number of partitions affects the speed of the computation.

samples = 100000000
num_partitions = 7
errors = []
times = []
partitions = []

for _ in range(5):
    t0 = time.time()
    pi_approx = approx_pi(samples, num_partitions)
    t1 = time.time()
    
    
    dt = t1 - t0
    errors.append(pi_approx - 3.1415926536)
    times.append(dt)
    
    partitions.append(num_partitions)
    num_partitions *= 3
    

plt.plot(partitions, times)
[<matplotlib.lines.Line2D at 0x7f59449b80f0>]

png

Test for Benford’s law

Let’s count the occurence of numeric digits within a body of text and decide if the distribution of digits follows Benford’s law. The data we’ll explore are bikeshare records in DC, currently available in the shared /data directory on the Big Data cluster. For example:

$ hadoop fs -cat /data/capitalbikeshare-data/2016-Q3-Trips-History-Data-1.csv | head -20
Duration (ms),Start date,End date,Start station number,Start station,End station number,End station,Bike number,Member Type
840866,8/31/2016 23:59,9/1/2016 0:13,31117,15th & Euclid St  NW,31228,8th & H St NW,W20409,Registered
656098,8/31/2016 23:58,9/1/2016 0:09,31279,19th & G St NW,31600,5th & K St NW,W20756,Registered
353159,8/31/2016 23:58,9/1/2016 0:04,31107,Lamont & Mt Pleasant NW,31101,14th & V St NW,W22626,Registered
219234,8/31/2016 23:58,9/1/2016 0:02,31200,Massachusetts Ave & Dupont Circle NW,31212,21st & M St NW,W00980,Casual
213473,8/31/2016 23:56,8/31/2016 23:59,31281,8th & O St NW,31280,11th & S St NW,W21338,Registered
637695,8/31/2016 23:56,9/1/2016 0:07,31624,North Capitol St & F St NW,31241,Thomas Circle,W21422,Registered
356455,8/31/2016 23:54,9/1/2016 0:00,31034,N Randolph St & Fairfax Dr,31023,Fairfax Dr & Wilson Blvd,W00748,Casual
924793,8/31/2016 23:53,9/1/2016 0:08,31124,14th & Irving St NW,31267,17th St & Massachusetts Ave NW,W21480,Registered
309433,8/31/2016 23:53,8/31/2016 23:58,31242,18th St & Pennsylvania Ave NW,31285,22nd & P ST NW,W20113,Registered
447572,8/31/2016 23:52,8/31/2016 23:59,31007,Crystal City Metro / 18th & Bell St,31011,23rd & Crystal Dr,W00943,Registered
438823,8/31/2016 23:52,8/31/2016 23:59,31122,16th & Irving St NW,31214,17th & Corcoran St NW,W00833,Registered
350341,8/31/2016 23:51,8/31/2016 23:57,31264,6th St & Indiana Ave NW,31244,4th & E St SW,W20404,Registered
568722,8/31/2016 23:51,9/1/2016 0:01,31238,14th & G St NW,31201,15th & P St NW,W22750,Registered
229685,8/31/2016 23:50,8/31/2016 23:54,31304,36th & Calvert St NW / Glover Park,31226,34th St & Wisconsin Ave NW,W21185,Registered
405098,8/31/2016 23:50,8/31/2016 23:57,31212,21st & M St NW,31114,18th St & Wyoming Ave NW,W22401,Registered
2592652,8/31/2016 23:47,9/1/2016 0:30,31067,Columbia Pike & S Walter Reed Dr,31260,23rd & E St NW ,W21736,Registered
570531,8/31/2016 23:46,8/31/2016 23:56,31111,10th & U St NW,31122,16th & Irving St NW,W21005,Registered
981529,8/31/2016 23:46,9/1/2016 0:03,31235,19th St & Constitution Ave NW,31258,Lincoln Memorial,W22953,Registered
1138485,8/31/2016 23:45,9/1/2016 0:04,31000,Eads St & 15th St S,31000,Eads St & 15th St S,W21251,Casual

Define our mapping function that will count the number of digits

def count_digits(string):
    counter = [0] * 10
    for c in string:
        if c in '0123456789':
            counter[int(c)] += 1
    return counter

dummy_counter = count_digits('abcd111123234')
print(dummy_counter)
[0, 4, 2, 2, 1, 0, 0, 0, 0, 0]

Define a reduce function that combines two counters into one.

def combine_digits(a, b):
    # assert(len(a) == len(b) == 10)
    return [a[i] + b[i] for i in range(10)]

print(combine_digits(dummy_counter, dummy_counter))
[0, 8, 4, 4, 2, 0, 0, 0, 0, 0]

Apply the map and reduce functions

lines = sc.textFile('/data/capital*/*.csv', 60) # notice the globbing functionality in the file names
counts = lines.map(count_digits).reduce(combine_digits)
print(counts)
[89309440, 133682530, 96328002, 50280321, 38989407, 37620303, 30666030, 27789765, 26541711, 23779326]

Plot the counts as a histogram.

fig, ax = plt.subplots()
ax.bar(range(1,10), counts[1:], align='center')
ax.set_ylabel('Counts')
ax.set_xlabel('Digit')
ax.set_xlim(0.5, 9.5)
ax.set_xticks(range(1, 10))
# ax.set_xticklabels(range(1,10))
[<matplotlib.axis.XTick at 0x7f5946d62908>,
 <matplotlib.axis.XTick at 0x7f5944399550>,
 <matplotlib.axis.XTick at 0x7f59443a5e48>,
 <matplotlib.axis.XTick at 0x7f5944935f98>,
 <matplotlib.axis.XTick at 0x7f594493a9e8>,
 <matplotlib.axis.XTick at 0x7f594493c438>,
 <matplotlib.axis.XTick at 0x7f594493ce48>,
 <matplotlib.axis.XTick at 0x7f5944940898>,
 <matplotlib.axis.XTick at 0x7f59449442e8>]

png

A set of numbers is said to satisfy Benford’s law if the leading digit \(d (d ∈ {1, …, 9})\) occurs with probability

from math import log10

total_counts = sum(counts[1:])
probs = [cts / total_counts for cts in counts]

for d, prob in enumerate(probs):
    if d == 0:
        continue
    ideal = log10(1 + 1 / d)
    print("Ideal: {0} --- Actual {1}".format(ideal, prob))
Ideal: 0.3010299956639812 --- Actual 0.28707111711961025
Ideal: 0.17609125905568124 --- Actual 0.20685565379440418
Ideal: 0.12493873660829992 --- Actual 0.10797243228866628
Ideal: 0.09691001300805642 --- Actual 0.08372621780363636
Ideal: 0.07918124604762482 --- Actual 0.08078619104970727
Ideal: 0.06694678963061322 --- Actual 0.06585252006917794
Ideal: 0.05799194697768673 --- Actual 0.05967600166634672
Ideal: 0.05115252244738129 --- Actual 0.05699591881628697
Ideal: 0.04575749056067514 --- Actual 0.05106394739216406