Существует множество способов развертывания моделей машин в производстве. Один из распространенных способов — реализовать его в виде веб-сервиса. И самый популярный тип — REST API. Что он делает, так это развертывание и запуск 24/7, ожидание получения запроса от клиента в JSON, извлечение входных данных и отправка их в модель ML для прогнозирования результата. Затем результат заворачивается в ответ и возвращается пользователю.

Вы начинаете искать эту проблему в Google с «Развернуть машинное обучение как REST API». Вы получите миллион результатов. Если вы потрудитесь и прочитаете об этом. Среди многих лучших результатов вы увидите общий шаблон для решения этой проблемы, как показано на диаграмме ниже. Популярный способ — нам нужен веб-фреймворк для создания API (Flask, Diango или FastAPI). Далее нам потребуется машинное обучение, чтобы принимать входные данные и возвращать прогноз. Чтобы помочь системе работать в производственной среде, нам потребуется дополнительный WSGI (если мы используем Flask) или ASGI (если мы используем FastAPI), обернутый вне веб-модуля.

Здесь замечено, что модели машинного обучения при таком подходе часто реализуются в одном блоке кода с веб-фреймворком (Flask/FastAPI/…). Это означает, что модели машинного обучения запускаются в том же процессе, что и веб-модуль. Это приводит ко многим проблемам:

  • С одним процессом Flask/FastAPI он может начинаться только с одного процесса модели ML.
  • В один момент времени модель ML может обрабатывать только один запрос.
  • Если мы хотим масштабировать приложение, мы можем использовать WSGI, например guvicorn, или ASGI, например uvicorn, для создания множества дочерних процессов, что увеличит количество как веб-модулей, так и моделей мл, поскольку они реализуются в одном процессе.
  • При выполнении некоторых тяжелых задач моделям машинного обучения может потребоваться много времени (даже несколько секунд) для выполнения логического вывода. Поскольку они создаются в одном и том же блоке кода веб-модуля, он будет блокировать другие запросы, когда нам нужно дождаться завершения всех задач, прежде чем мы сможем обработать следующие.

Так есть ли лучшие способы? Есть ли способы справиться с тяжелыми и длительными задачами, не блокируя запросы от клиентов? Сегодня я представлю подход, который, возможно, не нов, но, похоже, еще не применялся для развертывания машинного обучения ML в рабочей среде: использование распределенной системы с очередью задач.

Что такое очередь задач?

Очереди задач используются как механизм для распределения работы между потоками или машинами.

Ввод очереди задач — это единица работы, называемая задачей, выделенные рабочие процессы затем постоянно отслеживают очередь для выполнения новой работы. (Сельдерей Гитхаб)

Очередь задач — это инструмент, позволяющий запускать различные части программного обеспечения на отдельной машине/процессе/потоке. В приложении есть некоторые части (задачи), которые часто выполняются в течение длительного времени или мы не знаем, когда они закончатся. С этими задачами лучше запускать их в отдельном процессе или на распределенной машине, и когда они закончат выполнение, нам будет отправлено уведомление для проверки результата. Это не будет блокировать другие части. Это подходит для длительных задач, таких как отправка электронных писем, сканирование веб-контента или, в данном случае, запуск моделей машинного обучения. Рассмотрим приведенное ниже изображение.

Архитектура распределенной очереди задач содержит три основных модуля: производитель, потребитель и брокер сообщений.

  1. Клиент отправляет запрос нашему приложению Flask (Producer).
  2. Производитель отправляет сообщение задачи брокеру сообщений.
  3. ML Workers (Consumer) потребляют сообщения от брокера сообщений. После завершения задачи она сохраняет результат в Message Broker и обновляет статус задачи.
  4. После отправки задачи брокеру сообщений приложение FastAPI также может отслеживать состояние задачи из брокера сообщений. Когда статус выполнен, он извлекает результат и возвращает его клиенту.

Три модуля запускаются в разных процессах или распределенных машинах, чтобы они могли жить независимо. Существует множество инструментов для разработки очередей задач, распределенных на многих языках программирования, в этом блоге я сосредоточусь на Python и использую Celery, самый популярный инструмент для очередей задач в проектах Python. Чтобы больше узнать о преимуществах Celery и системы распределенной очереди задач, вы можете прочитать потрясающее объяснение. Теперь давайте перейдем к следующей проблеме

Модель прогнозирования продолжительности поездки

Чтобы проиллюстрировать это, я попытаюсь построить простую модель машинного обучения, которая может помочь предсказать среднее время в пути с учетом места получения, места высадки и продолжительности поездки. Это будет регрессионная модель. Обратите внимание, что в этом блоге я не занимаюсь созданием модели точности, а использую ее только для настройки API веб-сервиса. Вес модели и то, как мы можем построить модель, можно найти в этой ссылке.

