As dataframes and views are lazily evaluated in Spark, Spark has to compute the dataframes and views from scratch(start from reading the sources). This might not be an issue when working with data of smaller sizes. The definition of small depends on the backend in which we are running our code. If we are running our code in our PC, the definition of small datasets might be in even hundreds of megabytes. Usually in production applications, it is more likely that the data that our jobs use might be in hundreds of gigabytes. If we are creating multiple dataframes after transformation and we choose to write all these dataframes, spark will be reading the huge sources every time there is a write to be performed. This is not performant because of the size of data that Spark has to deal with. So in this case there is a room for improvement of performance and one of Spark’s answers is to cache the dataframes/views.
Caching allows us to persistently store data in memory for faster access. Spark tries to fit as many partitions as possible based on the available memory. The remaining partitions that could not fit in memory are either spilled into the disk or recomputed depending upon the caching method used. Not every dataframe/view/table should be cached. Available memory is a restriction, but since there is an option to spill the remaining data into disk, we have to be mindful about how many entities that we cache. The recommended candidates for caching are highly accessed data,which are preferably large. Spark also lets us decide where and how to cache. The three distinct storage levels that we can cache are memory, disk and off-heap memory. There can be a mix and match of the above storage levels while caching. So we have three possibilities:
Fully on memory
Fully on disk
Memory and disk
Off heap memory (experimental)
Caching on the machine’s disk does not come without disadvantages. If a dataframe is cached on disk, the speed of retrieval of the partitions from disk are relatively slower when compared to caching in memory. But that does not mean that caching in disk is worse than staying with the lazy evaluation. The performance boost offered by caching on disk also depends on the amount of data that is involved.
Spark also allows us to decide on how the cached data is to be stored in memory, we cannot choose how it is stored in disk because only serialized data can be stored in disk. In memory we can choose to store the data either in serialized or deserialized form. While storing in serialized form, we should keep in mind that the data will be deserialized whenever it is referenced. When combining the possibilities of where and how the data is to be stored, we get a list of different combinations. Depending upon the job, we can choose the caching option.
To cache dataframes, Spark provides us two similar methods. The methods are as follows:
cache()
persist()
The difference between the methods is the flexibility that they provide us. The methods are explained in depth in the following individual sections. Irrespective of the method we use, caching is a lazy operation. Meaning that data will be cached only after they are accessed.
i.) cache()
The cache method will try to store as many partitions as possible inside memory across Spark Executors. If the memory is not sufficient, some partitions are not temporarily stored in memory. One more thing to note is that a partition cannot be fractionally cached. While the entire dataframe can be partially cached, the same does not hold true for partitions. For instance, if we tried caching a dataframe with eight partitions and there is only sufficient memory to store 4.5 of the partitions. In that case, only four partitions are cached. The remaining four partitions will either be spilled to disk or recomputed depending on the method of caching that was used.
As mentioned earlier, caching is lazily evaluated in Spark. But one more interesting aspect is that only records that are accessed immediately after caching are cached. So after caching, if only four records are accessed by using df.take(4) then only the partition containing all of these records are cached. So if we wish to fully utilize the power of caching, we should access all the records post caching using an operation like count().
The cache method has a very basic syntax, the method can be called on all dataframe, dataset, RDD objects. The cache method caches data with a default storage level of MEMORY_AND_DISK. With this storage level, data that is spilled to disk is only read when there is a need. The data in memory is not serialized, but data in disk as usual is serialized. The method does not take any argument. The method returns a cached dataframe, which is the same as the dataframe on which we called the cache method. We can either use that dataframe or the original dataframe. The method can be called like the following
df = spark.range(1 * 20000000).toDF("id")
.withColumn("squared", col('id')*col("id"))
df.cache() # caching the dataframe
%timeit df.count() # checking run time, before caching
%timeit df.count() # checking run, time post caching
O/P: 180 ms ± 17.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
88.7 ms ± 19.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
As shown, the count operation post caching is relatively faster because Spark fetches the dataframe that is cached and does not repeat the range function and withColumn function. The return value of cache can be really useful when we want to cache a result of a chain of operations, instead of retrieving the result and then performing the cache. The latter looks something like this, same as the above example:
df = spark.range(1 * 20000000).toDF("id")
.withColumn("squared", col('id')*col("id"))
df.cache()
In the above example, the result is stored into the variable df first and then the dataframe is cached by calling the method on the dataframe. But the same can be done in one-line like the following:
df = spark.range(1 * 20000000).toDF("id")
.withColumn("squared", col('id')*col("id")).cache()
In the above case, the only thing left to do is to perform a count, in order to actually cache the dataframe.
ii.) persist()
The persist method provides us more flexibility and control over how our data is cached. The method takes one argument, which should be a storage level. The storage level that we pass to this method should be a flag from the class called StorageLevel. This class provides us many flags representing different levels of caching storage. The levels are as follows:
For those who are not aware of what off-heap means, it is one segment of memory from the JVM memory. JVM memory consists of three segments:
Heap Memory
Non-Heap Memory
JVM code, internal structures
In this article, I will only be covering the first two segments.
Heap
The JVM has a heap that is where objects instantiated by our applications running on JVM are stored. The heap can increase or decrease in size while the application runs. When the heap becomes full, garbage is collected. Here garbage means objects that have not been used in a while.
Off-Heap
The off-heap memory is outside the scope of Garbage Collection, hence the application developer has more control over the memory. Spark uses off-heap memory for two purposes:
Internal Java usage
To store data
Usage of persist()
The usages of persist with the available storage levels are very similar. The only difference is the storage level argument that is passed.
df_new = spark.range(1 * 20000000).toDF("id")
.withColumn("squared", col('id')*col("id"))
df_new.persist(StorageLevel.MEMORY_ONLY)
(OR)
df_new.persist(StorageLevel(useDisk=False, useMemory=True, useOffHeap=False, deserialized=False, replication=1))
Both of the above method calls result in the dataframe being cached in the memory. Similarly other available StorageLevels can be used to cache the dataframe within the corresponding StorageLevel.
Caching of Tables or Views
As mentioned above, tables and views derived from DataFrames can be cached. This makes sure that the view is not recomputed and the table is not re-read every time an action is performed. The SparkSQL syntax for the caching operation is as follows:
CACHE [ LAZY ] TABLE table_identifier
[ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]
Elements enclosed in [ ] are not mandatory. This syntax is common for both tables and views. Without the use of the LAZY keyword in the CACHE TABLE statement, the table or view is immediately cached. When the LAZY keyword is included, the table or view will be cached only when it is first used. Similar to the persist method, we are allowed to specify the storage level using the cache statement. The storage level can be any of the values described in the table above. If the storage level is not explicitly specified, then the default is MEMORY_AND_DISK.
CACHE TABLE students_cached OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM Students
(or)
CACHE TABLE students_cached OPTIONS ('storageLevel'='DISK_ONLY') AS SELECT * FROM Students
The only difference between the queries is the usage of equal to sign and the AS keyword, both of which are not mandatory. It is important to remember that the Students table referenced is searched in the current database and if not present an error is thrown. To safeguard from this error, it is better to reference the database name. Also, the name for the resulting cached table should not have any database name as a prefix, if tried with the prefix an error is thrown. The cached table can now be accessed without any database prefix, even if the current database is not the same as the database of the source table of the cached table.
spark.sql('CACHE TABLE sec_c_student as select * from test_db.student_secc')
spark.sql('select * from sec_c_student').show() # no database prefix
spark.sql('CACHE TABLE test_db.sec_c_student_2 as select * from test_db.student_secc')
O/P:
ParseException:
It is not allowed to add catalog/namespace prefix test_db to the table name in CACHE TABLE AS SELECT(line 1, pos 0)
No two cached tables can have the same name, but the cached table can have the same name as the source table.
spark.sql('CACHE TABLE student as select * from student')
spark.sql('select * from student').show()
The from statement above reads the table that is stored in-memory. All the above methods can be used to cache views, irrespective of whether it is a temporary or global temporary, as view is just a type of table.
Uncaching/Unpersisting:
It is important to know that whatever data we are caching can be either removed by us or by Spark. The data that we cache are physically cached in the executors. A single executor can run multiple tasks, hence it can hold multiple cached data. As we go caching dataframe, at a point in time the memory gets full. During this journey of caching data, spark maintains a ranking of recency of data usage. Partition that was recently used in a task has the highest rank and the partition that was never used or least recently used will have the lowest rank. Once that memory is almost full, the cache with the least rank is dropped to make space for the newest cache. But it might be helpful to clear cache manually, once the cached data has served its purpose. Consider that we have a long running job and we have cached a dataframe at the beginning. If all the transformed data that was sourced from the cached data has been written somewhere and if there is no need for the cache it is better to uncache the dataframe. This will free up resources, which is highly recommended for long running jobs. As I mentioned above, Spark does this automatically as well.
Spark provides us multiple ways to uncache or unpersist the data. Dataframes come with the method unpersist(), which helps us unpersist the dataframe. unpersist() method takes one argument, which is blocking and takes boolean as an input value. The argument is by default False, meaning that the call to the method initiates unpersisting and returns the control to our script even before completing the entire operation. If this is not what you want in your application, pass True as an argument. This method can be used irrespective of what method you used on the dataframe to cache or what storage level you used to cache, be it cache() or persist() and any of the storage levels available. The unpersist() can be used like the following:
df.unpersist(True)
(or)
df.unpersist()
To uncache tables or views, Spark provides us UNCACHE and CLEAR CACHE statements. UNCACHE statement has the following syntax:
UNCACHE TABLE [ IF EXISTS ] table_name
IF EXISTS clause can be ignored if the table or that is being uncached is valid. To be on the safer side IF EXISTS’s usage is recommended. Otherwise, the usage of UNCACHE should be straightforward. UNCACHE can be used to uncache individual tables and views, but what if there is a need to uncache all the cached tables and views? CLEAR CACHE statement comes hand for that requirement. The syntax of the statement is very simple:
CLEAR CACHE
The above methods are sufficient if we want to uncache dataframes or tables. But if we want to uncache multiple dataframes and tables at once, the Catalog’s clearCache() method can be used. The clearCache() method can be used to clear all cached data, irrespective of whether it is dataframe or a table. The method does not take any arguments and returns None. This method is not like the unpersist() method, which comes with a blocking and unblocking option. Issuing this method will uncache all the data and the flow will be blocked until all the cache is cleared.
print(spark.catalog.clearCache())
O/p: None
Before concluding this article, it is important to mention when to cache and when not to. It is better to cache dataframes or tables that will be accessed multiple times in our application. It is better to avoid caching, when the data is too big to fit in memory. Also, avoid caching when the transformations that are applied are very less, irrespective of the size of the data.
Conclusion;
Caching and Uncaching becomes handy when building large and long running data pipelines. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference.
Happy Learning! 😀
Comments
Post a Comment