pyspark combineByKey explained

This post explains how to use the function “combineByKey”, as the pyspark document does not seem very clear on this.

When to use combineByKey?

  • You have an RDD of (key, value) pairs – a paired RDD
    • In Python, such an RDD is constructed by creating elements of tuples
  • You want to aggregate the values based on Key

Why use combineByKey, instead of aggregateByKey or groupByKey, or reduceByKey?

  • groupByKey is computationally expensive. I once had a document of 50000+ elements, and the sheer computational resource made my program un-runnable.
  • aggregateByKey – the docs are not very clear. I did not find resource on how to use them.
  • reduceByKey — is powered by combineByKey.  so the latter is the more generic one.

How to use combineByKey?

The document goes like this:

# create an RDD
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
#
def to_list(a):
return [a]

def append(a, b):
a.append(b)
return a

def extend(a, b):
a.extend(b)
return a

sorted(x.combineByKey(to_list, append, extend).collect())

>>> [('a', [1, 2]), ('b', [1])]

Here is how the document goes for combineByKey:

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=)
Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

Users provide three functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

In addition, users can control the partitioning of the output RDD.

What does that mean? Basically the combineByKey functions takes three arugments, each is a function – or an operation – on the elements of an RDD.

In this example, the keys are ‘a’ and ‘b’. The values are ints. The task is to combine the ints to a list, for each key.

  1. The first function is createCombiner — which turn a value in the original key-value pair, to a combined type. In this example, the combined type is a list. This function will takes an int and returns a list.
  2. The second function is mergeValue — which adds a new int into the list. This is to combine value with combiners. — thus uses append.
  3. The third function is mergeCombiners — which merges two combiners. Thus in this example is to merge two lists  — thus uses extend.

 

 

Reference:

http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

 

Share this post

Leave a Reply

Your email address will not be published. Required fields are marked *