Skip to main content

groupBy 101

 This article is an extension to groupBy() discussed in the earlier article. If you have not checked out the earlier article I kindly request you to do so. If you have gone through the aforementioned article you will know that the groupBy method of the dataframe returns an instance of class GroupedData. This method comes equipped with many methods that return a dataframe after applying a calculation on the grouped data. In this article, basic methods provided with GroupedData will be explored. The advanced methods that come with GroupedData will be covered in another article, after exploring UDFs For most of the examples in this article the below data set will be used:


data = [('Raj','A','B',95,15,20,20,20,20),

        ('Hema','B','G',67,9, 18, 3, 19, 18),

        ('Joju','B','B',44,7,8,13,6,10),

        ('Priya','A','G',45,7,5,10,19,4),

        ('Sanjana','B','G',61,18,17,5,3,18),

        ('Ashok','A','B',70,7,18,14,11,20),

        ('Chitra','A','G',66,20,17,17,7,5),

        ('Prem','B','B',29,6,3,10,3,7),

        ('Pavan','B','B',53,3,18,5,16,11)]

columns = ['Name','Section','Gender','Total',

           'English','Maths','Physics',

           'Chemistry','Biology']

df = spark.createDataFrame(data,columns)

df.show()



i.) avg() and mean()

This method performs average over numeric columns. The method accepts column names as arguments and these arguments are not mandatory. When we do not pass any arguments, all the numeric columns in the GroupedData are averaged with respect to the columns which were used in the groupBy(). When provided with arguments, these columns are averaged and will be displayed along the grouping columns. The avg() method returns a dataframe. 


df.groupBy('Section').avg().show() # no arguments provided



df.groupBy('Section').avg('Total','English').show()# only two 

# numerical columns provided


To the curious out there, this is the error that is thrown when non-numeric columns are passed to the avg() method:


df.groupBy('Section').avg('Name').show()


AnalysisException: "Name" is not a numeric column. Aggregation function can only be applied on a numeric column.;


GroupedData also provides an alias method for avg(), which is mean(). Both of the methods work the same way.


df.groupBy('Section').mean('Total','English').show()


ii). count()

count() method might be the simplest method that is offered with the GroupedData class. It returns a dataframe with one extra column, other than the columns that are used for grouping, which displays the row count for each combination of the grouping columns values in the dataframe. The method takes no arguments.


df.groupBy('Section','Gender').count().show()



iii.)max() and min()


max() method is used to calculate maximum value for each group and min() method is used to calculate minimum value for each group. Both of these methods return a dataframe each. Both of these methods take column names as arguments. If no arguments are provided, max and min are calculated for every numeric column with respect to groups. 


df.groupBy('Section').max().show()



df.groupBy('Section').max('English','Maths').show()



df.groupBy('Section').min().show()


df.groupBy('Section').min('English','Maths').show()

Both of these methods only work with numerical columns, when provided with non-numerical columns as arguments we end with the same error that was thrown in a similar case while using avg().


df.groupBy('Section').min('Name').show()

AnalysisException: "Name" is not a numeric column. Aggregation function can only be applied on a numeric column.;


iv.) sum()


sum() method is used to calculate the arithmetic sum of numerical columns within each group. The method takes in column names as inputs and returns a dataframe. If no arguments are provided, then the arithmetic sum is calculated for every numerical column.


df.groupBy('Gender').sum().show()



df.groupBy('Gender').sum('Total').show()



The sum() method reacts in the same way as the other aforementioned methods of GroupedData when passed with names of non numerical columns.


v.)pivot()

pivot() is a method that is used to transform rows into columns and it is identical to PIVOT in standard SQL. The method takes two different types of arguments and returns a GroupedData object on which further aggregations can be performed. The first type of argument is column name aka pivot_col, as string, whose rows are to be transformed into columns. The second type of argument is the values, a collection of values from the given pivot column,each of which will be a column in the final dataframe. values is not a mandatory argument, if not provided all the values in the pivot column will be transformed to a column. As the return value of pivot() is an object of GroupedData, we will have to perform an aggregation to get back a dataframe.


