Our thoughts, knowledge, insights and opinions

Extending Spark pipeline

For some time now Spark has been offering a Pipeline API (available in MLlib module) which facilitates building sequences of transformers and estimators in order to process the data and build a model. Moreover, Spark MLlib module ships with a plethora of custom transformers that make the process of data transformation easy and painless. But what happens if there is no transformer that supports a particular use case?

What are Transformer and Estimator?

Spark MLlib exposes a Pipeline API, which facilitates a number of actions a developer may want any application to perform in order to prepare data for building a machine learning model (such as feature encoding, indexing, etc.) and producing a model itself. The Pipeline accepts an array that consists of a number of Transformer and Estimator instances. There is a profound difference between these two entities, namely:

  • Transformer is an instance that facilitates either feature transformation or model learning. The most important method any Transformer exposes is transform which takes a DataSet[_] and returns a DataFrame. The returned DataFrame usually contains additional columns that represent a transformed feature or prediction result.

  • Estimator is responsible of transforming an input into model that can be utilized to train the data. It exposes fit method which accepts a DataFrame and returns a Transformer that hosts a model.

The Pipeline wraps a series of Transformer and Estimator instances and executes them in a given order.

There is a great reference to the whole Pipeline process on Spark documentation site.

The use case

Let’s imagine we want to train a multiclass classification model based on Random Forest implementation in Spark. We have a number of different features, most of them categorical, represented by strings. In order to transform the data and feed it to a RandomForestClassifier instance we need to go through a number Transformer instances first, e.g. StringIndexer and VectorAssembler.

The problem that may emerge in the use case is that if we increase the number of classes the data can be assigned to the computation complexity raises and (if the resources are scarce) we can quickly run into OutOfMemory exception. There are multiple techniques and optimization tricks that can prevent the application from OOM, but let’s assume that we want to subsample the data so it reduces the computation time and minimizes the risk of OOM.

Fortunately, RandomForestClassifier contains a method that can do subsampling - subsamplingRate. We can set subsampling ratio to quite small numbers, e.g. 0.3, if we feel comfortable with it. By doing so we reduce the number of rows in a dataset significantly, decrease the time needed to train the model and reduce the risk of OOM. Can we now fire the application and forget about it?

Not at all. With a high number of classes in a dataset there is a significant probability that the overall distribution of labels is skewed. In other words, we may discover that there is a long tail of underrepresented classes. There are some techniques that minimize the skewness of data, e.g. resampling the underrepresented classes, but we are more interested in keeping the distribution as it is originally. Are we certain that subsamplingRate is aware of the target class distribution and prevents some labels from being dropped whatsoever? Well, no.

Eventually, we make the decision to take some action to tackle the issue and introduce our own subsampling technique that will take into account the distribution of the target feature.

Attempt I - subsampling outside the Pipeline

There is a method available on any DataFrame that reduces the number of rows - sample. So the easiest way to limit the size of each class would be to split the DataFrame into many small data frames equal to the number of classes, subsample each of them, and perform union. The code that performs the whole operation may look like that:

There are some disadvantages to this approach. Firstly, depending on number of classes, each dataframe in the resulting sequence is reshuffled into desired number of partitions, which means that after merging all those dataframes with union the overall number of partitions for the final dataset may increase significantly. Secondly, if we set sampling ratio to a significantly small number and the dataset contains some classes that are represented by only few entities we may lose these classes whatsoever. Thirdly, let’s imagine that the model is going to be passed somewhere else with categorical features represented by indices produced by StringIndexer. If we run the application multiple times and sample the data randomly the mapping between the indices and labels they represent will be lost. We may come to the conclusion that it would be best if we sampled the data after it has been indexed, inside the pipeline.

Attempt II - eating a cake and having it too

In order to solve our issues we need to change two things:

  • create a set of operations that consciously sample the data without increasing the number of partitions,
  • make this set perform all operations as a step in a pipeline.

Let’s address the first issue now.

Spark SQL to the rescue

If you have been working with some SQL, either plain or any specific implementation, you may know that there is a set of window functions (sometimes called analytical) that perform operations on an assigned window over data. In order to tackle our first issue we will resort to a bunch of these functions implemented in Spark (HiveContext). Let’s dive straight into the code.

The example above is pretty much self explanatory, yet let’s dissect it into pieces. In the first step there is one additional column created that contains a random double between 0 and 1 (0, 1]. All the numbers generated with this function should be unique. The second step creates yet another column that holds percent rank over each class in target feature ordered by a random number column we have generated in a previous step. This transformation ensures that there is always an entity with percent rank equal to 0, which facilitates keeping at least one entity if the number of observations for a certain class and subsampling ratio are both extremely low. Finally, the subsampling ratio is applied to the column with percent rank numbers and both auxiliary columns are dropped.

With the use of SQL functionality available in Spark we have managed to consciously subsample the columns without unnecessary partition proliferation.

Now it’s time to implement it as a Transformer in order to inject it into a Pipeline.

Would you like me to wrap it up for you?

There are two ways of creating a custom transformer, either by extending your class with a UnaryTransformer or just Transformer abstract class. In this post I would like to discuss the latter. I will use Transformer from Spark 2.0.X as it’s implementation is a little bit different than in previous versions of Spark.

In order to build a transformer we need to override three methods required by it’s ancestors: transform, transformSchema and copy. In our use case the implementation is a bit simpler than usually as we do not change the schema of the dataset.

Let’s start from declaration:

As you can see we specify a column name of a target feature, sampling ratio and a replacement flag in case we want to sample with replacement. Additionally, we implicitly pass SparkSession, a common entry point for Spark application available since Spark 2.0, in order to perform SQL-like operations. Bear in mind that while instantiating SparkSession you need to enable Hive support by invoking enableHiveSupport method.

Now, let’s implement the core method - transform:

Here you are! The implementation is almost identical to one of our previous snippets.

We won’t discuss implementation of tranformSchema method. In our case the method just returns the schema it receives as an input. Consider it a plug for a dent.

The last method copy can be implemented by passing its input attributes to defaultCopy function exposed by Spark ml module.

We should now have a fully functional Transformer that can extend any pipeline should it be necessary.


I hope I have managed to convince you that building a custom Transformer is nothing difficult and that by doing so we can easily extend any pipeline that modifies and processes data.

I have omitted some crucial things regarding sampling that might come to your mind as the purpose of this post was to present the way to build a custom Transformer rather than stay statistically correct. I hope you will forgive me that one day!

You like this post? Want to stay updated? Follow us on Twitter or subscribe to our Feed.

by Tomasz Sosiński
December 1, 2016
Tags : Scala Spark MLlib

comments powered by Disqus