Ускорьте обучение многих моделей с помощью распределенных вычислений с Ray и Ray AIR
Введение / Мотивация
Даже в нынешнюю эпоху генеративного ИИ (Stable Diffusion, ChatGPT) и LLM (большие языковые модели) прогнозирование временных рядов по-прежнему является фундаментальной частью любого бизнеса, который зависит от цепочки поставок или ресурсов. Например, его можно использовать в:
- Прогноз выполнения для управления запасами по географии.
- Прогнозирование спроса на разные товары по товарным категориям.
- Прогноз использования для предоставления ЦОД по ресурсам.
Общим для всех этих вариантов использования является обучение множества моделей на разных сегментах данных. Обучение, настройка и параллельное развертывание тысяч моделей машинного обучения с использованием распределенных вычислений может оказаться непростой задачей! Типичное программное обеспечение для моделирования временных рядов не распространяется само по себе.
В этом блоге будут представлены мои советы по началу преобразования ваших рабочих нагрузок прогнозирования в распределенные вычисления. Я буду использовать новейшие API-интерфейсы Ray v2 с ARIMA, используя statsforecast, Prophet и PyTorch Forecasting. » библиотеки. Для данных я буду использовать популярный набор данных NYC Taxi dataset, который содержит исторические пикапы такси с отметкой времени и местоположением в Нью-Йорке.
Ray — это платформа с открытым исходным кодом для масштабирования рабочих нагрузок ИИ с использованием распределенных вычислений. Чтобы получить обзор Ray, ознакомьтесь с Документацией по Ray или этой вводной записью в блоге.
Этот блог состоит из 4 разделов:
- Мультимодельное распределенное обучение с использованием Ray Core Multiprocessing.
- Мультимодельная распределенная настройка с использованием Ray AIR.
- Распределенная мультимодельная настройка нескольких более крупных моделей с использованием Ray AIR.
- Мультимодельное распределенное развертывание с использованием Ray AIR и Ray Serve.
Раздел 1: Мультимодельное распределенное обучение с использованием Ray Core Multiprocessing
Еще в ноябре 2021 года я написал статью в блоге, демонстрирующую, как параллельно обучать множество моделей прогнозов (либо ARIMA, либо Prophet) с помощью Ray Core в облаке AWS. С тех пор Ray Multiprocessing — это большое улучшение, которое упрощает работу по сравнению с Ray Core API.
Ниже приведена схема кода. Полный, обновленный код находится на мой гитхаб. Во-первых, давайте начнем с пары импортов:
import time, dateutil from typing import Tuple, List import numpy as np import pandas as pd # Import libraries for reading from partitioned parquet. import pyarrow.parquet as pq import pyarrow.dataset as pds # Import forecasting libraries. import prophet from prophet import Prophet # Import Ray's multiprocessing library. import ray from ray.util.multiprocessing import Pool import tqdm
Далее давайте определим функции Python для предварительной обработки данных, обучения и оценки модели. Чтобы быстрее перейти к концепциям распределенных вычислений, мы собираемся представить, что данные временных рядов уже подготовлены и разбиты на отдельные файлы для каждой желаемой модели.
########## # STEP 1. Define a Python function to read and preprocess a file of data. ########## def preprocess_data(file_path: str) -> Tuple[pd.DataFrame, np.int32]: # Read a single pyarrow parquet S3 file. data = pq.read_table(file_path, filters=[ ("pickup_location_id", "=", SAMPLE_UNIQUE_ID) ], columns=[ "pickup_at", "pickup_location_id", TARGET ])\ .to_pandas() # Transform data. data["ds"] = data["pickup_at"].dt.to_period("D").dt.to_timestamp() data.rename(columns={TARGET: "y"}, inplace=True) data.rename(columns={"pickup_location_id": "unique_id"}, inplace=True) data.drop("pickup_at", inplace=True, axis=1) unique_id = data["unique_id"][0] return data, unique_id ########## # STEP 2. Define Python functions to train and evaluate a model on a file of data. ########## def train_model(file_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, 'prophet.forecaster.Prophet', np.int32]: # Prepare data from a single S3 file. data, unique_id = preprocess_data(file_path) # Split data into train, test. train_end = data.ds.max() - relativedelta(days=FORECAST_LENGTH - 1) train_df = data.loc[(data.ds <= train_end), :].copy() test_df = data.iloc[-(FORECAST_LENGTH):, :].copy() # Define Prophet model with 75% confidence interval. model = Prophet(interval_width=0.75, seasonality_mode="multiplicative") # Train and fit Prophet model. model = model.fit(train_df[["ds", "y"]]) return train_df, test_df, model, unique_id def evaluate_model(model: 'prophet.forecaster.Prophet', train: pd.DataFrame, valid: pd.DataFrame, input_value: np.int32) -> Tuple[float, pd.DataFrame]: # Inference model using FORECAST_LENGTH. future_dates = model.make_future_dataframe( periods=FORECAST_LENGTH, freq="D") future = model.predict(future_dates) # Merge in the actual y-values. future = pd.merge(future, train[['ds', 'y']], on=['ds'], how='left') future = pd.merge(future, valid[['ds', 'y']], on=['ds'], how='left') future['y'] = future.y_x.combine_first(future.y_y) future.drop(['y_x', 'y_y'], inplace=True, axis=1) future['unique_id'] = input_value # Calculate mean absolute forecast error. temp = future.copy() temp["forecast_error"] = np.abs(temp["yhat"] - temp["y"]) temp.dropna(inplace=True) error = np.mean(temp["forecast_error"]) return error, future ############ # STEP 3. Define a calling function which calls all the above functions, # and will be called in parallel for every data file. ############ def train__and_evaluate(file_path: str) -> Tuple[pd.DataFrame, 'prophet.forecaster.Prophet', pd.DataFrame, float, np.int16]: # Read S3 file and train a Prophet model. train_df, valid_df, model, unique_id = train_model(file_path) # Inference model and evaluate error. error, future = evaluate_model(model, train_df, valid_df, unique_id) return valid_df, model, future, error, unique_id
Мы могли бы распараллелить это напрямую, используя вызовы API ядра Rayray.remote
, но многопроцессорная библиотека Ray, одна из распределенных библиотек Ray, делает это проще.
Ниже, обертывание вызова pool
с помощью tqdm
дает хороший индикатор выполнения для отслеживания прогресса. Внутри Ray отправляет задачи рабочим в кластере Ray, который автоматически обрабатывает такие проблемы, как отказоустойчивость и оптимизация пакетной обработки.
start = time.time() # Create a pool, where each worker is assigned 1 CPU by Ray. pool = Pool(ray_remote_args={"num_cpus": 1}) # Use the pool to run `train_model` on the data, in batches of 1. iterator = pool.imap_unordered(train__and_evaluate, models_to_train, chunksize=1) # Track the progress using tqdm and retrieve the results into a list. results = list(tqdm.tqdm(iterator, total=len(models_to_train))) # Print some training stats. time_ray_multiprocessing = time.time() - start print(f"Total number of models: {len(results)}") print(f"TOTAL TIME TAKEN: {time_ray_multiprocessing/60:.2f} minutes") print(type(results[0][0]), type(results[0][1]), type(results[0][2]), type(results[0][3]), type(results[0][4]))
Выше мы видим, что задание Рэя заняло менее 1 минуты, чтобы обучить 12 моделей.
Раздел 2: Мультимодельная распределенная настройка с использованием Ray AIR
Проницательный читатель мог заметить, что в приведенном выше разделе Ray Multiprocessing требовал, чтобы данные уже были организованы в один файл для каждой модели, которую вы хотите обучить. Но что, если ваши данные еще не организованы по модели? С помощью Ray AIR вы можете предварительно обрабатывать данные в одном и том же конвейере, одновременно обучая разные модели.
Еще одна проблема: что, если вы хотите смешивать и сопоставлять алгоритмы из более чем одной библиотеки одновременно? Ray Tune, входящая в состав Ray AIR, позволяет запускать параллельные испытания, чтобы найти наилучший выбор. алгоритма из любых библиотек Python AI/ML и гиперпараметров для каждого сегмента данных.
Ниже приведены шаги для предварительной обработки данных и автоматической настройки моделей. Хотя эти шаги специфичны для Ray AIR и его API, в целом они применяются для преобразования последовательного Python в распределенный Python:
- Определите функции Python для
preprocess
сегмента данных. - Определите функции Python для
train
иevaluate
модели сегмента данных. - Определите вызывающую функцию
train_models
, которая вызывает все вышеперечисленные функции и будет вызываться параллельно для каждой перестановки в пространстве поиска Tune!
Внутри этой функции train_models:
📖 Входные параметры должны включать аргумент словаря конфигурации.
📈 Метрика настройки (потери или ошибки модели) должны быть рассчитывается и сообщается с использованиемsession.report()
.
✔️Checkpoint
(сохранить) модель рекомендуется для обеспечения отказоустойчивости и легкого развертывания в дальнейшем. - Настройте масштабирование распределенных вычислений.
- Определите пространство поиска Tune для всех параметров обучения.
- (Необязательно) Укажите стратегию поиска гиперпараметров.
- Проведите эксперимент.
Ниже приведен дополнительный код, который мы хотели бы добавить; полный код находится на моем гитхабе.
- Приведенные ниже функции
preprocess_data
иtrain_model
точно такие же, как и раньше, за исключением того, что они принимают список файлов вместо одного файла. - Функция
train_models
точно такая же, какtrain_and_evaluate
, за исключением того, что она принимает список файлов вместо одного файла. Он также обучает алгоритм, переданный в конфигурации, вместо фиксированного алгоритма и выполняет контрольные точки.
import os num_cpu = os.cpu_count() # Import another forecasting library. import statsforecast from statsforecast import StatsForecast from statsforecast.models import AutoARIMA # Import Ray AIR libraries. from ray import air, tune from ray.air import session, ScalingConfig from ray.air.checkpoint import Checkpoint ########## # STEP 1. Define Python functions to read and prepare a segment of data. ########## def preprocess_per_uniqueid( s3_files: List[str], sample_location_id: np.int32) -> pd.DataFrame: # Load data. df_list = [read_data(f, sample_location_id) for f in s3_files] df_raw = pd.concat(df_list, ignore_index=True) # Transform data. df = transform_df(df_raw) df.sort_values(by="ds", inplace=True) return df ########## # STEP 2. Define Python functions to train and evaluate a model on a segment of data. ########## def train_prophet(s3_files: List[str], sample_unique_id: np.int32, model_type: str) -> Tuple[pd.DataFrame, pd.DataFrame, 'prophet.forecaster.Prophet', np.int32]: # Prepare data from a list of S3 files. data = preprocess_per_uniqueid(s3_files, sample_unique_id) # Split data into train, test. train_end = data.ds.max() - relativedelta(days=FORECAST_LENGTH - 1) train_df = data.loc[(data.ds <= train_end), :].copy() test_df = data.iloc[-(FORECAST_LENGTH):, :].copy() # Define Prophet model with 75% confidence interval. if model_type == "prophet_additive": model = Prophet(interval_width=0.75, seasonality_mode="additive") elif model_type == "prophet_multiplicative": model = Prophet(interval_width=0.75, seasonality_mode="multiplicative") # Train and fit Prophet model. model = model.fit(train_df[["ds", "y"]]) return train_df, test_df, model # Train an ARIMA model. Full code not shown here. def train_arima(): # Evaluate an ARIMA model. Full code not shown here. def evaluate_arima(): ############ # STEP 3. Define a calling function `train_models`, which calls all # the above functions, and will be called in parallel for every # permutation in the Tune search space. ############ def train_models(config: dict) -> None: # Get Tune parameters file_list = config['params']['file_list'] model_type = config['params']['algorithm'] sample_unique_id = config['params']['location'] # Train model. if model_type == "arima": # Train and fit the Prophet model. train_df, valid_df, model = \ train_arima(file_list, sample_unique_id) # Inference model and evaluate error. error, future = \ evaluate_arima(model, valid_df) else: # Train and fit the Prophet model. train_df, valid_df, model = \ train_prophet(file_list, sample_unique_id, model_type) # Inference model and evaluate error. error, future = evaluate_model(model, train_df, valid_df, sample_unique_id) # Define a model checkpoint using AIR API. checkpoint = ray.air.checkpoint.Checkpoint.from_dict({ "model": model, "valid_df": valid_df, "forecast_df": future, "location_id": sample_unique_id, }) metrics = dict(error=error) session.report(metrics, checkpoint=checkpoint) ############ # STEP 4. Customize distributed compute scaling. ############ num_training_workers = min(num_cpu - 2, 32) scaling_config = ScalingConfig( # Number of distributed workers. num_workers=num_training_workers, # Turn on/off GPU. use_gpu=False, # Specify resources used for trainer. trainer_resources={"CPU": 1}, # Try to schedule workers on different nodes. placement_strategy="SPREAD") ############ # STEP 5. Define a search space dict of all config parameters. ############ search_space = { "scaling_config": scaling_config, "params": { "file_list": tune.grid_search([files_to_use]), "algorithm": tune.grid_search(algorithms_to_use), "location": tune.grid_search(models_to_train), }, } # Optional STEP 6. Specify the hyperparameter tuning search strategy. ########## # STEP 7. Run the experiment with Ray AIR APIs. ########## # Define a tuner object. tuner = tune.Tuner( train_models, param_space=search_space, tune_config=tune.TuneConfig( metric="error", mode="min", ), run_config=air.RunConfig( # Redirect logs to relative path instead of default ~/ray_results/. local_dir="my_Tune_logs", # Specify name to make logs easier to find in log path. name="tune_nyc", ), ) # Fit the tuner object. results = tuner.fit()
На приведенных выше снимках экрана данные с января 2018 года были сгруппированы и агрегированы на ежедневном уровне. В прошлом я пытался сделать это в SageMaker, но одна только обработка данных занимала слишком много времени, не говоря уже о настройке такого количества моделей одновременно.
Раздел 3: Мультимодельная распределенная настройка (более крупные модели PyTorch)
Часто цель состоит в том, чтобы создать несколько более крупных моделей, например, одну модель по географической зоне, где таких зон всего несколько. Год назад, в декабре 2021 года, я написал статью в блоге, демонстрирующую, как использовать Ray Lightning для обучения более крупных моделей прогнозирования PyTorch. С тех пор большое улучшение заключается в том, что переключение разработки кода между ноутбуком и облаком стало более плавным благодаря Anyscale Workspaces.
Эти более крупные модели иногда называют «глобальными моделями», потому что только одна модель глубокой нейронной сети обучается одновременно для множества разных временных рядов. Вместо 1 модели на временной ряд (Prophet или ARIMA).
См. мой github для получения полного кода PyTorch Forecasting, показывающего последние API-интерфейсы Ray AIR с Ray Lightning. Вам нужно добавить идентификатор кластера к вашим данным, затем шаги для настройки такие же, как мы видели в разделе 2:
# Import forecasting libraries. import torch import pytorch_lightning as pl import pytorch_forecasting as ptf import tensorboard as tb # Import ray libraries. import ray_lightning from ray_lightning import RayStrategy from ray_lightning.tune import get_tune_resources, TuneReportCheckpointCallback from ray import air, tune from ray.tune.schedulers import ASHAScheduler # Define a tuner object. tuner = tune.Tuner( tune.with_resources( train_with_parameters, resources=get_tune_resources(num_workers=num_training_workers), ), tune_config=tune.TuneConfig( metric="loss", mode="min", scheduler=scheduler, ), run_config=air.RunConfig( # Redirect logs to relative path instead of default ~/ray_results/. local_dir="my_Tune_logs", # Specify name to make logs easier to find in log path. name="ptf_nyc", ), param_space=FORECAST_CONFIG, ) # Fit the tuner object. results = tuner.fit() # Get checkpoint for best model from results object, code not shown. # Plot inference forecasts for some unique_ids. some_unique_ids = [25, 41, 14, 24, 4] for idx in some_unique_ids: best_model.plot_prediction(x, raw_predictions, idx=idx)
Обратите внимание, что в отличие от моделей ARIMA и Prophet, которые представляли собой одну модель для каждого уникального_идентификатора, каждая из этих более крупных моделей содержит выводы для многих уникальных_идентификаторов одновременно в одной модели.
Раздел 4. Мультимодельное распределенное развертывание с использованием Ray AIR и Ray Serve
Перед развертыванием необходимо решить, должно ли развертывание быть онлайновым, всегда работающим http-сервисом или автономным (сервис Python вызывается по запросу). Ниже я демонстрирую автономное развертывание с использованием новых Ray AIR Predictors с Ray Serve.
Шаги для автономного развертывания:
Шаг 1. Создайте пакетный предиктор с помощью контрольных точек Ray AIR.
Шаг 2. Создайте несколько тестовых данных.
Шаг 3. Выполнить batch_predictor.predict(test_data)
.
Замените шаг 3 выше следующими шагами для пользовательского предиктора:
Шаг 3. Определите класс развертывания Ray Serve с помощью декоратора Ray @serve.deployment
.
Шаг 4. Разверните предиктор.
Шаг 5. Запросите развертывание и получите результат.
Шаги 3–5 выше необходимы только в том случае, если вы используете пользовательский предиктор (например, ARIMA, Prophet или PyTorch Forecasting).
В противном случае для интегрированных в Ray AIR библиотек машинного обучения (преобразователей HuggingFace, PyTorch, TensorFlow, Scikit-learn, XGBoost или LightGBM) достаточно вызвать batch_predictor.predict(test_data)
.
Продолжая пример прогнозирования PyTorch из предыдущего раздела, ниже приведен код развертывания. Полный код находится на моем гитхабе.
import pickle import numpy as np import pandas as pd import pyarrow import pyarrow.parquet as pq # Import forecasting libraries. import torch import pytorch_lightning as pl import pytorch_forecasting as ptf # Import ray libraries. import ray from ray import serve ########## # STEP 1. Instantiate a batch predictor from checkpoint. ########## batch_predictor = ptf.models.TemporalFusionTransformer.load_from_checkpoint(model_path) ########## # STEP 2. Create some test data. ########## # Being lazy, pretend the last test data is our out-of-sample test data. max_prediction_length = FORECAST_CONFIG['forecast_horizon'] new_prediction_data = df.copy() new_prediction_data["time_idx"] = new_prediction_data["time_idx"] + max_prediction_length # Convert data from pandas to PyTorch tensors. _, _, test_loader = convert_pandas_pytorch_timeseriesdata( new_prediction_data, FORECAST_CONFIG) ########## # STEP 3. Define a Ray Serve deployment class. ########## @serve.deployment class ForecastPredictor: def __init__(self, predictor, test_data): self.predictor = predictor self.test_data = test_data def predict(self): raw_predictions, x = \ self.predictor.predict(self.test_data, mode="raw", return_x=True) return x, raw_predictions def __call__(self): x, raw_predictions = self.predict() return [x, raw_predictions] ########## # STEP 4. Deploy the predictor. ########## # Bind arguments to the Class constructor. my_first_deployment = ForecastPredictor.bind( predictor=batch_predictor, test_data=test_loader) ########## # STEP 5. Query the deployment and get the result. ########## # Get handle from serve.run(). handle = serve.run(my_first_deployment) # ray.get() the results from the handle. ray_return = ray.get(handle.remote()) new_x = ray_return[0] new_pred = ray_return[1]
На скриншоте выше справа показана наблюдаемость кластера Ray во время обучения и обслуживания. Если вам нужно выполнить постобработку результатов предиктора, у меня есть пример этого в конце этой другой записной книжки здесь.
Заключение
В заключение, в этом блоге показаны шаги, как обучать и настраивать множество моделей параллельно, используя распределенные вычисления с помощью Ray с открытым исходным кодом. Модели не обязательно должны быть одного типа, их можно смешивать и сопоставлять из любых библиотек AI/ML Python.
API-интерфейсы Ray AIR были четкими, интуитивно понятными и скрывали большую сложность распределенных вычислений, поэтому было легко выполнять множество сложных действий, таких как ранняя остановка, планирование ASHA, контрольные точки и развертывание.
Чтобы продолжить обучение: