XGBoosting Home | About | Contact | Examples

XGBoost Parallel Prediction With a Process Pool (multiprocessing)

When dealing with massive datasets, making predictions with your trained XGBoost model can be extremely time-consuming.

Python’s concurrent.futures.ProcessPoolExecutor allows you to distribute the prediction workload across multiple processes, potentially providing a significant speedup compared to sequential prediction.

NumPy and XGBoost automatically makes use of BLAS (Basic Linear Algebra Subprograms) threads behind the scenes when making predictions. BLAS threads must be disabled before using Python processes to make parallel predictions, avoid contention (too many system threads competing with each other at the same time). This can be achieved by setting the 'OMP_NUM_THREADS' environment variable to "1", so that BLAS is singled-threaded.

This example demonstrates how to perform parallel predictions using a process pool and compares the execution time against sequential prediction.

It also shows how to save the dataset and model in the main process and have each child process load them from file to minimize the overhead of inter-process communication (IPC).

import os
# Fix the number of BLAS threads
os.environ['OMP_NUM_THREADS'] = "1"
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from concurrent.futures import ProcessPoolExecutor
import time
import joblib

# Function for sequential prediction
def predict_sequential():
    # Load the dataset and model in each child process
    X_test = joblib.load('X_test.joblib')
    model = joblib.load('model.joblib')
    # make predictions
    predictions = model.predict(X_test)
    # do something with the predictions...

# Function that defines the prediction task in each child process
def predict(ix, n_jobs):
    # Load the dataset and model in each child process
    X_test = joblib.load('X_test.joblib')
    model = joblib.load('model.joblib')
    # Determine input data for prediction
    chunk_size = len(X_test) // n_jobs
    ix_s = ix * chunk_size
    ix_e = ix_s + chunk_size
    # make a prediction on the chosen chunk
    predictions = model.predict(X_test[ix_s:ix_e])
    # do something with the predictions...

# Function for parallel prediction
def predict_parallel(n_jobs):
    # Make predictions for each chunk in a separate process
    with ProcessPoolExecutor(max_workers=n_jobs) as executor:
        _ = [executor.submit(predict, i, n_jobs) for i in range(n_jobs)]

# protect the entry point
if __name__ == '__main__':
    # Generate an even larger synthetic dataset
    X, y = make_classification(n_samples=10000000, n_features=20, random_state=42)

    # Split the data into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.95, random_state=42)

    # Train an XGBoost model
    model = XGBClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    # Save the dataset and model
    joblib.dump(X_test, 'X_test.joblib')
    joblib.dump(model, 'model.joblib')

    # Time the sequential prediction
    start_sequential = time.perf_counter()
    sequential_predictions = predict_sequential()
    end_sequential = time.perf_counter()
    # Print the execution time
    print(f"Sequential prediction time: {end_sequential - start_sequential:.2f} seconds")

    # Time the parallel prediction
    start_parallel = time.perf_counter()
    parallel_predictions = predict_parallel(4)
    end_parallel = time.perf_counter()
    # Print the execution time
    print(f"Parallel prediction time: {end_parallel - start_parallel:.2f} seconds")

    # Print the speedup
    speedup = (end_sequential - start_sequential) / (end_parallel - start_parallel)
    print(f"Parallel prediction is {speedup:.2f} times faster than sequential prediction")

You may see results that look something like the following:

Sequential prediction time: 17.91 seconds
Parallel prediction time: 6.59 seconds
Parallel prediction is 2.72 times faster than sequential prediction

Here’s a step-by-step breakdown:

  1. We configure BLAS to be single threaded via the 'OMP_NUM_THREADS' environment variable.
  2. We generate a large synthetic dataset using sklearn.datasets.make_classification.
  3. We split the data into train and test sets, with 95% of the data used for testing.
  4. We train an XGBClassifier on the training data.
  5. We save the test dataset and trained model to file using joblib.dump.
  6. We define two functions: predict_sequential for sequential prediction and predict_parallel for parallel prediction using ProcessPoolExecutor. The predict_parallel issues multiple calls to the predict function, one for each chunk of the test set to predict.
  7. In the predict function:
    • We load the test dataset and model in each child process using joblib.load.
    • We determine the chunk of the test dataset on which the task is to make a prediction.
    • We make a prediction for just the chunk.
    • We do something locally (in the process) with the predictions, but do not transmit them back to the parent process to avoid IPC.
  8. We time the execution of sequential prediction and parallel prediction.
  9. We print the execution times and the speedup achieved with parallel prediction.

By using ProcessPoolExecutor, we can distribute the prediction workload across multiple processes.

To get the most benefit from parallelism with multiprocessing (Python processes), each child process would access the required model and input data then store predictions independently. This is to minimize the added overhead of inter-process communication required to transmit model and data between parent and child processes.

Generally, the exact speedup will depend on various factors such as the size of the dataset, the complexity of the model, the number of processes used, and the hardware specifications of the machine running the code.

It is likely that automatically parallelism during prediction with BLAS threads will be faster than manually splitting the prediction task among Python processes. Test and compare performance on your system.



See Also