Understanding Aggregation Method in Spark

Ben Chen
5 min readOct 3, 2019

This blog is basically for people who just come from pandas-scikit flavour to pyspark api.

Recently, I was asked to explain the usage of aggregate method in pyspark, and it just remind me that I was also a bit confused when I first saw this method for both RDD and pyspark DataFrame.

If you only care about how to use it, you could simply jump to How it works in Spark section.

For taking away why we want to use it, you can just think the reason we apply aggregate is because it's faster than map + reduce and it's also easier to apply multiple abstract functions in the reduce operations.

Why aggregate function

Before we explain how it works, we might need to understand why we need it.

Curryied function

Firstly, we need to know aggregate function here in Spark is a curryied function thanks to Mathematics and Java. The following is a morceau of wikipedia about the curried function.

In mathematics and computer science, currying is the technique of translating the evaluation of a function that takes multiple arguments into evaluating a sequence of functions, each with a single argument. For example, a function that takes two arguments, one from X and one from Y, and produces outputs in Z, by currying is translated into a function that takes a single argument from X and produces as outputs functions from Y to Z. Currying is related to, but not the same as, partial application.

Currying is useful in both practical and theoretical settings. In functional programming languages, and many others, it provides a way of automatically managing how arguments are passed to functions and exceptions. In theoretical computer science, it provides a way to study functions with multiple arguments in simpler theoretical models which provide only one argument.

Furthermore, it has bunch of mathematic theory behind it. If you’re interested in it, you could visit wiki first to have more taste about it.

Advantages?

Currying allows us to apply functions in many ways by using a lightweight syntax and then pass these partially applied functions around to higher order function such as map or filter. Higher order functions which take functions as parameters or yield them as results are the bread and butter of functional programming.

In short, with currying, we can easily reuse lots of abstract functions in one single process since it makes creating anonymous functions much easier. That’s one of the important thing that functional programming want to serve. Currying and partially applied functions enable higher order functions to be used much more effectively and concisely.

That’s also why in pyspark, we keep seeing people using lambda within the aggregate method.

Moreover, aggregate is a faster computation then using a combination of map and reduce.

Here’s an simple example about the bench marks:

The experiment is using data from KDD cup 99 full data set, which contains about 5 million rows of data.

map / reduce vs. aggregate

How it works in Spark

I will mainly use python notation as example

Firstly, the definition of aggregate function is as following:

Scala
PySpark

Breifly, seqOp will aggregate the elements from all the partitions, and combOp will merge all the result of seqOp in all the partitions. Both of the operation share the same initial values which is called zeroValue.

zeroValue is the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Null for list concatenation or 0 for summation). In short, it will be used in the both operations, and people tend to forget it will be used in the second step!

seqOp is an operator used to accumulate results within a partition. This operation will walk thourgh all the elements (T) in all the partitions. All the T[0] will merge with zeroValue, and its result will merge with T[1] and so on until it loops over all the partitions. In other words, this sequential operation is just like rolling a given function on RDD.

combOp is an associative operator used to combine results from different partitions. It will return a different type result comparing to the original RDD. As result, we need use seqOp to merge all the T in each partition to U, and then we use combOp to combine all the U together.

These operations are TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

i.e. The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

Example

Now we want to use aggregate to calculate its average.

So the process is like this:

  • At the beginning, we have an initial value (0, 0)
  • (acc, number) = (acc[0] + number, acc[1] + 1)
    - where number is the T in our previos definition, which is as also know as the element of list in one partition.
    - On the RHS, the first element is an accumlation of a list in one partion, and the second element is just like counting numbers.

What’s gonna happen? Let;s split the operations into multiple steps:

Assume that now we put all the list in one single partition, so the following is just like:
(Initial value[0] + T, initial value[1] + 1) in seqOp
=> (0 + 45, 0 + 9)
=> (45, 9)

What if Spark split the list into multiple partitions, let’s say 3 partitions.

  • Suppose we have:
    1) Partition 1: [1, 2, 3, 4]
    2) Partition 2: [5, 6, 7, 8]
    3) Partition 3: [9]
  • After the operation of seqOp, we will get:
    1) Partition 1: (10, 4)
    2) Partition 2: (26, 4)
    3) Partition 3: (9, 1)
  • Then like what we mention what combOp will do is to combine all the results from all the partitions:
    => (45, 9)

Et voila, we will get the same result as linear one.

Now if we want to calculate the average, we could simply do 45 / 9

Exercise

Try to figure out the result of the following process, you might need a calculator 😅:

Results:

--

--