grouping – Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey

grouping – Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey

groupByKey:

Syntax:

sparkContext.textFile(hdfs://)
                    .flatMap(line => line.split( ) )
                    .map(word => (word,1))
                    .groupByKey()
                    .map((x,y) => (x,sum(y)))
            

groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers.

reduceByKey:

Syntax:

sparkContext.textFile(hdfs://)
                    .flatMap(line => line.split( ))
                    .map(word => (word,1))
                    .reduceByKey((x,y)=> (x+y))

Data are combined at each partition, with only one output for one key at each partition to send over the network. reduceByKey required combining all your values into another value with the exact same type.

aggregateByKey:

same as reduceByKey, which takes an initial value.

3 parameters as input

  1. initial value
  2. Combiner logic
  3. sequence op logic

Example:

val keysWithValuesList = Array(foo=A, foo=A, foo=A, foo=A, foo=B, bar=C, bar=D, bar=D)
    val data = sc.parallelize(keysWithValuesList)
    //Create key value pairs
    val kv = data.map(_.split(=)).map(v => (v(0), v(1))).cache()
    val initialCount = 0;
    val addToCounts = (n: Int, v: String) => n + 1
    val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
    val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

ouput:
Aggregate By Key sum Results
bar -> 3
foo -> 5

combineByKey:

3 parameters as input

  1. Initial value: unlike aggregateByKey, need not pass constant always, we can pass a function that will return a new value.
  2. merging function
  3. combine function

Example:

val result = rdd.combineByKey(
                        (v) => (v,1),
                        ( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
                        ( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2)) 
                        ).map( { case (k,v) => (k,v._1/v._2.toDouble) })
        result.collect.foreach(println)

reduceByKey,aggregateByKey,combineByKey preferred over groupByKey

Reference:
Avoid groupByKey

  • groupByKey() is just to group your dataset based on a key. It will result in data shuffling when RDD is not already partitioned.
  • reduceByKey() is something like grouping + aggregation. We can say reduceByKey() equivalent to dataset.group(…).reduce(…). It will shuffle less data unlike groupByKey().
  • aggregateByKey() is logically same as reduceByKey() but it lets you return result in different type. In another words, it lets you have an input as type x and aggregate result as type y. For example (1,2),(1,4) as input and (1,six) as output. It also takes zero-value that will be applied at the beginning of each key.

Note: One similarity is they all are wide operations.

grouping – Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey

While both reducebykey and groupbykey will produce the same answer, the
reduceByKey example works much better on a large dataset. Thats
because Spark knows it can combine output with a common key on each
partition before shuffling the data.

On the other hand, when calling groupByKey – all the key-value pairs
are shuffled around. This is a lot of unnessary data to being
transferred over the network.

for more detailed check this below link

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

Leave a Reply

Your email address will not be published.