Skip to main content

Dataframes 101 - Part II

 Manipulating DataFrames:


DataFrames come with many methods that enable us to code the transformation or manipulations we might want to perform. These methods usually work with the col and expr that was covered in the Columns 101 article. The col and expr methods by themselves are not useful, because unless used inside methods of dataframes they do not know which dataframe these columns belong to and hence they are generic.


i.) select()


You might be familiar by now about the select() method that comes with DataFrames. It is used to select columns or evaluated expressions from an existing dataframe and returns a new dataframe. As you might have seen in one of the previous articles, there are many ways to denote columns and expressions. The following example illustrates different ways in which expressions can be selected, selecting columns will not be discussed in this article as I have illustrated in many articles in the past:


data = [['X',360],['XXXXX',182],['XXXX',249],['XXX',350],['XX',500]]

schema = 'NAME STRING, TOTAL_MARKS INTEGER'

df = spark.createDataFrame(data,schema=schema)

df.select((df.TOTAL_MARKS/100).alias('Type1'),

          (col('TOTAL_MARKS')/100).alias('Type2'),

          expr('TOTAL_MARKS/100').alias('Type3')).show()


ii.) count()


count() is a pretty straightforward method, it returns the total number of rows in the dataframe. One important property of the count method is that it is an action. The result is evaluated immediately. 


# the df used here is the same as one above

print(df.count()) 

O/P:

4


iii.) distinct()


This method returns the distinct records of the dataframe. When we use this on a dataframe that consists of only one column, this method filters out only unique values. This method is a transformation. To display the results use show() on the resulting dataframe.


data = [['X'],['X'],['X'],['XX'],['XX']]

schema = 'NAME STRING'

df = spark.createDataFrame(data,schema=schema)

df.distinct().show()


O/p:

iv.) drop()


drop() method helps us to drop columns of a dataframe and returns a data frame. This functionality provided can be considered the opposite of what we can do with the select() method. Multiple columns can be dropped at the same time, the method takes columns as arguments. I will be using the dataframe below for further examples in this article:


data = [('Raj','A','B',95,15,20,20,20,20),

        ('Hema','B','G',67,9, 18, 3, 19, 18),

        ('Joju','B','B',44,7,8,13,6,10),

        ('Priya','A','G',45,7,5,10,19,4),

        ('Sanjana','B','G',61,18,17,5,3,18),

        ('Ashok','A','B',70,7,18,14,11,20),

        ('Chitra','A','G',66,20,17,17,7,5),

        ('Prem','B','B',29,6,3,10,3,7),

        ('Pavan','B','B',53,3,18,5,16,11)]

columns = ['Name','Section','Gender','Total',

           'English','Maths','Physics','Chemistry',

           'Biology']

df = spark.createDataFrame(data,columns)

df.show()



Now, if I wanted to drop the section column, I can call the drop method as follows:


df_new = df.drop(col(‘Section’))

df_new.show()



This method specifically is helpful while dropping a few columns in dataframes where there are a lot of columns. The following example shows a scenario where more than one column is dropped in the same function call:


df_new = df.drop('Section','Gender','Total')

df_new.show()



In the above example, I have used only strings to represent the columns to be dropped because when specifying a list of columns the drop() method accepts only column names which inturn will always be a string. Just to illustrate that using col(), expr() or df.col will not work try the following, you will end up with a TypeError as shown below:


df_new = df.drop(df.Section,col('Gender'),expr('Total'))



v.) filter() and where()


I have grouped these two methods together because they are the same. where() method is just an alias for the filter(), so both these methods work the same and take the same argument which is a condition. This condition can consist of many sub-conditions separated by logical operators such as &(and),|(or) and ~(not). The sub-conditions should be enclosed between braces for clarity and to avoid interpretation errors.


df.where(~(df.Gender == 'B')).show()


df.filter((df.Gender == 'B') & (df.Total > 60)).show()

vi.) dropDuplicates()

By now you might have understood that most of the names of the methods of dataframes are very much intuitive. That statement applies to this method as well, as you might have guessed this method helps in dropping duplicate rows. If used without any arguments, all the columns are considered when deciding which rows are duplicates. This method accepts a collection of column names. To demonstrate how this method works, I have created a slightly modified dataframe:


