Tuesday 1 December 2015

webinar notes "Apache Spark Release 1.6"

General notes :
What's coming in Apache Spark 1.6 :
  •  Key themes
    • Out of the box performance
    • Previews of key new APIs
  •  Two separate memory managers ( Spark 1.5 )
    • Execution memory : Computation of shuffles,
    • Storage memory
  • Goal : Allow memory regions to shrink / grow dynamically
  • Unified Memory Management in Spark 1.6
    • Can cross between execution and storage memory
      • Borrowing can happen from both sides
  • History of Spark API's
    • RDD API
      • Distributed collections of JVM objects
      • Functional operators (map, filter, etc.)
    • Data Frame API ( 2013 )
      • Distribute collection of Row Objects
      • Expression-based operations and UDFs
      • Logical Plans and optimizer
      • Fast / Efficient internal representations
    • DataSet API ( 2015 ) 
      • Internally Rows, externally JVM objects
      • "Best of both worlds : type safe + fast "
  • Encoder : Converts from JVM object into a DataSet Row
  • High Level APIs -> Data Frame ( & DataSet ) -> Tungsten Execution
  • SQL directly over files
    • select * from text.`fileName` where value != ''
  • Advanced JSON parsing
  • Better instrumentation for SQL operators
    • Tracking memory usage ( How much used on each machine )
  • Display the failed output op in streaming
  • Persist ML pipelines to
  • R-like statistics for GLMs
    • Provide R-like summary statistics
  • New algos added to MLlib
    • Bisecting K-Means
    • Online Hypothesis testing : A/B testing in Spark Streaming
    • Survival analysi
    • etc ..

Monday 2 November 2015

Understanding fold() and aggregate() functions in Apache Spark

Note: I was trying to understand fold() and aggregate() function in Apache Spark, it took for a while to understand what are this two functions are really doing and how are they executing stuff. Manually traced, how this two functions are getting evaluated with couple of scenarios, thought of sharing this hard earned experience, with fellow folks.
 

An extract from "Learning Spark"

"Similar to reduce() is fold(), which also takes a function with the same signature as needed for reduce(), but in addition takes a "Zero Value" to be used for the initial call on each partition. The Zero value you should be the identity element for your operation; that is applying it multiple times your function should not change the value ( e.g. 0 for +, 1 for *, or an empty list for concatenation )"

Let's understand this with some examples

sc.parallelize ( [1,2,3,4] )
print nums.fold ( 0 , lambda a,b : a+b )# Which evaluates to 10 

First parameter to fold function is Zero Value, what gets used in the function that follows. Here the Zero Value  is going to get used in function addition(+), so Zero Value is 0.

So Why it evaluates to 10 ?

nums.getNumPartitions() # Which evaluates to 2

As we can see, from above our RDD distributed in two partitions, say [1,2] in partition1 and [3,4] are in partition2. Addition(+) function is going to get evaluate with each partition data independently with initial value of 0. Now accumulator1 has a value of 3 ( 0 + 1 + 2 ) and accumulator2 has a value of 7 ( 0 + 3 + 4). Now this two accumulators are going to evaluates with addition with initial value of 0, that's get evaluated to 10 ( 0 + 3 + 7 ).

Now let's try with some other non "Zero Value"

print nums.fold ( 5 , lambda a,b : a+b ) # Which evaluates to 25
 

So Why it evaluates to 25 ?

As we already now that our RDD is distributed in two partitions, [1,2] in partition1 and [3,4] are in partition2. Addition(+) function is going to get evaluate with each partition data independently with initial value of 5. Now accumulator1 has a value of 8 ( 5 + 1 + 2) and accumulator2 has a value of 12 ( 5 + 3 + 4). Now this two accumulators are going to evaluates with addition(+) with initial value of 5, that's get evaluated to 25 ( 5 + 8 + 12 ).

Let's try with multiplication ( * )

print nums.fold ( 1 , lambda a,b : a*b ) # which evaluates to 24

So Why it evaluates to 24 ?

multiplication(*) function is going to get evaluate with each partition data independently with initial value of 1. Now accumulator1 has a value of 2 ( 1 * 1 * 2) and accumulator2 has a value of 12 ( 1 * 3 * 4). Now this two accumulators are going to evaluates with multiplication(*) with an initial value of 1, that's get evaluated to 24 ( 1 * 2 * 12 ).

Question : So what is this evalutes to  nums.fold (5 , lambda a,b : a * b )


Caution: As mentioned previously, "The Zero value should be the identity element for your operation; that is applying it multiple times your function should not change the value ( e.g. 0 for +, 1 for *, or an empty list for concatenation )", just for understanding, used non identity values in above scenarios. 

This article is getting bigger than expected, let's get to know about aggregate() function in following article.


Thanks & Regards
Viswanath G.
Senior Data Scientist
    





 

 

Sunday 1 November 2015

Four Short Links

  1. Beginners guide: Apache Spark Machine Learning Scenario with A Large Input DataSet
  2. How to find Simple and Interesting Multi-Gigabytes Dataset
  3. Supervised Term Weighting Schemes for Text Classification
  4. Apache Zeppelin Overview
    • Zeppelin is focusing on providing analytical environment on top of Hadoop eco-system.  Zeppelin is a analytical tool that supports multi language backand. Directly display data with many different charts. Parameterized query support.  Screen share through WebSocket





Saturday 31 October 2015

Standalone Apache Spark application

The best way to learn Apache Spark, is through various interactive shells that it supports. As of now,  Apache Spark supports following interactive shells Python/IPython, Scala and R. 

But the best way to deploy Spark programs in production systems is through Standalone applications. The main difference from using  it in the shell is that we need to initialize your own SparkContext(where as interactive shells gives one to us).

Note: SparkContext represents  a connection to a computing cluster.

Python standalone application

In Python, Spark standalone applications are simple Python scripts, but need to run this scripts through bin/spark-submit script which included in Spark installation. The spark-submit includes the Spark dependencies for us in Python. This script sets up the environment for Spark's Python API to function.

"HelloWorld.py" :


from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext ( conf = conf )
lines = sc.textFile ("/home/viswanath/spark/README.md")
lines.count()

To run the script
spark-submit HelloWorld.py




Finally, to shut down Spark, we can either call the stop() method on SparkContext, or simply exit the application ( e.g. with System.exit(0) or sys.exit()).

Friday 30 October 2015

How to configure IPython Notebook server with Apache Spark

IPython provides a rich architecture for interactive computing with a powerful interactive shell, a kernel for Jupyter Sand others.

Apache Spark  is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.


Steps for setting up IPython notebook server with Apache Spark:

  • Install Spark
  • Create a PySpark profile for IPython
  • WordCount example

Spark installation:

Installing Spark is pretty much a straight forward task, get the latest version from here, to make things simple, download pre-built version to your working directory, at the time of writing this article it is version 1.5.1(spark-1.5.1-bin-hadoop2.6.tgz).  Extract compressed file(.tgz) with tar -xvzf spark-1.5.1-bin-hadoop2.6.tgz. That's it,  now you have working Spark installation on your machine.

Some configurations :
To easily adapt  for different versions of Spark, let's create symbolic link for Spark installation directory
ln -s spark-1.5.1-bin-hadoop2.6 spark 

Now let's set up some environmental variables, 

export JAVA_HOME=/usr
export SPARK_HOME=/home/viswanath/spark
export PATH=$SPARK_HOME/bin:$PATH

# Where you specify options you would normally add after bin/pyspark
export PYSPARK_SUBMIT_ARGS="--master local[4]"
Note: In the case of Linux, you have to add above variables to .bashrc file

Create a PySpark profile for IPython:

Let's create IPython profile for PySpark
ipython profile create pyspark

Above command will create pyspark direcory in ~/.ipython/profile_pyspark.

Now open ipython_notebook_config.py file in profile_pyspark directory, add following lines to the file and save 


# Kernel config

c.IPKernelApp.pylab = 'inline'  # if you want plotting support always


# Notebook config
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
# It is a good idea to put it on a known, fixed port
c.NotebookApp.port = 9999


Now create a file in "00-pyspark-setup.py" in startup (~/.ipython/profile_pyspark/startup) directory and add the following code snippet to it

# Configure the necessary Spark environment
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)

    pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
    if not "pyspark-shell" in pyspark_submit_args: pyspark_submit_args += " pyspark-shell"
    os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

sys.path.insert(0, spark_home + "/python")

# Add the py4j to the path.
# You may need to change the version number to match your install
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

# Initialize PySpark to predefine the SparkContext variable 'sc'
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))



Now you are ready to launch your IPython notebook server with PySpark support

ipython notebook --profile=pyspark

Here goes, your IPython notebook http://server_ip:9999/


WordCount example :


inputRDD = sc.textFile ( "~/spark/README.md" )

inputRDD.flatMap( lambda line : line.split( ' ')).map ( lambda word : (word,1)).reduceByKey ( lambda a,b : a+b).collect()


Tip: To keep alive your notebook server after you left the current command line interaction, use screen.


Thanks & Regards
Viswanath G.
Senior Data Scientist