To illustrate the usage of pivot(), I have created a new dataframe whose source code can be found below:


data = [['Raj', 'A', 'B', 'English', 15],

        ['Raj', 'A', 'B', 'Maths', 20],

        ['Raj', 'A', 'B', 'Physics', 20],

        ['Raj', 'A', 'B', 'Chemistry', 20],

        ['Raj', 'A', 'B', 'Biology', 20],

        ['Hema', 'B', 'G', 'English', 9],

        ['Hema', 'B', 'G', 'Maths', 18],a

        ['Hema', 'B', 'G', 'Physics', 3],

        ['Hema', 'B', 'G', 'Chemistry', 19],

        ['Hema', 'B', 'G', 'Biology', 18]]

schema = ['Name','Section','Gender','Subject','Marks']

df = spark.createDataFrame(data,schema)

df.show()



The first example does not pass values as an argument and the result is as follows:


df.groupBy('Name','Section','Gender')

  .pivot('Subject').sum('Marks').show()



The following transformation is very similar to the above example but the only difference being the passing of values argument, hence the final dataframe has selective columns than the above dataframe:


df.groupBy('Name','Section','Gender')

  .pivot('Subject',('English','Maths','Physics')).sum('Marks').show()



vi.)agg():

When we might want to perform different types of aggregation for the same group, it is not possible with the aforementioned methods. They allow us to perform one type of aggregation on different columns at a time. For this purpose the agg() method can be leveraged. The method takes in expressions as arguments, the expressions can be passed as either column expressions or as a dict with column-aggregation mapping. The aggregation functions that we are allowed to apply are: 


  1. avg

  2. max

  3. min

  4. sum

  5. count

  6. group aggregate pandas UDF


The first five function are available as part of Spark library:


from pyspark.sql.functions import max, min, avg, count,sum


The last option is a custom aggregation function that we can build using Pandas, which will be covered in the Advanced groupBy article. All of the above functions return values for each group and help populate a new dataframe. Each of these methods take a single argument, a column indicator. 


df.groupBy('Section','Gender')

  .agg(count(expr('Chemistry')),sum(col('Physics')),

       min(df.English),avg(df['Total']),max('Biology')).show()

(or)

df.groupBy('Section','Gender').agg({'Biology':'max','English':'min,

                                    'Total':'avg','Physics':'sum',

                                    'Chemistry':'count'}).show()



Passing a dictionary consisting of column aggregation mapping might seem the better option but when we want to perform different aggregation on the same column, the same column will be repeated as keys in the dictionary, which will not give us the desired results. The result will only consist of the last aggregation that was defined for the column. Because there cannot be duplicate keys in a dictionary, if there are duplicates then only the key value pair defined at the last will take effect.


df.groupBy('Section','Gender').agg({'Total':'max','Total':'min',

          'Total':'avg','Total':'sum','Total':'count'}).show()


One important point to note is that UDFs and in-built aggregation functions cannot be used in a single call to the agg function. This caution might be relevant after we have covered the group aggregate UDFs.

Conclusion:


Most of the groupBy topics were discussed in this article, excluding a few advanced topics which will be covered in another article. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 


Happy Learning! 😀


Comments

Popular posts from this blog

Rows in Spark 101

  Every row in a Dataframe is of type Row and this object comes with many methods with which we can interact with the data that it consists of. In this article I will dive deeper into the Row object Row: The objects of Row class can hold multiple values that correspond to columns. We can create Row objects separately or access Row objects from the existing dataframe. The Row class can be imported as follows: from pyspark.sql import Row  There are several methods that can help retrieve row objects from the dataframe. Some of them are listed below: df.collect() : This method returns a list of all rows that are present in the dataframe. The result is returned to the driver, so we should be careful while running this method as the driver might not have sufficient memory to hold all the Row objects.  df.take(num) : This method returns a list of a specific number of records, specified by the argument num that we are allowed to pass. The results are returned to the driver, so ...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...