data = [('Raj','A','B',95,15,20,20,20,20),

        ('Radha','A','G',95,15,20,20,20,20),

        ('Hema','B','G',67,9, 18, 3, 19, 18),

        ('Hema','B','G',67,9, 18, 3, 19, 18),

        ('Joju','B','B',44,7,8,13,6,10),

        ('Joju','B','B',44,7,8,13,6,10),

        ('Priya','A','G',45,7,5,10,19,4),

        ('Sanjana','B','G',61,18,17,5,3,18),

        ('Ashok','A','B',70,7,18,14,11,20),

        ('Chitra','A','G',66,20,17,17,7,5),

        ('Prem','B','B',29,6,3,10,3,7),

        ('Pavan','B','B',53,3,18,5,16,11)]

columns = ['Name','Section','Gender',

           'Total','English','Maths',

           'Physics','Chemistry','Biology']

df = spark.createDataFrame(data,columns)

df.show()



As you can see, duplicate rows are added for the students Hema, Joju and one new record for Radha. The first example drops duplicate rows considering all column values:


df.dropDuplicates().show()


The following example specifically instructs spark to only consider the Total column:


df.dropDuplicates(('Total',)).show()



As you can see the row added for Radha is now dropped. dropDuplicates() method has an alias method that goes by the name drop_duplicates()


vii.)orderBy()


The orderBy() does exactly what the ORDER BY clause in SQL does. The dataframe is ordered by columns, the order can be configured to be either ascending or descending. The method accepts column names or objects, list of column names or  objects and accepts a keyword argument ascending, which can be a boolean or list of booleans. A list of booleans can be passed only in the case where we are ordering by multiple columns and need control in the ordering method used for each column. 


In the following example, the dataframe is ordered by Name(in ascending order), English(in descending order). The value for the ascending parameter is a list of 1’s and 0’s in this case, as they evaluate to True and False internally.


df.orderBy(['Name',col('English')],ascending=[1,0]).show()

(or)

df.orderBy('Name',col('English'),ascending=[1,0]).show()



If you try the following, you will get an error as mix and matching of the mix and match of list and other column identifiers is not allowed.


df.orderBy('Name',[col('English')],ascending=[1,0]).show() 


When working with multiple columns in order by, it might be very intuitive to specify whether the ordering strategy should be ascending or descending. Spark provides asc() and desc() functions that can help specify column and its ordering strategy together. These two methods take one argument each and expect them to be  strings. The working example depicted above looks like the following after the usage of asc() and desc():


df.orderBy([asc('Name'),desc('English')]).show()


I personally find this more intuitive and I would suggest you use this particular way of ordering due to its clarity. Spark provides a method called sort() with the dataframe and it is an alias to the orderBy method.


viii.) groupBy()


groupBy() is one of the methods which does not return a dataframe as a result. It returns an instance of GroupedData. This instance of GroupedData comes with many aggregation functions that can be performed on the dataframe. groupBy() is a crucial method for analysis, hence there are many concepts to cover which I will be doing in another article. 


groupBy accepts columns or column names. In this example I have demonstrated how to calculate average Total marks for each section of students.


grouped_data = df.groupBy(df.Section)

print(grouped_data)

df_avg  = grouped_data.avg('Total')

df_avg.show()



The following example takes the analaysis a level deeper and finds the average marks scored by boys and girls of the two sections.

grouped_data = df.groupBy(df.Section,'Gender')

print(grouped_data)

df_avg  = grouped_data.avg('Total')

df_avg.show()



ix.)limit()


The limit method functions the same way that the LIMIT clause in SQL does. The method helps us to limit the count of the result set to a predefined number which is the only accepted parameter for the method. This helps in decreasing the number of rows in a dataframe and is a good precursor to the collect() method. Because the collect method returns Row objects to the driver program, we might not want to overload the driver and for precaution the limit() method can act as a good solution.


rows = df.limit(3).collect()

for row in rows:

    print(row)


x.) join() and alias()


One of the most essential methods for analysis is join(). The join method allows to join a dataframe with another. The method takes three arguments:

  1. other

  2. on

  3. how

other should be an object of Class DataFrame, on should be either a condition or a list of column(s) or column name(s). Argument how determines what join strategy is to be used. In depth explanation of the join() method will be done in a separate article, in this article the example will be a simple inner join.


While the alias method is a common precursor to the join usage. This method serves the purpose of aliasing which is a common operation. The alias method adds the same advantage as it does in SQL, we can reference columns of different dataframes using a concise alias. alias() method takes in one string argument which will be the alias and it returns a dataframe with the alias set. We can only use aliases on the returned dataframe and not on the dataframes on which we called the alias method on. 

For the purpose of demonstrating the join, I have created two new dataframes. The source code to create these dataframes is as follows:


