When dealing with massive datasets, making predictions with your trained XGBoost model can be extremely time-consuming.
Python’s concurrent.futures.ProcessPoolExecutor
allows you to distribute the prediction workload across multiple processes, potentially providing a significant speedup compared to sequential prediction.
The dataset can be stored in shared memory and accessed from each processes, rather than transmitting the dataset to each child process or having each child process load the same dataset from file.
NumPy and XGBoost automatically makes use of BLAS (Basic Linear Algebra Subprograms) threads behind the scenes when making predictions. BLAS threads must be disabled before using Python processes to make parallel predictions, avoid contention (too many system threads competing with each other at the same time). This can be achieved by setting the 'OMP_NUM_THREADS'
environment variable to "1"
, so that BLAS is singled-threaded.
This example demonstrates how to perform parallel predictions using a process pool and shared memory and compares the execution time against sequential prediction.
import os
# Fix the number of BLAS threads
os.environ['OMP_NUM_THREADS'] = "1"
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from concurrent.futures import ProcessPoolExecutor
import time
import joblib
from multiprocessing import shared_memory
# Function for sequential prediction
def predict_sequential(name, dim):
# Access the shared memory
existing_shm = shared_memory.SharedMemory(name=name)
X_test = np.ndarray(dim, dtype=np.float64, buffer=existing_shm.buf)
model = joblib.load('model.joblib')
predictions = model.predict(X_test)
# do something with the predictions...
# close shared memory
existing_shm.close()
def predict(ix, n_jobs, name, dim):
# Load the model
model = joblib.load('model.joblib')
# Access the shared memory
existing_shm = shared_memory.SharedMemory(name=name)
X_test = np.ndarray(dim, dtype=np.float64, buffer=existing_shm.buf)
# Determine input data for prediction
chunk_size = len(X_test) // n_jobs
ix_s = ix * chunk_size
ix_e = ix_s + chunk_size
# make a prediction on the chosen chunk
predictions = model.predict(X_test[ix_s:ix_e])
# do something with the predictions...
# close shared memory
existing_shm.close()
# Function for parallel prediction
def predict_parallel(n_jobs, name, dim):
# Make predictions for each chunk in a separate process
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
_ = [executor.submit(predict, i, n_jobs, name, dim) for i in range(n_jobs)]
# protect the entry point
if __name__ == '__main__':
# Generate an even larger synthetic dataset
X, y = make_classification(n_samples=10000000, n_features=20, random_state=42)
# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.95, random_state=42)
# Train an XGBoost model
model = XGBClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Save the model
joblib.dump(model, 'model.joblib')
# Create shared memory for X_test
shm = shared_memory.SharedMemory(create=True, size=X_test.nbytes)
# Now create an ndarray from the buffer of the shared memory
shared_X_test = np.ndarray(X_test.shape, dtype=X_test.dtype, buffer=shm.buf)
shared_X_test[:] = X_test[:] # Copy the data to shared memory
# Time the sequential prediction
start_sequential = time.perf_counter()
sequential_predictions = predict_sequential(shm.name, X_test.shape)
end_sequential = time.perf_counter()
# Print the execution time
print(f"Sequential prediction time: {end_sequential - start_sequential:.2f} seconds")
# Time the parallel prediction
start_parallel = time.perf_counter()
parallel_predictions = predict_parallel(4, shm.name, X_test.shape)
end_parallel = time.perf_counter()
# Print the execution time
print(f"Parallel prediction time: {end_parallel - start_parallel:.2f} seconds")
# Print the speedup
speedup = (end_sequential - start_sequential) / (end_parallel - start_parallel)
print(f"Parallel prediction is {speedup:.2f} times faster than sequential prediction")
# Clean up shared memory
shm.close()
shm.unlink()
You may see results that look something like the following:
Sequential prediction time: 17.12 seconds
Parallel prediction time: 5.64 seconds
Parallel prediction is 3.04 times faster than sequential prediction
Here’s a step-by-step breakdown:
- We configure BLAS to be single threaded via the
'OMP_NUM_THREADS'
environment variable. - We generate a large synthetic dataset using
sklearn.datasets.make_classification
. - We split the data into train and test sets, with 95% of the data used for testing.
- We train an XGBClassifier on the training data.
- We save the trained model to file using
joblib.dump
and copy the test dataset into shared memory. - We define two functions:
predict_sequential
for sequential prediction andpredict_parallel
for parallel prediction usingProcessPoolExecutor
. Thepredict_parallel
issues multiple calls to thepredict
function, one for each chunk of the test set to predict. - In the
predict
function:- We load the trained model from file in each child process using
joblib.load
. - We connect to the shared memory to access the test dataset, specifying the memory name and dataset dimensions.
- We determine the chunk of the test dataset on which the task is to make a prediction.
- We make a prediction for just the chunk.
- We do something locally (in the process) with the predictions, but do not transmit them back to the parent process to avoid inter-process communication.
- We load the trained model from file in each child process using
- We time the execution of sequential prediction and parallel prediction.
- We print the execution times and the speedup achieved with parallel prediction.
By using ProcessPoolExecutor
, we can distribute the prediction workload across multiple processes.
To get the most benefit from parallelism with multiprocessing (Python processes), each child process should access the required model and input data then store predictions independently. This is to minimize the added overhead of inter-process communication required to transmit model and data between parent and child processes.
By using shared_memory
we can define single copy of the dataset in memory for all processes to create, connect to this shared memory by name and define new NumPy arrays that are backed by a shared memory buffer.
Generally, the exact speedup will depend on various factors such as the size of the dataset, the complexity of the model, the number of processes used, and the hardware specifications of the machine running the code.
It is likely that automatically parallelism during prediction with BLAS threads will be faster than manually splitting the prediction task among Python processes. Test and compare performance on your system.