As the only structured API offering in Python, most of our data manipulation will be performed with the help of Dataframe. These dataframes can be generated/created by a list of ways. The manipulations that are possible with dataframes are endless as well. In this article I will be covering the basic and necessary aspects about dataframes.
Creation of DataFrames:
Dataframes can be created in Pyspark in four distinct ways:
As a result of read operation
Using createDataFrame() method
Using Spark Session object’s inbuilt methods
From other dataframes
Read:
The read operation will not be extensively covered here as it was covered in a few earlier articles, please visit Data Sources API and Reading Delimited Files. The load method of the load() method of DataFrameReader will return a DataFrame. So whenever any data source is read the returned object is a dataframe. The following examples depict reading of two data sources:
csv_path='...'
df_csv = spark.read.format('csv')
.option('inferSchema','true')
.option('header','true').load(csv_path)
df_csv.show()
parquet_path= '...'
df_parquet = spark.read.format('parquet').load(parquet_path)
df_parquet.show()
createDataFrame():
The createDataFrame()method can be used to create dataframes in many ways. But this method is usually used to create small dataframes for testing purposes. But the method does not impose any restrictions on the size of dataframes created. This method takes
four arguments and can be inferred from below:
createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
Since the data argument accepts iterables in general, we have a lot of possibilities and less restrictions. The following examples cover a good number of different cases.
Creating a dataframe by passing data as nested lists, where each nested list represents a row, and schema as standard StructType() object:
schema = StructType([StructField('Name',StringType(),True)])
df = spark.createDataFrame([['x'],['y'],['z'],['a']],schema)
df.show()
schema = StructType([StructField('Name',StringType(),True),
StructField('Gender',StringType(),True)])
data = [['x','M'],['y','M'],['z','M'],['a','M']] #elements inside the
df = spark.createDataFrame(data,schema) #nested list
df.show() #represent columns
Creating a dataframe by passing data as nested tuples, where each nested tuples represents a row, and schema as string object:
schema = 'Name STRING, Gender STRING'
data = (('x','M'),('y','M'),('z','M'),('a','M'))
df = spark.createDataFrame(data,schema)
df.show()
Creating a dataframe by passing data as a list of dictionaries. Even though the dictionaries contain the column name details, the first example explicitly passes the schema and we get the expected results. The second example does not pass the schema, so in this case Spark automatically tries to infer the schema from the given data structure. When it tries this, it throws a warning that inferring schema from dict is deprecated. So we should avoid not passing schema when the rows are dictionary objects and instead pass Row objects in their place.
data = [{'Name': 'x', 'Gender': 'M'},
{'Name': 'y', 'Gender': 'M'},
{'Name': 'z', 'Gender': 'M'},
{'Name': 'a', 'Gender': 'M'}]
schema = 'Name STRING, Gender STRING'
df = spark.createDataFrame(data,schema)
df.show()
df1 = spark.createDataFrame(data) #same data as the previous example
df1.show()
It is not necessary to provide the datatype of the columns or even column names. The first example illustrates the case where only the column names are provided. In this case the type information is inferred by Spark. In the next example we do not provide any type of schema at all, so Spark checks if the given data has any column name information (in case of Row or dict objects column names are embedded) if not column names starting from _1 are assigned incrementally. It goes without saying that the type information is inferred in both of the cases.
data = (('x','M'),('y','M'),('z','M'),('a','M'))
schema = ['Name','Gender']
df = spark.createDataFrame(data,schema)
df.show()
data = (('x','M'),('y','M'),('z','M'),('a','M'))
df = spark.createDataFrame(data)
df.show()
In the next example, multiple row objects are created and stored in a list. This is passed as an argument to the method. As shown in the output of printSchema(), the datatype of Gender is clearly inferred. Example that follows the below one passes the schema as well, but the name and type of Gender column is different. The column name specified in the schema is the only consideration for the dataframe, but when it comes to the type there is an error thrown. Because the data for gender is clearly a Python int object and Spark does not cast that column to decimal. The error is thrown because of the default argument verifySchema that was explained above. Spark does a check if the data’s type and given schema type are compatible. If that is not the case, verifySchema option makes sure that an error is raised. This option is very similar to the FAILFAST option. If we are okay with the data type incompatibility, we can set the default argument as False, which is the case in the third example. We can see that the GEN column is now filled with nulls because of the incompatibility.
data = [Row(Name='x', Gender=1), Row(Name='y', Gender=1),
Row(Name='z', Gender=0), Row(Name='a', Gender=0)]
df = spark.createDataFrame(data)
df.printSchema()
df.show()
schema = 'Name STRING, GEN DECIMAL(32,8)'
df = spark.createDataFrame(data,schema)
df.show()
schema = 'Name STRING, GEN DECIMAL(32,8)'
data = [Row(Name='x', Gender=1), Row(Name='y', Gender=1), Row(Name='z', Gender=0), Row(Name='a', Gender=0)]
df = spark.createDataFrame(data,schema,verifySchema=False)
df.printSchema()
df.show()
The final parameter available with the createDataFrame() method is the sampling ratio. The ratio determines how much of the data provided is used for the inferring of schema. If the size of the data is high and we are sure that the schema is consistent across rows, it is wise to keep the sampling ratio below 0.5. Because if it is more than the above Spark has to consider a huge chunk of data to determine the schema and that is a considerable overhead.
data = [Row(Name='x', Gender=None), Row(Name='y', Gender=None),
Row(Name='z', Gender=1), Row(Name='a', Gender=None)]
df = spark.createDataFrame(data,samplingRatio=0.25)
df.printSchema()
Creation using inbuilt methods:
i. range()
One of the basic methods provided that can be used to create dataframes of integers ranges is range(). This method creates a dataframe with one column called id. The function definition looks something like this:
range(start, end=None, step=1, numPartitions=None)
start: is the lower bound of the range we wish to create.
end : is the upper bound(exclusive) of the range we wish to create
step : is the difference by which the two consecutive numbers in a range is separated(default is 1)
numPartitions : the number of partitions created for the dataframe(by default Spark determines the value based on the range generated)
We can pass float values to the start, end and step parameters, but internally the float will be casted to integer and hence the resulting range will be an integer range.
range_df = spark.range(1,100000, step = 1)
range_df.show(5)
print(f’number of partitions for range_df {range_df.rdd.getNumPartitions()}’)
o/p: number of partitions for range_df 8
range_df = spark.range(1.5,5.5, step = 1.5)
range_df.show(5)
range_df = spark.range(0,-20, step = -1)
range_df.show(5)
ii. sql()
The sql() method of SparkSession is used to run Spark SQL and always returns a new dataframe object. The query can reference views and tables in it and there is no concept of dual in Spark sql. I will cover working with Spark SQL in another article, all you need to know is that the sql method of SparkSession always returns a dataframe. A simple example can be seen below:
sql = """
select 'x' as Name, 'G' as Gender
UNION ALL
select 'y' as Name, 'B' as Gender
UNION ALL
select 'xx' as Name, 'G' as Gender
UNION ALL
select 'yy' as Name, 'G' as Gender"""
df = spark.sql(sql)
df.show()
iii.table()
Before diving into this method, I would like to give a brief introduction of Spark Tables. Spark allows us to create and maintain Hive tables, where tables are logically equivalent to a DataFrame. The tables are written and stored in the location specified by the configuration spark.sql.warehouse.dir which defaults to /user/hive/warehouse.
The table() method always returns a dataframe object. We pass in the name of the table we wish to retrieve as a dataframe and Spark takes care of the retrieval. For the purpose of this article I have created a table called x with:
The following example calls the table method with the ‘x’ table as argument:
df_table = spark.table(‘x’)
df_table.show()
iii. From other dataframes
The fact that dataframes can be created from other dataframes is the base of Spark. As dataframes are immutable, any transformation that one might perform results in a new dataframe. This can be any transformation that Spark allows us to do. One simple example is the following, where I first create a dataframe using the createDataFrame() method and I apply some numerical calculations which results in another dataframe.
data = [['X',360],['XXXXX',182],['XXXX',249],['XXX',350],['XX',500]]
schema = 'NAME STRING, TOTAL_MARKS INTEGER'
df = spark.createDataFrame(data,schema=schema)
df2 = df.select(col('NAME'),(col('TOTAL_MARKS')/5)
.alias('Average_Percentage'))
df2.show()
Conclusion:
Part one of the dataframes covered about creating dataframes in various ways. In the next article I will delve in to manipulating datafames with the inbuilt methods provided. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference.
Happy Learning! 😀
Comments
Post a Comment