data_sections = [[1,'A',2,4],[2,'B',3,5]]

data_teachers = [[1,'XXXX','M',34,'Maths'],

                 [2,'YYYY','F',45,'English'],

                 [3,'YYXX','F',42,'Science'],

                 [4,'YYYX','F',39,'Math']]

columns_teachers = ['Teacher_ID','Name','Gender','Age','Subject']

columns_sections = ['Section_ID','Section_Name',

                    'Class_Teacher_ID','Class_Strength']

df_teachers = spark.createDataFrame(data_teachers,columns_teachers)

df_sections = spark.createDataFrame(data_sections,columns_sections)

df_teachers.show()

df_sections.show()



Let’s  take a case where we want to display the class teacher name for each section to the students. As you can see the teacher’s name and section’s name is in two  different dataframes. In order to display the Section and Teacher name we have to join these dataframes, which can be done as follows:


df_sections.join(df_teachers,df_sections.Class_Teacher_ID == df_teachers.Teacher_ID)

 .select(col('Section_Name'),

         col('Name').alias('Teacher_Name')).show()



In the above example, the condition is a normal Pythonic expression and nothing more. Spark accepts Pythonic expression as a value for the on argument. In this case I have only demonstrated an equality condition, but the conditions can be of many types which I will cover in the join article. 


The following example depicts the same join as above but the only difference is usage of aliases which is made possible by the alias method.


df_a = df_sections.alias('a')

df_b = df_teachers.alias('b')

df_a.join(df_b,expr('a.Class_Teacher_ID = b.Teacher_ID')) \

    .select(col('a.Section_Name'),col('b.Name').alias('Teacher_Name')) \

    .show()


xi.) union() and unionAll()


union() and unionAll() are both equivalent to UNION ALL in standard SQL. In standard SQL, UNION drops duplicate rows when combining two dataframes, whereas UNION ALL allows the duplicates to be present in the final dataframe. But in Spark DataFrame API, both union() and unionAll() allow duplicate rows. To imitate UNION behavior we are supposed to follow the union() or unionAll() method call with a distinct method call. Both of these methods return a new dataframe as a result. Both of these methods take one argument each, which would be an object of class DataFrame. The same requirements that apply to UNION and UNION ALL  apply to these Spark equivalent methods as well. To show the difference between how union and unionAll will work I have created two new dataframes and the source code is as follows:


data_x = [['X',1],['Y',2]]

data_y = [['X',1],['Y',3]]

columns = ['Name','Age']

df_x = spark.createDataFrame(data_x,columns)

df_y = spark.createDataFrame(data_y,columns)

df_x.show()

df_y.show()




df_x.union(df_y).show()

df_y.unionAll(df_x).show()


As you can see, there are duplicate rows in both results. Now we can try the union() method in conjunction with the distinct() method to achieve the identical behavior as in UNION.


df_x.union(df_y).distinct().show()



xii.)intersect() and intersectAll()

intersect() and intersectAll are the equivalents of INTERSECT and INTERSECT ALL in standard SQL. intersect() returns distinct common rows between  two dataframes, whereas intersectAll() preserves the duplicates and returns them. Both of these methods return a new dataframe and each of them takes one argument which should be an object of class DataFrame. 


data_x = [['X',1],['Y',2],['X',1],['X',1]]

data_y = [['X',1],['X',1],['Y',3]]

columns = ['Name','Age']

df_x = spark.createDataFrame(data_x,columns)

df_y = spark.createDataFrame(data_y,columns)

df_x.intersect(df_y).show()

df_x.intersect(df_y).show()


Conclusion:


Part two of the dataframes covered about manipulation of dataframes in various ways. In the next article I will delve in to utility methods and attributes of datafames. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 


Happy Learning! 😀


Comments

Popular posts from this blog

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Joins in Spark

  Joins are a crucial transformation when it comes to data. It allows us to enrich the data that we currently have with some more data. As the name suggests, the join() method allows us to join two dataframes. We are allowed to define what strategy is to be used and on what conditions the dataframes are to be joined. I had given a brief introduction in this article , if you have not checked it out I kindly request you to do so. The join() method takes three arguments: other on how The other should be an object of Class DataFrame , argument on should be either a condition or a list of column(s) or column name(s). Argument how determines what join strategy is to be used.  The how argument accepts a variety of string values that should be one of the following: Allowed Values for how Diagram inner cross outer,full,fullouter,full_outer left,leftouter,left_outer right,rightouter,right_outer semi,left_semi,leftsemi Note: Same as inner join, but the final dataframe has only colu...