Spark Dataframes with Plotly


Apache Spark's meteoric rise has been incredible. It is one of the fastest growing open source projects and is a perfect fit for the graphing tools that Plotly provides. Plotly's ability to graph and share images from Spark DataFrames quickly and easily make it a great tool for any data scientist and Plotly Enterprise make it easy to securely host and share those Plotly graphs.

This notebook will go over the details of getting set up with IPython Notebooks for graphing Spark data with Plotly.


First you'll have to create an ipython profile for pyspark, you can do this locally or you can do it on the cluster that you're running Spark.

Start off by creating a new ipython profile. (Spark should have ipython install but you may need to install ipython notebook yourself).

ipython profile create pyspark

Next you'll have to edit some configurations. Spark/Hadoop have plenty of ports that they open up so you'll have to change the below file to avoid any conflicts that might come up.

~/.ipython/profile_pyspark/ipython_notebook_config.py

If you're not running Spark locally, you'll have to add some other configurations. Cloudera's blog has a great post about some of the other things you can add, like passwords.

IPython's documentation also has some excellent recommendations for settings that you can find on the "Securing a Notebook Server" post on ipython.org.

You'll likely want to set a port, and an IP address to be able to access the notebook.

Next you'll need to set a couple of environmental variables. You can do this at the command line or you can set it up in your computer's/master node's bash_rc/bash_profile files.

export SPARK_HOME="$HOME/Downloads/spark-1.3.1"

Now we'll need to add a file to make sure that we boot up with the Spark Context. Basically when we start the IPython Notebook, we need to be bring in the Spark Context. We need to set up a startup script that runs everytime we start a notebook from this profile.

Setting startup scripts are actually extremely easy - you just put them in the IPython Notebook directory under the "startup" folder. You can learn more about IPython configurations on the IPython site.

We'll create a file called pyspark_setup.py

in it we'll put

import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)

# check if it exists
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

# check if it is a directory
if not os.path.isdir(spark_home):
    raise ValueError('SPARK_HOME environment variable is not a directory')

#check if we can find the python sub-directory
if not os.path.isdir(os.path.join(spark_home, 'python')):
    raise ValueError('SPARK_HOME directory does not contain python')

sys.path.insert(0, os.path.join(spark_home, 'python'))

