Skip to main content

Join strategies in Spark

 

Join operation in Spark is a very common operation that is performed between Dataframes. There are different types of Joins that Spark provides, as seen in a previous article. Since the data nowadays are huge, the performance join might hamper the entire pipeline’s performance. Because Joins usually trigger a large amount of data movement across Spark executors.There are at least a couple of ways by which we can optimize the join operation. But before one can think about optimizing, it would help to have a basic understanding of how Spark performs Joins. Spark has different strategies when it comes to joins. Two of the most common join strategies are: Broadcast Hash Join and Shuffle Sort Merge Join. 

Broadcast Hash Join


Before getting the hands dirty with the details of this strategy it is better to know about a feature called broadcasting that Spark provides. Broadcasting allows us to broadcast variables or objects(even dataframes) to each executor. For simplicity, we assume only two tables are involved. The tables can be considered the extremes in terms of size. When one of the tables is small enough to fit into the memory of a worker, of course with some memory left free, Spark will broadcast the smaller table to the executors. In simple words, Spark replicates our small dataframe entirely on every worker node in the cluster. Spark performs broadcast once at the beginning, and then lets each individual worker node perform the work without having to wait or communicate with any other worker node. This means that at the beginning, there will be a large communication. But after this initial communication, every executor is allowed to work on its own. When this is the case, the CPU becomes the biggest bottleneck. This join strategy is considered to be the fastest and easiest join strategy that spark offers, when shuffling is the main consideration. In this Join strategy, there is shuffling of data. One should not confuse broadcasting and shuffling. Both are different operations.


The larger table in the join, which we consider to be way larger when compared to the smaller table, is spread across the executors. Since a copy of the smaller table is present inside every executor, we need no relocation of data. Both of the tables can be easily joined. By default Spark will use broadcast join if the smaller data set is less than 10MB, this threshold is configurable using the Spark Config spark.sql.autoBroadcastJoinThreshold. When the value of the flag is set to -1, this join strategy will not be considered for any join in the current SparkSession. We have the responsibility to tune it depending on how much memory is available on each executor and in the driver. If we try to broadcast something too large, this can crash our entire job.


We can explicitly give the optimizer a hint that we would like to use a broadcast join by using the broadcast() function around the small dataframe when defining the join. The following example illustrates this, and the physical plan printed confirms that the join strategy chosen here is Broadcast Hash Join.


from pyspark.sql.functions import broadcast

df_teachers.join(broadcast(df_sections),

df_teachers.Teacher_ID == df_sections.Class_Teacher_ID, 'inner').explain()



We are allowed to do similar hints in the case of SparkSQL as well, but in that case the optimizer can choose to ignore them. So the hints are not enforced, as in the case of dataframe API. The hints in SparkSQL are supposed be provided in the following format /*+ hint [ , ... ] */. The hints for broadcast join can be one of the following: MAPJOIN, BROADCAST, and BROADCASTJOIN. Each of these hints should be followed with (table), which should be broadcasted. The following example uses the hint, MAPJOIN. The physical plan of the resulting dataframe is printed and it is evident that the hint that we provided has been ignored when creating the physical plan.


It is important to know in what situations can we enforce Broadcast Hash Join to efficiently join our datasets. One main consideration is the size of the smaller dataframe. Depending upon the executor configuration, we can decide to broadcast the smaller dataframes to the executors. The next situation is when we want to perform equi-join, to combine two datasets based on matching unsorted keys. Also this strategy might be useful to experiment when we are not worried about excessive network bandwidth usage or Out-Of-Memory errors. We can prevent the Out-Of-Memory error by increasing the executor memory configuration, but this might not always be feasible as there should be that much physical memory available.



Shuffle Sort Merge Join


This join strategy is usually chosen when two large tables are supposed to be joined. In shuffle join, every node talks to every other node by sharing data. The data is shared based on keys, the ultimate goal of this data share is to have similar keys of both of the datasets in the same node. Since this strategy involves data, which is huge in this case, to be shared across nodes, the join performed using this strategy is considered costly. There is a huge dependency on the network of the cluster nodes, in order to share data within the nodes. So if not managed properly, there will be heavy network traffic. The “management” is nothing but partitioning, not just by any key but by keys with which join will be performed. Partitioning decreases the need for shuffling, hence possibly decreasing the chances of job failure related to shuffling.  Every worker node and potentially every partition will need to communicate with one another during the entire join process, especially if the existing partitioning is not compatible.


Shuffle sort merge join is efficient for merging two large datasets over a common key under certain conditions. The key(s) should be sortable, unique and have the ability to be assigned or stored in the same partition. The last condition from the above statement means two datasets with a common hashable key should have the ability to end up on the same partition. If this is the case, the join becomes easy for obvious reasons. There is a term to describe this scenario where rows of datasets with the same common-key’s value reside in the same node, it is called colocation. If this is not the case by default, shuffle of data is required between executors. Once the shuffle has occurred, there is a sort phase that orders the datasets by the join key(s). Then comes the merge phase, where each row is iterated and they are merged when the keys match. By default, Shuffle sort merge join is enabled by the Spark config: spark.sql.join.preferSortMergeJoin.


Since we will be majorly dealing with big datasets, Shuffle sort merge join is almost unavoidable. So learning to optimize the join will surely be of great help. One major optimization will be to eliminate the shuffle step. We can do that by creating partitioned buckets on keys that are frequent candidates for equi-joins. We can create an explicit number of buckets to store specific sorted columns, with each bucket hosting one key per bucket. Pre–sorting and pre-organizing of data helps in performance especially if the dataset(s) will be involved in many such joins.


Conclusion


If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 


Happy Learning! 😀


Comments

Popular posts from this blog

Rows in Spark 101

  Every row in a Dataframe is of type Row and this object comes with many methods with which we can interact with the data that it consists of. In this article I will dive deeper into the Row object Row: The objects of Row class can hold multiple values that correspond to columns. We can create Row objects separately or access Row objects from the existing dataframe. The Row class can be imported as follows: from pyspark.sql import Row  There are several methods that can help retrieve row objects from the dataframe. Some of them are listed below: df.collect() : This method returns a list of all rows that are present in the dataframe. The result is returned to the driver, so we should be careful while running this method as the driver might not have sufficient memory to hold all the Row objects.  df.take(num) : This method returns a list of a specific number of records, specified by the argument num that we are allowed to pass. The results are returned to the driver, so ...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...