Skip to main content

User Defined Functions in Spark

 It is possible that there might be situations where there aren't some transformations available in Structured API that are required for a use case that one is working. To deal with those situations, Spark allows us to define user-defined functions(UDFs). Spar allows us to write our own custom transformations using Python, Scala and Java. These UDFs can involve usage of external libraries as well. So with these functionalities, possibilities are endless and the transformation toolkit is extensible. 


UDFs in Spark can take one or more columns as input and return one or more columns as output. They can be written in different programming languages and be used in the same programming languages and other supported languages. So an UDF written in Scala can be used in transformations in PySpark. UDFs are functions that operate on data record by record. To use a UDF within Spark, one should follow the following steps:


  1. We are supposed to write a function which will take necessary number of input columns, perform the operations and return the output column(s)

  2. To use this function with dataframes, tables or views, we need to register the function with Spark. 


By registering the functions, Spark allows us to use them on all of our worker machines. After registering, Spark will serialize the function on the driver and transfer it over the network to all executor processes. This happens regardless of the language that we had used to define the function. Irrespective of what language we used to define the function, we will not be able to take advantage of the code generation capabilities that Spark has for built-in functions. 


Based on the language that we had used to define the function, there can be two different scenarios. The first scenario happens if we had used Scala or Java, we can use the function within the JVM which already exists. This means that there will be little performance penalty aside from the fact related to code generation that is stated above. Obviously as the number of UDFs used in an application increases, there will be evident performance degradation. The second case happens if we had used Python for the same purpose as above. Since Python code does not run in the JVM, Spark starts Python processes on the workers and serializes all of the data to a format that Python can understand. Then the UDF is executed row by row on the data in the Python process. After this process, the data is deserialized to Spark internal format and returned to the JVM and Spark. 


In the later scenario described above, there are two costs associated. The first is that Spark has to spawn Python processes and the second one is in serializing the data to Python. Serializing the data to Python incurs more cost. This is due to two reasons:


  1. Serialization itself is costly

  2. Spark cannot manage the memory of the worker after the data enters Python


Since Spark loses some level of control here in terms of memory management, the chances of worker failures are high if there are resource constraints. The constraints might be due to the fact that both JVM and python are competing for memory on the same machine. Due to the above reasons, the recommendation is that we should write the UDFs in Scala or Java.The UDFs written in Scala or Java can be used from Python.This whole process involves small efforts but significant improvements in performance.


As the first step in the process of creating a UDF, the following snippet defines the function which will be later registered as a UDF:


def square(col):

    return col**2


As the function works on the column row-by-row, we can test the function by passing a sample Python numeric object:


print(square(2))

O/P: 4


As you can see, the function is working as expected. Now following the second step, we should register the function with Spark. This is done in two ways. The first way to register is using the udf method provided under Pyspark SQL’s functions module.The following snippet illustrates this method:


from pyspark.sql.functions import udf

udf_square = udf(square)


The udf method returns us a Spark user defined function which accepts column(s) as input arguments and returns column(s). One important thing to note is that the return value of the udf when applied to any column is by default StringType. This method can be further used in the computations on the dataframes.


df = spark.range(1,10)

df.select(udf_square(col('id'))).show()


When the returned dataframe is further inspected, we understand that the returned column is of type String:

df.select(udf_square(col('id'))).printSchema()


We can enforce the return type of the function by passing in the returnType argument taken by the udf method. The type that we pass as the value to returnType argument should be an object of pyspark.sql.types.DataType or a DDL-formatted type string. To enforce the appropriate return type, we could do the following:


from pyspark.sql.types import IntegerType

udf_square_int = udf(square, IntegerType()) 

(or)

udf_square_int = udf(square, 'INTEGER')

df1 = df.select(udf_square_int(col('id')))

df1.show(5)

df1.printSchema()


The returnType should be compatible with the object that the function returns. For the same transformation, if we were to change the return type to Double or Float, we get the following:


from pyspark.sql.types import FloatType

udf_square_float = udf(square, FloatType())

df1 = df.select(udf_square_float(col('id')))

df1.show(5)

df1.printSchema()



The returnType given during the registration is not compatible with the object that is actually returned. The object returned by the function is of type int(whose equivalent Spark internal type is IntegerType()), Spark will not be performing any casting to the given internal type FloatType(). When faced with this type incompatibility, Spark will not even throw an error but will just return null to indicate that there was a failure. To be more specific with the above use case, the range function creates a dataframe with a column of type IntegerType. When the function(udf_square_float) is called on each row, the equivalent type of the input column in Python is int. After the function is applied, the resulting object of square operation is also int. But for the registered FloatType, the result should have been its Python equivalent float. Since this is not the case, there is an incompatibility of types which results in a column of null. There might be cases where we might not want to return some value for every row that is passed as input. In those cases, we can simply return None. For example, in the square function if we do not want to return values for even input numbers we can write the function like the following, then register and use the function: 


def square(col):

    if col%2 == 0:

        return None

    return col**2

udf_square = udf(square, IntegerType())

df.select(udf_square(col('id'))).show()


As one can see, Spark does not throw any error and the values are correctly represented as Nulls.


The UDFs are registered and scoped to be used in a specific SparkSession or Context; at the time of writing we cannot register the UDFs to be used across different sessions. Using the above registration method, the UDF cannot be used in SparkSQL, expr() or selectExpr()


spark.sql('select udf_square(1)').show()

df.select(expr('udf_square(id)')).show()

df.selectExpr('udf_square(id)').show()


Usages identical to the above will result in the same AnalysisException as the following:


In order to use the UDF within the expressions and SparkSQL, we have to perform a different registration. This is done using the spark.udf.register method with similar syntax to the above udf method . This method takes three arguments: 


  1. name - name of the function that will be used in the SQLs or expressions

  2. f - python function

  3. returnType - type of the object returned by the python function provided to f


The one additional argument that we pass to this method, when compared to the udf method, is the name of the function to be used in the expressions or SQL. This name can be different from the Python function name. The function can be registered and be used like the following:


spark.udf.register('square', square, IntegerType())

spark.sql('select square(1)')

df.select(expr('square(id)')).show()

df.selectExpr('square(id)').show()




The returnType constraints specified for the udf method apply for the register method as well.


Conclusion;


I hope this article was a good place to start. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference. 



Happy Learning! 😀


Comments

Popular posts from this blog

Rows in Spark 101

  Every row in a Dataframe is of type Row and this object comes with many methods with which we can interact with the data that it consists of. In this article I will dive deeper into the Row object Row: The objects of Row class can hold multiple values that correspond to columns. We can create Row objects separately or access Row objects from the existing dataframe. The Row class can be imported as follows: from pyspark.sql import Row  There are several methods that can help retrieve row objects from the dataframe. Some of them are listed below: df.collect() : This method returns a list of all rows that are present in the dataframe. The result is returned to the driver, so we should be careful while running this method as the driver might not have sufficient memory to hold all the Row objects.  df.take(num) : This method returns a list of a specific number of records, specified by the argument num that we are allowed to pass. The results are returned to the driver, so ...

Introduction to Structured Streaming

  Structured Streaming is one of the APIs provided by Spark to address stream processing needs. The API is based on the familiar Structured APIs, so the data are treated as tables with only one important distinction which is that it is a table to which data is continuously appended. As seen in the previous article, because of the base, which is the Structured API, we can perform SQL queries over our data. So the amount of technical capability required to construct operations on data is low. Much of the complexity that revolves around the stream processing is abstracted and we are left with configuring and expressing operations at a high level. But still there are concepts of Structured Streaming that one has to be familiar with before starting to work with it. In this article, I would like to provide a basic description of the landscape which will further be explored in the coming articles. To ones who are familiar with Structured APIs and its concepts, there are not many new conce...

Spark Streaming APIs

  Streaming applications and some common designs of such applications had been discussed in the last two articles: article1 and article2. Spark, which is a powerful processing engine for batch data, also provides support for streaming data. The support comes as two related streaming APIs.  The earliest stream processing API in Spark is called DStream API and it takes the micro-batch processing approach over the record-by-record processing. The second design pattern that the DStream API follows is that it takes the declarative programming approach rather than the imperative approach. Finally the Dstream API only supports processing-time processing. For event-time use-cases, we have to implement our own version depending upon the use-case. One important distinction of DStream API is that it works on Java or Python objects instead of the familiar table-like abstractions such as Dataframe or Dataset. Since the objects are tied to their language, the amount of optimizations that Sp...