For some time now Spark has been offering a Pipeline API (available in MLlib module) which facilitates building sequences of transformers and estimators in order to process the data and build a model. Moreover, Spark MLlib module ships with a plethora of custom transformers that make the process of data transformation easy and painless. But what happens if there is no transformer that supports a particular use case?
Spark MLlib exposes a
Pipeline API, which facilitates a number of actions a developer may want any application to perform in order to prepare data for building a machine learning model (such as feature encoding, indexing, etc.) and producing a model itself.
Pipeline accepts an array that consists of a number of
Estimator instances. There is a profound difference between these two entities, namely:
Transformer is an instance that facilitates either feature transformation or model learning. The most important method any
Transformer exposes is
transform which takes a
DataSet[_] and returns a
DataFrame. The returned
DataFrame usually contains additional columns that represent a transformed feature or prediction result.
Estimator is responsible of transforming an input into model that can be utilized to train the data. It exposes
fit method which accepts a
DataFrame and returns a
Transformer that hosts a model.
Pipeline wraps a series of
Estimator instances and executes them in a given order.
There is a great reference to the whole
Pipeline process on Spark documentation site.
Let’s imagine we want to train a multiclass classification model based on Random Forest implementation in Spark. We have a number of different features, most of them categorical, represented by strings.
In order to transform the data and feed it to a
RandomForestClassifier instance we need to go through a number
Transformer instances first, e.g.
The problem that may emerge in the use case is that if we increase the number of classes the data can be assigned to the computation complexity raises and (if the resources are scarce) we can quickly run into OutOfMemory exception. There are multiple techniques and optimization tricks that can prevent the application from OOM, but let’s assume that we want to subsample the data so it reduces the computation time and minimizes the risk of OOM.
RandomForestClassifier contains a method that can do subsampling -
subsamplingRate. We can set subsampling ratio to quite small numbers, e.g. 0.3, if we feel comfortable with it.
By doing so we reduce the number of rows in a dataset significantly, decrease the time needed to train the model and reduce the risk of OOM. Can we now fire the application and forget about it?
Not at all. With a high number of classes in a dataset there is a significant probability that the overall distribution of labels is skewed. In other words, we may discover that there is a long tail of underrepresented classes.
There are some techniques that minimize the skewness of data, e.g. resampling the underrepresented classes, but we are more interested in keeping the distribution as it is originally.
Are we certain that
subsamplingRate is aware of the target class distribution and prevents some labels from being dropped whatsoever? Well, no.
Eventually, we make the decision to take some action to tackle the issue and introduce our own subsampling technique that will take into account the distribution of the target feature.
There is a method available on any
DataFrame that reduces the number of rows -
sample. So the easiest way to limit the size of each class would be to split the DataFrame into many small data frames equal to the number of classes, subsample each of them, and perform
union. The code that performs the whole operation may look like that:
There are some disadvantages to this approach.
Firstly, depending on number of classes, each dataframe in the resulting sequence is reshuffled into desired number of partitions, which means that after merging all those dataframes with
union the overall number of partitions for the final dataset may increase significantly.
Secondly, if we set sampling ratio to a significantly small number and the dataset contains some classes that are represented by only few entities we may lose these classes whatsoever.
Thirdly, let’s imagine that the model is going to be passed somewhere else with categorical features represented by indices produced by
StringIndexer. If we run the application multiple times and sample the data randomly the mapping between the indices and labels they represent will be lost.
We may come to the conclusion that it would be best if we sampled the data after it has been indexed, inside the pipeline.
In order to solve our issues we need to change two things:
Let’s address the first issue now.
If you have been working with some SQL, either plain or any specific implementation, you may know that there is a set of window functions (sometimes called analytical) that perform operations on an assigned window over data. In order to tackle our first issue we will resort to a bunch of these functions implemented in Spark (HiveContext). Let’s dive straight into the code.
The example above is pretty much self explanatory, yet let’s dissect it into pieces. In the first step there is one additional column created that contains a random double between 0 and 1 (0, 1]. All the numbers generated with this function should be unique. The second step creates yet another column that holds percent rank over each class in target feature ordered by a random number column we have generated in a previous step. This transformation ensures that there is always an entity with percent rank equal to 0, which facilitates keeping at least one entity if the number of observations for a certain class and subsampling ratio are both extremely low. Finally, the subsampling ratio is applied to the column with percent rank numbers and both auxiliary columns are dropped.
With the use of SQL functionality available in Spark we have managed to consciously subsample the columns without unnecessary partition proliferation.
Now it’s time to implement it as a
Transformer in order to inject it into a
There are two ways of creating a custom transformer, either by extending your class with a
UnaryTransformer or just
Transformer abstract class. In this post I would like to discuss the latter.
I will use
Transformer from Spark 2.0.X as it’s implementation is a little bit different than in previous versions of Spark.
In order to build a transformer we need to override three methods required by it’s ancestors:
In our use case the implementation is a bit simpler than usually as we do not change the schema of the dataset.
Let’s start from declaration:
As you can see we specify a column name of a target feature, sampling ratio and a replacement flag in case we want to sample with replacement. Additionally, we implicitly pass
SparkSession, a common entry point for Spark application available since Spark 2.0, in order to perform SQL-like operations.
Bear in mind that while instantiating
SparkSession you need to enable Hive support by invoking
Now, let’s implement the core method -
Here you are! The implementation is almost identical to one of our previous snippets.
We won’t discuss implementation of
tranformSchema method. In our case the method just returns the schema it receives as an input. Consider it a plug for a dent.
The last method
copy can be implemented by passing its input attributes to
defaultCopy function exposed by Spark
We should now have a fully functional
Transformer that can extend any pipeline should it be necessary.
I hope I have managed to convince you that building a custom
Transformer is nothing difficult and that by doing so we can easily extend any pipeline that modifies and processes data.
I have omitted some crucial things regarding sampling that might come to your mind as the purpose of this post was to present the way to build a custom
Transformer rather than stay statistically correct. I hope you will forgive me that one day!