Plot Data from Apache Spark in Python/v3

A tutorial showing how to plot Apache Spark DataFrames with Plotly


Note: this page is part of the documentation for version 3 of Plotly.py, which is not the most recent version.
See our Version 4 Migration Guide for information about how to upgrade.

New to Plotly?

Plotly's Python library is free and open source! Get started by downloading the client and reading the primer.
You can set up Plotly to work in online or offline mode, or in jupyter notebooks.
We also have a quick-reference cheatsheet (new!) to help you get started!

Version Check

Plotly's python package is updated frequently. Run pip install plotly --upgrade to use the latest version.

In [1]:
import plotly
plotly.__version__
Out[1]:
'2.0.1'

About Apache Spark

Apache Spark's meteoric rise has been incredible. It is one of the fastest growing open source projects and is a perfect fit for the graphing tools that Plotly provides. Plotly's ability to graph and share images from Spark DataFrames quickly and easily make it a great tool for any data scientist and Chart Studio Enterprise make it easy to securely host and share those Plotly graphs.

This notebook will go over the details of getting set up with IPython Notebooks for graphing Spark data with Plotly.

Create a Profile

First you'll have to create an ipython profile for pyspark, you can do this locally or you can do it on the cluster that you're running Spark.

Start off by creating a new ipython profile. (Spark should have ipython install but you may need to install ipython notebook yourself).

ipython profile create pyspark

Next you'll have to edit some configurations. Spark/Hadoop have plenty of ports that they open up so you'll have to change the below file to avoid any conflicts that might come up.

~/.ipython/profile_pyspark/ipython_notebook_config.py

If you're not running Spark locally, you'll have to add some other configurations. Cloudera's blog has a great post about some of the other things you can add, like passwords.

IPython's documentation also has some excellent recommendations for settings that you can find on the "Securing a Notebook Server" post on ipython.org.

You'll likely want to set a port, and an IP address to be able to access the notebook.

Next you'll need to set a couple of environmental variables. You can do this at the command line or you can set it up in your computer's/master node's bash_rc/bash_profile files.

export SPARK_HOME="$HOME/Downloads/spark-1.3.1"

Setup

Now we'll need to add a file to make sure that we boot up with the Spark Context. Basically when we start the IPython Notebook, we need to be bring in the Spark Context. We need to set up a startup script that runs everytime we start a notebook from this profile.

Setting startup scripts are actually extremely easy - you just put them in the IPython Notebook directory under the "startup" folder. You can learn more about IPython configurations on the IPython site.

We'll create a file called pyspark_setup.py

in it we'll put

import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)

# check if it exists
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

# check if it is a directory
if not os.path.isdir(spark_home):
    raise ValueError('SPARK_HOME environment variable is not a directory')

#check if we can find the python sub-directory
if not os.path.isdir(os.path.join(spark_home, 'python')):
    raise ValueError('SPARK_HOME directory does not contain python')

sys.path.insert(0, os.path.join(spark_home, 'python'))

