Skip to main content

Caching in Spark 101

 As dataframes and views are lazily evaluated in Spark, Spark has to compute the dataframes and views from scratch(start from reading the sources). This might not be an issue when working with data of smaller sizes. The definition of small depends on the backend in which we are running our code.  If we are running our code in our PC, the definition of small datasets might be in even hundreds of megabytes. Usually in production applications, it is more likely that the data that our jobs use might be in hundreds of gigabytes. If we are creating multiple dataframes after transformation and we choose to write all these dataframes, spark will be reading the huge sources every time there is a write to be performed. This is not performant because of the size of data that Spark has to deal with. So in this case there is a room for improvement of performance and one of Spark’s answers is to cache the dataframes/views. 


Caching allows us to persistently store data in memory for faster access. Spark tries to fit as many partitions as possible based on the available memory. The remaining partitions that could not fit in memory are either spilled into the disk or recomputed depending upon the caching method used. Not every dataframe/view/table should be cached. Available memory is a restriction, but since there is an option to spill the remaining data into disk, we have to be mindful about how many entities that we cache. The recommended candidates for caching are highly accessed data,which are preferably large. Spark also lets us decide where and how to cache. The three distinct storage levels that we can cache are memory, disk and off-heap memory. There can be a mix and match of the above storage levels while caching. So we have three possibilities:


  1. Fully on memory

  2. Fully on disk

  3. Memory and disk

  4. Off heap memory (experimental)


Caching on the machine’s disk does not come without disadvantages. If a dataframe is cached on disk, the speed of retrieval of the partitions from disk are relatively slower when compared to caching in memory. But that does not mean that caching in disk is worse than staying with the lazy evaluation. The performance boost offered by caching on disk also depends on the amount of data that is involved.


Spark also allows us to decide on how the cached data is to be stored in memory, we cannot choose how it is stored in disk because only serialized data can be stored in disk. In memory we can choose to store the data either in serialized or deserialized form. While storing in serialized form, we should keep in mind that the data will be deserialized whenever it is referenced. When combining the possibilities of where and how the data is to be stored, we get a list of different combinations. Depending upon the job, we can choose the caching option. 


To cache dataframes, Spark provides us two similar methods. The methods are as follows:

  1. cache()

  2. persist()

The difference between the methods is the flexibility that they provide us. The methods are explained in depth in the following individual sections.  Irrespective of the method we use, caching is a lazy operation. Meaning that data will be cached only after they are accessed. 


i.) cache()


The cache method will try to store as many partitions as possible inside memory across Spark Executors. If the memory is not sufficient, some partitions are not temporarily stored in memory. One more thing to note is that a partition cannot be fractionally cached. While the entire dataframe can be partially cached, the same does not hold true for partitions. For instance, if we tried caching a dataframe with eight partitions and there is only sufficient memory to store 4.5 of the partitions. In that case, only four partitions are cached. The remaining four partitions will either be spilled to disk or recomputed depending on the method of caching that was used.


As mentioned earlier, caching is lazily evaluated in Spark. But one more interesting aspect is that only records that are accessed immediately after caching are cached. So after caching, if only four records are accessed by using df.take(4) then only the partition containing all of these records are cached. So if we wish to fully utilize the power of caching, we should access all the records post caching using an operation like count().


The cache method has a very basic syntax, the method can be called on all dataframe, dataset, RDD objects. The cache method caches data with a default storage level of MEMORY_AND_DISK. With this storage level, data that is spilled to disk is only read when there is a need. The data in memory is not serialized, but data in disk as usual is serialized. The method does not take any argument.  The method returns a cached dataframe, which is the same as the dataframe on which we called the cache method. We can either use that dataframe or the original dataframe. The method can be called like the following



df = spark.range(1 * 20000000).toDF("id")

          .withColumn("squared", col('id')*col("id"))

df.cache() # caching the dataframe

%timeit df.count() # checking run time, before caching

%timeit df.count() # checking run, time post caching

O/P: 180 ms ± 17.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

88.7 ms ± 19.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


As shown, the count operation post caching is relatively faster because Spark fetches the dataframe that is cached and does not repeat the range function and withColumn function. The return value of cache can be really useful when we want to cache a result of a chain of operations, instead of retrieving the result and then performing the cache. The latter looks something like this, same as the above example:


df = spark.range(1 * 20000000).toDF("id")

          .withColumn("squared", col('id')*col("id"))

df.cache()


In the above example, the result is stored into the variable df first and then the dataframe is cached by calling the method on the dataframe. But the same can be done in one-line like the following:


df = spark.range(1 * 20000000).toDF("id")

          .withColumn("squared", col('id')*col("id")).cache()


In the above case, the only thing left to do is to perform a count, in order to actually cache the dataframe.


ii.) persist()

The persist method provides us more flexibility and control over how our data is cached. The method takes one argument, which should be a storage level. The storage level that we pass to this method should be a flag from the class called StorageLevel. This class provides us many flags representing different levels of caching storage. The levels are as follows:


StorageLevel

Description

MEMORY_ONLY (or) StorageLevel(useDisk=False, useMemory=True, useOffHeap=False, deserialized=False, replication=1)


Data is stored directly as objects and stored only in memory

MEMORY_ONLY_SER y


The data is stored in serialized form and in memory. Data should be serialized before use 

MEMORY_AND_DISK (or) StorageLevel(useDisk=True, useMemory=True, useOffHeap=False, deserialized=False, replication=1)

Spark tries to fit the data as objects in memory. In the case of insufficient memory, the rest of the data is serialized and stored on disk

DISK_ONLY (or)

StorageLevel(useDisk=True, useMemory=False, useOffHeap=False, deserialized=False, replication=1)

Data is stored in serialized form and on disk 

OFF_HEAP (or) StorageLevel(useDisk=True, useMemory=True, useOffHeap=True, deserialized=False, replication=1)

Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. 

MEMORY_AND_DISK_SER

Similar to MEMORY_ONLY_SER, but spill the data that does not fit in the memory to disk.

MEMORY_AND_DISK_SER_2

Similar to the MEMORY_AND_DISK_SER, but replicate each partition on two cluster nodes.

MEMORY_ONLY_2 (or) StorageLevel(useDisk=False, useMemory=True, useOffHeap=False, deserialized=False, replication=2)

Similar to the MEMORY_ONLY, but replicate each partition on two cluster nodes.

MEMORY_AND_DISK_2 (or) StorageLevel(useDisk=True, useMemory=True, useOffHeap=False, deserialized=False, replication=2)

Similar to the MEMORY_AND_DISK, but replicate each partition on two cluster nodes.

DISK_ONLY_2 (or) StorageLevel(useDisk=True, useMemory=False, useOffHeap=False, deserialized=False, replication=2)

Similar to the DISK_ONLY, but replicate each partition on two cluster nodes.

MEMORY_ONLY_SER_2

Similar to the MEMORY_ONLY_SER, but replicate each partition on two cluster nodes.


For those who are not aware of what off-heap means, it is one segment of memory from the JVM memory. JVM memory consists of three segments:


  1. Heap Memory

  2. Non-Heap Memory

  3. JVM code, internal structures


In this article, I will only be covering the first two segments.


Heap

The JVM has a heap that is where objects instantiated by our applications running on JVM are stored. The heap can increase or decrease in size while the application runs. When the heap becomes full, garbage is collected. Here garbage means objects that have not been used in a while.


Off-Heap

The off-heap memory is outside the scope of Garbage Collection, hence the application developer has more control over the memory. Spark uses off-heap memory for two purposes:

  1. Internal Java usage

  2. To store data


Usage of persist()

The usages of persist with the available storage levels are very similar. The only difference is the storage level argument that is passed.


df_new = spark.range(1 * 20000000).toDF("id")

              .withColumn("squared", col('id')*col("id"))

df_new.persist(StorageLevel.MEMORY_ONLY)

(OR)

df_new.persist(StorageLevel(useDisk=False, useMemory=True, useOffHeap=False, deserialized=False, replication=1))


Both of the above method calls result in the dataframe being cached in the memory. Similarly other available StorageLevels can be used to cache the dataframe within the corresponding  StorageLevel.


Caching of Tables or Views

As mentioned above, tables and views derived from DataFrames can be cached. This makes sure that the view is not recomputed and the table is not re-read every time an action is performed. The SparkSQL syntax for the caching operation is as follows:


CACHE [ LAZY ] TABLE table_identifier

    [ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]

Elements enclosed in [ ] are not mandatory. This syntax is common for both tables and views. Without the use of the LAZY keyword in the CACHE TABLE statement, the table or view is immediately cached. When the LAZY keyword is included, the table or view will be cached only when it is first used. Similar to the persist method, we are allowed to specify the storage level using the cache statement. The storage level can be any of the values described in the table above. If the storage level is not explicitly specified, then the default is MEMORY_AND_DISK.


CACHE TABLE students_cached OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM Students

(or)

CACHE TABLE students_cached OPTIONS ('storageLevel'='DISK_ONLY') AS SELECT * FROM Students


