Skip to main content

Reading and Writing Delimited files


Reading Delimited Files:


To read delimited files with Spark, the format method should be called with the argument “csv” even if the file that we are dealing with is not delimited by commas. That would inform spark that the files it is going to be dealing with are delimited files.  Following are some of the examples of delimited file reading:


The following line reads a comma delimited file(s) inside path, specifies the first line of the file is a header and instructs spark to infer the schema. One important thing to keep in mind before using inferSchema is when spark is left to infer the schema, Spark creates a new job to read a large portion of the file to ascertain the schema. This can be really expensive, in terms of resources, for large files.


path = “...”

df = spark.read.format(“csv”).option(“header”,”true”)

                        .option(“inferSchema”,”true”)

                        .load(path)


The following examples reads pipe delimited file(s),configured using the sep option, inside path, specifies the first line of the file is a header and instructs spark to infer the schema. They both achieve the same result mentioned above but the definition of configurations differ syntactically.


df = spark.read.format(“csv”).option(“header”,”true”)

                        .option(“sep”,”|”)

                        .option(“inferSchema”,”true”)

                        .load(path)

or


options_dict = {“header”:”true”,“sep”:”|”,“inferSchema”:”true”,”path”:path}


df = spark.read.format(“csv”).options(**options_dict)

                        .load()



The next examples read CSV files, but the schema is explicitly given.


schema = “““Name STRING,Section STRING,Total INT,

    English INT,Maths INT,Physics INT,

    Chemistry INT,Biology INT”””


df= spark.read.format('csv')

         .schema(schema)

         .option("header","true")

         .load(path)


or 


from pyspark.sql.types import *


schema = StructType([StructField("Name",StringType()),

                    StructField("Section",StringType()),

                    StructField("Total", IntegerType()),

                    StructField("English", IntegerType()),

                    StructField("Maths", IntegerType()),

                    StructField("Physics", IntegerType()),

                    StructField("Chemistry", IntegerType()),

                    StructField("Biology", IntegerType())])


df = spark.read.format('csv')

          .schema(schema)

          .option("header","true").load(path)



We can verify if the schema that we have has been applied while reading the files using:


df.printSchema()

O/P:


A very interesting characteristic of this schema is that Spark enforces the schema when reading the data. So the columns will be casted to the given type in schema. If we give a wrong type, for example if I try defining timestamp as the type for the Biology column:


schema = """Name STRING,Section STRING,Total INT,English INT,Maths INT,

          Physics INT,Chemistry INT,Biology TIMESTAMP"""


df = spark.read.format('csv')

     .schema(schema).option("header","true")

     .load(path)


df.printSchema()

Spark does not throw any error and indeed casts the column to timestamp while reading the data. So when we try to view the column’s data:


df.select(‘Biology’).show()


The values are null because the integers were casted to TimestampType. This is also due to the default read mode - permissive. We can do the following to view the _corrupt_records columns that will print out the entire record as seen in the file:


schema = StructType([StructField("Name",StringType()),

                    StructField("Section",StringType()),

                    StructField("Total", IntegerType()),

                    StructField("English", IntegerType()),

                    StructField("Maths", IntegerType()),

                    StructField("Physics", IntegerType()),

                    StructField("Chemistry", IntegerType()),

                    StructField("Biology", TimestampType()),

                    StructField('_corrupt_record',StringType())])


df = spark.read.format('csv')

.schema(schema).option("header","true")

.option("columnNameOfCorruptRecord", "_corrupt_record")

.load(path)



columnNameOfCorruptRecord option gives the column name that should consist of the deformed records. One way to instruct spark to avoid this behavior is to use the FAILFAST like in the following example: 


df = spark.read.format('csv')

.schema(schema).option("header","true").option(“mode”,”FAILFAST”)

.load(path)

Which would result in an error like:

Writing Delimited Files:


To write delimited files with Spark, the format method should be called with the argument “csv” even if the file that we are dealing with is not delimited by commas. That would inform spark that the files it is going to be dealing with are delimited files. The header option is mandatory in all of the following examples because I want the header to be written as the first line of the files. Without this option the files will not have the header line. Following are some of the examples of delimited file writing:


df.write.format(‘csv’).option(‘header’,’true’).save(path)


The following example writes the dataframe’s content as CSV file(s). The number of files depends on the number of spark partitions in memory. But we can always choose to change the number of partitions. The repartitioning will be covered in another article.


df.write.format('csv').option('header','true')

        .save(path)


Since the default write mode is errorIfExists, the writing operation will fail because there are some files present in the given directory.


The following example writes the data into the same directory but the only difference is the write mode is overwrite. This will make sure that the data is written even if any files are found.


df.write.format('csv').option('header','true')

        .mode('overwrite').save(path)


The following example depicts how to write data as pipe delimited files.



df.write.format('csv')

        .mode('overwrite')

        .option('header','true')

        .option(‘sep’,’|’)

        .save(path)


One of the common actions that is performed while writing to filesystems is partitioning with respect to one or more columns. Partitioning by column(s) creates folders for each unique value of the column(s) and the files inside the folder will contain only data for the corresponding column(s) value(s).Partitioning does not just make the data more easier for us to access but also provides Spark an option to perform predicate pushdown, but it does not apply to CSV files. This example depicts partitioning with the Section column and writing the CSV files.


df.write.format('csv').option('header','true')

        .partitionBy('Section')

        .mode('overwrite')

        .save(path)


Resulting in:



In the following example, the path has not been changed and the only difference is the write mode. The files are appended to the respective Section directories, no files that are already present are affected:


df.write.format('csv')

  .partitionBy('Section')

  .option('header','true')

  .mode('append').save(path)


One more example with partition by multiple columns can be seen below(one more column “Gender” has been added):



df.write.format('csv')

  .partitionBy('Section','Gender')

  .option('header','true')

  .mode('overwrite').save(path)


Which would result in:



Conclusion:


This article covered basic examples using the options that we have seen in th previous article. But all the options available in Spark are not covered for obvious reasons. I hope this was helpful and if you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 


Happy Learning! 😀


Comments

Popular posts from this blog

Rows in Spark 101

  Every row in a Dataframe is of type Row and this object comes with many methods with which we can interact with the data that it consists of. In this article I will dive deeper into the Row object Row: The objects of Row class can hold multiple values that correspond to columns. We can create Row objects separately or access Row objects from the existing dataframe. The Row class can be imported as follows: from pyspark.sql import Row  There are several methods that can help retrieve row objects from the dataframe. Some of them are listed below: df.collect() : This method returns a list of all rows that are present in the dataframe. The result is returned to the driver, so we should be careful while running this method as the driver might not have sufficient memory to hold all the Row objects.  df.take(num) : This method returns a list of a specific number of records, specified by the argument num that we are allowed to pass. The results are returned to the driver, so ...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Structured API Execution

  Spark Structured API execution can be split up into 3 high level steps: The syntax of the code is checked (common for any code) If the code is found to be valid, Spark converts this to a Logical Plan This Logical plan is then converted to Physical plan. In this process, any optimizations that are possible are applied. Spark executes this Physical Plan(RDD manipulations) on the cluster The first step does not just apply to Spark but to all types of programs. Python checks for syntax of the code written, and fails the job if there is any violation of syntax. The second step is what I would describe as Spark-specific. In this step, a sequence of three types of Logical plans are created, with the most optimal one at the end. Logical plans are abstract, with no clarity on low-level execution details. We could imagine it to be similar to a flow chart that gives a high-level idea about how operations are to be performed and in what order. As the high-level is a good start, a detailed pl...