This time I will describe a way to read files, written using Apache Hudi that were schema evolved over
time.
The whole code that is described in the following sections could be found in a single gist, for your convenience.
Preparation
Starting spark-shell
The fastest and easiest way to show this example is to use spark-shell, make sure you have it installed and working
with version 2.4.4 of Apache Spark.
To start the spark-shell with Hudi’s support, you can run the following command:
It should download the mentioned dependencies if they don’t exist.
Now just import the following libs that will help with writing the Hudi files:
Creating the events
We will create two simple JSON format events. One of them will be older, and one newer with a one extra field,
regarding their event time.
As we can see the second event is happened later in time ('event_date': '20200206') and got also an extra field called
direction.
Now lets read them into a DataFrame and check how spark perceives them.
Writing the events
Now we will write the events using Hudi, with some simple configuration.
The PRECOMBINE_FIELD_OPT_KEY option is used to determine which row to write when getting the same key. It will write the
row that has the largest value in the specified field. In the example we use the utc field that represents the
epoch time of the event. Which means, that the latest event with the same key will be written.
The RECORDKEY_FIELD_OPT_KEY option is used to determine which field to use as the key of the row. In the example we use uuid
which is just a unique id for the event.
The PARTITIONPATH_FIELD_OPT_KEY option is used to determine by which field to partition the data when writing. In the example
we use the event_date field, which represents the date when the event happened, as I expect that most of the queries
would be at least time filtered.
The TABLE_NAME option is used to mark which table to register in Hive. It is a
mandatory option, but in our example it is meaningless.
We use the SaveMode.Append to append the events, otherwise if SaveMode.Overwrite was used, it would have overwritten
the whole folder that is mentioned in the .save method.
Now as we finished with writing the events, we can move to try and read it.
Reading the data
Basic read
Reading Hudi files, is as easy as reading parquet files using spark, just need to mention the relevant format.
The result is:
As we can see, we get some more Hudi metadata columns, but we are missing the direction field, as spark uses the
schema of the first partition.
Merge Schema read
Instead of the basic read, we can ask spark to merge the schemas of all the files it is processing. As it sounds it is
a relatively expensive operation, so it is turned off by default. There are several way to turn it on. In the next
example we are going to use it as an option when reading the files.
Now the output is as we expected it to be:
Reducing the performance penalty
In order to reduce the performance penalty, we did some work around for our use case.
Instead of just reading all the files with mergeSchema, we only read the latest partition with it.
Extract the schema, and then read all the other partitions we are interested in with that schema.
As we can see, we got the direction field with a default value of null.