Hello and welcome back to Learning Journal. In this video, we are going to talk about Spark
User-defined functions.
What
is a user-defined function? I guess you already know it. User-defined functions or UDFs are a
crucial
feature of an SQL based environment. They allow you to extend the systems’ built-in
functionality.
When you do not have an SQL function for doing something, you can create a UDF and use it in
your
SQL. In Apache Spark, you can achieve this functionality in two steps.
- Create a function
- Register the function in the Spark session as a UDF
The first thing is to create a function. These functions are not unusual, they are like standard
functions. But once you
register them as UDF, you can call them in your SQL expression. That’s the only difference in a
standard
function and a user-defined function. However, you must note one essential thing. When you are
creating
a function that you want to use in an SQL expression, you must create a pure function. You don’t
want your UDF to have side effects. Right? If you do not understand pure functions and side
effects,
I recommend that you watch our Scala
tutorial videos.
I have seen people trying different ways to create a Spark UDF. If you try Google on this
topic,
you will see a variety of code and will really get confused.
In this tutorial, I will show
you the
most simple and straightforward method to create and use Spark UDF. Then we will go to the next
level,
and I will show you the technique for creating your UDF library. You can create and package all
your
Spark UDFs in a separate Jar, and then, you will be able to use those UDFs in any Spark
application
by merely including the jar to your classpath. We will not stop there. We will go one step
further,
and I will show you a UDF that you can define in Scala and use it in your PySpark code. We
already
talked about PySpark performance limitations in the earlier video, and hence the ability to
create
your UDFs in Scala and use them in PySpark is critical for the UDF performance.
Let’s start with the most straightforward method for creating and using a Spark UDF.
What is a Spark UDF?
I already talked about it. Apache Spark UDF is nothing more than a pure Scala function value that you register in the Spark session. Once registered, you can use the UDF in your SQL statements in the given session. It is as simple as that. Let me demonstrate the idea with an easy example.
So, what are we doing in this simple example? We are creating a Scala function value and registering it as a UDF in a single step. The API spark.udf.register is the standard method for registering a Spark UDF. The first argument is the name for the UDF. I named it as pgender. The second argument is a Scala function value. My Scala function takes one argument. If the argument value is f, female or the woman, we return Female else we return Male. That’s it. It is a simple function. Right? Let’s execute this API, and it registers a Scala function as a UDF. Then, we can use the UDF in a SQL expression. Let’s try it.
Let’s try one more.
Simple. Isn’t it. The function pgender becomes a UDF. Like any other SQL function, it is case insensitive. I hope you got the first impression of a Spark UDF. It is nothing but a Scala function value. All we need to do is to register it to the Spark session. Once registered, you can use it in the given session. The critical thing to remember is the session. We registered the UDF for a particular session, and you should be able to use it in the same session only.
How to use a Spark UDF in a Scala Application?
The earlier example creates a Scala UDF, and we used it in the Spark SQL statement. However, we
did all that in a REPL. You
might be wondering, how to use it in a Spark application? The method for creating and using a
Spark
UDF in an application is as simple as we did in the REPL. Let’s create a simple Spark
application
to show you the idea.
Create a project directory for your Spark application and then create a build.sbt
file. My
build file looks like below.
I am using Spark 2.3.0 and Scala 2.11.8 because I am going to execute this example on a Google Dataproc cluster that is built on Spark 2.3 and Scala 2.11.8. Great! So, we have a build file. The next step is to create a simple Spark application. Create a Scala source file and copy/paste the below code.
Let me quickly walk you through the code. The example code is straightforward. As usual, we
create a Spark session. Then
we load data from a CSV file source and create a temporary view. You have already seen all this
in
the earlier videos.
Now let’s come to the UDF thing. We define a local function literal. It takes a string as an
argument and returns the gender. Right? Then we register the function literal as a Spark UDF.
That’s
it. Once we register the UDF in the current Spark session, we should be able to use it in any
subsequent
Spark SQL statements.
We apply the UDF to create a new data frame, then we write it to Spark SQL database as a new
table. Finally, we stop the Spark session.
Once you compile and execute this application, you should have the
transformed_survey table created
in the default database. The converted survey table is a permanent table, and hence it should be
available to you even after the Spark application stops.
Let’s package this application. All you need to do is to execute sbt package
command from your
project directory.
The SBT will create the JAR
file in the target/scala-2.11/ directory. Once your JAR file is ready, you can submit
your application to the
Spark cluster for execution. Use the following command to submit your application.
Wait for few seconds to get the application complete the execution. Now, you can go to Spark SQL and check for your transformed survey table. You can use a simple SQL statement.
Amazing, isn’t it?
How to create Spark UDF Library?
The earlier example is the most simple and straightforward method for creating Spark UDF in a Scala application. The example works best when you need to create a UDF and use it in the same program. However, you might have a requirement to reuse a UDF in various Spark applications. In that case, you don't want to copy and paste the UDF code in each Spark job. In fact, I want to define all my UDFs at one place and package them in a jar file. That gives me two benefits.
- I will have a single copy of the source code for the UDF, and it will be easy to maintain.
- I can create a separate jar for all my UDFs and reuse it in various Spark applications.
Let’s create an example to demonstrate the idea. In the earlier example, we created the parse gender UDF. But now I want to take the parse gender UDF and package it in a UDF library. Once I create a JAR for the UDF Library, I should be able to use the parse gender UDF at the following places.
- Scala REPL
- PySpark REPL
- Spark application written in Scala Language
- PySpark application written in Python Language
The idea is to define the UDF once in the Scala language and use it wherever you need it. You
can use it in your Scala code
as well as in your Python code.
Let’s create a new project directory. I call it a UDF Lib project. You will need a build
file
for this project. Copy and paste the build file content, and you are ready to write your UDF
library.
Create a Scala source file and copy/paste the code.
Let me quickly walk you through the code. The first line is the package definition. Giving a
proper package structure to
your UDF library is essential. We will use this package name to import the UDFs when we want to
use
the UDFs from this library. We are going to use Spark Java interface to define our UDF, and
that’s
necessary to be able to use these UDFs from Scala as well as from Python. The Scala’s approach
to
creating a UDF is straightforward. Create a function and register it as a UDF. However, the Java
APIs follow a traditions approach. You must create a class by extending one of the UDF
interfaces.
Spark Java API offers a bunch of UDF interfaces starting from UDF0 to UDF22. I am going
to create a parse gender UDF. The parse gender UDF should take a single argument as input. So, I
need to extend the UDF1 interface for the parse gender class. If you are creating a UDF that
should
take 5 input parameters, you should extend the UDF5 interface. As you can see in the
documentation,
you can have up to twenty-two arguments for your UDF.
Great! Let’s come back to the example. The second line of the code imports the UDF1
interface.
Then we create a ParseGender class that extends UDF1. Then we define the types. The
first one is
the input type and the second one is the output type. For our example, input and output are both
strings. The body of the class is straightforward. We define a method, and we name it as the
call.
You might be wondering to see that code is written in Scala. We are extending the Java
Interface.
However, there is nothing that restricts us to create the UDF class in Scala language, and
that’s
what the example is doing. The entire code for the UDF is written in Scala. Amazing! Isn’t it.
Great! Let’s package the UDF in a JAR file. Just execute sbt package. That’s it.
The SBT will
create the JAR file in the target directory. Now it’s time to use the Spark UDF library in Scala
as well as Python. Let’s do that.
How to use Spark UDF Library in Scala?
I am going to demonstrate the method using Spark Shell. However, the process is the same for your Spark application a well. To use a UDF from a JAR file, you need to take care of two things.
- Make sure the JAR is available to all your worker nodes.
- Import and register the UDF in your Spark session.
So, how do you make a JAR available to your Spark worker nodes? You already know it. Right? Supply the jar using --jars option. I am going to use the Spark shell. But if you have a Spark application and you are using Spark submit, you can supply your UDF library using --jars option for the Spark submit.
Now let’s come to the next step. Import your UDF class. If you remember the UDF code, you can
recognize the package name.
So we import all the classes in that UDF library package. Then we instantiate an object for the
parse
gender class function. I hope you understand the underscore at the end of the line. That’s a
Scala
technique known as partially applied function. If you don’t understand that underscore and what
it
does, I recommend you to check out our Scala tutorials.
Great! Once we instantiate the function, we are ready to register for the current Spark
session.
Registering a UDF is as simple as making a call to the spark.udf.register API. That’s
it. You are now ready
to use this UDF in your Spark SQL. Rest of the code is straightforward, and I already explained
it
in the earlier example.
How to use Spark Scala UDF Library in Python?
Now it’s time to USE the same UDF in your Python application. The process for using the UDF from your JAR file is same as we did it in Scala. There are two steps.
- Make sure the JAR is available to all your worker nodes.
- Register the UDF in your Spark session.
To make your UDF jar available to the workers, we supply it using the --jars option. This is precisely the same as we did with Scala shell.
The next step is to register the function in the current Spark session. In a Python code, unlike Scala, you do not need to instantiate the function object and then register the UDF using the object. You can directly register it using the spark.udf.registerJavaFunction API. The first parameter is the UDF name and the second parameter is the UDF class name.
The spark.udf.registerJavaFunction API is available in Spark 2.3.0 and above. If you are using an older version, you might find this API in the SQL Context. The next line shows the code for Spark 2.2.
Once you register the UDF, rest is simple. Load the data, create a temporary view and start
using your UDF in the Spark SQL
code.
Great! Hope you enjoyed this video. Thank you for watching Learning. Keep learning and Keep
growing.