Skip to main content

Data Sources API

 

As Spark is used to deal with data, Spark supports both reading and writing of data for many different data sources. Data source denotes where the data that is used comes from. Some of the example data sources are:


  • HBase

  • Hive

  • MapR-DB

  • File system


 Data Sources API is an interface to a variety of data sources and file formats.Out of which there are 6 core data sources and file format that Spark inherently supports:


  1. CSV

  2. JSON

  3. Parquet

  4. ORC

  5. JDBC/ODBC connections

  6. Plain-text files


These are just the core data sources and Spark is definitely not limited to interacting with only the above, thanks to the open source community. As you might have guessed Spark provides two interfaces for each of these data sources and file formats. One for reading and another for writing/loading. The two interfaces are:


  1. DataFrameReader

  2. DataFrameWriter


DataFrameReader:


DataFrameReader is the interface to read data from a data source. It provides various methods to configure various details about our source and file format. DataFrameReader has a recommended pattern for usage:


DataFrameReader.format(args).option("key","value")

                            .options(**options_dict)

                            .schema(args).load()


As you might have guessed, the format method should be called with the format/data source that you might want to read. The option and options methods are to pass configuration for the read, these might differ for every data source/file format and these methods are not mandatory. The schema method should be called with schema related information, this as well is not necessary in certain cases. The final load method can be called with either path(s) to be read if not specified as part of options.


One important option that would apply to every data source or file format is mode, that determines the read mode. Read modes come particularly handy when we are dealing with sources whose quality might not be the best. Malformed data can be a common occurrence when dealing with semi structured data sources. Read mode determines how Spark will deal with malformed data when they are encountered.



Read Mode

Description

permissive (default)

When dealt with bad data, every column/field is set to null for that record. A string pseudo column is created which contains all the corrupted data

dropMalformed

Drops malformed records

failFast

When malformed data encountered, the process is failed


Detailed articles for several data sources will be published later. 


DataFrameReader cannot be instantiated directly in Spark. The instantiation occurs behind the scenes when we create a SparkSession. The DataFrameReader can be accessed as an attribute of spark session object:



DataFrameWriter:


DataFrameWriter is the interface to write to data sources/ file formats. It provides various methods to configure various details about our data source and file format. DataFrameWriter has a recommended patterns for usage:


  1. DataFrameWriter.format(args)

               .options(args)

               .bucketBy(args)

               .partitionBy(args)

               .sortBy(args)

               .save(path)


  1. DataFrameWriter.format(args)

               .options(args)

               .bucketBy(args)

               .partitionBy(args)

               .sortBy(args)

               .saveAsTable(tableName)


The first pattern applies to file based data sources. The dataframe can be written to supported file formats. bucketBy method is used to bucket the data based on given columns. partitionBy method is used to partition the data based on given columns. options method can be used to specify necessary options while writing the data. sortBy sorts the data in each bucket based on the given columns. As the description suggests, we cannot use sortBy without using bucketBy. The second pattern is used when writing to Spark Tables.All the methods discussed for the above pattern apply for this pattern as well.


Spark accepts configuration that will deal with the writing process if data is found in the given writing location. This configuration is called write mode. There are four write modes.



Save Mode

Description

errorIfExists(default)

Throws an error if any data is found in the location.Fails the write process.

overwrite

The files/data that are already present are deleted and data in the dataframe is written.

append

The data is written as files in such a way that the new files are added to the list of files that are already present.Spark does not append data to files that already exist, new files are created. If df is written to tables, the data is appended to the table.

ignore

The dataframe’s content is not written. Skips write operation.



option and options methods:


option method accepts two positional arguments, first should be the configuration/option name, second argument is the value for the specific configuration. Whereas the option method accepts keyword arguments, key being the configuration/option and value being the value for the configuration.


Conclusion:


This article should have given you the idea about the Data Sources API of Spark. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 


Happy Learning! 😀


Comments

Popular posts from this blog

Rows in Spark 101

  Every row in a Dataframe is of type Row and this object comes with many methods with which we can interact with the data that it consists of. In this article I will dive deeper into the Row object Row: The objects of Row class can hold multiple values that correspond to columns. We can create Row objects separately or access Row objects from the existing dataframe. The Row class can be imported as follows: from pyspark.sql import Row  There are several methods that can help retrieve row objects from the dataframe. Some of them are listed below: df.collect() : This method returns a list of all rows that are present in the dataframe. The result is returned to the driver, so we should be careful while running this method as the driver might not have sufficient memory to hold all the Row objects.  df.take(num) : This method returns a list of a specific number of records, specified by the argument num that we are allowed to pass. The results are returned to the driver, so ...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...