Использование Hadoop для развертывания модели машинного обучения

Эта запись в блоге основана на идеях, начатых в трех предыдущих записях блога.

В этом сообщении блога я покажу, как развернуть ту же модель машинного обучения, которую я развернул как пакетное задание в этом сообщении блога, как очередь задач в этом сообщении блога, внутри AWS Lambda в этом сообщении блога. , в качестве потокового приложения Kafka в этом сообщении блога и службы gRPC в этом сообщении блога.

Код в этом сообщении блога можно найти в этом репозитории github.

Введение

Из-за растущей потребности в обработке больших объемов данных на многих компьютерах в 2006 году был начат проект Hadoop. Hadoop - это набор программных компонентов, которые помогают решать проблемы обработки больших объемов данных с использованием кластеров компьютеров. Hadoop поддерживает массовое хранение данных с помощью компонента HDFS и крупномасштабную обработку данных с помощью компонента MapReduce. Кластеры Hadoop стали центральной частью инфраструктуры многих компаний благодаря своей полезности.

В этом сообщении блога мы сосредоточимся на компоненте MapReduce в Hadoop, поскольку мы будем развертывать модель машинного обучения, которая представляет собой процесс, требующий значительных вычислительных ресурсов. MapReduce - это среда программирования для обработки данных, которая полезна для обработки больших объемов распределенных данных. MapReduce может обрабатывать ошибки и сбои в вычислениях. MapReduce также по своей сути параллелен по своей природе, но абстрагирует этот факт, делая код похожим на однопроцессный код.

Hadoop и MapReduce используются почти исключительно для обработки больших наборов данных. Несмотря на то, что модели машинного обучения обучаются на больших наборах данных, мы сосредоточимся на использовании MapReduce для выполнения прогнозов. Hadoop и MapReduce следует учитывать, когда пакетное задание прогнозирования необходимо выполнить для миллионов или миллиардов записей. Это сообщение в блоге похоже на предыдущее сообщение в блоге, в котором модель машинного обучения развертывалась как пакетное задание, но в нем основное внимание уделялось мелкомасштабным пакетным заданиям, которые могли быстро выполняться на отдельных машинах.

Поскольку результаты задания пакетного прогнозирования сохраняются, и клиенты получают к ним доступ позже, пользователь не может напрямую взаимодействовать с моделью. Это означает, что клиент, который использует прогнозы, созданные моделью, не может запрашивать прогнозы непосредственно из программного компонента модели машинного обучения и должен получить доступ к набору данных, созданных пакетным заданием, для получения прогнозов от модели.

Структура пакета

Для начала я настроил структуру проекта для пакета заданий:

- data (data files used for testing the job)
- model_map_reduce_job (python package for the map reduce job)
    - __init__.py
    - config.py
    - ml_model_map_reduce_job.py
    - ml_model_manager.py
- tests ( unit tests )
- Makefle
- mrjob.conf (configuration file for MapReduce framework)
- README.md
- requirements.txt
- setup.py
- test_requirements.txt

Эту структуру можно увидеть в репозитории github.

Создание картыСокращение рабочих мест

Задание MapReduce состоит из двух основных шагов: шага карты и шаг сокращения. Оба шага реализованы как простые функции, которые получают данные, обрабатывают их и возвращают результаты. Шаг карты отвечает за реализацию фильтрации и сортировки, а шаг уменьшения отвечает за вычисление совокупных результатов. Система MapReduce отвечает за запуск, управление и остановку кода в функциях map и reduce, за сериализацию и десериализацию данных, а также за управление избыточностью и отказоустойчивостью выполнения функций map и reduce.

Реализация MapReduce, предоставляемая Hadoop, может выполнять обработку данных с помощью функций map и reduce, реализованных на многих различных языках программирования, с использованием потокового интерфейса. В этом сообщении блога мы будем использовать этот интерфейс для запуска задания прогнозирования модели с использованием Python. Это значительно упрощает развертывание модели, поскольку нам не нужно переписывать код прогнозирования модели, чтобы развернуть ее в кластере Hadoop. Мы будем использовать пакет mrjob python для написания задания MapReduce.

Установка модели

Чтобы написать задание MapReduce, способное обрабатывать любую модель машинного обучения, мы начнем с установки модели в среду. Для этого мы можем использовать ту же модель, что и раньше, пакет iris_model. Этот пакет можно установить из репозитория git с помощью этой команды:

pip install git+https://github.com/schmidtbri/ml-model-abc-improvements

Теперь, когда у нас есть модель, установленная в среде, мы можем опробовать ее, открыв интерпретатор Python и введя этот код:

from iris_model.iris_predict import IrisModel
>>> model = IrisModel()
>>> model.predict({“sepal_length”:1.1, “sepal_width”: 1.2, “petal_width”: 1.3, “petal_length”: 1.4})
{‘species’: ‘setosa’}

Чтобы загрузить модель в задание MapReduce, мы укажем на класс IrisModel в файле конфигурации. Файл конфигурации выглядит так:

class Config(dict):
    models = [{
        “module_name”: “iris_model.iris_predict”,
        “class_name”: “IrisModel”
    }]

Приведенный выше код можно найти здесь.

Эта конфигурация будет использоваться заданием для динамической загрузки пакетов модели. Поля module_name и class_name позволяют заданию импортировать класс, содержащий реализацию алгоритма прогнозирования модели. Список моделей может содержать указатели на многие модели, поэтому нет никаких ограничений на то, сколько моделей может быть размещено заданием MapReduce.

Управление моделями

Как и в предыдущих сообщениях блога, мы будем использовать одноэлементный объект для управления объектами модели машинного обучения, которые будут использоваться для прогнозирования. Класс, из которого создается одноэлементный объект, называется ModelManager. Класс отвечает за создание экземпляров объектов MLModel, управление экземплярами, возврат информации об объектах MLModel и возврат ссылок на объекты при необходимости. Код для класса ModelManager можно найти здесь. Полное объяснение кода в классе см. В этом сообщении в блоге.

Класс MLModelMapReduceJob

Теперь у нас установлен пакет модели и класс ModelManager для управления им, поэтому мы можем приступить к написанию самого задания MapReduce. Задание MapReduce определяется как подкласс базового класса MRJob, который определяет методы map () и reduce (), реализующие функциональные возможности задания. Для начала загрузим правильную конфигурацию, обратившись к переменной среды APP_SETTINGS:

configuration = __import__(“model_mapreduce_job”). \
    __getattribute__(“config”). \
    __getattribute__(os.environ[“APP_SETTINGS”])

Приведенный выше код можно найти здесь.

После загрузки конфигурации мы создадим экземпляр синглтона ModelManager, который будет содержать ссылки на объекты модели, которые мы хотим разместить в этом задании MapReduce:

model_manager = ModelManager()
model_manager.load_models(Config.models)

Приведенный выше код можно найти здесь.

Поместив эту инициализацию в верхнюю часть модуля, мы можем быть уверены, что модели инициализируются только один раз, когда модуль загружается интерпретатором python.

Теперь мы можем написать класс, составляющий задание MapReduce:

class MLModelMapReduceJob(MRJob):
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONProtocol
    DIRS = [‘../model_mapreduce_job’]

Приведенный выше код можно найти здесь.

Свойства классов INPUT_PROTOCOL и OUTPUT_PROTOCOL определяют протоколы ввода и вывода шагов MapReduce. Протокол - это фрагмент кода, который считывает и записывает данные в файловую систему, полезно абстрагироваться от карты и сокращать количество шагов от формата, в котором хранятся данные. Свойство класса DIRS сообщает пакету MrJob, что код в этом модуле зависит от кода внутри каталога model_map_reduce, это заставляет MrJob копировать код всякий раз, когда он создает пакет развертывания для этого задания. Эти параметры помогают упростить код и развертывание задания.

Класс задания должен быть инициализирован, поэтому мы добавим метод __init __ ():

def __init__(self, *args, **kwargs):
    super(MLModelMapReduceJob, self).__init__(*args, **kwargs)
    
    self._model = model_manager.get_model(
        self.options.model_qualified_name)
    if self._model is None:
        raise ValueError(“‘{}’ not found in the ModelManager 
            instance.”.format(self.options.model_qualified_name))

Приведенный выше код можно найти здесь.

Метод __init__ сначала вызывает метод __init__ базового класса MrJob, чтобы он мог выполнить инициализацию на уровне инфраструктуры. Затем мы запрашиваем у синглтона ModelManager экземпляр модели, которую мы хотим разместить в задании MapReduce. Доступ к полному имени модели осуществляется из переменной self.options.model_qualified_name, которая задается параметром командной строки. Наконец, мы проверяем, действительно ли объект модели был возвращен ModelManager, и вызываем исключение, если это не так.

Затем задание MapReduce должно иметь возможность запускаться на любой модели, находящейся внутри экземпляра ModelManager. Для поддержки этого мы добавим в задание параметр командной строки, который принимает полное имя модели, которую мы хотим запустить:

def configure_args(self):
    super(MLModelMapReduceJob, self).configure_args()
    
    self.add_passthru_arg(‘--model_qualified_name’, 
        type=str, help=’Qualified name of the model.’)

Приведенный выше код можно найти здесь.

Эта функция позволяет нам расширить параметры командной строки, уже поддерживаемые фреймворком MrJob. Аргумент командной строки проходит через платформу и сохраняется в объекте self.options, который мы использовали в коде метода __init__, чтобы выбрать модель, которую мы хотим использовать для задания.

Теперь, когда у нас есть инициализированный класс задания, мы можем написать код, который фактически выполняет работу MapReduce. Функция mapper выглядит так:

def mapper(self, _, data):
    prediction = None
    try:
        prediction = self._model.predict(data=data)
    except Exception as e:
        prediction = None
    yield data, prediction

Приведенный выше код можно найти здесь.

Эта функция очень проста: она принимает словарь в аргументе «данные», делает прогноз с помощью модели и возвращает кортеж входных и выходных данных предсказания. Аргумент данных - это словарь, потому что мы использовали «JSONValueProtocol» в качестве INPUT_PROTOCOL для этого задания. Этот протокол десериализует строку JSON в собственный объект Python. Используя этот протокол, мы избавили себя от необходимости десериализации ввода в JSON на этапе сопоставления. Если модель не может сделать прогноз, в качестве прогноза возвращается None. Параметр OUTPUT_PROTOCOL установлен на «JSONProtocol», который сериализует пару «ключ-значение» в две строки JSON, разделенные символом табуляции.

Результатом шага сопоставления всегда является пара «ключ-значение», в которой ключ должен быть уникальным на всех входах шага. Если какой-либо ввод повторяется, шаг картографа сделает для него прогноз, но инфраструктура MapReduce вернет только один результат для ключа к следующему шагу. Такое поведение накладывает ограничение на нашу модель: она всегда должна давать один и тот же прогноз при одинаковых входных данных, то есть модель должна делать прогнозы детерминированно. Если модель не является детерминированной, инфраструктура MapReduce выберет первый прогноз, сделанный для входной записи. В некоторых ситуациях это может не иметь значения, но может нарушить работу любых шагов, использующих результаты этого шага, если такое поведение не обрабатывается правильно.

Этому заданию MapReduce не требуется шаг сокращения, поскольку нам нужно только делать прогнозы и возвращать результаты. Однако это задание можно комбинировать с другими заданиями MapReduce, которые действительно используют шаги сокращения, чтобы сделать конвейер обработки более сложным.

Проверка работы

Теперь, когда у нас есть код для задания MapReduce, мы протестируем его локально на небольшом файле данных. Из-за параметров протокола ввода и вывода модель может принимать файлы JSON в качестве входных данных и создавать файлы JSON в качестве выходных. Вот пример JSON, который мы скармливаем заданию:

{“sepal_length”: 5.0, “sepal_width”: 3.2, “petal_length”: 1.2, “petal_width”: 0.2}
{ “sepal_length”: 5.5, “sepal_width”: 3.5, “petal_length”: 1.3, “petal_width”: 0.2}
...

Файл данных можно найти здесь.

Чтобы выполнить задание локально, необходимо выполнить следующие команды:

export PYTHONPATH=./
export APP_SETTINGS=ProdConfig
python model_mapreduce_job/ml_model_map_reduce_job.py \
    --model_qualified_name iris_model ./data/input.ldjson > \
    data/output.ldjson

После выполнения задания выходные данные шага карты будут находиться в папке / data. Входная строка json и результирующий прогноз будут в одной строке файла, разделенной символом табуляции. В одной входной строке был JSON со схемой, которую модель не могла принять, поэтому выходные данные должны содержать нулевое предсказание для этого входа. Аргумент командной строки - model_qualified_name указывает заданию использовать модель iris_model из ModelManager при запуске задания.

Развертывание в AWS

Пакет mrjob поддерживает выполнение заданий в сервисе AWS Elastic Map Reduce (EMR). Чтобы запустить модельное задание, нам понадобится учетная запись в AWS. Для взаимодействия с AWS нам необходимо установить пакеты python boto3 и awscli:

pip install boto3 awscli

Далее мы настроим ключи доступа к API. Набор ключей доступа можно сгенерировать и настроить, следуя этим инструкциям. Конфигурация будет выглядеть так:

aws configure
AWS Access Key ID [*******************]: xxxxxxxxxxxxxxxxxx
AWS Secret Access Key [******************]:xxxxxxxxxxxxxxxxxxx
Default region name [us-east-2]: us-east-1
Default output format [None]:

Чтобы запустить модельное задание в AWS EMR, нам сначала нужно настроить роль по умолчанию, которую будет выполнять задание. Простой способ сделать это уже поддерживается в инструменте AWS CLI. Команда выглядит так:

aws emr create-default-roles

Чтобы настроить среду выполнения в узлах перед запуском кода прогнозирования модели, нам потребуется выполнить несколько команд. Пакет mrjob поддерживает это с помощью файла конфигурации mrjob.conf. Конфигурационный файл написан на YAML и выглядит так:

runners:
  emr:
    bootstrap:
    - sudo yum update -y
    - sudo yum install git -y
    - sudo pip-3.6 install -r ./requirements.txt#
    setup:
    - export PYTHONPATH=$PYTHONPATH:model_mapreduce_job/#
    - export APP_SETTINGS=ProdConfig

Файл можно найти здесь.

Этот файл может содержать конфигурацию для нескольких типов бегунов, пока мы настроим только бегунок EMR. Раздел начальной загрузки содержит команды, которые будут выполняться один раз при первом создании узла кластера. В этом разделе мы обновляем менеджер пакетов yum, устанавливаем клиент git и устанавливаем все зависимости Python, необходимые для запуска пакета модели из файла requirements.txt в проекте.

В разделе настройки содержатся команды, которые будут выполняться при каждом запуске задания MapReduce. В этом разделе мы настраиваем переменную среды PYTHONPATH, которая понадобится интерпретатору Python для поиска файлов кода, составляющих задание. Мы также устанавливаем переменную среды APP_SETTINGS, которая сообщает заданию, в какой среде оно выполняется, а пока мы выполняем задание с настройками ProdConfiguration.

Теперь, когда у нас есть учетные данные и конфигурация, мы можем запустить задание в AWS. Команда выглядит так:

python model_mapreduce_job/ml_model_map_reduce_job.py \
  --conf-path=./mrjob.conf \
  -r emr \
  --iam-service-role EMR_DefaultRole \
  --model_qualified_name iris_model ./data/input.ldjson

Пакет mrjob создаст корзину S3 для задания, загрузит код и данные в корзину S3, создаст кластер EMR для задания и выполнит задание. Результаты работы будут сохранены в той же корзине S3.

Закрытие

Используя структуру MapReduce, мы можем делать большое количество прогнозов на кластере компьютеров. Из-за простой конструкции инфраструктуры MapReduce многие сложности, связанные с выполнением задания на многих компьютерах, абстрагируются. Этот вариант развертывания моделей машинного обучения позволяет нам развертывать задания прогнозирования моделей для действительно массивных наборов данных.

Благодаря построению задания прогнозирования с использованием интерфейса MLModel развертывание модели как задания MapReduce значительно упрощается. Задание MapReduce, которое мы создали в этом сообщении в блоге, способно разместить любую модель машинного обучения, которая использует интерфейс MLModel, что делает код многоразовым. И снова интерфейс MLModel позволил нам абстрагироваться от сложности построения модели машинного обучения от сложностей развертывания модели машинного обучения.

Одним из недостатков реализации является тот факт, что она принимает только файлы в кодировке LDJSON в качестве входных данных для задания. Это сделано для простоты, поскольку наличие имен полей вместе с данными упрощает понимание кода. Улучшение кода могло бы состоять в том, чтобы включить другие протоколы, чтобы мы могли использовать другие типы файлов с заданием. Более того, было бы легко сделать выбор протоколов ввода и вывода параметром командной строки, который можно было бы выбрать во время выполнения.