Skip to main content

Data Sources API

 

As Spark is used to deal with data, Spark supports both reading and writing of data for many different data sources. Data source denotes where the data that is used comes from. Some of the example data sources are:


  • HBase

  • Hive

  • MapR-DB

  • File system


 Data Sources API is an interface to a variety of data sources and file formats.Out of which there are 6 core data sources and file format that Spark inherently supports:


  1. CSV

  2. JSON

  3. Parquet

  4. ORC

  5. JDBC/ODBC connections

  6. Plain-text files


These are just the core data sources and Spark is definitely not limited to interacting with only the above, thanks to the open source community. As you might have guessed Spark provides two interfaces for each of these data sources and file formats. One for reading and another for writing/loading. The two interfaces are:


  1. DataFrameReader

  2. DataFrameWriter


DataFrameReader:


DataFrameReader is the interface to read data from a data source. It provides various methods to configure various details about our source and file format. DataFrameReader has a recommended pattern for usage:


DataFrameReader.format(args).option("key","value")

                            .options(**options_dict)

                            .schema(args).load()


As you might have guessed, the format method should be called with the format/data source that you might want to read. The option and options methods are to pass configuration for the read, these might differ for every data source/file format and these methods are not mandatory. The schema method should be called with schema related information, this as well is not necessary in certain cases. The final load method can be called with either path(s) to be read if not specified as part of options.


One important option that would apply to every data source or file format is mode, that determines the read mode. Read modes come particularly handy when we are dealing with sources whose quality might not be the best. Malformed data can be a common occurrence when dealing with semi structured data sources. Read mode determines how Spark will deal with malformed data when they are encountered.



Read Mode

Description

permissive (default)

When dealt with bad data, every column/field is set to null for that record. A string pseudo column is created which contains all the corrupted data

dropMalformed

Drops malformed records

failFast

When malformed data encountered, the process is failed


Detailed articles for several data sources will be published later. 


DataFrameReader cannot be instantiated directly in Spark. The instantiation occurs behind the scenes when we create a SparkSession. The DataFrameReader can be accessed as an attribute of spark session object:



DataFrameWriter:


DataFrameWriter is the interface to write to data sources/ file formats. It provides various methods to configure various details about our data source and file format. DataFrameWriter has a recommended patterns for usage:


  1. DataFrameWriter.format(args)

               .options(args)

               .bucketBy(args)

               .partitionBy(args)

               .sortBy(args)

               .save(path)


  1. DataFrameWriter.format(args)

               .options(args)

               .bucketBy(args)

               .partitionBy(args)

               .sortBy(args)

               .saveAsTable(tableName)


The first pattern applies to file based data sources. The dataframe can be written to supported file formats. bucketBy method is used to bucket the data based on given columns. partitionBy method is used to partition the data based on given columns. options method can be used to specify necessary options while writing the data. sortBy sorts the data in each bucket based on the given columns. As the description suggests, we cannot use sortBy without using bucketBy. The second pattern is used when writing to Spark Tables.All the methods discussed for the above pattern apply for this pattern as well.


Spark accepts configuration that will deal with the writing process if data is found in the given writing location. This configuration is called write mode. There are four write modes.



Save Mode

Description

errorIfExists(default)

Throws an error if any data is found in the location.Fails the write process.

overwrite

The files/data that are already present are deleted and data in the dataframe is written.

append

The data is written as files in such a way that the new files are added to the list of files that are already present.Spark does not append data to files that already exist, new files are created. If df is written to tables, the data is appended to the table.

ignore

The dataframe’s content is not written. Skips write operation.



option and options methods:


option method accepts two positional arguments, first should be the configuration/option name, second argument is the value for the specific configuration. Whereas the option method accepts keyword arguments, key being the configuration/option and value being the value for the configuration.


Conclusion:


This article should have given you the idea about the Data Sources API of Spark. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 


Happy Learning! 😀


Comments

Popular posts from this blog

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Joins in Spark

  Joins are a crucial transformation when it comes to data. It allows us to enrich the data that we currently have with some more data. As the name suggests, the join() method allows us to join two dataframes. We are allowed to define what strategy is to be used and on what conditions the dataframes are to be joined. I had given a brief introduction in this article , if you have not checked it out I kindly request you to do so. The join() method takes three arguments: other on how The other should be an object of Class DataFrame , argument on should be either a condition or a list of column(s) or column name(s). Argument how determines what join strategy is to be used.  The how argument accepts a variety of string values that should be one of the following: Allowed Values for how Diagram inner cross outer,full,fullouter,full_outer left,leftouter,left_outer right,rightouter,right_outer semi,left_semi,leftsemi Note: Same as inner join, but the final dataframe has only colu...