Существует множество способов развертывания моделей машин в производстве. Один из распространенных способов — реализовать его в виде веб-сервиса. И самый популярный тип — REST API. Что он делает, так это развертывание и запуск 24/7, ожидание получения запроса от клиента в JSON, извлечение входных данных и отправка их в модель ML для прогнозирования результата. Затем результат заворачивается в ответ и возвращается пользователю.
Вы начинаете искать эту проблему в Google с «Развернуть машинное обучение как REST API». Вы получите миллион результатов. Если вы потрудитесь и прочитаете об этом. Среди многих лучших результатов вы увидите общий шаблон для решения этой проблемы, как показано на диаграмме ниже. Популярный способ — нам нужен веб-фреймворк для создания API (Flask, Diango или FastAPI). Далее нам потребуется машинное обучение, чтобы принимать входные данные и возвращать прогноз. Чтобы помочь системе работать в производственной среде, нам потребуется дополнительный WSGI (если мы используем Flask) или ASGI (если мы используем FastAPI), обернутый вне веб-модуля.
Здесь замечено, что модели машинного обучения при таком подходе часто реализуются в одном блоке кода с веб-фреймворком (Flask/FastAPI/…). Это означает, что модели машинного обучения запускаются в том же процессе, что и веб-модуль. Это приводит ко многим проблемам:
- С одним процессом Flask/FastAPI он может начинаться только с одного процесса модели ML.
- В один момент времени модель ML может обрабатывать только один запрос.
- Если мы хотим масштабировать приложение, мы можем использовать WSGI, например guvicorn, или ASGI, например uvicorn, для создания множества дочерних процессов, что увеличит количество как веб-модулей, так и моделей мл, поскольку они реализуются в одном процессе.
- При выполнении некоторых тяжелых задач моделям машинного обучения может потребоваться много времени (даже несколько секунд) для выполнения логического вывода. Поскольку они создаются в одном и том же блоке кода веб-модуля, он будет блокировать другие запросы, когда нам нужно дождаться завершения всех задач, прежде чем мы сможем обработать следующие.
Так есть ли лучшие способы? Есть ли способы справиться с тяжелыми и длительными задачами, не блокируя запросы от клиентов? Сегодня я представлю подход, который, возможно, не нов, но, похоже, еще не применялся для развертывания машинного обучения ML в рабочей среде: использование распределенной системы с очередью задач.
Что такое очередь задач?
Очереди задач используются как механизм для распределения работы между потоками или машинами.
Ввод очереди задач — это единица работы, называемая задачей, выделенные рабочие процессы затем постоянно отслеживают очередь для выполнения новой работы. (Сельдерей Гитхаб)
Очередь задач — это инструмент, позволяющий запускать различные части программного обеспечения на отдельной машине/процессе/потоке. В приложении есть некоторые части (задачи), которые часто выполняются в течение длительного времени или мы не знаем, когда они закончатся. С этими задачами лучше запускать их в отдельном процессе или на распределенной машине, и когда они закончат выполнение, нам будет отправлено уведомление для проверки результата. Это не будет блокировать другие части. Это подходит для длительных задач, таких как отправка электронных писем, сканирование веб-контента или, в данном случае, запуск моделей машинного обучения. Рассмотрим приведенное ниже изображение.
Архитектура распределенной очереди задач содержит три основных модуля: производитель, потребитель и брокер сообщений.
- Клиент отправляет запрос нашему приложению Flask (Producer).
- Производитель отправляет сообщение задачи брокеру сообщений.
- ML Workers (Consumer) потребляют сообщения от брокера сообщений. После завершения задачи она сохраняет результат в Message Broker и обновляет статус задачи.
- После отправки задачи брокеру сообщений приложение FastAPI также может отслеживать состояние задачи из брокера сообщений. Когда статус выполнен, он извлекает результат и возвращает его клиенту.
Три модуля запускаются в разных процессах или распределенных машинах, чтобы они могли жить независимо. Существует множество инструментов для разработки очередей задач, распределенных на многих языках программирования, в этом блоге я сосредоточусь на Python и использую Celery, самый популярный инструмент для очередей задач в проектах Python. Чтобы больше узнать о преимуществах Celery и системы распределенной очереди задач, вы можете прочитать потрясающее объяснение. Теперь давайте перейдем к следующей проблеме
Модель прогнозирования продолжительности поездки
Чтобы проиллюстрировать это, я попытаюсь построить простую модель машинного обучения, которая может помочь предсказать среднее время в пути с учетом места получения, места высадки и продолжительности поездки. Это будет регрессионная модель. Обратите внимание, что в этом блоге я не занимаюсь созданием модели точности, а использую ее только для настройки API веб-сервиса. Вес модели и то, как мы можем построить модель, можно найти в этой ссылке.
Обзорные модули API
Мы разработаем веб-API для обслуживания модели машинного обучения, состоящей из 3 модулей: Web, Redis и ML Model. Эти модули докеризуются и развертываются в контейнерах.
. ├── apps │ └── api │ ├── api_routers.py │ └── main.py ├── boot │ ├── docker │ │ ├── celery │ │ │ ├── cuda90.yml │ │ │ └── trip │ │ │ ├── Dockerfile │ │ │ └── entrypoint.sh │ │ ├── compose │ │ │ └── trip_duration_prediction │ │ │ ├── docker-compose.cpu.yml │ │ │ ├── docker-compose.dev.yml │ │ │ ├── docker-compose.yml │ │ │ ├── docker-services.sh │ │ │ ├── my_build.sh │ │ └── uvicorn │ │ ├── Dockerfile │ │ ├── entrypoint.sh │ │ └── requirements.txt │ └── uvicorn │ └── config.py ├── config.py ├── core │ ├── managers │ ├── schemas │ │ ├── api_base.py │ │ ├── health.py │ │ └── trip.py │ ├── services │ │ ├── trip_duration_api.py │ │ └── trip_duration_prediction_task.py │ └── utilities ├── repo │ ├── logs │ └── models │ └── lin_reg.bin ├── tasks │ └── trip │ └── tasks.py └── tests ├── http_test │ └── test_api.py └── model_test └── test_trip_prediction_task.py
Вот подробная информация о структурах папок репо:
- apps: определяет основное приложение и маршрутизаторы API веб-модуля с использованием FastAPI.
- boot: определяет образы Dockerfile для веб-модуля, модуля ML и файла docker-compose, чтобы связать 3 модуля. Он также содержит конфигурацию для каждого образа докера и соответствующие файлы yml для библиотек пакетов.
- config.py: файл конфигурации определяет различные конфигурации для CELERY_BROKER_URL, CELERY_RESULT_BACKEND, TRIP_DURATION_MODEL, TRIP_DURATION_THRESHOLD…
- core: определяет все сценарии реализации, используемые в модулях Web, Redis и Worker.
- repo: сохраняет журналы приложений и задач при запуске API. Он также сохранил вес модели
- задачи: определение сценариев задач Celery.
- tests: определяет модульный тест для API.
Веб-модуль
В веб-модуле я использую FastAPI в качестве веб-фреймворка. FastAPI предоставляет множество нишевых функций, таких как: супербыстрый, интегрированный с Uvicorn, автоматическая проверка типа проверки с помощью Pydantic, автоматическое создание документов и многое другое… Давайте посмотрим, с чего я запускаю приложение FastAPI.
boot/docker/uvicorn/entrypoint.sh
Здесь я запускаю приложение FastAPI
Затем я определяю маршрутизаторы API и схему сообщений.
apps/api/api_routers.py
core/schemas/trip.py
core/services/trip_duration_api.py
Затем я определяю класс с именем trip_duration_api.py, в котором я обрабатываю логику запроса.
- В функции process_request_apiонбудет собирать информацию о запросе
- Функция call_celery_matching добавляет в очередь брокера сообщений Redis, развернутого в другом контейнере, в качестве задачи. Модули машинного обучения, развернутые в других контейнерах, будут извлекать задачи из Redis и начинать работать над этим. Результатом является обещание, которое уведомит заднюю часть веб-модуля, когда рабочий закончит задачу или по истечении времени истечения срока действия. Обратите внимание на строки 29 и 35, где необходимо ввести task_celery.task_process_trip в качестве имени задачи Celery и task_celery.task_trip_queue в качестве очереди Celery.
- Строка с 12 по 14 помогает веб-модулю подключаться к модулю машинного обучения через Celery.
Все объединено и встроено в образ докера.
Веб-файл Docker
Рабочий модуль
В celery каждое задание, которое может быть выполнено в отдельном процессе или машине, называется заданием. Задача может варьироваться от сканирования веб-контента и отправки электронных писем до даже сложных моделей работающих машин. Задача может запускаться во время выполнения или периодически. Когда он развернут, каждый воркер может быть запущен в процессе, в зеленом потоке… в зависимости от используемого нами типа Celery. Чтобы лучше понять исполняющий пул Celery, подробнее можно прочитать в этом блоге. В этом API я выбираю тип пула Celery gevent. Отправную точку Celery можно найти в boot/docker/celery/trip/entrypoint.sh.
Обратите внимание, что в строке 18 я выбираю тип Celery gevent. Prefetch-multiplier – это количество сообщений, которые необходимо предварительно загрузить за один раз, это означает, что для каждого рабочего процесса будет резервироваться только одна задача за раз. Параллелизм — это количество зеленых потоков, созданных для каждого экземпляра Celery.
Конфигурация сельдерея
Конфигурация Celery определена в config.py.
Приведенный выше файл содержит все конфигурации, необходимые для запуска Celery. Строки 6 и 7 настраивают URL-адрес брокера и серверную часть результатов, которой в данном случае является Redis. Эти конфигурации будут получены из файла env образа докера, который я объясню позже при определении файла docker-compose.
Когда производитель отправляет сообщение брокеру сообщений, ему необходимо определить, какую задачу он намеревается использовать и в какой очереди. Затем, основываясь на имени очереди и имени задачи, Celery может назначить сообщение правильному обработчику-потребителю, работающему над этой задачей. Поэтому в строке 2 я определяю имя очереди как «tasks.trip», а имя_задачи как «tasks.trip.tasks.predict_ride». Напомним, что эти параметры используются в файле core/services/trip_duration_api.py, когда веб-модуль выполняет задачу Celery.
Задание с сельдереем
Задача сельдерея реализована в tasks/trip/tasks.py.
В строках с 9 по 11 я присваиваю задаче соответствующее имя задачи. Поэтому позже, когда клиент вызовет задачу поездки, Celery инициирует реализацию задачи в файле сценария. В строке 14 я установил максимальное время выполнения задачи 60 секунд, что означает, что если задача не завершится в течение 60 секунд, задача завершится ошибкой и сообщит об ошибке обратно клиенту. .
Задача прогнозирования продолжительности поездки
Приведенный выше файл является основным местом для реализации модели ML. Со строк с 17 по 19 я загружаю вес модели, хранящийся в папке repo/models. Другие части не требуют пояснений, когда регрессионная модель принимает входные данные, содержащие место посадки, место высадки и расстояние поездки, а затем прогнозирует время в пути.
Подключайте все с помощью Docker Compose
Как я объяснил в начале, нам понадобятся 3 модуля: модули Web, Redis и ML. Чтобы соединить три части и позволить им взаимодействовать друг с другом, я использую docker-compose для определения определения трех образов Docker. При запуске приложения будут созданы три соответствующих контейнера, которые будут связаны друг с другом в сети докеров. Подробную информацию о docker-compose можно найти в boot/docker/compose/trip_duration_prediction/docker-compose.yml.
Файл .env, который содержит все параметры, работающие при запуске docker-compose, можно найти по адресу boot/docker/compose/trip_duration_prediciton/.env.
Тестирование приложения
Позвольте мне запустить все приложение. Как видно на изображении ниже, при запуске API есть три запущенных контейнера.
Веб-контейнер работает на порту 8182, где мы можем получить доступ к документации API по адресу: localhost:8182/docs. Это одна из нишевых функций FastAPI, где у нас будет документация по Swagger сразу после того, как мы закончим реализацию API без каких-либо усилий.
Затем давайте попробуем запустить конечную точку API в /v1/trip/predict,просмотрим прогноз и проверим возврат журнала.
Как только запрос будет отправлен от клиента к веб-модулю, он будет обработан асинхронно с использованием Celery в рабочем потоке как отдельный процесс или поток. Это дает множество преимуществ:
- Тяжелая задача обрабатывается в отдельном процессе/потоке, что может помочь увеличить количество запросов, которые мы можем обработать, поскольку не будет блокировать вызов клиента.
- Модуль машинного обучения реализован в другом потоке, упакованном в отдельный образ докера, что означает, что специалисты по данным или инженеры по машинному обучению могут хранить свой код реализации и пакеты независимо друг от друга.
- Если количество запросов увеличится, мы можем легко увеличить количество модулей машинного обучения, чтобы справиться с всплеском запросов, в то время как для веб-модуля можно оставить то же самое.
Заключение
В этом блоге я рассказал, как использовать распределенную архитектуру очереди задач для реализации API для обслуживания модуля машинного обучения. Использование Celery, FastAPI и Redis может помочь лучше справляться с длительными задачами, такими как запущенный процесс мл, и, следовательно, повысить общую производительность.
Кредит
Эта оригинальная идея время от времени развивалась и совершенствовалась, когда я работал в своей предыдущей компании. Спасибо Шанхонгу и Джонатану, замечательным бывшим коллегам, я узнал от них много хорошего.
Если вы хотите обратиться к полному коду, отметьте: