Skip to main content

Dataframes 101 - Part I

 As the only structured API offering in Python, most of our data manipulation will be performed with the help of Dataframe. These dataframes can be generated/created by a list of ways. The manipulations that are possible with dataframes are endless as well. In this article I will be covering the basic and necessary aspects about dataframes.


Creation of DataFrames: 

Dataframes can be created in Pyspark in four distinct ways:

  1. As a result of read operation

  2. Using createDataFrame() method

  3. Using Spark Session object’s inbuilt methods

  4. From other dataframes


Read:

The read operation will not be extensively covered here as it was covered in a few earlier articles, please visit Data Sources API and Reading Delimited Files. The load method of the load() method of DataFrameReader will return a DataFrame. So whenever any data source is read the returned object is a dataframe.  The following examples depict reading of two data sources:


csv_path='...'

df_csv = spark.read.format('csv')

            .option('inferSchema','true')

            .option('header','true').load(csv_path)

df_csv.show()



parquet_path= '...'

df_parquet = spark.read.format('parquet').load(parquet_path)

df_parquet.show()


createDataFrame():

The createDataFrame()method can be used to create dataframes in many ways. But this method is usually used to create small dataframes for testing purposes. But the method does not impose any restrictions on the size of dataframes created. This method takes

four arguments and can be inferred from below:


createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)



Arguments

Description

data

Can be any iterable(list, tuple etc..) or RDD.

schema

Optional argument. Can be any object of type pyspark.sql.types.DataType (ex: StringType,StructField etc..), string(schema string) or list(of column name strings).

samplingRatio

If schema is not provided, ratio of rows that should be used for inferring of schema

verifySchema

Verify the datatypes of every row with schema. This argument is enabled by default.


Since the data argument accepts iterables in general, we have a lot of possibilities and less restrictions. The following examples cover a good number of different cases. 


Creating a dataframe by passing data as nested lists, where each nested list represents a row, and schema as standard StructType() object:


schema = StructType([StructField('Name',StringType(),True)])

df = spark.createDataFrame([['x'],['y'],['z'],['a']],schema)

df.show()



schema = StructType([StructField('Name',StringType(),True),

                     StructField('Gender',StringType(),True)])

data = [['x','M'],['y','M'],['z','M'],['a','M']] #elements inside the 

df = spark.createDataFrame(data,schema)          #nested list 

df.show()                                        #represent columns

Creating a dataframe by passing data as nested tuples, where each nested tuples represents a row, and schema as string object:


schema = 'Name STRING, Gender STRING'

data = (('x','M'),('y','M'),('z','M'),('a','M'))

df = spark.createDataFrame(data,schema)

df.show()




Creating a dataframe by passing data as a list of dictionaries. Even though the dictionaries contain the column name details, the first example explicitly passes the schema and we get the expected results. The second example does not pass the schema, so in this case Spark automatically tries to infer the schema from the given data structure. When it tries this, it throws a warning that inferring schema from dict is deprecated. So we should avoid not passing schema when the rows are dictionary objects and instead pass Row objects in their place. 


data = [{'Name': 'x', 'Gender': 'M'},

        {'Name': 'y', 'Gender': 'M'},

        {'Name': 'z', 'Gender': 'M'},

        {'Name': 'a', 'Gender': 'M'}]

schema = 'Name STRING, Gender STRING'

df = spark.createDataFrame(data,schema)

df.show()


df1 = spark.createDataFrame(data) #same data as the previous example

df1.show()



It is not necessary to provide the datatype of the columns or even column names. The first example illustrates the case where only the column names are provided. In this case the type information is inferred by Spark. In the next example we do not provide any type of schema at all, so Spark checks if the given data has any column name information (in case of Row or dict objects column names are embedded) if not column names starting from _1 are assigned incrementally. It goes without saying that the type information is inferred in both of the cases. 


data = (('x','M'),('y','M'),('z','M'),('a','M'))

schema = ['Name','Gender']

df = spark.createDataFrame(data,schema)

df.show()



data = (('x','M'),('y','M'),('z','M'),('a','M'))

df = spark.createDataFrame(data)

df.show()


In the next example, multiple row objects are created and stored in a list. This is passed as an argument to the method. As shown in the output of printSchema(), the datatype of Gender is clearly inferred. Example that follows the below one passes the schema as well, but the name and type of Gender column is different. The column name specified in the schema is the only consideration for the dataframe, but when it comes to the type there is an error thrown.  Because the data for gender is clearly a Python int object and Spark does not cast that column to decimal. The error is thrown because of the default argument verifySchema that was explained above. Spark does a check if the data’s type and given schema type are compatible. If that is not the case, verifySchema option makes sure that an error is raised. This option is very similar to the FAILFAST option. If we are okay with the data type incompatibility, we can set the default argument as False, which is the case in the third example. We can see that the GEN column is now filled with nulls because of the incompatibility.


data = [Row(Name='x', Gender=1), Row(Name='y', Gender=1),

  Row(Name='z', Gender=0), Row(Name='a', Gender=0)]

df = spark.createDataFrame(data)

df.printSchema()

df.show()


schema = 'Name STRING, GEN DECIMAL(32,8)'

df = spark.createDataFrame(data,schema)

df.show()


schema = 'Name STRING, GEN DECIMAL(32,8)'

data = [Row(Name='x', Gender=1), Row(Name='y', Gender=1), Row(Name='z', Gender=0), Row(Name='a', Gender=0)]

df = spark.createDataFrame(data,schema,verifySchema=False)

df.printSchema()

df.show()



The final parameter available with the createDataFrame() method is the sampling ratio. The ratio determines how much of the data provided is used for the inferring of schema. If the size of the data is high and we are sure that the schema is consistent across rows, it is wise to keep the sampling ratio below 0.5. Because if it is more than the above Spark has to consider a huge chunk of data to determine the schema and that is a considerable overhead. 


data = [Row(Name='x', Gender=None), Row(Name='y', Gender=None),

        Row(Name='z', Gender=1), Row(Name='a', Gender=None)]

df = spark.createDataFrame(data,samplingRatio=0.25)

df.printSchema()


Creation using inbuilt methods:


i. range()


One of the basic methods provided that can be used to create dataframes of integers ranges is range(). This method creates a dataframe with one column called id. The function definition looks something like this:


range(start, end=None, step=1, numPartitions=None)

start: is the lower bound of the range we wish to create.

end : is the upper bound(exclusive) of the range we wish to create

step : is the difference by which the two consecutive numbers in a range is separated(default is 1)

numPartitions : the number of partitions created for the dataframe(by default Spark determines the value based on the range generated)


We can pass float values to the start, end and step parameters, but internally the float will be casted to integer and hence the resulting range will be an integer range.


range_df = spark.range(1,100000, step = 1)

range_df.show(5)

print(f’number of partitions for range_df {range_df.rdd.getNumPartitions()}’)

o/p: number of partitions for range_df 8


range_df = spark.range(1.5,5.5, step = 1.5)

range_df.show(5)

range_df = spark.range(0,-20, step = -1)

range_df.show(5)

ii. sql()


The sql() method of SparkSession is used to run Spark SQL and always returns a new dataframe object. The query can reference views and tables in it and there is no concept of dual in Spark sql. I will cover working with Spark SQL in another article, all you need to know is that the sql method of SparkSession always returns a dataframe. A simple example can be seen below:


sql = """

select 'x' as Name, 'G' as Gender

UNION ALL

select 'y' as Name, 'B' as Gender

UNION ALL

select 'xx' as Name, 'G' as Gender

UNION ALL

select 'yy' as Name, 'G' as Gender"""

df = spark.sql(sql)

df.show()


iii.table()


Before diving into this method, I would like to give a brief introduction of Spark Tables. Spark allows us to create and maintain Hive tables, where tables are logically equivalent to a DataFrame. The tables are written and stored in the location specified by the configuration spark.sql.warehouse.dir  which defaults to /user/hive/warehouse. 


The table() method always returns a dataframe object. We pass in the name of the table we wish to retrieve as a dataframe and Spark takes care of the retrieval. For the purpose of this article I have created a table called x with:

spark.catalog.listTables() # command to print out available tables

The following example calls the table method with the ‘x’ table as argument:


df_table = spark.table(‘x’)

df_table.show()


iii. From other dataframes


The fact that dataframes can be created from other dataframes is the base of Spark. As dataframes are immutable, any transformation that one might perform results in a new dataframe. This can be any transformation that Spark allows us to do. One simple example is the following, where I first create a dataframe using the createDataFrame() method and I apply some numerical calculations which results in another dataframe.


data = [['X',360],['XXXXX',182],['XXXX',249],['XXX',350],['XX',500]]

schema = 'NAME STRING, TOTAL_MARKS INTEGER'

df = spark.createDataFrame(data,schema=schema)

df2  = df.select(col('NAME'),(col('TOTAL_MARKS')/5)

         .alias('Average_Percentage')) 

df2.show()


Conclusion:


Part one of the dataframes covered about creating dataframes in various ways. In the next article I will delve in to manipulating datafames with the inbuilt methods provided. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 


Happy Learning! 😀


Comments

Popular posts from this blog

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Joins in Spark

  Joins are a crucial transformation when it comes to data. It allows us to enrich the data that we currently have with some more data. As the name suggests, the join() method allows us to join two dataframes. We are allowed to define what strategy is to be used and on what conditions the dataframes are to be joined. I had given a brief introduction in this article , if you have not checked it out I kindly request you to do so. The join() method takes three arguments: other on how The other should be an object of Class DataFrame , argument on should be either a condition or a list of column(s) or column name(s). Argument how determines what join strategy is to be used.  The how argument accepts a variety of string values that should be one of the following: Allowed Values for how Diagram inner cross outer,full,fullouter,full_outer left,leftouter,left_outer right,rightouter,right_outer semi,left_semi,leftsemi Note: Same as inner join, but the final dataframe has only colu...