Ускорьте обучение многих моделей с помощью распределенных вычислений с Ray и Ray AIR

Введение / Мотивация

Даже в нынешнюю эпоху генеративного ИИ (Stable Diffusion, ChatGPT) и LLM (большие языковые модели) прогнозирование временных рядов по-прежнему является фундаментальной частью любого бизнеса, который зависит от цепочки поставок или ресурсов. Например, его можно использовать в:

Общим для всех этих вариантов использования является обучение множества моделей на разных сегментах данных. Обучение, настройка и параллельное развертывание тысяч моделей машинного обучения с использованием распределенных вычислений может оказаться непростой задачей! Типичное программное обеспечение для моделирования временных рядов не распространяется само по себе.

В этом блоге будут представлены мои советы по началу преобразования ваших рабочих нагрузок прогнозирования в распределенные вычисления. Я буду использовать новейшие API-интерфейсы Ray v2 с ARIMA, используя statsforecast, Prophet и PyTorch Forecasting. » библиотеки. Для данных я буду использовать популярный набор данных NYC Taxi dataset, который содержит исторические пикапы такси с отметкой времени и местоположением в Нью-Йорке.

Ray — это платформа с открытым исходным кодом для масштабирования рабочих нагрузок ИИ с использованием распределенных вычислений. Чтобы получить обзор Ray, ознакомьтесь с Документацией по Ray или этой вводной записью в блоге.

Этот блог состоит из 4 разделов:

  • Мультимодельное распределенное обучение с использованием Ray Core Multiprocessing.
  • Мультимодельная распределенная настройка с использованием Ray AIR.
  • Распределенная мультимодельная настройка нескольких более крупных моделей с использованием Ray AIR.
  • Мультимодельное распределенное развертывание с использованием Ray AIR и Ray Serve.

Раздел 1: Мультимодельное распределенное обучение с использованием Ray Core Multiprocessing

Еще в ноябре 2021 года я написал статью в блоге, демонстрирующую, как параллельно обучать множество моделей прогнозов (либо ARIMA, либо Prophet) с помощью Ray Core в облаке AWS. С тех пор Ray Multiprocessing — это большое улучшение, которое упрощает работу по сравнению с Ray Core API.

Ниже приведена схема кода. Полный, обновленный код находится на мой гитхаб. Во-первых, давайте начнем с пары импортов:

import time, dateutil
from typing import Tuple, List
import numpy as np
import pandas as pd
# Import libraries for reading from partitioned parquet.
import pyarrow.parquet as pq
import pyarrow.dataset as pds
# Import forecasting libraries.
import prophet
from prophet import Prophet
# Import Ray's multiprocessing library.
import ray
from ray.util.multiprocessing import Pool 
import tqdm

Далее давайте определим функции Python для предварительной обработки данных, обучения и оценки модели. Чтобы быстрее перейти к концепциям распределенных вычислений, мы собираемся представить, что данные временных рядов уже подготовлены и разбиты на отдельные файлы для каждой желаемой модели.

##########
# STEP 1. Define a Python function to read and preprocess a file of data. 
##########
def preprocess_data(file_path: str) -> Tuple[pd.DataFrame, np.int32]:

    # Read a single pyarrow parquet S3 file.
    data = pq.read_table(file_path,
           filters=[ ("pickup_location_id", "=", SAMPLE_UNIQUE_ID) ],
           columns=[ "pickup_at", "pickup_location_id", TARGET ])\
          .to_pandas()
    
    # Transform data.
    data["ds"] = data["pickup_at"].dt.to_period("D").dt.to_timestamp()
    data.rename(columns={TARGET: "y"}, inplace=True)
    data.rename(columns={"pickup_location_id": "unique_id"}, inplace=True)
    data.drop("pickup_at", inplace=True, axis=1)
    unique_id = data["unique_id"][0]
    return data, unique_id

##########
# STEP 2. Define Python functions to train and evaluate a model on a file of data.
##########
def train_model(file_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, 
    'prophet.forecaster.Prophet', np.int32]:
    
    # Prepare data from a single S3 file.
    data, unique_id = preprocess_data(file_path)

    # Split data into train, test.
    train_end = data.ds.max() - relativedelta(days=FORECAST_LENGTH - 1)
    train_df = data.loc[(data.ds <= train_end), :].copy()
    test_df = data.iloc[-(FORECAST_LENGTH):, :].copy()
    
    # Define Prophet model with 75% confidence interval.
    model = Prophet(interval_width=0.75, seasonality_mode="multiplicative")      

    # Train and fit Prophet model.
    model = model.fit(train_df[["ds", "y"]])
    return train_df, test_df, model, unique_id