Обзорные модули API

Мы разработаем веб-API для обслуживания модели машинного обучения, состоящей из 3 модулей: Web, Redis и ML Model. Эти модули докеризуются и развертываются в контейнерах.

.
├── apps
│   └── api
│       ├── api_routers.py
│       └── main.py
├── boot
│   ├── docker
│   │   ├── celery
│   │   │   ├── cuda90.yml
│   │   │   └── trip
│   │   │       ├── Dockerfile
│   │   │       └── entrypoint.sh
│   │   ├── compose
│   │   │   └── trip_duration_prediction
│   │   │       ├── docker-compose.cpu.yml
│   │   │       ├── docker-compose.dev.yml
│   │   │       ├── docker-compose.yml
│   │   │       ├── docker-services.sh
│   │   │       ├── my_build.sh
│   │   └── uvicorn
│   │       ├── Dockerfile
│   │       ├── entrypoint.sh
│   │       └── requirements.txt
│   └── uvicorn
│       └── config.py
├── config.py
├── core
│   ├── managers
│   ├── schemas
│   │   ├── api_base.py
│   │   ├── health.py
│   │   └── trip.py
│   ├── services
│   │   ├── trip_duration_api.py
│   │   └── trip_duration_prediction_task.py
│   └── utilities
├── repo
│   ├── logs
│   └── models
│       └── lin_reg.bin
├── tasks
│   └── trip
│       └── tasks.py
└── tests
    ├── http_test
    │   └── test_api.py
    └── model_test
        └── test_trip_prediction_task.py

Вот подробная информация о структурах папок репо:

  • apps: определяет основное приложение и маршрутизаторы API веб-модуля с использованием FastAPI.
  • boot: определяет образы Dockerfile для веб-модуля, модуля ML и файла docker-compose, чтобы связать 3 модуля. Он также содержит конфигурацию для каждого образа докера и соответствующие файлы yml для библиотек пакетов.
  • config.py: файл конфигурации определяет различные конфигурации для CELERY_BROKER_URL, CELERY_RESULT_BACKEND, TRIP_DURATION_MODEL, TRIP_DURATION_THRESHOLD…
  • core: определяет все сценарии реализации, используемые в модулях Web, Redis и Worker.
  • repo: сохраняет журналы приложений и задач при запуске API. Он также сохранил вес модели
  • задачи: определение сценариев задач Celery.
  • tests: определяет модульный тест для API.

Веб-модуль

В веб-модуле я использую FastAPI в качестве веб-фреймворка. FastAPI предоставляет множество нишевых функций, таких как: супербыстрый, интегрированный с Uvicorn, автоматическая проверка типа проверки с помощью Pydantic, автоматическое создание документов и многое другое… Давайте посмотрим, с чего я запускаю приложение FastAPI.

boot/docker/uvicorn/entrypoint.sh

Здесь я запускаю приложение FastAPI

Затем я определяю маршрутизаторы API и схему сообщений.

apps/api/api_routers.py

core/schemas/trip.py

core/services/trip_duration_api.py

Затем я определяю класс с именем trip_duration_api.py, в котором я обрабатываю логику запроса.

  • В функции process_request_apiонбудет собирать информацию о запросе
  • Функция call_celery_matching добавляет в очередь брокера сообщений Redis, развернутого в другом контейнере, в качестве задачи. Модули машинного обучения, развернутые в других контейнерах, будут извлекать задачи из Redis и начинать работать над этим. Результатом является обещание, которое уведомит заднюю часть веб-модуля, когда рабочий закончит задачу или по истечении времени истечения срока действия. Обратите внимание на строки 29 и 35, где необходимо ввести task_celery.task_process_trip в качестве имени задачи Celery и task_celery.task_trip_queue в качестве очереди Celery.
  • Строка с 12 по 14 помогает веб-модулю подключаться к модулю машинного обучения через Celery.

Все объединено и встроено в образ докера.

Веб-файл Docker

Рабочий модуль

В celery каждое задание, которое может быть выполнено в отдельном процессе или машине, называется заданием. Задача может варьироваться от сканирования веб-контента и отправки электронных писем до даже сложных моделей работающих машин. Задача может запускаться во время выполнения или периодически. Когда он развернут, каждый воркер может быть запущен в процессе, в зеленом потоке… в зависимости от используемого нами типа Celery. Чтобы лучше понять исполняющий пул Celery, подробнее можно прочитать в этом блоге. В этом API я выбираю тип пула Celery gevent. Отправную точку Celery можно найти в boot/docker/celery/trip/entrypoint.sh.

Обратите внимание, что в строке 18 я выбираю тип Celery gevent. Prefetch-multiplier – это количество сообщений, которые необходимо предварительно загрузить за один раз, это означает, что для каждого рабочего процесса будет резервироваться только одна задача за раз. Параллелизм — это количество зеленых потоков, созданных для каждого экземпляра Celery.

Конфигурация сельдерея

Конфигурация Celery определена в config.py.

Приведенный выше файл содержит все конфигурации, необходимые для запуска Celery. Строки 6 и 7 настраивают URL-адрес брокера и серверную часть результатов, которой в данном случае является Redis. Эти конфигурации будут получены из файла env образа докера, который я объясню позже при определении файла docker-compose.

Когда производитель отправляет сообщение брокеру сообщений, ему необходимо определить, какую задачу он намеревается использовать и в какой очереди. Затем, основываясь на имени очереди и имени задачи, Celery может назначить сообщение правильному обработчику-потребителю, работающему над этой задачей. Поэтому в строке 2 я определяю имя очереди как «tasks.trip», а имя_задачи как «tasks.trip.tasks.predict_ride». Напомним, что эти параметры используются в файле core/services/trip_duration_api.py, когда веб-модуль выполняет задачу Celery.

Задание с сельдереем

Задача сельдерея реализована в tasks/trip/tasks.py.

В строках с 9 по 11 я присваиваю задаче соответствующее имя задачи. Поэтому позже, когда клиент вызовет задачу поездки, Celery инициирует реализацию задачи в файле сценария. В строке 14 я установил максимальное время выполнения задачи 60 секунд, что означает, что если задача не завершится в течение 60 секунд, задача завершится ошибкой и сообщит об ошибке обратно клиенту. .

Задача прогнозирования продолжительности поездки

Приведенный выше файл является основным местом для реализации модели ML. Со строк с 17 по 19 я загружаю вес модели, хранящийся в папке repo/models. Другие части не требуют пояснений, когда регрессионная модель принимает входные данные, содержащие место посадки, место высадки и расстояние поездки, а затем прогнозирует время в пути.

Подключайте все с помощью Docker Compose

Как я объяснил в начале, нам понадобятся 3 модуля: модули Web, Redis и ML. Чтобы соединить три части и позволить им взаимодействовать друг с другом, я использую docker-compose для определения определения трех образов Docker. При запуске приложения будут созданы три соответствующих контейнера, которые будут связаны друг с другом в сети докеров. Подробную информацию о docker-compose можно найти в boot/docker/compose/trip_duration_prediction/docker-compose.yml.

Файл .env, который содержит все параметры, работающие при запуске docker-compose, можно найти по адресу boot/docker/compose/trip_duration_prediciton/.env.

Тестирование приложения

Позвольте мне запустить все приложение. Как видно на изображении ниже, при запуске API есть три запущенных контейнера.

Веб-контейнер работает на порту 8182, где мы можем получить доступ к документации API по адресу: localhost:8182/docs. Это одна из нишевых функций FastAPI, где у нас будет документация по Swagger сразу после того, как мы закончим реализацию API без каких-либо усилий.

Затем давайте попробуем запустить конечную точку API в /v1/trip/predict,просмотрим прогноз и проверим возврат журнала.

Как только запрос будет отправлен от клиента к веб-модулю, он будет обработан асинхронно с использованием Celery в рабочем потоке как отдельный процесс или поток. Это дает множество преимуществ:

  • Тяжелая задача обрабатывается в отдельном процессе/потоке, что может помочь увеличить количество запросов, которые мы можем обработать, поскольку не будет блокировать вызов клиента.
  • Модуль машинного обучения реализован в другом потоке, упакованном в отдельный образ докера, что означает, что специалисты по данным или инженеры по машинному обучению могут хранить свой код реализации и пакеты независимо друг от друга.
  • Если количество запросов увеличится, мы можем легко увеличить количество модулей машинного обучения, чтобы справиться с всплеском запросов, в то время как для веб-модуля можно оставить то же самое.

Заключение

В этом блоге я рассказал, как использовать распределенную архитектуру очереди задач для реализации API для обслуживания модуля машинного обучения. Использование Celery, FastAPI и Redis может помочь лучше справляться с длительными задачами, такими как запущенный процесс мл, и, следовательно, повысить общую производительность.

Кредит

Эта оригинальная идея время от времени развивалась и совершенствовалась, когда я работал в своей предыдущей компании. Спасибо Шанхонгу и Джонатану, замечательным бывшим коллегам, я узнал от них много хорошего.

Если вы хотите обратиться к полному коду, отметьте: