In the earlier video, we started our discussion on Spark Data frames. In this video, we will deep dive further and try to understand some internals of Apache Spark data frames. I am assuming that you have already watched the Architecture video. We covered some RDD basics in that lesson. You might find that half of this video is reiterating the RDD concepts. And that happens because a data frame ultimately compiles down to an RDD. However, we will go ahead of those notions and also cover things that are specific to data frames. So, Let's start.
We already learned a method to load data from a source. What do you want to
do next?
If you have a business requirement, you might want to do some data processing and generate
an
output to meet your requirement. Let me give you a simple requirement. We already loaded mental
health
survey dataset. Can you generate following bar chart using that dataset?
Well, we don't want to get into the visualization so let's reduce the requirement to an output
dataset. The table represents
the final output that we want to achieve. I created the Bar Chart in MS Excel using the above
table.
Does it appear to be an easy requirement? Well, It is indeed an easy example. We are going
to
solve this problem, and while we develop a solution, you will also discover some important data
frame
concepts.
We already learned that RDDs are immutable. Once loaded, you cannot modify them. However,
you
can perform following types of operations on RDDs.
- Transformations
- Actions
Spark data frames carry the same legacy from RDDs. So, Spark data frames are also immutable. But you can perform transformations on them to generate new data frames. You can also perform actions on a Spark data frames. If you check the documentation, you will see two categories of transformations.
- Typed Transformations
- Untyped Transformations
Both of these are available to data frames. The untyped transformations might return you a
dataset. But you can convert a
dataset to a data frame using the toDF function. However, you don't
need to worry too much about it because Spark can take care of that automatically.
Transformations are the basic building blocks for Spark developers. They are the key tools
for
the developers because we use transformations to express our business logic.
Let's come back to our example. What do we want to do? If you check the
dataset .
You will see a field for treatment. This field records the response to the following question.
Have you taken a treatment for your mental illness?
Some people responded as Yes and others as No. All we need to do is to
count
these Yes and No. Then group them by the gender. That's it. You will get the
output
table. We can meet this requirement by applying a set of transformations. Let's try that.
The first activity is to load the data into a DataFrame. You can use below code to load the
data.
There are many fields in this dataset. However, for my requirement, I am only concerned with two fields.
- Gender
- Treatment
So, let me apply a select transformation to select only two fields.
So the df1 is a new data frame that contains only two fields. But I need to count
Yes
and Nos. So I might need to separate them into two different columns. Once I have all
the
Yes in one column and all the Nos in another column, I can easily count them.
So, let's apply another transformation.
This time, I am taking df1 and applying another select transformation. I take the
gender
field as it was earlier. I am not applying any changes to that field. However, I am transforming
the treatment field. When the treatment is Yes, I take it as a numeric one. Otherwise,
I
take it as a numeric zero. Now, I can simply take a sum of All-Yes column to get the
total
number of Yes. Right? You might want to do it differently.
For example, When the treatment is Yes, take it as Yes otherwise take it
as
Null. Then you can apply a count function on that column.
However, I prefer to take it as a numeric value and apply the sum function instead of a
count
function. That helps me to avoid unnecessary dealing with nulls. You might also wonder that why
do
we apply two transformations for this. I mean, We could have done it in a single step.
Let me show you.
Instead of applying this transformation over df2, we could have applied it directly to
df.
I mean, we could have avoided the first transformation and done it in a single step. You
might
argue that if we can avoid one transformation, and do it as a single step, Spark might perform
better.
Right? But that's not the truth in this case. There is no difference in performance. At least
not
in this example.
Why?
Let's try to understand that.
We already learned in RDDs that the transformations are lazy. They don't execute until we
fire
an action. Right? Spark implements data frames to be lazy because that design gives them at
least
two clear benefits.
- Spark engine can act as a compiler.
- They get an opportunity to apply necessary optimizations.
In our example, we applied two transformations. We haven't executed any action yet. So, Spark
has executed nothing yet on
the cluster. You might see a load operation on Spark UI. But I am talking about those two select
transformations. None of them are executed yet.
Let's apply an action.
Now you will see one job in Spark UI.
If you jump to the SQL tab in the Spark UI and click on the collect job, you will get a lot
of
details.
You will see four plans.
- Parsed Logical Plan
- Analyzed Logical Plan
- Optimized Logical Plan
- and Finally the Physical Plan
When we execute an action, Spark takes the user code. In our case, It takes those two select
transformations. It will then
parse the user code and generate a parsed logical plan.
The second step is to analyze the initial plan and resolve the column names and their data
types.
The output of the second step is an analyzed logical plan. Apache Spark maintains a catalog of
all
the tables and data frame information. The analyzer makes a call to the catalog and resolves the
initial plan. The analyzed plan clearly shows the column names and the datatypes.
The analyzed logical plan goes to an optimizer. As of now, the optimizer mainly performs two
types of optimizations.
- Pipelining
- Predicate pushdown
Pipelining is as simple as combining multiple transformations together. We created two
transformations. Both were the select
operations. Spark realizes that it can combine them together into a single transformation. So,
it
simply does that.
You can cross check it by looking at the optimized plan. The pipelining optimization doesn't
only apply to a select transformation. Spark will look for all such opportunities and apply the
pipelining
where ever it is applicable.
The other type of optimization is the predicate pushdown. That simply means pushing down the
filter conditions to the early stage instead of applying it at the end.
The optimized logical plan goes to the Spark compiler that generates a bunch of physical
execution
plans. The physical execution plan is nothing but a series of RDD transformations. The Spark
engine
generates multiple physical plans based on various considerations. Those considerations might be
a different approach to perform a join operation. It may be the physical attributes of the
underlying
data file. It may be something else. However, Spark settles down to a single physical plan that
it
evaluates to be the best among others. Finally, The best plan goes for the execution on the
cluster.
Great. Let's come back to our example. We loaded data. We applied one transformation. Then
we
applied another transformation. But we haven't reached the desired output. I think if group by
the
gender and compute a sum over the second and third column, we should get the desired output.
Let's try that.
Now we apply a group by on gender. Then aggregate to calculate the sum of the other two columns.
All we are doing here is
nothing but simply chaining the data frame APIs. The syntax might look weird in the beginning,
but
you will be comfortable with this style in few days. All the methods that I am using in this
example
are available in dataset documentation with easy to follow examples.
If you check the output for the above transformation, you will realize that the Gender field
is not very well coded. We have a data quality problem. There are many ways to deal with that
issue.
You might want to use a tricky regular expression and translate each value to one of the
genders.
However, we have a small dataset, and I think the safest method is to make a list of all unique
values
and handle them using a match case expression. I can quickly create a Scala function to handle
it.
Here is the code. Well, this is a plain scala code.
We want this function to be available to all the executors. you can do that by registering this function as a UDF. Registering a UDF is as simple as passing the function to the UDF function.
Spark will serialize the function on the driver and transfer it over the network to all executor
processes. So, now we can
use the parseGenderUDF in our data frames.
Let's create another transformation to fix our data quality problem.
I am using the data frame df2 and applying another select transformation. This time, we apply the parseGenderUDF to the gender field. We also take All-Yes and All-Nos fields that we created earlier. Now, we can do a group by on df3.
Great. You can check the output using below statement.
You shoud get the desired output.
We did this in several steps. You might have lost it in between. So, Let me list down all
the
code at once.
There are few more things covered in the video. So don't miss the video.