def evaluate_model(model: 'prophet.forecaster.Prophet', train: pd.DataFrame, 
    valid: pd.DataFrame, input_value: np.int32) -> Tuple[float, pd.DataFrame]:

    # Inference model using FORECAST_LENGTH.
    future_dates = model.make_future_dataframe(
        periods=FORECAST_LENGTH, freq="D")
    future = model.predict(future_dates)
    
    # Merge in the actual y-values.
    future = pd.merge(future, train[['ds', 'y']], on=['ds'], how='left')
    future = pd.merge(future, valid[['ds', 'y']], on=['ds'], how='left')
    future['y'] = future.y_x.combine_first(future.y_y)
    future.drop(['y_x', 'y_y'], inplace=True, axis=1)
    future['unique_id'] = input_value
    
    # Calculate mean absolute forecast error.
    temp = future.copy()
    temp["forecast_error"] = np.abs(temp["yhat"] - temp["y"])
    temp.dropna(inplace=True)
    error = np.mean(temp["forecast_error"])
    return error, future

############
# STEP 3.  Define a calling function which calls all the above functions,
#          and will be called in parallel for every data file.
############
def train__and_evaluate(file_path: str) -> Tuple[pd.DataFrame, 
    'prophet.forecaster.Prophet', pd.DataFrame, float, np.int16]:
    
    # Read S3 file and train a Prophet model.
    train_df, valid_df, model, unique_id = train_model(file_path)
    
    # Inference model and evaluate error.
    error, future = evaluate_model(model, train_df, valid_df, unique_id)
    return valid_df, model, future, error, unique_id

Мы могли бы распараллелить это напрямую, используя вызовы API ядра Rayray.remote, но многопроцессорная библиотека Ray, одна из распределенных библиотек Ray, делает это проще.

Ниже, обертывание вызова pool с помощью tqdm дает хороший индикатор выполнения для отслеживания прогресса. Внутри Ray отправляет задачи рабочим в кластере Ray, который автоматически обрабатывает такие проблемы, как отказоустойчивость и оптимизация пакетной обработки.

start = time.time()
# Create a pool, where each worker is assigned 1 CPU by Ray.
pool = Pool(ray_remote_args={"num_cpus": 1})

# Use the pool to run `train_model` on the data, in batches of 1.
iterator = pool.imap_unordered(train__and_evaluate, models_to_train, chunksize=1)

# Track the progress using tqdm and retrieve the results into a list.
results = list(tqdm.tqdm(iterator, total=len(models_to_train)))

# Print some training stats.
time_ray_multiprocessing = time.time() - start
print(f"Total number of models: {len(results)}")
print(f"TOTAL TIME TAKEN: {time_ray_multiprocessing/60:.2f} minutes")
print(type(results[0][0]), type(results[0][1]), type(results[0][2]), 
      type(results[0][3]), type(results[0][4]))

Выше мы видим, что задание Рэя заняло менее 1 минуты, чтобы обучить 12 моделей.

Раздел 2: Мультимодельная распределенная настройка с использованием Ray AIR

Проницательный читатель мог заметить, что в приведенном выше разделе Ray Multiprocessing требовал, чтобы данные уже были организованы в один файл для каждой модели, которую вы хотите обучить. Но что, если ваши данные еще не организованы по модели? С помощью Ray AIR вы можете предварительно обрабатывать данные в одном и том же конвейере, одновременно обучая разные модели.

Еще одна проблема: что, если вы хотите смешивать и сопоставлять алгоритмы из более чем одной библиотеки одновременно? Ray Tune, входящая в состав Ray AIR, позволяет запускать параллельные испытания, чтобы найти наилучший выбор. алгоритма из любых библиотек Python AI/ML и гиперпараметров для каждого сегмента данных.

