It is possible that there might be situations where there aren't some transformations available in Structured API that are required for a use case that one is working. To deal with those situations, Spark allows us to define user-defined functions(UDFs). Spar allows us to write our own custom transformations using Python, Scala and Java. These UDFs can involve usage of external libraries as well. So with these functionalities, possibilities are endless and the transformation toolkit is extensible.
UDFs in Spark can take one or more columns as input and return one or more columns as output. They can be written in different programming languages and be used in the same programming languages and other supported languages. So an UDF written in Scala can be used in transformations in PySpark. UDFs are functions that operate on data record by record. To use a UDF within Spark, one should follow the following steps:
We are supposed to write a function which will take necessary number of input columns, perform the operations and return the output column(s)
To use this function with dataframes, tables or views, we need to register the function with Spark.
By registering the functions, Spark allows us to use them on all of our worker machines. After registering, Spark will serialize the function on the driver and transfer it over the network to all executor processes. This happens regardless of the language that we had used to define the function. Irrespective of what language we used to define the function, we will not be able to take advantage of the code generation capabilities that Spark has for built-in functions.
Based on the language that we had used to define the function, there can be two different scenarios. The first scenario happens if we had used Scala or Java, we can use the function within the JVM which already exists. This means that there will be little performance penalty aside from the fact related to code generation that is stated above. Obviously as the number of UDFs used in an application increases, there will be evident performance degradation. The second case happens if we had used Python for the same purpose as above. Since Python code does not run in the JVM, Spark starts Python processes on the workers and serializes all of the data to a format that Python can understand. Then the UDF is executed row by row on the data in the Python process. After this process, the data is deserialized to Spark internal format and returned to the JVM and Spark.
In the later scenario described above, there are two costs associated. The first is that Spark has to spawn Python processes and the second one is in serializing the data to Python. Serializing the data to Python incurs more cost. This is due to two reasons:
Serialization itself is costly
Spark cannot manage the memory of the worker after the data enters Python
Since Spark loses some level of control here in terms of memory management, the chances of worker failures are high if there are resource constraints. The constraints might be due to the fact that both JVM and python are competing for memory on the same machine. Due to the above reasons, the recommendation is that we should write the UDFs in Scala or Java.The UDFs written in Scala or Java can be used from Python.This whole process involves small efforts but significant improvements in performance.
As the first step in the process of creating a UDF, the following snippet defines the function which will be later registered as a UDF:
def square(col):
return col**2
As the function works on the column row-by-row, we can test the function by passing a sample Python numeric object:
print(square(2))
O/P: 4
As you can see, the function is working as expected. Now following the second step, we should register the function with Spark. This is done in two ways. The first way to register is using the udf method provided under Pyspark SQL’s functions module.The following snippet illustrates this method:
from pyspark.sql.functions import udf
udf_square = udf(square)
The udf method returns us a Spark user defined function which accepts column(s) as input arguments and returns column(s). One important thing to note is that the return value of the udf when applied to any column is by default StringType. This method can be further used in the computations on the dataframes.
df = spark.range(1,10)
df.select(udf_square(col('id'))).show()
When the returned dataframe is further inspected, we understand that the returned column is of type String:
df.select(udf_square(col('id'))).printSchema()
We can enforce the return type of the function by passing in the returnType argument taken by the udf method. The type that we pass as the value to returnType argument should be an object of pyspark.sql.types.DataType or a DDL-formatted type string. To enforce the appropriate return type, we could do the following:
from pyspark.sql.types import IntegerType
udf_square_int = udf(square, IntegerType())
(or)
udf_square_int = udf(square, 'INTEGER')
df1 = df.select(udf_square_int(col('id')))
df1.show(5)
df1.printSchema()
The returnType should be compatible with the object that the function returns. For the same transformation, if we were to change the return type to Double or Float, we get the following:
from pyspark.sql.types import FloatType
udf_square_float = udf(square, FloatType())
df1 = df.select(udf_square_float(col('id')))
df1.show(5)
df1.printSchema()
The returnType given during the registration is not compatible with the object that is actually returned. The object returned by the function is of type int(whose equivalent Spark internal type is IntegerType()), Spark will not be performing any casting to the given internal type FloatType(). When faced with this type incompatibility, Spark will not even throw an error but will just return null to indicate that there was a failure. To be more specific with the above use case, the range function creates a dataframe with a column of type IntegerType. When the function(udf_square_float) is called on each row, the equivalent type of the input column in Python is int. After the function is applied, the resulting object of square operation is also int. But for the registered FloatType, the result should have been its Python equivalent float. Since this is not the case, there is an incompatibility of types which results in a column of null. There might be cases where we might not want to return some value for every row that is passed as input. In those cases, we can simply return None. For example, in the square function if we do not want to return values for even input numbers we can write the function like the following, then register and use the function:
def square(col):
if col%2 == 0:
return None
return col**2
udf_square = udf(square, IntegerType())
df.select(udf_square(col('id'))).show()
As one can see, Spark does not throw any error and the values are correctly represented as Nulls.
The UDFs are registered and scoped to be used in a specific SparkSession or Context; at the time of writing we cannot register the UDFs to be used across different sessions. Using the above registration method, the UDF cannot be used in SparkSQL, expr() or selectExpr().
spark.sql('select udf_square(1)').show()
df.select(expr('udf_square(id)')).show()
df.selectExpr('udf_square(id)').show()
Usages identical to the above will result in the same AnalysisException as the following:
In order to use the UDF within the expressions and SparkSQL, we have to perform a different registration. This is done using the spark.udf.register method with similar syntax to the above udf method . This method takes three arguments:
name - name of the function that will be used in the SQLs or expressions
f - python function
returnType - type of the object returned by the python function provided to f
The one additional argument that we pass to this method, when compared to the udf method, is the name of the function to be used in the expressions or SQL. This name can be different from the Python function name. The function can be registered and be used like the following:
spark.udf.register('square', square, IntegerType())
spark.sql('select square(1)')
df.select(expr('square(id)')).show()
df.selectExpr('square(id)').show()
The returnType constraints specified for the udf method apply for the register method as well.
Conclusion;
I hope this article was a good place to start. If you have any suggestions or questions please post it in the comment box. This article, very much like every article in this blog, will be updated based on comments and as I find better ways of explaining things. So kindly bookmark this page and checkout whenever you need some reference.
Happy Learning! 😀
Comments
Post a Comment