Manipulating DataFrames:
DataFrames come with many methods that enable us to code the transformation or manipulations we might want to perform. These methods usually work with the col and expr that was covered in the Columns 101 article. The col and expr methods by themselves are not useful, because unless used inside methods of dataframes they do not know which dataframe these columns belong to and hence they are generic.
i.) select()
You might be familiar by now about the select() method that comes with DataFrames. It is used to select columns or evaluated expressions from an existing dataframe and returns a new dataframe. As you might have seen in one of the previous articles, there are many ways to denote columns and expressions. The following example illustrates different ways in which expressions can be selected, selecting columns will not be discussed in this article as I have illustrated in many articles in the past:
data = [['X',360],['XXXXX',182],['XXXX',249],['XXX',350],['XX',500]]
schema = 'NAME STRING, TOTAL_MARKS INTEGER'
df = spark.createDataFrame(data,schema=schema)
df.select((df.TOTAL_MARKS/100).alias('Type1'),
(col('TOTAL_MARKS')/100).alias('Type2'),
expr('TOTAL_MARKS/100').alias('Type3')).show()
ii.) count()
count() is a pretty straightforward method, it returns the total number of rows in the dataframe. One important property of the count method is that it is an action. The result is evaluated immediately.
# the df used here is the same as one above
print(df.count())
O/P:
4
iii.) distinct()
This method returns the distinct records of the dataframe. When we use this on a dataframe that consists of only one column, this method filters out only unique values. This method is a transformation. To display the results use show() on the resulting dataframe.
data = [['X'],['X'],['X'],['XX'],['XX']]
schema = 'NAME STRING'
df = spark.createDataFrame(data,schema=schema)
df.distinct().show()
O/p:
iv.) drop()
drop() method helps us to drop columns of a dataframe and returns a data frame. This functionality provided can be considered the opposite of what we can do with the select() method. Multiple columns can be dropped at the same time, the method takes columns as arguments. I will be using the dataframe below for further examples in this article:
data = [('Raj','A','B',95,15,20,20,20,20),
('Hema','B','G',67,9, 18, 3, 19, 18),
('Joju','B','B',44,7,8,13,6,10),
('Priya','A','G',45,7,5,10,19,4),
('Sanjana','B','G',61,18,17,5,3,18),
('Ashok','A','B',70,7,18,14,11,20),
('Chitra','A','G',66,20,17,17,7,5),
('Prem','B','B',29,6,3,10,3,7),
('Pavan','B','B',53,3,18,5,16,11)]
columns = ['Name','Section','Gender','Total',
'English','Maths','Physics','Chemistry',
'Biology']
df = spark.createDataFrame(data,columns)
df.show()
Now, if I wanted to drop the section column, I can call the drop method as follows:
df_new = df.drop(col(‘Section’))
df_new.show()
This method specifically is helpful while dropping a few columns in dataframes where there are a lot of columns. The following example shows a scenario where more than one column is dropped in the same function call:
df_new = df.drop('Section','Gender','Total')
df_new.show()
In the above example, I have used only strings to represent the columns to be dropped because when specifying a list of columns the drop() method accepts only column names which inturn will always be a string. Just to illustrate that using col(), expr() or df.col will not work try the following, you will end up with a TypeError as shown below:
df_new = df.drop(df.Section,col('Gender'),expr('Total'))
v.) filter() and where()
I have grouped these two methods together because they are the same. where() method is just an alias for the filter(), so both these methods work the same and take the same argument which is a condition. This condition can consist of many sub-conditions separated by logical operators such as &(and),|(or) and ~(not). The sub-conditions should be enclosed between braces for clarity and to avoid interpretation errors.
df.where(~(df.Gender == 'B')).show()
vi.) dropDuplicates()
By now you might have understood that most of the names of the methods of dataframes are very much intuitive. That statement applies to this method as well, as you might have guessed this method helps in dropping duplicate rows. If used without any arguments, all the columns are considered when deciding which rows are duplicates. This method accepts a collection of column names. To demonstrate how this method works, I have created a slightly modified dataframe:
data = [('Raj','A','B',95,15,20,20,20,20),
('Radha','A','G',95,15,20,20,20,20),
('Hema','B','G',67,9, 18, 3, 19, 18),
('Hema','B','G',67,9, 18, 3, 19, 18),
('Joju','B','B',44,7,8,13,6,10),
('Joju','B','B',44,7,8,13,6,10),
('Priya','A','G',45,7,5,10,19,4),
('Sanjana','B','G',61,18,17,5,3,18),
('Ashok','A','B',70,7,18,14,11,20),
('Chitra','A','G',66,20,17,17,7,5),
('Prem','B','B',29,6,3,10,3,7),
('Pavan','B','B',53,3,18,5,16,11)]
columns = ['Name','Section','Gender',
'Total','English','Maths',
'Physics','Chemistry','Biology']
df = spark.createDataFrame(data,columns)
df.show()
As you can see, duplicate rows are added for the students Hema, Joju and one new record for Radha. The first example drops duplicate rows considering all column values:
df.dropDuplicates().show()
The following example specifically instructs spark to only consider the Total column:
df.dropDuplicates(('Total',)).show()
As you can see the row added for Radha is now dropped. dropDuplicates() method has an alias method that goes by the name drop_duplicates().
vii.)orderBy()
The orderBy() does exactly what the ORDER BY clause in SQL does. The dataframe is ordered by columns, the order can be configured to be either ascending or descending. The method accepts column names or objects, list of column names or objects and accepts a keyword argument ascending, which can be a boolean or list of booleans. A list of booleans can be passed only in the case where we are ordering by multiple columns and need control in the ordering method used for each column.
In the following example, the dataframe is ordered by Name(in ascending order), English(in descending order). The value for the ascending parameter is a list of 1’s and 0’s in this case, as they evaluate to True and False internally.
df.orderBy(['Name',col('English')],ascending=[1,0]).show()
(or)
df.orderBy('Name',col('English'),ascending=[1,0]).show()
If you try the following, you will get an error as mix and matching of the mix and match of list and other column identifiers is not allowed.
df.orderBy('Name',[col('English')],ascending=[1,0]).show()
When working with multiple columns in order by, it might be very intuitive to specify whether the ordering strategy should be ascending or descending. Spark provides asc() and desc() functions that can help specify column and its ordering strategy together. These two methods take one argument each and expect them to be strings. The working example depicted above looks like the following after the usage of asc() and desc():
df.orderBy([asc('Name'),desc('English')]).show()
I personally find this more intuitive and I would suggest you use this particular way of ordering due to its clarity. Spark provides a method called sort() with the dataframe and it is an alias to the orderBy method.
viii.) groupBy()
groupBy() is one of the methods which does not return a dataframe as a result. It returns an instance of GroupedData. This instance of GroupedData comes with many aggregation functions that can be performed on the dataframe. groupBy() is a crucial method for analysis, hence there are many concepts to cover which I will be doing in another article.
groupBy accepts columns or column names. In this example I have demonstrated how to calculate average Total marks for each section of students.
grouped_data = df.groupBy(df.Section)
print(grouped_data)
df_avg = grouped_data.avg('Total')
df_avg.show()
The following example takes the analaysis a level deeper and finds the average marks scored by boys and girls of the two sections.
grouped_data = df.groupBy(df.Section,'Gender')
print(grouped_data)
df_avg = grouped_data.avg('Total')
df_avg.show()
ix.)limit()
The limit method functions the same way that the LIMIT clause in SQL does. The method helps us to limit the count of the result set to a predefined number which is the only accepted parameter for the method. This helps in decreasing the number of rows in a dataframe and is a good precursor to the collect() method. Because the collect method returns Row objects to the driver program, we might not want to overload the driver and for precaution the limit() method can act as a good solution.
rows = df.limit(3).collect()
for row in rows:
print(row)
x.) join() and alias()
One of the most essential methods for analysis is join(). The join method allows to join a dataframe with another. The method takes three arguments:
other
on
how
other should be an object of Class DataFrame, on should be either a condition or a list of column(s) or column name(s). Argument how determines what join strategy is to be used. In depth explanation of the join() method will be done in a separate article, in this article the example will be a simple inner join.
While the alias method is a common precursor to the join usage. This method serves the purpose of aliasing which is a common operation. The alias method adds the same advantage as it does in SQL, we can reference columns of different dataframes using a concise alias. alias() method takes in one string argument which will be the alias and it returns a dataframe with the alias set. We can only use aliases on the returned dataframe and not on the dataframes on which we called the alias method on.
For the purpose of demonstrating the join, I have created two new dataframes. The source code to create these dataframes is as follows:
data_sections = [[1,'A',2,4],[2,'B',3,5]]
data_teachers = [[1,'XXXX','M',34,'Maths'],
[2,'YYYY','F',45,'English'],
[3,'YYXX','F',42,'Science'],
[4,'YYYX','F',39,'Math']]
columns_teachers = ['Teacher_ID','Name','Gender','Age','Subject']
columns_sections = ['Section_ID','Section_Name',
'Class_Teacher_ID','Class_Strength']
df_teachers = spark.createDataFrame(data_teachers,columns_teachers)
df_sections = spark.createDataFrame(data_sections,columns_sections)
df_teachers.show()
df_sections.show()
Let’s take a case where we want to display the class teacher name for each section to the students. As you can see the teacher’s name and section’s name is in two different dataframes. In order to display the Section and Teacher name we have to join these dataframes, which can be done as follows:
df_sections.join(df_teachers,df_sections.Class_Teacher_ID == df_teachers.Teacher_ID)
.select(col('Section_Name'),
col('Name').alias('Teacher_Name')).show()
In the above example, the condition is a normal Pythonic expression and nothing more. Spark accepts Pythonic expression as a value for the on argument. In this case I have only demonstrated an equality condition, but the conditions can be of many types which I will cover in the join article.
The following example depicts the same join as above but the only difference is usage of aliases which is made possible by the alias method.
df_a = df_sections.alias('a')
df_b = df_teachers.alias('b')
df_a.join(df_b,expr('a.Class_Teacher_ID = b.Teacher_ID')) \
.select(col('a.Section_Name'),col('b.Name').alias('Teacher_Name')) \
.show()
xi.) union() and unionAll()
union() and unionAll() are both equivalent to UNION ALL in standard SQL. In standard SQL, UNION drops duplicate rows when combining two dataframes, whereas UNION ALL allows the duplicates to be present in the final dataframe. But in Spark DataFrame API, both union() and unionAll() allow duplicate rows. To imitate UNION behavior we are supposed to follow the union() or unionAll() method call with a distinct method call. Both of these methods return a new dataframe as a result. Both of these methods take one argument each, which would be an object of class DataFrame. The same requirements that apply to UNION and UNION ALL apply to these Spark equivalent methods as well. To show the difference between how union and unionAll will work I have created two new dataframes and the source code is as follows:
data_x = [['X',1],['Y',2]]
data_y = [['X',1],['Y',3]]
columns = ['Name','Age']
df_x = spark.createDataFrame(data_x,columns)
df_y = spark.createDataFrame(data_y,columns)
df_x.show()
df_y.show()
df_x.union(df_y).show()
df_y.unionAll(df_x).show()
As you can see, there are duplicate rows in both results. Now we can try the union() method in conjunction with the distinct() method to achieve the identical behavior as in UNION.
df_x.union(df_y).distinct().show()
xii.)intersect() and intersectAll()
intersect() and intersectAll are the equivalents of INTERSECT and INTERSECT ALL in standard SQL. intersect() returns distinct common rows between two dataframes, whereas intersectAll() preserves the duplicates and returns them. Both of these methods return a new dataframe and each of them takes one argument which should be an object of class DataFrame.
data_x = [['X',1],['Y',2],['X',1],['X',1]]
data_y = [['X',1],['X',1],['Y',3]]
columns = ['Name','Age']
df_x = spark.createDataFrame(data_x,columns)
df_y = spark.createDataFrame(data_y,columns)
df_x.intersect(df_y).show()
df_x.intersect(df_y).show()
Conclusion:
Part two of the dataframes covered about manipulation of dataframes in various ways. In the next article I will delve in to utility methods and attributes of datafames. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference.
Happy Learning! 😀
Comments
Post a Comment