Ниже приведены шаги для предварительной обработки данных и автоматической настройки моделей. Хотя эти шаги специфичны для Ray AIR и его API, в целом они применяются для преобразования последовательного Python в распределенный Python:

  1. Определите функции Python для preprocess сегмента данных.
  2. Определите функции Python для train и evaluate модели сегмента данных.
  3. Определите вызывающую функцию train_models, которая вызывает все вышеперечисленные функции и будет вызываться параллельно для каждой перестановки в пространстве поиска Tune!
    Внутри этой функции train_models:
    📖 Входные параметры должны включать аргумент словаря конфигурации.
    📈 Метрика настройки (потери или ошибки модели) должны быть рассчитывается и сообщается с использованием session.report().
    ✔️ Checkpoint (сохранить) модель рекомендуется для обеспечения отказоустойчивости и легкого развертывания в дальнейшем.
  4. Настройте масштабирование распределенных вычислений.
  5. Определите пространство поиска Tune для всех параметров обучения.
  6. (Необязательно) Укажите стратегию поиска гиперпараметров.
  7. Проведите эксперимент.

Ниже приведен дополнительный код, который мы хотели бы добавить; полный код находится на моем гитхабе.

  • Приведенные ниже функции preprocess_data и train_model точно такие же, как и раньше, за исключением того, что они принимают список файлов вместо одного файла.
  • Функция train_models точно такая же, как train_and_evaluate, за исключением того, что она принимает список файлов вместо одного файла. Он также обучает алгоритм, переданный в конфигурации, вместо фиксированного алгоритма и выполняет контрольные точки.
import os
num_cpu = os.cpu_count()
# Import another forecasting library.
import statsforecast
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA
# Import Ray AIR libraries.
from ray import air, tune
from ray.air import session, ScalingConfig
from ray.air.checkpoint import Checkpoint

##########
# STEP 1. Define Python functions to read and prepare a segment of data.
##########
def preprocess_per_uniqueid(
    s3_files: List[str], sample_location_id: np.int32) -> pd.DataFrame:

    # Load data.
    df_list = [read_data(f, sample_location_id) for f in s3_files]
    df_raw = pd.concat(df_list, ignore_index=True)

    # Transform data.
    df = transform_df(df_raw)
    df.sort_values(by="ds", inplace=True)
    return df

##########
# STEP 2. Define Python functions to train and evaluate a model on a segment of data.
##########
def train_prophet(s3_files: List[str], sample_unique_id: np.int32,
    model_type: str) -> Tuple[pd.DataFrame, pd.DataFrame, 
                        'prophet.forecaster.Prophet', np.int32]:
    
    # Prepare data from a list of S3 files.
    data = preprocess_per_uniqueid(s3_files, sample_unique_id)

    # Split data into train, test.
    train_end = data.ds.max() - relativedelta(days=FORECAST_LENGTH - 1)
    train_df = data.loc[(data.ds <= train_end), :].copy()
    test_df = data.iloc[-(FORECAST_LENGTH):, :].copy()
    
    # Define Prophet model with 75% confidence interval.
    if model_type == "prophet_additive":
        model = Prophet(interval_width=0.75, seasonality_mode="additive")
    elif model_type == "prophet_multiplicative":
        model = Prophet(interval_width=0.75, seasonality_mode="multiplicative")     

    # Train and fit Prophet model.
    model = model.fit(train_df[["ds", "y"]])
    return train_df, test_df, model

# Train an ARIMA model. Full code not shown here.
def train_arima(): 

# Evaluate an ARIMA model. Full code not shown here.
def evaluate_arima(): 

############
# STEP 3.  Define a calling function `train_models`, which calls all 
#          the above functions, and will be called in parallel for every 
#          permutation in the Tune search space.
############
def train_models(config: dict) -> None:
    
    # Get Tune parameters
    file_list = config['params']['file_list']
    model_type = config['params']['algorithm']
    sample_unique_id = config['params']['location']
    
    # Train model.
    if model_type == "arima":
        # Train and fit the Prophet model.
        train_df, valid_df, model = \
            train_arima(file_list, sample_unique_id)
        # Inference model and evaluate error.
        error, future = \
            evaluate_arima(model, valid_df)
    else:
        # Train and fit the Prophet model.
        train_df, valid_df, model = \
            train_prophet(file_list, sample_unique_id, model_type)
        # Inference model and evaluate error.
        error, future = evaluate_model(model, train_df, valid_df, sample_unique_id)
    
    # Define a model checkpoint using AIR API.
    checkpoint = ray.air.checkpoint.Checkpoint.from_dict({
          "model": model,
          "valid_df": valid_df,
          "forecast_df": future,
          "location_id": sample_unique_id,
        })
    metrics = dict(error=error)
    session.report(metrics, checkpoint=checkpoint)

