Explain the aggregate functionality in Spark (with Python and Scala)
Explain the aggregate functionality in Spark (with Python and Scala)
_{I wasnt fully convinced from the accepted answer, and JohnKnights answer helped, so heres my point of view:}
First, lets explain aggregate() in my own words:
Prototype:
aggregate(zeroValue, seqOp, combOp)
Description:
aggregate()
lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.
Parameters:
zeroValue
: The initialization value, for your result, in the desired
format.seqOp
: The operation you want to apply to RDD records. Runs once for
every record in a partition.combOp
: Defines how the resulted objects (one for every partition),
gets combined.
Example:
Compute the sum of a list and the length of that list. Return the result in a pair of
(sum, length)
.
In a Spark shell, I first created a list with 4 elements, with 2 partitions:
listRDD = sc.parallelize([1,2,3,4], 2)
then I defined my seqOp:
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
and my combOp:
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
and then I aggregated:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
As you can see, I gave descriptive names to my variables, but let me explain it further:
The first partition has the sublist [1, 2]. We will apply the seqOp to each element of that list and this will produce a local result, a pair of (sum, length)
, that will reflect the result locally, only in that first partition.
So, lets start: local_result
gets initialized to the zeroValue
parameter we provided the aggregate()
with, i.e. (0, 0) and list_element
is the first element of the list, i.e. 1. As a result this is what happens:
0 + 1 = 1
0 + 1 = 1
Now, the local result is (1, 1), that means, that so far, for the 1st partition, after processing only the first element, the sum is 1 and the length 1. Notice, that local_result
gets updated from (0, 0), to (1, 1).
1 + 2 = 3
1 + 1 = 2
and now the local result is (3, 2), which will be the final result from the 1st partition, since they are no other elements in the sublist of the 1st partition.
Doing the same for 2nd partition, we get (7, 2).
Now we apply the combOp to each local result, so that we can form, the final, global result, like this: (3,2) + (7,2) = (10, 4)
Example described in figure:
(0, 0) < zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
 
v v
(3, 2) (7, 2)
/
/
/
/
/
/

 combOp 


v
(10, 4)
Inspired by this great example.
So now if the zeroValue
is not (0, 0), but (1, 0), one would expect to get (8 + 4, 2 + 2) = (12, 4), which doesnt explain what you experience. Even if we alter the number of partitions of my example, I wont be able to get that again.
The key here is JohnKnights answer, which state that the zeroValue
is not only analogous to the number of partitions, but may be applied more times than you expect.
Explanation using Scala
Aggregate lets you transform and combine the values of the RDD at will.
It uses two functions:
The first one transforms and adds the elements of the original collection [T] in a local aggregate [U] and takes the form: (U,T) => U. You can see it as a fold and therefore it also requires a zero for that operation. This operation is applied locally to each partition in parallel.
Here is where the key of the question lies: The only value that should be used here is the ZERO value for the reduction operation.
This operation is executed locally on each partition, therefore, adding anything to that zero value will add to the result multiplied by the number of partitions of the RDD.
The second operation takes 2 values of the result type of the previous operation [U] and combines it in to one value. This operation will reduce the partial results of each partition and produce the actual total.
For example:
Given an RDD of Strings:
val rdd:RDD[String] = ???
Lets say you want to the aggregate of the length of the strings in that RDD, so you would do:

The first operation will transform strings into size (int) and accumulate the values for size.
val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`

provide the ZERO for the addition operation (0)
val ZERO = 0

an operation to add two integers together:
val add: (Int, Int) => Int = _ + _
Putting it all together:
rdd.aggregate(ZERO, stringSizeCummulator, add)
with Spark 2.4 and higher version
rdd.aggregate(ZERO)(stringAccumulator,add)
So, why is the ZERO needed?
When the cummulator function is applied to the first element of a partition, theres no running total. ZERO is used here.
Eg. My RDD is:
 Partition 1: [Jump, over]
 Partition 2: [the, wall]
This will result:
P1:
 stringSizeCummulator(ZERO, Jump) = 4
 stringSizeCummulator(4, over) = 8
P2:
 stringSizeCummulator(ZERO, the) = 3
 stringSizeCummulator(3, wall) = 7
Reduce: add(P1, P2) = 15
Explain the aggregate functionality in Spark (with Python and Scala)
I dont have enough reputation points to comment on the previous answer by Maasg.
Actually the zero value should be neutral towards the seqop, meaning it wouldnt interfere with the seqop result, like 0 towards add, or 1 towards *;
You should NEVER try with nonneutral values as it might be applied arbitrary times.
This behavior is not only tied to num of partitions.
I tried the same experiment as stated in the question.
with 1 partition, the zero value was applied 3 times.
with 2 partitions, 6 times.
with 3 partitions, 9 times and this will go on.