Apache Spark's meteoric rise has been incredible. It is one of the fastest growing open source projects and is a perfect fit for the graphing tools that Plotly provides. Plotly's ability to graph and share images from Spark DataFrames quickly and easily make it a great tool for any data scientist and Plotly Enterprise make it easy to securely host and share those Plotly graphs.
This notebook will go over the details of getting set up with IPython Notebooks for graphing Spark data with Plotly.
First you'll have to create an ipython profile for pyspark, you can do this locally or you can do it on the cluster that you're running Spark.
Start off by creating a new ipython profile. (Spark should have ipython install but you may need to install ipython notebook yourself).
ipython profile create pyspark
Next you'll have to edit some configurations. Spark/Hadoop have plenty of ports that they open up so you'll have to change the below file to avoid any conflicts that might come up.
If you're not running Spark locally, you'll have to add some other configurations. Cloudera's blog has a great post about some of the other things you can add, like passwords.
IPython's documentation also has some excellent recommendations for settings that you can find on the "Securing a Notebook Server" post on ipython.org.
You'll likely want to set a port, and an IP address to be able to access the notebook.
Next you'll need to set a couple of environmental variables. You can do this at the command line or you can set it up in your computer's/master node's bash_rc/bash_profile files.
Now we'll need to add a file to make sure that we boot up with the Spark Context. Basically when we start the IPython Notebook, we need to be bring in the Spark Context. We need to set up a startup script that runs everytime we start a notebook from this profile.
Setting startup scripts are actually extremely easy - you just put them in the IPython Notebook directory under the "startup" folder. You can learn more about IPython configurations on the IPython site.
We'll create a file called
in it we'll put
import os import sys spark_home = os.environ.get('SPARK_HOME', None) # check if it exists if not spark_home: raise ValueError('SPARK_HOME environment variable is not set') # check if it is a directory if not os.path.isdir(spark_home): raise ValueError('SPARK_HOME environment variable is not a directory') #check if we can find the python sub-directory if not os.path.isdir(os.path.join(spark_home, 'python')): raise ValueError('SPARK_HOME directory does not contain python') sys.path.insert(0, os.path.join(spark_home, 'python')) #check if we can find the py4j zip file if not os.path.exists(os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')): raise ValueError('Could not find the py4j library - \ maybe your version number is different?(Looking for 0.8.2.1)') sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')) with open(os.path.join(spark_home, 'python/pyspark/shell.py')) as f: code = compile(f.read(), os.path.join(spark_home, 'python/pyspark/shell.py'), 'exec') exec(code)
And now we're all set! When we start up an ipython notebook, we'll have the Spark Context available in our IPython notebooks. This is one time set up! So now we're ready to run things normally! We just have to start a specific pyspark profile.
ipython notebook --profile=pyspark
We can test for the Spark Context's existence with
from __future__ import print_function #python 3 support print(sc)
<pyspark.context.SparkContext object at 0x10e797950>
Now that we've got the SparkContext, let's pull in some other useful Spark tools that we'll need. We'll be using pandas for some downstream analysis as well as Plotly for our graphing.
We'll also need the SQLContext to be able to do some nice Spark SQL transformations.
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) import plotly.plotly as py from plotly.graph_objs import * import pandas as pd import requests requests.packages.urllib3.disable_warnings()
The data we'll be working with is a sample of the open bike rental data. Essentially people can rent bikes and ride them from one station to another. This data provides that information. You can snag the sample I am using in JSON format here..
Now we can import it.
btd = sqlContext.jsonFile("btd2.json")
Now we can see that it's a DataFrame by printing its type.
Now RDD is the base abstraction of Apache Spark, it's the Resilient Distributed Dataset. It is an immutable, partitioned collection of elements that can be operated on in a distributed manner. The DataFrame builds on that but is also immutable - meaning you've got to think in terms of transformations - not just manipulations.
Because we've got a json file, we've loaded it up as a DataFrame - a new introduction in Spark 1.3. The DataFrame interface which is similar to pandas style DataFrames except for that immutability described above.
We can print the schema easily, which gives us the layout of the data. Everything that I'm describing can be found in the Pyspark SQL documentation.
root |-- Bike #: string (nullable = true) |-- Duration: string (nullable = true) |-- End Date: string (nullable = true) |-- End Station: string (nullable = true) |-- End Terminal: string (nullable = true) |-- Start Date: string (nullable = true) |-- Start Station: string (nullable = true) |-- Start Terminal: string (nullable = true) |-- Subscription Type: string (nullable = true) |-- Trip ID: string (nullable = true) |-- Zip Code: string (nullable = true)
We can grab a couple, to see what the layout looks like.
[Row(Bike #=u'520', Duration=u'63', End Date=u'8/29/13 14:14', End Station=u'South Van Ness at Market', End Terminal=u'66', Start Date=u'8/29/13 14:13', Start Station=u'South Van Ness at Market', Start Terminal=u'66', Subscription Type=u'Subscriber', Trip ID=u'4576', Zip Code=u'94127'), Row(Bike #=u'661', Duration=u'70', End Date=u'8/29/13 14:43', End Station=u'San Jose City Hall', End Terminal=u'10', Start Date=u'8/29/13 14:42', Start Station=u'San Jose City Hall', Start Terminal=u'10', Subscription Type=u'Subscriber', Trip ID=u'4607', Zip Code=u'95138'), Row(Bike #=u'48', Duration=u'71', End Date=u'8/29/13 10:17', End Station=u'Mountain View City Hall', End Terminal=u'27', Start Date=u'8/29/13 10:16', Start Station=u'Mountain View City Hall', Start Terminal=u'27', Subscription Type=u'Subscriber', Trip ID=u'4130', Zip Code=u'97214')]
Now one thing I'd like to look at is the duration distribution - can we see how common certain ride times are?
To answer that we'll get the durations and the way we'll be doing it is through the Spark SQL Interface. To do so we'll register it as a table.
Now as you may have noted above, the durations are in seconds. Let's start off by looking at all rides under 2 hours.
60 * 60 * 2 # 2 hours in seconds
df2 = sqlCtx.sql("SELECT Duration as d1 from bay_area_bike where Duration < 7200")
We've created a new DataFrame from the transformation and query - now we're ready to plot it. One of the great things about plotly is that you can throw very large datasets at it and it will do just fine. It's certainly a much more scalable solution than matplotlib.
Below I create a histogram of the data.
data = Data([Histogram(x=df2.toPandas()['d1'])])
/Users/bill_chambers/.virtualenvs/plotly-notebook/lib/python2.7/site-packages/plotly/plotly/plotly.py:187: UserWarning: Woah there! Look at all those points! Due to browser limitations, Plotly has a hard time graphing more than 500k data points for line charts, or 40k points for other types of charts. Here are some suggestions: (1) Trying using the image API to return an image instead of a graph URL (2) Use matplotlib (3) See if you can create your visualization with fewer data points If the visualization you're using aggregates points (e.g., box plot, histogram, etc.) you can disregard this warning.
That was simple and we can see that plotly was able to handle the data without issue. We can see that big uptick in rides that last less than ~30 minutes (2000 seconds) - so let's look at that distribution.
df3 = sqlCtx.sql("SELECT Duration as d1 from bay_area_bike where Duration < 2000")
A great thing about Apache Spark is that you can sample easily from large datasets, you just set the amount you would like to sample and you're all set.
s1 = df2.sample(False, 0.05, 20) s2 = df3.sample(False, 0.05, 2500)
data = Data([ Histogram(x=s1.toPandas()['d1'], name="Large Sample"), Histogram(x=s2.toPandas()['d1'], name="Small Sample") ])
Plotly converts those samples into beautifully overlayed histograms. This is a great way to eyeball different distributions.
What's really powerful about Plotly is sharing this data is simple. I can take the above graph and change the styling or bins visually. A common workflow is to make a rough sketch of the graph in code, then make a more refined version with notes to share with management like the one below. Plotly's online interface allows you to edit graphs in other languages as well.
import plotly.tools as tls tls.embed("https://plot.ly/~bill_chambers/101")
Now let's check out bike rentals from individual stations. We can do a groupby with Spark DataFrames just as we might in Pandas. We've also seen at this point how easy it is to convert a Spark DataFrame to a pandas DataFrame.
dep_stations = btd.groupBy(btd['Start Station']).count().toPandas().sort('count', ascending=False) dep_stations
|34||San Francisco Caltrain (Townsend at 4th)||9838|
|47||Harry Bridges Plaza (Ferry Building)||7343|
|0||Embarcadero at Sansome||6545|
|52||Market at Sansome||5922|
|62||Temporary Transbay Terminal (Howard at Beale)||5113|
|32||Market at 4th||5030|
|66||2nd at Townsend||4987|
|61||San Francisco Caltrain 2 (330 Townsend)||4976|
|25||Steuart at Market||4913|
|21||Townsend at 7th||4493|
|44||2nd at South Park||4458|
|57||Grant Avenue at Columbus Avenue||4004|
|38||Powell Street BART||3836|
|54||2nd at Folsom||3776|
|27||South Van Ness at Market||3521|
|49||Market at 10th||3511|
|67||Embarcadero at Bryant||3497|
|4||Spear at Folsom||3423|
|5||Howard at 2nd||3263|
|10||Civic Center BART (7th at Market)||3074|
|18||Beale at Market||3057|
|23||Embarcadero at Folsom||2931|
|59||Mechanics Plaza (Market at Battery)||2868|
|9||Commercial at Montgomery||2834|
|37||Powell at Post (Union Square)||2824|
|24||Embarcadero at Vallejo||2785|
|2||5th at Howard||2635|
|16||Post at Kearney||2503|
|45||Yerba Buena Center of the Arts (3rd @ Howard)||2487|
|36||Clay at Battery||2419|
|40||San Pedro Square||715|
|31||Mountain View City Hall||630|
|51||San Salvador at 1st||597|
|60||SJSU - San Salvador at 9th||489|
|28||University and Emerson||434|
|30||Palo Alto Caltrain Station||431|
|15||SJSU 4th at San Carlos||389|
|53||Redwood City Caltrain Station||378|
|42||St James Park||366|
|26||Cowper at University||355|
|55||San Jose Civic Center||346|
|3||Arena Green / SAP Center||339|
|65||Adobe on Almaden||335|
|14||California Ave Caltrain Station||297|
|58||Rengstorff Avenue / California Street||248|
|41||San Antonio Caltrain Station||238|
|29||Evelyn Park and Ride||218|
|56||Broadway St at Battery St||201|
|11||Park at Olive||189|
|12||Castro Street and El Camino Real||132|
|20||Redwood City Medical Center||123|
|22||San Antonio Shopping Center||108|
|50||San Mateo County Center||101|
|1||Franklin at Maple||99|
|19||Broadway at Main||45|
|33||Redwood City Public Library||44|
|7||San Jose Government Center||23|
69 rows × 2 columns
Now that we've got a better sense of which stations might be interesting to look at, let's graph out, the number of trips leaving from the top two stations over time.
dep_stations['Start Station'][:3] # top 3 stations
34 San Francisco Caltrain (Townsend at 4th) 47 Harry Bridges Plaza (Ferry Building) 0 Embarcadero at Sansome Name: Start Station, dtype: object
we'll add a handy function to help us convert all of these into appropriate count data. We're just using pandas resampling function to turn this into day count data.
def transform_df(df): df['counts'] = 1 df['Start Date'] = df['Start Date'].apply(pd.to_datetime) return df.set_index('Start Date').resample('D', how='sum')
pop_stations =  # being popular stations - we could easily extend this to more stations for station in dep_stations['Start Station'][:3]: temp = transform_df(btd.where(btd['Start Station'] == station).select("Start Date").toPandas()) pop_stations.append( Scatter( x=temp.index, y=temp.counts, name=station ) )
data = Data(pop_stations) py.iplot(data, filename="spark/over_time")
Interestingly we can see similar patterns for the Embarcadero and Ferry Buildings. We also get a consistent break between work weeks and work days. There also seems to be an interesting pattern between fall and winter usage for the downtown stations that doesn't seem to affect the Caltrain station.