#check if we can find the py4j zip file
if not os.path.exists(os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')):
    raise ValueError('Could not find the py4j library - \
            maybe your version number is different?(Looking for 0.8.2.1)')

sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

with open(os.path.join(spark_home, 'python/pyspark/shell.py')) as f:
    code = compile(f.read(), os.path.join(spark_home, 'python/pyspark/shell.py'), 'exec')
    exec(code)

And now we're all set! When we start up an ipython notebook, we'll have the Spark Context available in our IPython notebooks. This is one time set up! So now we're ready to run things normally! We just have to start a specific pyspark profile.

ipython notebook --profile=pyspark

We can test for the Spark Context's existence with print sc.

In [1]:
from __future__ import print_function #python 3 support
print(sc)
<pyspark.context.SparkContext object at 0x10e797950>

Now that we've got the SparkContext, let's pull in some other useful Spark tools that we'll need. We'll be using pandas for some downstream analysis as well as Plotly for our graphing.

We'll also need the SQLContext to be able to do some nice Spark SQL transformations.

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

import plotly.plotly as py
from plotly.graph_objs import *
import pandas as pd
import requests
requests.packages.urllib3.disable_warnings()

The data we'll be working with is a sample of the open bike rental data. Essentially people can rent bikes and ride them from one station to another. This data provides that information. You can snag the sample I am using in JSON format here..

Now we can import it.

In [3]:
btd = sqlContext.jsonFile("btd2.json")

Now we can see that it's a DataFrame by printing its type.

In [4]:
print(type(btd))
<class 'pyspark.sql.dataframe.DataFrame'>

Now RDD is the base abstraction of Apache Spark, it's the Resilient Distributed Dataset. It is an immutable, partitioned collection of elements that can be operated on in a distributed manner. The DataFrame builds on that but is also immutable - meaning you've got to think in terms of transformations - not just manipulations.

Because we've got a json file, we've loaded it up as a DataFrame - a new introduction in Spark 1.3. The DataFrame interface which is similar to pandas style DataFrames except for that immutability described above.

We can print the schema easily, which gives us the layout of the data. Everything that I'm describing can be found in the Pyspark SQL documentation.

In [5]:
btd.printSchema()
root
 |-- Bike #: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: string (nullable = true)
 |-- Subscription Type: string (nullable = true)
 |-- Trip ID: string (nullable = true)
 |-- Zip Code: string (nullable = true)

We can grab a couple, to see what the layout looks like.

In [6]:
btd.take(3)
Out[6]:
[Row(Bike #=u'520', Duration=u'63', End Date=u'8/29/13 14:14', End Station=u'South Van Ness at Market', End Terminal=u'66', Start Date=u'8/29/13 14:13', Start Station=u'South Van Ness at Market', Start Terminal=u'66', Subscription Type=u'Subscriber', Trip ID=u'4576', Zip Code=u'94127'),
 Row(Bike #=u'661', Duration=u'70', End Date=u'8/29/13 14:43', End Station=u'San Jose City Hall', End Terminal=u'10', Start Date=u'8/29/13 14:42', Start Station=u'San Jose City Hall', Start Terminal=u'10', Subscription Type=u'Subscriber', Trip ID=u'4607', Zip Code=u'95138'),
 Row(Bike #=u'48', Duration=u'71', End Date=u'8/29/13 10:17', End Station=u'Mountain View City Hall', End Terminal=u'27', Start Date=u'8/29/13 10:16', Start Station=u'Mountain View City Hall', Start Terminal=u'27', Subscription Type=u'Subscriber', Trip ID=u'4130', Zip Code=u'97214')]

Now one thing I'd like to look at is the duration distribution - can we see how common certain ride times are?

To answer that we'll get the durations and the way we'll be doing it is through the Spark SQL Interface. To do so we'll register it as a table.

In [7]:
sqlCtx.registerDataFrameAsTable(btd, "bay_area_bike")

Now as you may have noted above, the durations are in seconds. Let's start off by looking at all rides under 2 hours.

In [8]:
60 * 60 * 2 # 2 hours in seconds
Out[8]:
7200
In [9]:
df2 = sqlCtx.sql("SELECT Duration as d1 from bay_area_bike where Duration < 7200")

We've created a new DataFrame from the transformation and query - now we're ready to plot it. One of the great things about plotly is that you can throw very large datasets at it and it will do just fine. It's certainly a much more scalable solution than matplotlib.

Below I create a histogram of the data.

In [10]:
data = Data([Histogram(x=df2.toPandas()['d1'])])
In [11]:
py.iplot(data, filename="spark/less_2_hour_rides")
/Users/bill_chambers/.virtualenvs/plotly-notebook/lib/python2.7/site-packages/plotly/plotly/plotly.py:187: UserWarning:

Woah there! Look at all those points! Due to browser limitations, Plotly has a hard time graphing more than 500k data points for line charts, or 40k points for other types of charts. Here are some suggestions:
(1) Trying using the image API to return an image instead of a graph URL
(2) Use matplotlib
(3) See if you can create your visualization with fewer data points

If the visualization you're using aggregates points (e.g., box plot, histogram, etc.) you can disregard this warning.

Out[11]:

That was simple and we can see that plotly was able to handle the data without issue. We can see that big uptick in rides that last less than ~30 minutes (2000 seconds) - so let's look at that distribution.

In [12]:
df3 = sqlCtx.sql("SELECT Duration as d1 from bay_area_bike where Duration < 2000")

A great thing about Apache Spark is that you can sample easily from large datasets, you just set the amount you would like to sample and you're all set.

In [13]:
s1 = df2.sample(False, 0.05, 20)
s2 = df3.sample(False, 0.05, 2500)
In [14]:
data = Data([
        Histogram(x=s1.toPandas()['d1'], name="Large Sample"),
        Histogram(x=s2.toPandas()['d1'], name="Small Sample")
    ])

Plotly converts those samples into beautifully overlayed histograms. This is a great way to eyeball different distributions.

In [15]:
py.iplot(data, filename="spark/sample_rides")
Out[15]:

What's really powerful about Plotly is sharing this data is simple. I can take the above graph and change the styling or bins visually. A common workflow is to make a rough sketch of the graph in code, then make a more refined version with notes to share with management like the one below. Plotly's online interface allows you to edit graphs in other languages as well.

In [16]:
import plotly.tools as tls
tls.embed("https://plot.ly/~bill_chambers/101")
Out[16]:

Now let's check out bike rentals from individual stations. We can do a groupby with Spark DataFrames just as we might in Pandas. We've also seen at this point how easy it is to convert a Spark DataFrame to a pandas DataFrame.

In [17]:
dep_stations = btd.groupBy(btd['Start Station']).count().toPandas().sort('count', ascending=False)
dep_stations
Out[17]:
Start Station count
34 San Francisco Caltrain (Townsend at 4th) 9838
47 Harry Bridges Plaza (Ferry Building) 7343
0 Embarcadero at Sansome 6545
52 Market at Sansome 5922
62 Temporary Transbay Terminal (Howard at Beale) 5113
32 Market at 4th 5030
66 2nd at Townsend 4987
61 San Francisco Caltrain 2 (330 Townsend) 4976
25 Steuart at Market 4913
21 Townsend at 7th 4493
44 2nd at South Park 4458
57 Grant Avenue at Columbus Avenue 4004
38 Powell Street BART 3836
54 2nd at Folsom 3776
27 South Van Ness at Market 3521
49 Market at 10th 3511
67 Embarcadero at Bryant 3497
4 Spear at Folsom 3423
5 Howard at 2nd 3263
10 Civic Center BART (7th at Market) 3074
18 Beale at Market 3057
23 Embarcadero at Folsom 2931
59 Mechanics Plaza (Market at Battery) 2868
9 Commercial at Montgomery 2834
37 Powell at Post (Union Square) 2824
24 Embarcadero at Vallejo 2785
2 5th at Howard 2635
16 Post at Kearney 2503
45 Yerba Buena Center of the Arts (3rd @ Howard) 2487
36 Clay at Battery 2419
... ... ...
40 San Pedro Square 715
31 Mountain View City Hall 630
51 San Salvador at 1st 597
35 MLK Library 528
63 Japantown 496
60 SJSU - San Salvador at 9th 489
28 University and Emerson 434
30 Palo Alto Caltrain Station 431
15 SJSU 4th at San Carlos 389
53 Redwood City Caltrain Station 378
42 St James Park 366
26 Cowper at University 355
55 San Jose Civic Center 346
3 Arena Green / SAP Center 339
65 Adobe on Almaden 335
14 California Ave Caltrain Station 297
58 Rengstorff Avenue / California Street 248
41 San Antonio Caltrain Station 238
29 Evelyn Park and Ride 218
56 Broadway St at Battery St 201
11 Park at Olive 189
12 Castro Street and El Camino Real 132
20 Redwood City Medical Center 123
22 San Antonio Shopping Center 108
50 San Mateo County Center 101
1 Franklin at Maple 99
19 Broadway at Main 45
33 Redwood City Public Library 44
7 San Jose Government Center 23
48 Mezes Park 3

69 rows × 2 columns

Now that we've got a better sense of which stations might be interesting to look at, let's graph out, the number of trips leaving from the top two stations over time.

In [18]:
dep_stations['Start Station'][:3] # top 3 stations
Out[18]:
34    San Francisco Caltrain (Townsend at 4th)
47        Harry Bridges Plaza (Ferry Building)
0                       Embarcadero at Sansome
Name: Start Station, dtype: object

we'll add a handy function to help us convert all of these into appropriate count data. We're just using pandas resampling function to turn this into day count data.

In [19]:
def transform_df(df):
    df['counts'] = 1
    df['Start Date'] = df['Start Date'].apply(pd.to_datetime)
    return df.set_index('Start Date').resample('D', how='sum')
In [20]:
pop_stations = [] # being popular stations - we could easily extend this to more stations
for station in dep_stations['Start Station'][:3]:
    temp = transform_df(btd.where(btd['Start Station'] == station).select("Start Date").toPandas())
    pop_stations.append(
        Scatter(
        x=temp.index,
        y=temp.counts,
        name=station
        )
    )
In [21]:
data = Data(pop_stations)
py.iplot(data, filename="spark/over_time")
Out[21]:

Interestingly we can see similar patterns for the Embarcadero and Ferry Buildings. We also get a consistent break between work weeks and work days. There also seems to be an interesting pattern between fall and winter usage for the downtown stations that doesn't seem to affect the Caltrain station.

You can learn more about Plotly Enterprise and collaboration tools with the links below: