When dealing with massive datasets, making predictions with your trained XGBoost model can be extremely time-consuming.
Python’s concurrent.futures.ProcessPoolExecutor
allows you to distribute the prediction workload across multiple processes, potentially providing a significant speedup compared to sequential prediction.
NumPy and XGBoost automatically makes use of BLAS (Basic Linear Algebra Subprograms) threads behind the scenes when making predictions. BLAS threads must be disabled before using Python processes to make parallel predictions, avoid contention (too many system threads competing with each other at the same time). This can be achieved by setting the 'OMP_NUM_THREADS'
environment variable to "1"
, so that BLAS is singled-threaded.
This example demonstrates how to perform parallel predictions using a process pool and compares the execution time against sequential prediction.
It also shows how to save the dataset and model in the main process and have each child process load them from file to minimize the overhead of inter-process communication (IPC).
import os
# Fix the number of BLAS threads
os.environ['OMP_NUM_THREADS'] = "1"
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from concurrent.futures import ProcessPoolExecutor
import time
import joblib
# Function for sequential prediction
def predict_sequential():
# Load the dataset and model in each child process
X_test = joblib.load('X_test.joblib')
model = joblib.load('model.joblib')
# make predictions
predictions = model.predict(X_test)
# do something with the predictions...
# Function that defines the prediction task in each child process
def predict(ix, n_jobs):
# Load the dataset and model in each child process
X_test = joblib.load('X_test.joblib')
model = joblib.load('model.joblib')
# Determine input data for prediction
chunk_size = len(X_test) // n_jobs
ix_s = ix * chunk_size
ix_e = ix_s + chunk_size
# make a prediction on the chosen chunk
predictions = model.predict(X_test[ix_s:ix_e])
# do something with the predictions...
# Function for parallel prediction
def predict_parallel(n_jobs):
# Make predictions for each chunk in a separate process
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
_ = [executor.submit(predict, i, n_jobs) for i in range(n_jobs)]
# protect the entry point
if __name__ == '__main__':
# Generate an even larger synthetic dataset
X, y = make_classification(n_samples=10000000, n_features=20, random_state=42)
# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.95, random_state=42)
# Train an XGBoost model
model = XGBClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Save the dataset and model
joblib.dump(X_test, 'X_test.joblib')
joblib.dump(model, 'model.joblib')
# Time the sequential prediction
start_sequential = time.perf_counter()
sequential_predictions = predict_sequential()
end_sequential = time.perf_counter()
# Print the execution time
print(f"Sequential prediction time: {end_sequential - start_sequential:.2f} seconds")
# Time the parallel prediction
start_parallel = time.perf_counter()
parallel_predictions = predict_parallel(4)
end_parallel = time.perf_counter()
# Print the execution time
print(f"Parallel prediction time: {end_parallel - start_parallel:.2f} seconds")
# Print the speedup
speedup = (end_sequential - start_sequential) / (end_parallel - start_parallel)
print(f"Parallel prediction is {speedup:.2f} times faster than sequential prediction")
You may see results that look something like the following:
Sequential prediction time: 17.91 seconds
Parallel prediction time: 6.59 seconds
Parallel prediction is 2.72 times faster than sequential prediction
Here’s a step-by-step breakdown:
- We configure BLAS to be single threaded via the
'OMP_NUM_THREADS'
environment variable. - We generate a large synthetic dataset using
sklearn.datasets.make_classification
. - We split the data into train and test sets, with 95% of the data used for testing.
- We train an XGBClassifier on the training data.
- We save the test dataset and trained model to file using
joblib.dump
. - We define two functions:
predict_sequential
for sequential prediction andpredict_parallel
for parallel prediction usingProcessPoolExecutor
. Thepredict_parallel
issues multiple calls to thepredict
function, one for each chunk of the test set to predict. - In the
predict
function:- We load the test dataset and model in each child process using
joblib.load
. - We determine the chunk of the test dataset on which the task is to make a prediction.
- We make a prediction for just the chunk.
- We do something locally (in the process) with the predictions, but do not transmit them back to the parent process to avoid IPC.
- We load the test dataset and model in each child process using
- We time the execution of sequential prediction and parallel prediction.
- We print the execution times and the speedup achieved with parallel prediction.
By using ProcessPoolExecutor
, we can distribute the prediction workload across multiple processes.
To get the most benefit from parallelism with multiprocessing (Python processes), each child process would access the required model and input data then store predictions independently. This is to minimize the added overhead of inter-process communication required to transmit model and data between parent and child processes.
Generally, the exact speedup will depend on various factors such as the size of the dataset, the complexity of the model, the number of processes used, and the hardware specifications of the machine running the code.
It is likely that automatically parallelism during prediction with BLAS threads will be faster than manually splitting the prediction task among Python processes. Test and compare performance on your system.