XGBoosting Home | About | Contact | Examples

XGBoost Parallel Prediction With a Process Pool and Shared Memory

When dealing with massive datasets, making predictions with your trained XGBoost model can be extremely time-consuming.

Python’s concurrent.futures.ProcessPoolExecutor allows you to distribute the prediction workload across multiple processes, potentially providing a significant speedup compared to sequential prediction.

The dataset can be stored in shared memory and accessed from each processes, rather than transmitting the dataset to each child process or having each child process load the same dataset from file.

NumPy and XGBoost automatically makes use of BLAS (Basic Linear Algebra Subprograms) threads behind the scenes when making predictions. BLAS threads must be disabled before using Python processes to make parallel predictions, avoid contention (too many system threads competing with each other at the same time). This can be achieved by setting the 'OMP_NUM_THREADS' environment variable to "1", so that BLAS is singled-threaded.

This example demonstrates how to perform parallel predictions using a process pool and shared memory and compares the execution time against sequential prediction.

import os
# Fix the number of BLAS threads
os.environ['OMP_NUM_THREADS'] = "1"
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from concurrent.futures import ProcessPoolExecutor
import time
import joblib
from multiprocessing import shared_memory

# Function for sequential prediction
def predict_sequential(name, dim):
    # Access the shared memory
    existing_shm = shared_memory.SharedMemory(name=name)
    X_test = np.ndarray(dim, dtype=np.float64, buffer=existing_shm.buf)
    model = joblib.load('model.joblib')
    predictions = model.predict(X_test)
    # do something with the predictions...
    # close shared memory
    existing_shm.close()

def predict(ix, n_jobs, name, dim):
    # Load the model
    model = joblib.load('model.joblib')
    # Access the shared memory
    existing_shm = shared_memory.SharedMemory(name=name)
    X_test = np.ndarray(dim, dtype=np.float64, buffer=existing_shm.buf)
    # Determine input data for prediction
    chunk_size = len(X_test) // n_jobs
    ix_s = ix * chunk_size
    ix_e = ix_s + chunk_size
    # make a prediction on the chosen chunk
    predictions = model.predict(X_test[ix_s:ix_e])
    # do something with the predictions...
    # close shared memory
    existing_shm.close()

# Function for parallel prediction
def predict_parallel(n_jobs, name, dim):
    # Make predictions for each chunk in a separate process
    with ProcessPoolExecutor(max_workers=n_jobs) as executor:
        _ = [executor.submit(predict, i, n_jobs, name, dim) for i in range(n_jobs)]

# protect the entry point
if __name__ == '__main__':
    # Generate an even larger synthetic dataset
    X, y = make_classification(n_samples=10000000, n_features=20, random_state=42)

    # Split the data into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.95, random_state=42)

    # Train an XGBoost model
    model = XGBClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    # Save the model
    joblib.dump(model, 'model.joblib')

    # Create shared memory for X_test
    shm = shared_memory.SharedMemory(create=True, size=X_test.nbytes)
    # Now create an ndarray from the buffer of the shared memory
    shared_X_test = np.ndarray(X_test.shape, dtype=X_test.dtype, buffer=shm.buf)
    shared_X_test[:] = X_test[:]  # Copy the data to shared memory

    # Time the sequential prediction
    start_sequential = time.perf_counter()
    sequential_predictions = predict_sequential(shm.name, X_test.shape)
    end_sequential = time.perf_counter()
    # Print the execution time
    print(f"Sequential prediction time: {end_sequential - start_sequential:.2f} seconds")

    # Time the parallel prediction
    start_parallel = time.perf_counter()
    parallel_predictions = predict_parallel(4, shm.name, X_test.shape)
    end_parallel = time.perf_counter()
    # Print the execution time
    print(f"Parallel prediction time: {end_parallel - start_parallel:.2f} seconds")

    # Print the speedup
    speedup = (end_sequential - start_sequential) / (end_parallel - start_parallel)
    print(f"Parallel prediction is {speedup:.2f} times faster than sequential prediction")

    # Clean up shared memory
    shm.close()
    shm.unlink()

You may see results that look something like the following:

Sequential prediction time: 17.12 seconds
Parallel prediction time: 5.64 seconds
Parallel prediction is 3.04 times faster than sequential prediction

Here’s a step-by-step breakdown:

  1. We configure BLAS to be single threaded via the 'OMP_NUM_THREADS' environment variable.
  2. We generate a large synthetic dataset using sklearn.datasets.make_classification.
  3. We split the data into train and test sets, with 95% of the data used for testing.
  4. We train an XGBClassifier on the training data.
  5. We save the trained model to file using joblib.dump and copy the test dataset into shared memory.
  6. We define two functions: predict_sequential for sequential prediction and predict_parallel for parallel prediction using ProcessPoolExecutor. The predict_parallel issues multiple calls to the predict function, one for each chunk of the test set to predict.
  7. In the predict function:
    • We load the trained model from file in each child process using joblib.load.
    • We connect to the shared memory to access the test dataset, specifying the memory name and dataset dimensions.
    • We determine the chunk of the test dataset on which the task is to make a prediction.
    • We make a prediction for just the chunk.
    • We do something locally (in the process) with the predictions, but do not transmit them back to the parent process to avoid inter-process communication.
  8. We time the execution of sequential prediction and parallel prediction.
  9. We print the execution times and the speedup achieved with parallel prediction.

By using ProcessPoolExecutor, we can distribute the prediction workload across multiple processes.

To get the most benefit from parallelism with multiprocessing (Python processes), each child process should access the required model and input data then store predictions independently. This is to minimize the added overhead of inter-process communication required to transmit model and data between parent and child processes.

By using shared_memory we can define single copy of the dataset in memory for all processes to create, connect to this shared memory by name and define new NumPy arrays that are backed by a shared memory buffer.

Generally, the exact speedup will depend on various factors such as the size of the dataset, the complexity of the model, the number of processes used, and the hardware specifications of the machine running the code.

It is likely that automatically parallelism during prediction with BLAS threads will be faster than manually splitting the prediction task among Python processes. Test and compare performance on your system.



See Also