This post explains how to use the function “combineByKey”, as the pyspark document does not seem very clear on this.
When to use combineByKey?
- You have an RDD of (key, value) pairs – a paired RDD
- In Python, such an RDD is constructed by creating elements of tuples
- You want to aggregate the values based on Key
Why use combineByKey, instead of aggregateByKey or groupByKey, or reduceByKey?
- groupByKey is computationally expensive. I once had a document of 50000+ elements, and the sheer computational resource made my program un-runnable.
- aggregateByKey – the docs are not very clear. I did not find resource on how to use them.
- reduceByKey — is powered by combineByKey. so the latter is the more generic one.
How to use combineByKey?
The document goes like this:
# create an RDD x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)]) # def to_list(a): return [a] def append(a, b): a.append(b) return a def extend(a, b): a.extend(b) return a sorted(x.combineByKey(to_list, append, extend).collect()) >>> [('a', [1, 2]), ('b', )]
Here is how the document goes for combineByKey:
combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=)
Generic function to combine the elements for each key using a custom set of aggregation functions.
Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.
Users provide three functions:
- createCombiner, which turns a V into a C (e.g., creates a one-element list)
- mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
- mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)
To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.
In addition, users can control the partitioning of the output RDD.
What does that mean? Basically the combineByKey functions takes three arugments, each is a function – or an operation – on the elements of an RDD.
In this example, the keys are ‘a’ and ‘b’. The values are ints. The task is to combine the ints to a list, for each key.
- The first function is createCombiner — which turn a value in the original key-value pair, to a combined type. In this example, the combined type is a list. This function will takes an int and returns a list.
- The second function is mergeValue — which adds a new int into the list. This is to combine value with combiners. — thus uses append.
- The third function is mergeCombiners — which merges two combiners. Thus in this example is to merge two lists — thus uses extend.