The only difference between the queries is the usage of equal to sign and the AS keyword, both of which are not mandatory. It is important to remember that the Students table referenced is searched in the current database and if not present an error is thrown. To safeguard from this error, it is better to reference the database name. Also, the name for the resulting cached table should not have any database name as a prefix, if tried with the prefix an error is thrown. The cached table can now be accessed without any database prefix, even if the current database is not the same as the database of the source table of the cached table.


spark.sql('CACHE TABLE sec_c_student as select * from test_db.student_secc')

spark.sql('select * from sec_c_student').show() # no database prefix



spark.sql('CACHE TABLE test_db.sec_c_student_2 as select * from test_db.student_secc')


O/P:

ParseException

It is not allowed to add catalog/namespace prefix test_db to the table name in CACHE TABLE AS SELECT(line 1, pos 0)


No two cached tables can have the same name, but the cached table can have the same name as the source table.


spark.sql('CACHE TABLE student as select * from student')

spark.sql('select * from student').show()


The from statement above reads the table that is stored in-memory. All the above methods can be used to cache views, irrespective of whether it is a temporary or global temporary, as view is just a type of table.


Uncaching/Unpersisting:


It is important to know that whatever data we are caching can be either removed by us or by Spark. The data that we cache are physically cached in the executors. A single executor can run multiple tasks, hence it can hold multiple cached data. As we go caching dataframe, at a point in time the memory gets full. During this journey of caching data, spark maintains a ranking of recency of data usage. Partition that was recently used in a task has the highest rank and the partition that was never used or least recently used will have the lowest rank. Once that memory is almost full, the cache with the least rank is dropped to make space for the newest cache. But it might be helpful to clear cache manually, once the cached data has served its purpose. Consider that we have a long running job and we have cached a dataframe at the beginning. If all the transformed data that was sourced from the cached data has been written somewhere and if there is no need for the cache it is better to uncache the dataframe. This will free up resources, which is highly recommended for long running jobs. As I mentioned above, Spark does this automatically as well. 


Spark provides us multiple ways to uncache or unpersist the data. Dataframes come with the method unpersist(), which helps us unpersist the dataframe. unpersist() method takes one argument, which is blocking and takes boolean as an input value. The argument is by default False, meaning that the call to the method initiates unpersisting and returns the control to our script even before completing the entire operation. If this is not what you want in your application, pass True as an argument. This method can be used irrespective of what method you used on the dataframe to cache or what storage level you used to cache, be it cache() or persist() and any of the storage levels available. The unpersist() can be used like the following:


df.unpersist(True)

(or)

df.unpersist()

To uncache tables or views, Spark provides us UNCACHE and CLEAR CACHE statements. UNCACHE statement has the following syntax: 


UNCACHE TABLE [ IF EXISTS ] table_name


IF EXISTS clause can be ignored if the table or that is being uncached is valid. To be on the safer side IF EXISTS’s usage is recommended. Otherwise, the usage of UNCACHE should be straightforward. UNCACHE can be used to uncache individual tables and views, but what if there is a need to uncache all the cached tables and views? CLEAR CACHE statement comes hand for that requirement. The syntax of the statement is very simple:


CLEAR CACHE


The above methods are sufficient if we want to uncache dataframes or tables. But if we want to uncache multiple dataframes and tables at once, the Catalog’s clearCache() method can be used. The clearCache() method can be used to clear all cached data, irrespective of whether it is dataframe or a table. The method does not take any arguments and returns None. This method is not like the unpersist() method, which comes with a blocking and unblocking option. Issuing this method will uncache all the data and the flow will be blocked until all the cache is cleared.


print(spark.catalog.clearCache())

O/p: None


Before concluding this article, it is important to mention when to cache and when not to. It is better to cache dataframes or tables that will be accessed multiple times in our application. It is better to avoid caching, when the data is too big to fit in memory. Also, avoid caching when the transformations that are applied are very less, irrespective of the size of the data.



Conclusion;


Caching and Uncaching becomes handy when building large and long running data pipelines. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 



Happy Learning! 😀


Comments

Popular posts from this blog

Rows in Spark 101

  Every row in a Dataframe is of type Row and this object comes with many methods with which we can interact with the data that it consists of. In this article I will dive deeper into the Row object Row: The objects of Row class can hold multiple values that correspond to columns. We can create Row objects separately or access Row objects from the existing dataframe. The Row class can be imported as follows: from pyspark.sql import Row  There are several methods that can help retrieve row objects from the dataframe. Some of them are listed below: df.collect() : This method returns a list of all rows that are present in the dataframe. The result is returned to the driver, so we should be careful while running this method as the driver might not have sufficient memory to hold all the Row objects.  df.take(num) : This method returns a list of a specific number of records, specified by the argument num that we are allowed to pass. The results are returned to the driver, so ...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...