Custom Machine Learning Estimators at Scale on Dask & RAPIDS
How to build reusable components that integrate with Scikit-learn, Dask, and RAPIDS
When building reusable data science and machine learning code, developers often need to add custom business logic around existing open source libraries, such as scikit-learn. These customization may perform data preprocessing, segment that data in a specific way or implement a proprietary algorithm. Custom logic results in more code to understand and maintain, which adds complexity and introduces risk. This blog post will discuss how to leverage the scikit-learn library’s API to add such customizations in a way that can minimize code, reduce maintenance, facilitate reuse, and provide the ability to scale with technologies such as Dask and RAPIDS.
Why is it difficult to follow the scikit-learn API in modeling code?
The end goal of any machine learning endeavor is usually to get the best model for a given dataset as quickly as possible. In machine learning, so much time is spent on data processing, model training, and validation that the design and maintainability of the code created in the process is sometimes overlooked. In a world of limited resources, perhaps this prioritization makes sense. Afterall, it is the model that will be put into production, not the code that produced it. In banks and other financial institutions, models must go through a validation process - where reproducibility is critical - before they are deployed into production. Once the model is deployed, the code that produced it will often sit in a git repository untouched for weeks, months, or even years. Until one day when a model developer, which may or may not be the original author, will come back to the code to update the model. They will greatly benefit - in time and reduced cost - from a well tested and documented repository that is easy to understand so they can get to work quickly.
In practice, following standards and applying them can be challenging because
- Standards must be understood before they can be applied.
- Applying a standard to your problem takes effort.
Overcoming these challenges takes careful thought and requires precious time that may not be abundant on a data science project. The PyData ecosystem is composed of many evolving standards and APIs making it difficult to keep up with the changes. All of this can slow down the model development process so the path of least resistance is to use a library wrapped in your own design that best suits your application. However, this often leads to problems such as data leaks due to user error or repeated code because the design was not modular enough to allow extensions. In the latter case, the maintenance overhead is high due to an increasingly large and complex code base where fixing bugs or scaling, even with Dask and RAPIDS, becomes challenging.
Scikit-learn example
We will use the following example from the scikit-learn documentation to illustrate the points in this post.
from sklearn import datasets
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, train_test_split
import numpy as np
X_digits, y_digits = datasets.load_digits(return_X_y=True)
# Define a pipeline to search for the best combination of PCA truncation
# and classifier regularization.
pca = PCA()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(max_iter=10000, tol=0.1)
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])
# Parameters of pipelines can be set using ‘__’ separated parameter names:
param_grid = {
'pca__n_components': [5, 15, 30, 45, 64],
'logistic__C': np.logspace(-4, 4, 4),
}
search = GridSearchCV(pipe, param_grid, n_jobs=-1)
X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, random_state=123)
search.fit(X_train, y_train)
best = search.best_estimator_
print(f"Training set score: {best.score(X_train, y_train)}")
print(f"Test set score: {best.score(X_test, y_test)}")
Training set score: 1.0
Test set score: 0.9666666666666667
Customizing the Scikit-learn Example
Suppose we want to modify this example to mutate data after the PCA step. Perhaps we have a set of features that we always want to include from another data source or need to tokenize the data. We could do this by removing the pipeline and adding a function to perform the mutation on X.
def mutate(X):
"""Mutates X"""
# ... do something ...
return X
pca = PCA(n_components=search.best_params_['pca__n_components'])
logistic = LogisticRegression(
max_iter=10000, tol=0.1, C=search.best_params_['logistic__C'])
X_train, X_test, y_train, y_test = train_test_split(
X_digits, y_digits, random_state=123)
X_train = pca.fit_transform(X_train, y_train)
X_train = mutate(X_train)
logistic = logistic.fit(X_train, y_train)
X_test = pca.transform(X_test) # <- Don't call fit again!
X_test = mutate(X_test) # <-Don’t forget to call mutate on X_test!
print(f"Training set score: {logistic.score(X_train, y_train)}")
print(f"Test set score: {logistic.score(X_test, y_test)}")
Training set score: 1.0
Test set score: 0.9666666666666667
This allows for customizations without requiring the user to learn about scikit-learn pipelines. However, if you are collaborating with or developing a library for other teams this pattern quickly becomes problematic. This code could be encapsulated into a class and then be copied many times. Some of those classes could even be slightly modified to add extended functionality or scale. Additionally, if you discover a bug in one of the classes, you will potentially have many places to fix it. Happy hunting!
Consider an alternative that extends the scikit-learn API by adding a custom estimator that contains the mutation logic.
from sklearn.base import BaseEstimator, TransformerMixin
from abc import ABCMeta
class Mutate(TransformerMixin, BaseEstimator, metaclass=ABCMeta):
def fit(self, X, y):
return self
def transform(self, X):
"""Mutates X"""
# ... do something ...
return X
pca = PCA(n_components=search.best_params_['pca__n_components'])
logistic = LogisticRegression(max_iter=10000, tol=0.1, C=search.best_params_['logistic__C'])
pipe = Pipeline(steps=[('pca', pca), ('mutate', Mutate()), ('logistic', logistic)])
X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, random_state=123)
pipe.fit(X_train, y_train)
print(f"Training set score: {pipe.score(X_train, y_train)}")
print(f"Test set score: {pipe.score(X_test, y_test)}")
Training set score: 1.0
Test set score: 0.9666666666666667
By adding a custom estimator, we have encapsulated the mutation in a way that allows us to insert it into the exact step of the pipeline. An additional benefit to this approach is that we only have to maintain one class that can grow over time to contain more functionality and scale.
def transform(self, X):
"""Mutates X"""
# ... do something ...
if is_dask_collection(X):
# Do Dask things
elif is_rapids_collection(X):
# Do RAPIDS things
return X
The primary disadvantage of this approach, and probably why it is not followed, is that the developer needs enough understanding of `scikit-learn` to fit the problem into the API.
Existing scalable custom estimators in Dask & RAPIDS
Actually this is not a new pattern. In fact, we already have plenty of examples of custom scalable estimators in the PyData community. dask-ml is a library of scikit-learn extensions that scale data and perform parallel computations using Dask. Dask-ml provides many drop-in replacements for scikit-learn estimators.
Here is what the toy example pipeline looks like with `dask-ml`.
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
n_workers=1, memory_limit='3GB')
client
from dask_ml.datasets import make_classification
from dask_ml.decomposition import PCA
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline # <-- using the sklearn pipeline
import numpy as np
X, y = make_classification(n_samples=1000, n_features=20,
chunks=100, n_informative=4,
random_state=0)
# Define a pipeline to search for the best combination of PCA truncation
# and classifier regularization.
pca = PCA()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(fit_intercept=False, max_iter=10000, tol=0.1)
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])
# Parameters of pipelines can be set using ‘__’ separated parameter names:
param_grid = {
'pca__n_components': [5, 15, 30, 45, 64],
'logistic__C': np.logspace(-4, 4, 4),
}
search = GridSearchCV(pipe, param_grid, n_jobs=-1)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=123)
search.fit(X_train, y_train)
best = search.best_estimator_
print(f"Training set score: {best.score(X_train, y_train)}")
print(f"Test set score: {best.score(X_test, y_test)}")
Alternatively, one can use cuML’s drop-in replacements to scale on NVIDIA GPU’s with RAPIDS.
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from cuml.dask.datasets.classification import make_classification
cluster = LocalCUDACluster()
client = Client(cluster)
X, y = make_classification(n_samples=1000, n_features=20,
chunks=100, n_informative=4,
random_state=0)
from cuml.dask.decomposition import PCA
from cuml.linear_model import LogisticRegression
pca = PCA()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(max_iter=10000, tol=0.1)
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])
Designing your own scikit-learn estimators that scale with Dask & RAPIDS
At this point, we have shown how the same pipeline can be scaled in two ways and you may see a pattern emerging. The data structures and modeling algorithms depend on the same underlying corresponding libraries, but are loosely coupled from one another. In other words, we have separated the data loading logic from the computation, which relies on either the array-like or dataframe API. Under the hood, we see that the Dask estimators know how to deal with Dask collections and the cuML estimators know how to deal with RAPIDS collections. It all works if we read the data using the library that matches the estimator. Can we build our own estimators following this pattern to encapsulate custom business logic in a way that scales on both Dask and RAPIDS?
First let’s look at the ways to read data for in-memory, distributed, and accelerated frameworks.
import pandas as pd
df = pd.read_csv(...)
import dask.dataframe as dd
df = dd.read_csv(...)
import cudf as cdf
df = cdf.read_csv(...)
Notice, all three of these libraries provide drop-in replacements that can be used as an abstraction layer that generalizes reading data into memory and data operations. Once we have a dataframe-like structure, we can define an estimator that makes careful assumptions about the data API.
from mylib import CustomEstimator
est = CustomEstimator(**params)
est.fit(df)
Alternatively, if we want to standardize on the array interface, our estimator’s fit method can accept X’s and y’s. We have found that the array-like API is more consistent for scikit-learn tasks across the three frameworks making our custom estimator code more generalizable for scaling. Below we show a slightly modified version of our CustomEstimator
usage for the array-like interface.
from mylib import CustomEstimator
est = CustomEstimator(**params)
X, y = df[features].values, df[target].values
est.fit(X, y)
Custom scikit-learn estimator
With an understanding of a basic design for separating data reading and manipulations from the business logic, let’s walk through an example of what this might look like in practice. Below, we define our own custom cross validation class CustomSearchCV
that implements an in-memory version of the model training logic following the scikit-learn API.
from sklearn.base import BaseEstimator, clone
from sklearn.model_selection import train_test_split
import copy
import pandas as pd
import numpy as np
class CustomSearchCV(BaseEstimator):
def __init__(self, estimator, cv, logger):
self.estimator = estimator
self.cv = cv
self.logger = logger
def fit(self, X, y=None, **fit_kws):
if isinstance(X, pd.DataFrame):
X = X.values
# Insert more guards here!
X_base, X_holdout, y_base, y_holdout = train_test_split(
X, y, random_state=123)
self.split_scores_ = []
self.holdout_scores_ = []
self.estimators_ = []
for train_idx, test_idx in self.cv.split(X_base, y_base):
X_test, y_test = X_base[test_idx], y_base[test_idx]
X_train, y_train = X_base[train_idx], y_base[train_idx]
estimator_ = clone(self.estimator)
estimator_.fit(X_train, y_train, **fit_kws)
self.logger.info("... log things ...")
self.estimators_.append(estimator_)
self.split_scores_.append(estimator_.score(X_test, y_test))
self.holdout_scores_.append(
estimator_.score(X_holdout, y_holdout))
self.best_estimator_ = \
self.estimators_[np.argmax(self.holdout_scores_)]
return self
CustomSearchCV
works well with existing estimators, such as sklearn.model_selection.RepeatedKFold
and xgboost.XGBRegressor
. Users can even define their own folding class and inject it into our estimator. An example of usage is shown below.
from sklearn.model_selection import RepeatedKFold
import xgboost as xgb
from mylib import make_classifier_data, logger
X, y = make_classifier_data(
n_samples=100_000,
n_features=100,
response_rate=0.25,
predictability=0.25,
random_state=123,
)
cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
clf = CustomSearchCV(xgb.XGBClassifier(n_jobs=-1), cv, logger)
clf.fit(X, y)
clf.best_estimator_
Scale with Dask
CustomSearchCV
can work with Dask collections with a few minor modifications to the fit method. First, create a Dask client to connect to your cluster.
from dask.distributed import Client, progress
client = Client()
client
Then add the following logic to check the input data to determine whether or not it is a Dask collection.
from dask.base import is_dask_collection
import dask.dataframe as dd
...
def fit(self, X, y=None, **fit_kws):
if isinstance(X, dd.DataFrame):
X = X.to_dask_array(lengths=True)
elif isinstance(X, pd.DataFrame):
X = X.values
if is_dask_collection(X):
from dask_ml.model_selection import train_test_split
else:
from sklearn.model_selection import train_test_split
...
Now, we can read (or generate) data using Dask and inject a Dask-enable estimator into our CustomSearchCV
object. In this case, we inject the xgb.dask.DaskXGBClassifier
.
from sklearn.model_selection import RepeatedKFold
import xgb
from mylib import logger, make_classifier_data
X, y = make_classifier_data(
n_samples=100_000,
n_features=1000,
response_rate=0.25,
predictability=0.25,
random_state=123,
dask_ml=True,
chunks=1000
)
cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
clf = CustomSearchCV(xgb.dask.DaskXGBClassifier(), cv, logger)
clf.fit(X, y)
clf.best_estimator_
Scale with GPUs
Single GPU
After making similar modifications to CustomSearchCV we can perform training on a single GPU by initializing xgb.XGBClassifier
with tree_method=”gpu_hist”
. In this case, we don’t need to modify the data reading (or generation) since XGBoost knows how to move data onto the GPU.
from sklearn.model_selection import RepeatedKFold
from mylib import make_classifier_data, logger
import xgboost as xgb
X, y = make_classifier_data(
n_samples=100_000,
n_features=1000,
response_rate=0.25,
predictability=0.25,
random_state=123
)
cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
xgb_clf = xgb.XGBClassifier(n_jobs=-1, tree_method="gpu_hist")
clf = CustomSearchCV(xgb_clf, cv, logger)
clf.fit(X, y)
clf.best_estimator_
Single node, multiple GPU
Many systems have multiple GPUs that can be combined to form a single host cluster using Dask and RAPIDS. Below, we initialize xgb.dask.DaskXGBClassifier
with tree_method=”gpu_hist”
and connect it to a dask_cuda.LocalCUDACluster
. By default, the LocalCUDACluster
will add a cuda-worker (GPU worker) for each GPU on the host. If we run this code on a system with eight GPUs, we will have an eight worker cluster. NVLink and Apache Arrow allow for extremely efficient distributed data access among the GPUs.
Additionally, as GPU memory fills, the data will spill to system memory, which is typically much larger than what is available on the GPUs. This makes single node, multiple GPU computing well suited for many data science and machine learning problems. The example below shows the usage with no modifications to the CustomSearchCV
class.
from sklearn.model_selection import RepeatedKFold
from mylib import make_classifier_data, logger
import xgboost as xgb
cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
dclf = xgb.dask.DaskXGBClassifier(tree_method="gpu_hist")
clf = CustomSearchCV(dclf, cv, logger)
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)
client
X, y = make_classifier_data(
n_samples=100_000,
n_features=1000,
response_rate=0.25,
predictability=0.25,
random_state=123,
dask_ml=True,
chunks=1000
)
X.persist()
dclf.client = client
clf.fit(X, y)
clf.best_estimator_
Inspect the Dask dashboard and GPU utilization while running this code. Notice the Dask cluster is busy doing work and then pauses while the GPU utilization spikes. Here we are splitting data in Dask on the system memory and CPU, then training the XGBoost model on GPUs. A future improvement is to perform the training splits on the GPU.
The CustomSearchCV
estimator can contain custom logic for scalable hyperparameter tuning or perform some of the pruning, regularization, and early stopping techniques discussed in a previous post on controlling XGBoost models.
Notes
Here are a few additional notes to consider:
- Standardizing on the array-like interface for internal data structures early in the fit procedure allows for reduced complexity related to scaling with Dask and RAPIDS.
- Make sure X values pulled from dataframes contain only features for training and seperate labels as 1d arrays or pd.Series.
- Make sure X values do not contain columns used for segmentation, such as dates.
- Minimize dataframe to array conversions to avoid performance bottlenecks.
- Methods should return collections that match the input type.
- Developers should use the check_estimator helper function in sklearn.utils.estimator_checks to verify that their custom estimators adhere to the API.
Conclusion
In this post, we have discussed patterns for adding custom functionally to `scikit-learn` modeling code. We have found that by following the scikit-learn API we can minimize customizations and encapsulate scaling logic in a single place. This reduces the cost of maintenance over time and provides developers with examples of how to integrate their code into the ecosystem. Following a standard API allows us to share estimators and combine them to serve many use cases.