06 December 2015

cogroup() Example

Example

Multiple Pair RDDs can be combined using cogroup()

scala> val rdd1 = sc.parallelize(Seq(
     |                ("key1", 1),
     |                ("key2", 2),
     |                ("key1", 3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at :21

scala> val rdd2 = sc.parallelize(Seq(
     |                ("key1", 5),
     |                ("key2", 4)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at :21

//cogroup() Example
scala> val grouped = rdd1.cogroup(rdd2)
grouped: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[28] at cogroup at :25

//Result
scala> grouped.collect()
res10: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key1,(CompactBuffer(1, 3),CompactBuffer(5))), (key2,(CompactBuffer(2),CompactBuffer(4))))

// Iterate through each value in key
// and increment the value by '1'
scala> val updated = grouped.map{x =>
     |    {
     |       val key = x._1
     |       //println("Key -> " + key)
     |       val value = x._2
     |       val itl1 = value._1
     |       val itl2 = value._2
     |       val res1 = itl1.map{ x =>
     |          {
     |             //println("It1 : Key -> " + key + ", Val -> " + (x + 1))
     |             x + 1
     |          }
     |       }
     |       val res2 = itl2.map{ x =>
     |          {
     |             //println("It2 : Key -> " + key + ", Val -> " + (x + 1))
     |             x + 1
     |          }
     |       }
     |       //println("End")
     |       (key,  (res1, res2))
     |    }
     | }
updated: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[18] at map at :33

scala> updated.collect()
res17: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((key1,(List(2, 4),List(6))), (key2,(List(3),List(5))))