Monday 2 November 2015

Understanding fold() and aggregate() functions in Apache Spark

Note: I was trying to understand fold() and aggregate() function in Apache Spark, it took for a while to understand what are this two functions are really doing and how are they executing stuff. Manually traced, how this two functions are getting evaluated with couple of scenarios, thought of sharing this hard earned experience, with fellow folks.
 

An extract from "Learning Spark"

"Similar to reduce() is fold(), which also takes a function with the same signature as needed for reduce(), but in addition takes a "Zero Value" to be used for the initial call on each partition. The Zero value you should be the identity element for your operation; that is applying it multiple times your function should not change the value ( e.g. 0 for +, 1 for *, or an empty list for concatenation )"

Let's understand this with some examples

sc.parallelize ( [1,2,3,4] )
print nums.fold ( 0 , lambda a,b : a+b )# Which evaluates to 10 

First parameter to fold function is Zero Value, what gets used in the function that follows. Here the Zero Value  is going to get used in function addition(+), so Zero Value is 0.

So Why it evaluates to 10 ?

nums.getNumPartitions() # Which evaluates to 2

As we can see, from above our RDD distributed in two partitions, say [1,2] in partition1 and [3,4] are in partition2. Addition(+) function is going to get evaluate with each partition data independently with initial value of 0. Now accumulator1 has a value of 3 ( 0 + 1 + 2 ) and accumulator2 has a value of 7 ( 0 + 3 + 4). Now this two accumulators are going to evaluates with addition with initial value of 0, that's get evaluated to 10 ( 0 + 3 + 7 ).

Now let's try with some other non "Zero Value"

print nums.fold ( 5 , lambda a,b : a+b ) # Which evaluates to 25
 

So Why it evaluates to 25 ?

As we already now that our RDD is distributed in two partitions, [1,2] in partition1 and [3,4] are in partition2. Addition(+) function is going to get evaluate with each partition data independently with initial value of 5. Now accumulator1 has a value of 8 ( 5 + 1 + 2) and accumulator2 has a value of 12 ( 5 + 3 + 4). Now this two accumulators are going to evaluates with addition(+) with initial value of 5, that's get evaluated to 25 ( 5 + 8 + 12 ).

Let's try with multiplication ( * )

print nums.fold ( 1 , lambda a,b : a*b ) # which evaluates to 24

So Why it evaluates to 24 ?

multiplication(*) function is going to get evaluate with each partition data independently with initial value of 1. Now accumulator1 has a value of 2 ( 1 * 1 * 2) and accumulator2 has a value of 12 ( 1 * 3 * 4). Now this two accumulators are going to evaluates with multiplication(*) with an initial value of 1, that's get evaluated to 24 ( 1 * 2 * 12 ).

Question : So what is this evalutes to  nums.fold (5 , lambda a,b : a * b )


Caution: As mentioned previously, "The Zero value should be the identity element for your operation; that is applying it multiple times your function should not change the value ( e.g. 0 for +, 1 for *, or an empty list for concatenation )", just for understanding, used non identity values in above scenarios. 

This article is getting bigger than expected, let's get to know about aggregate() function in following article.


Thanks & Regards
Viswanath G.
Senior Data Scientist
    





 

 

Sunday 1 November 2015

Four Short Links

  1. Beginners guide: Apache Spark Machine Learning Scenario with A Large Input DataSet
  2. How to find Simple and Interesting Multi-Gigabytes Dataset
  3. Supervised Term Weighting Schemes for Text Classification
  4. Apache Zeppelin Overview
    • Zeppelin is focusing on providing analytical environment on top of Hadoop eco-system.  Zeppelin is a analytical tool that supports multi language backand. Directly display data with many different charts. Parameterized query support.  Screen share through WebSocket