Join operation in Spark is a very common operation that is performed between Dataframes. There are different types of Joins that Spark provides, as seen in a previous article. Since the data nowadays are huge, the performance join might hamper the entire pipeline’s performance. Because Joins usually trigger a large amount of data movement across Spark executors.There are at least a couple of ways by which we can optimize the join operation. But before one can think about optimizing, it would help to have a basic understanding of how Spark performs Joins. Spark has different strategies when it comes to joins. Two of the most common join strategies are: Broadcast Hash Join and Shuffle Sort Merge Join.
Broadcast Hash Join
Before getting the hands dirty with the details of this strategy it is better to know about a feature called broadcasting that Spark provides. Broadcasting allows us to broadcast variables or objects(even dataframes) to each executor. For simplicity, we assume only two tables are involved. The tables can be considered the extremes in terms of size. When one of the tables is small enough to fit into the memory of a worker, of course with some memory left free, Spark will broadcast the smaller table to the executors. In simple words, Spark replicates our small dataframe entirely on every worker node in the cluster. Spark performs broadcast once at the beginning, and then lets each individual worker node perform the work without having to wait or communicate with any other worker node. This means that at the beginning, there will be a large communication. But after this initial communication, every executor is allowed to work on its own. When this is the case, the CPU becomes the biggest bottleneck. This join strategy is considered to be the fastest and easiest join strategy that spark offers, when shuffling is the main consideration. In this Join strategy, there is shuffling of data. One should not confuse broadcasting and shuffling. Both are different operations.
The larger table in the join, which we consider to be way larger when compared to the smaller table, is spread across the executors. Since a copy of the smaller table is present inside every executor, we need no relocation of data. Both of the tables can be easily joined. By default Spark will use broadcast join if the smaller data set is less than 10MB, this threshold is configurable using the Spark Config spark.sql.autoBroadcastJoinThreshold. When the value of the flag is set to -1, this join strategy will not be considered for any join in the current SparkSession. We have the responsibility to tune it depending on how much memory is available on each executor and in the driver. If we try to broadcast something too large, this can crash our entire job.
We can explicitly give the optimizer a hint that we would like to use a broadcast join by using the broadcast() function around the small dataframe when defining the join. The following example illustrates this, and the physical plan printed confirms that the join strategy chosen here is Broadcast Hash Join.
from pyspark.sql.functions import broadcast
df_teachers.join(broadcast(df_sections),
df_teachers.Teacher_ID == df_sections.Class_Teacher_ID, 'inner').explain()
It is important to know in what situations can we enforce Broadcast Hash Join to efficiently join our datasets. One main consideration is the size of the smaller dataframe. Depending upon the executor configuration, we can decide to broadcast the smaller dataframes to the executors. The next situation is when we want to perform equi-join, to combine two datasets based on matching unsorted keys. Also this strategy might be useful to experiment when we are not worried about excessive network bandwidth usage or Out-Of-Memory errors. We can prevent the Out-Of-Memory error by increasing the executor memory configuration, but this might not always be feasible as there should be that much physical memory available.
Shuffle Sort Merge Join
This join strategy is usually chosen when two large tables are supposed to be joined. In shuffle join, every node talks to every other node by sharing data. The data is shared based on keys, the ultimate goal of this data share is to have similar keys of both of the datasets in the same node. Since this strategy involves data, which is huge in this case, to be shared across nodes, the join performed using this strategy is considered costly. There is a huge dependency on the network of the cluster nodes, in order to share data within the nodes. So if not managed properly, there will be heavy network traffic. The “management” is nothing but partitioning, not just by any key but by keys with which join will be performed. Partitioning decreases the need for shuffling, hence possibly decreasing the chances of job failure related to shuffling. Every worker node and potentially every partition will need to communicate with one another during the entire join process, especially if the existing partitioning is not compatible.
Shuffle sort merge join is efficient for merging two large datasets over a common key under certain conditions. The key(s) should be sortable, unique and have the ability to be assigned or stored in the same partition. The last condition from the above statement means two datasets with a common hashable key should have the ability to end up on the same partition. If this is the case, the join becomes easy for obvious reasons. There is a term to describe this scenario where rows of datasets with the same common-key’s value reside in the same node, it is called colocation. If this is not the case by default, shuffle of data is required between executors. Once the shuffle has occurred, there is a sort phase that orders the datasets by the join key(s). Then comes the merge phase, where each row is iterated and they are merged when the keys match. By default, Shuffle sort merge join is enabled by the Spark config: spark.sql.join.preferSortMergeJoin.
Since we will be majorly dealing with big datasets, Shuffle sort merge join is almost unavoidable. So learning to optimize the join will surely be of great help. One major optimization will be to eliminate the shuffle step. We can do that by creating partitioned buckets on keys that are frequent candidates for equi-joins. We can create an explicit number of buckets to store specific sorted columns, with each bucket hosting one key per bucket. Pre–sorting and pre-organizing of data helps in performance especially if the dataset(s) will be involved in many such joins.
Conclusion
If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference.
Happy Learning! 😀
Comments
Post a Comment