############
# STEP 4. Customize distributed compute scaling.
############
num_training_workers = min(num_cpu - 2, 32)
scaling_config = ScalingConfig(
    # Number of distributed workers.
    num_workers=num_training_workers,
    # Turn on/off GPU.
    use_gpu=False,
    # Specify resources used for trainer.
    trainer_resources={"CPU": 1},
    # Try to schedule workers on different nodes.
    placement_strategy="SPREAD")

############
# STEP 5. Define a search space dict of all config parameters.
############
search_space = {
    "scaling_config": scaling_config,
    "params": {
        "file_list": tune.grid_search([files_to_use]),
        "algorithm": tune.grid_search(algorithms_to_use),
        "location": tune.grid_search(models_to_train),
    },
}

# Optional STEP 6. Specify the hyperparameter tuning search strategy.

##########
# STEP 7. Run the experiment with Ray AIR APIs.
##########
# Define a tuner object.
tuner = tune.Tuner(
        train_models,
        param_space=search_space,
        tune_config=tune.TuneConfig(
            metric="error",
            mode="min",
        ),
        run_config=air.RunConfig(
            # Redirect logs to relative path instead of default ~/ray_results/.
            local_dir="my_Tune_logs",
            # Specify name to make logs easier to find in log path.
            name="tune_nyc",
        ),
    )
# Fit the tuner object.
results = tuner.fit()

На приведенных выше снимках экрана данные с января 2018 года были сгруппированы и агрегированы на ежедневном уровне. В прошлом я пытался сделать это в SageMaker, но одна только обработка данных занимала слишком много времени, не говоря уже о настройке такого количества моделей одновременно.

Раздел 3: Мультимодельная распределенная настройка (более крупные модели PyTorch)

Часто цель состоит в том, чтобы создать несколько более крупных моделей, например, одну модель по географической зоне, где таких зон всего несколько. Год назад, в декабре 2021 года, я написал статью в блоге, демонстрирующую, как использовать Ray Lightning для обучения более крупных моделей прогнозирования PyTorch. С тех пор большое улучшение заключается в том, что переключение разработки кода между ноутбуком и облаком стало более плавным благодаря Anyscale Workspaces.

Эти более крупные модели иногда называют «глобальными моделями», потому что только одна модель глубокой нейронной сети обучается одновременно для множества разных временных рядов. Вместо 1 модели на временной ряд (Prophet или ARIMA).

См. мой github для получения полного кода PyTorch Forecasting, показывающего последние API-интерфейсы Ray AIR с Ray Lightning. Вам нужно добавить идентификатор кластера к вашим данным, затем шаги для настройки такие же, как мы видели в разделе 2:

# Import forecasting libraries.
import torch
import pytorch_lightning as pl
import pytorch_forecasting as ptf 
import tensorboard as tb  
# Import ray libraries.
import ray_lightning
from ray_lightning import RayStrategy
from ray_lightning.tune import get_tune_resources, TuneReportCheckpointCallback
from ray import air, tune
from ray.tune.schedulers import ASHAScheduler

# Define a tuner object.
tuner = tune.Tuner(
        tune.with_resources(
            train_with_parameters,
            resources=get_tune_resources(num_workers=num_training_workers),
        ),
        tune_config=tune.TuneConfig(
            metric="loss",
            mode="min",
            scheduler=scheduler,
        ),
        run_config=air.RunConfig(
            # Redirect logs to relative path instead of default ~/ray_results/.
            local_dir="my_Tune_logs",
            # Specify name to make logs easier to find in log path.
            name="ptf_nyc",
        ),
        param_space=FORECAST_CONFIG,
    )

# Fit the tuner object.
results = tuner.fit()

# Get checkpoint for best model from results object, code not shown.

# Plot inference forecasts for some unique_ids.
some_unique_ids = [25, 41, 14, 24, 4]
for idx in some_unique_ids:
    best_model.plot_prediction(x, raw_predictions, idx=idx)

Обратите внимание, что в отличие от моделей ARIMA и Prophet, которые представляли собой одну модель для каждого уникального_идентификатора, каждая из этих более крупных моделей содержит выводы для многих уникальных_идентификаторов одновременно в одной модели.

Раздел 4. Мультимодельное распределенное развертывание с использованием Ray AIR и Ray Serve

Перед развертыванием необходимо решить, должно ли развертывание быть онлайновым, всегда работающим http-сервисом или автономным (сервис Python вызывается по запросу). Ниже я демонстрирую автономное развертывание с использованием новых Ray AIR Predictors с Ray Serve.

Шаги для автономного развертывания:

Шаг 1. Создайте пакетный предиктор с помощью контрольных точек Ray AIR.
Шаг 2. Создайте несколько тестовых данных.
Шаг 3. Выполнить batch_predictor.predict(test_data).

Замените шаг 3 выше следующими шагами для пользовательского предиктора:

Шаг 3. Определите класс развертывания Ray Serve с помощью декоратора Ray @serve.deployment.
Шаг 4. Разверните предиктор.
Шаг 5. Запросите развертывание и получите результат.

Шаги 3–5 выше необходимы только в том случае, если вы используете пользовательский предиктор (например, ARIMA, Prophet или PyTorch Forecasting).

В противном случае для интегрированных в Ray AIR библиотек машинного обучения (преобразователей HuggingFace, PyTorch, TensorFlow, Scikit-learn, XGBoost или LightGBM) достаточно вызвать batch_predictor.predict(test_data).

Продолжая пример прогнозирования PyTorch из предыдущего раздела, ниже приведен код развертывания. Полный код находится на моем гитхабе.

import pickle
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
# Import forecasting libraries. 
import torch
import pytorch_lightning as pl
import pytorch_forecasting as ptf
# Import ray libraries.
import ray
from ray import serve

##########
# STEP 1. Instantiate a batch predictor from checkpoint.
##########
batch_predictor = ptf.models.TemporalFusionTransformer.load_from_checkpoint(model_path)

##########
# STEP 2. Create some test data. 
##########
# Being lazy, pretend the last test data is our out-of-sample test data.
max_prediction_length = FORECAST_CONFIG['forecast_horizon']
new_prediction_data = df.copy()
new_prediction_data["time_idx"] = new_prediction_data["time_idx"] + max_prediction_length
# Convert data from pandas to PyTorch tensors.
_, _, test_loader = convert_pandas_pytorch_timeseriesdata(
    new_prediction_data, FORECAST_CONFIG)

##########
# STEP 3. Define a Ray Serve deployment class.
##########
@serve.deployment
class ForecastPredictor:
    def __init__(self, predictor, test_data):
        self.predictor = predictor
        self.test_data = test_data
        
    def predict(self):
        raw_predictions, x = \
          self.predictor.predict(self.test_data, mode="raw", return_x=True)
        return x, raw_predictions

    def __call__(self):
        x, raw_predictions = self.predict()
        return [x, raw_predictions]

##########
# STEP 4. Deploy the predictor.
##########
# Bind arguments to the Class constructor.
my_first_deployment = ForecastPredictor.bind(
    predictor=batch_predictor,
    test_data=test_loader)

##########
# STEP 5. Query the deployment and get the result.
##########
# Get handle from serve.run().
handle = serve.run(my_first_deployment)

# ray.get() the results from the handle.
ray_return = ray.get(handle.remote())
new_x = ray_return[0]
new_pred = ray_return[1]

На скриншоте выше справа показана наблюдаемость кластера Ray во время обучения и обслуживания. Если вам нужно выполнить постобработку результатов предиктора, у меня есть пример этого в конце этой другой записной книжки здесь.

Заключение

В заключение, в этом блоге показаны шаги, как обучать и настраивать множество моделей параллельно, используя распределенные вычисления с помощью Ray с открытым исходным кодом. Модели не обязательно должны быть одного типа, их можно смешивать и сопоставлять из любых библиотек AI/ML Python.

API-интерфейсы Ray AIR были четкими, интуитивно понятными и скрывали большую сложность распределенных вычислений, поэтому было легко выполнять множество сложных действий, таких как ранняя остановка, планирование ASHA, контрольные точки и развертывание.

Чтобы продолжить обучение:

  • Прочтите Ray docs для подробных пояснений и примеров.
  • Задавайте вопросы в Slack и Обсудить.
  • Используйте Anyscale, что упрощает развертывание кластера и запуск вашего кода в облаке (получите инвайт-код здесь).