#check if we can find the py4j zip file
if not os.path.exists(os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')):
    raise ValueError('Could not find the py4j library - \
            maybe your version number is different?(Looking for 0.8.2.1)')

sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

with open(os.path.join(spark_home, 'python/pyspark/shell.py')) as f:
    code = compile(f.read(), os.path.join(spark_home, 'python/pyspark/shell.py'), 'exec')
    exec(code)

And now we're all set! When we start up an ipython notebook, we'll have the Spark Context available in our IPython notebooks. This is one time set up! So now we're ready to run things normally! We just have to start a specific pyspark profile.

ipython notebook --profile=pyspark

We can test for the Spark Context's existence with print sc.

In [1]:
from __future__ import print_function #python 3 support
print(sc)
<pyspark.context.SparkContext object at 0x10e797950>

Spark Tools

Now that we've got the SparkContext, let's pull in some other useful Spark tools that we'll need. We'll be using pandas for some downstream analysis as well as Plotly for our graphing.

We'll also need the SQLContext to be able to do some nice Spark SQL transformations.

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

import plotly.plotly as py
import plotly.graph_objs as go
import pandas as pd
import requests
requests.packages.urllib3.disable_warnings()

The data we'll be working with is a sample of the open bike rental data. Essentially people can rent bikes and ride them from one station to another. This data provides that information. You can snag the sample I am using in JSON format here..

Now we can import it.

In [3]:
btd = sqlContext.jsonFile("btd2.json")

Now we can see that it's a DataFrame by printing its type.

In [4]:
print(type(btd))
<class 'pyspark.sql.dataframe.DataFrame'>

Now RDD is the base abstraction of Apache Spark, it's the Resilient Distributed Dataset. It is an immutable, partitioned collection of elements that can be operated on in a distributed manner. The DataFrame builds on that but is also immutable - meaning you've got to think in terms of transformations - not just manipulations.

Because we've got a json file, we've loaded it up as a DataFrame - a new introduction in Spark 1.3. The DataFrame interface which is similar to pandas style DataFrames except for that immutability described above.

We can print the schema easily, which gives us the layout of the data. Everything that I'm describing can be found in the Pyspark SQL documentation.

In [5]:
btd.printSchema()
root
 |-- Bike #: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: string (nullable = true)
 |-- Subscription Type: string (nullable = true)
 |-- Trip ID: string (nullable = true)
 |-- Zip Code: string (nullable = true)

We can grab a couple, to see what the layout looks like.

In [6]:
btd.take(3)
Out[6]:
[Row(Bike #=u'520', Duration=u'63', End Date=u'8/29/13 14:14', End Station=u'South Van Ness at Market', End Terminal=u'66', Start Date=u'8/29/13 14:13', Start Station=u'South Van Ness at Market', Start Terminal=u'66', Subscription Type=u'Subscriber', Trip ID=u'4576', Zip Code=u'94127'),
 Row(Bike #=u'661', Duration=u'70', End Date=u'8/29/13 14:43', End Station=u'San Jose City Hall', End Terminal=u'10', Start Date=u'8/29/13 14:42', Start Station=u'San Jose City Hall', Start Terminal=u'10', Subscription Type=u'Subscriber', Trip ID=u'4607', Zip Code=u'95138'),
 Row(Bike #=u'48', Duration=u'71', End Date=u'8/29/13 10:17', End Station=u'Mountain View City Hall', End Terminal=u'27', Start Date=u'8/29/13 10:16', Start Station=u'Mountain View City Hall', Start Terminal=u'27', Subscription Type=u'Subscriber', Trip ID=u'4130', Zip Code=u'97214')]

Now one thing I'd like to look at is the duration distribution - can we see how common certain ride times are?

To answer that we'll get the durations and the way we'll be doing it is through the Spark SQL Interface. To do so we'll register it as a table.

In [7]:
sqlCtx.registerDataFrameAsTable(btd, "bay_area_bike")

Now as you may have noted above, the durations are in seconds. Let's start off by looking at all rides under 2 hours.

In [8]:
60 * 60 * 2 # 2 hours in seconds
Out[8]:
7200
In [9]:
df2 = sqlCtx.sql("SELECT Duration as d1 from bay_area_bike where Duration < 7200")

We've created a new DataFrame from the transformation and query - now we're ready to plot it. One of the great things about plotly is that you can throw very large datasets at it and it will do just fine. It's certainly a much more scalable solution than matplotlib.

Below I create a histogram of the data.

In [10]:
data = [go.Histogram(x=df2.toPandas()['d1'])]
In [11]:
py.iplot(data, filename="spark/less_2_hour_rides")
/Users/bill_chambers/.virtualenvs/plotly-notebook/lib/python2.7/site-packages/plotly/plotly/plotly.py:187: UserWarning:

Woah there! Look at all those points! Due to browser limitations, Plotly has a hard time graphing more than 500k data points for line charts, or 40k points for other types of charts. Here are some suggestions:
(1) Trying using the image API to return an image instead of a graph URL
(2) Use matplotlib
(3) See if you can create your visualization with fewer data points

If the visualization you're using aggregates points (e.g., box plot, histogram, etc.) you can disregard this warning.

Out[11]:

That was simple and we can see that plotly was able to handle the data without issue. We can see that big uptick in rides that last less than ~30 minutes (2000 seconds) - so let's look at that distribution.

In [12]:
df3 = sqlCtx.sql("SELECT Duration as d1 from bay_area_bike where Duration < 2000")

A great thing about Apache Spark is that you can sample easily from large datasets, you just set the amount you would like to sample and you're all set. Plotly converts those samples into beautifully overlayed histograms. This is a great way to eyeball different distributions.

In [15]:
s1 = df2.sample(False, 0.05, 20)
s2 = df3.sample(False, 0.05, 2500)

data = [
        go.Histogram(x=s1.toPandas()['d1'], name="Large Sample"),
        go.Histogram(x=s2.toPandas()['d1'], name="Small Sample")
    ]

py.iplot(data, filename="spark/sample_rides")
Out[15]:

What's really powerful about Plotly is sharing this data is simple. I can take the above graph and change the styling or bins visually. A common workflow is to make a rough sketch of the graph in code, then make a more refined version with notes to share with management like the one below. Plotly's online interface allows you to edit graphs in other languages as well.

In [16]:
import plotly.tools as tls
tls.embed("https://plotly.com/~bill_chambers/101")
Out[16]:

Now let's check out bike rentals from individual stations. We can do a groupby with Spark DataFrames just as we might in Pandas. We've also seen at this point how easy it is to convert a Spark DataFrame to a pandas DataFrame.

In [18]:
dep_stations = btd.groupBy(btd['Start Station']).count().toPandas().sort('count', ascending=False)
dep_stations['Start Station'][:3] # top 3 stations
Out[18]:
34    San Francisco Caltrain (Townsend at 4th)
47        Harry Bridges Plaza (Ferry Building)
0                       Embarcadero at Sansome
Name: Start Station, dtype: object

we'll add a handy function to help us convert all of these into appropriate count data. We're just using pandas resampling function to turn this into day count data.

In [21]:
def transform_df(df):
    df['counts'] = 1
    df['Start Date'] = df['Start Date'].apply(pd.to_datetime)
    return df.set_index('Start Date').resample('D', how='sum')

pop_stations = [] # being popular stations - we could easily extend this to more stations
for station in dep_stations['Start Station'][:3]:
    temp = transform_df(btd.where(btd['Start Station'] == station).select("Start Date").toPandas())
    pop_stations.append(
        go.Scatter(
        x=temp.index,
        y=temp.counts,
        name=station
        )
    )

data = pop_stations
py.iplot(data, filename="spark/over_time")
Out[21]:

Interestingly we can see similar patterns for the Embarcadero and Ferry Buildings. We also get a consistent break between work weeks and work days. There also seems to be an interesting pattern between fall and winter usage for the downtown stations that doesn't seem to affect the Caltrain station.

References

You can learn more about Chart Studio Enterprise and collaboration tools with the links below: