This blog is basically for people who just come from pandas-scikit flavour to pyspark api.
Recently, I was asked to explain the usage of aggregate
method in pyspark, and it just remind me that I was also a bit confused when I first saw this method for both RDD and pyspark DataFrame.
If you only care about how to use it, you could simply jump to How it works in Spark
section.
For taking away why we want to use it, you can just think the reason we apply aggregate
is because it's faster than map
+ reduce
and it's also easier to apply multiple abstract functions in the reduce operations.
Why aggregate function
Before we explain how it works
, we might need to understand why we need it
.
Curryied function
Firstly, we need to know aggregate
function here in Spark is a curryied function thanks to Mathematics and Java. The following is a morceau of wikipedia about the curried function.
In mathematics and computer science, currying is the technique of translating the evaluation of a function that takes multiple arguments into evaluating a sequence of functions, each with a single argument. For example, a function that takes two arguments, one from X and one from Y, and produces outputs in Z, by currying is translated into a function that takes a single argument from X and produces as outputs functions from Y to Z. Currying is related to, but not the same as, partial application.
Currying is useful in both practical and theoretical settings. In functional programming languages, and many others, it provides a way of automatically managing how arguments are passed to functions and exceptions. In theoretical computer science, it provides a way to study functions with multiple arguments in simpler theoretical models which provide only one argument.
Furthermore, it has bunch of mathematic theory behind it. If you’re interested in it, you could visit wiki first to have more taste about it.
Advantages?
Currying allows us to apply functions in many ways by using a lightweight syntax and then pass these partially applied functions around to higher order function such as map
or filter
. Higher order functions which take functions as parameters or yield them as results are the bread and butter of functional programming.
In short, with currying, we can easily reuse lots of abstract functions in one single process since it makes creating anonymous functions much easier. That’s one of the important thing that functional programming want to serve. Currying and partially applied functions enable higher order functions to be used much more effectively and concisely.
That’s also why in pyspark
, we keep seeing people using lambda
within the aggregate
method.
Moreover, aggregate
is a faster computation then using a combination of map
and reduce
.
Here’s an simple example about the bench marks:
The experiment is using data from KDD cup 99 full data set, which contains about 5 million rows of data.
How it works in Spark
I will mainly use python notation as example
Firstly, the definition of aggregate function is as following:
Breifly, seqOp
will aggregate the elements from all the partitions, and combOp
will merge all the result of seqOp
in all the partitions. Both of the operation share the same initial values which is called zeroValue
.
zeroValue
is the initial value for the accumulated result of each partition for the seqOp
operator, and also the initial value for the combine results from different partitions for the combOp
operator - this will typically be the neutral element (e.g. Null for list concatenation or 0 for summation). In short, it will be used in the both operations, and people tend to forget it will be used in the second step!
seqOp
is an operator used to accumulate results within a partition. This operation will walk thourgh all the elements (T)
in all the partitions. All the T[0]
will merge with zeroValue
, and its result will merge with T[1]
and so on until it loops over all the partitions. In other words, this sequential operation is just like rolling a given function on RDD.
combOp
is an associative operator used to combine results from different partitions. It will return a different type result comparing to the original RDD. As result, we need use seqOp
to merge all the T
in each partition to U
, and then we use combOp
to combine all the U
together.
These operations are TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
i.e. The functions
op(t1, t2)
is allowed to modifyt1
and return it as its result value to avoid object allocation; however, it should not modifyt2
.
Example
Now we want to use aggregate
to calculate its average.
So the process is like this:
- At the beginning, we have an initial value
(0, 0)
- (acc, number) = (acc[0] + number, acc[1] + 1)
- where number is theT
in our previos definition, which is as also know as the element of list in one partition.
- On the RHS, the first element is an accumlation of a list in one partion, and the second element is just like counting numbers.
What’s gonna happen? Let;s split the operations into multiple steps:
Assume that now we put all the list in one single partition, so the following is just like:
(Initial value[0] + T
, initial value[1] + 1) in seqOp
=> (0 + 45, 0 + 9)
=> (45, 9)
What if Spark split the list into multiple partitions, let’s say 3 partitions.
- Suppose we have:
1) Partition 1: [1, 2, 3, 4]
2) Partition 2: [5, 6, 7, 8]
3) Partition 3: [9] - After the operation of
seqOp
, we will get:
1) Partition 1: (10, 4)
2) Partition 2: (26, 4)
3) Partition 3: (9, 1) - Then like what we mention what
combOp
will do is to combine all the results from all the partitions:
=> (45, 9)
Et voila, we will get the same result as linear one.
Now if we want to calculate the average, we could simply do 45 / 9
Exercise
Try to figure out the result of the following process, you might need a calculator 😅:
Results: