Параллельная пакетная обработка в Python

Обработка пакетами с использованием joblib и отображение прогресса с помощью tqdm

Joblib — отличный инструмент для распараллеливания, но иногда лучше обрабатывать рабочую нагрузку пакетами, а не итеративным способом по умолчанию. В этой статье я покажу:

  1. Стандартный способ распараллеливания с помощью joblib и tqdm
  2. Почему и когда не работает
  3. Параллельное использование пакетов
  4. Заставьте прогресс снова работать

Весь код доступен на Github. Не стесняйтесь связаться со мной, если у вас есть какие-либо вопросы.

Код также доступен в виде пакета Pypi: pip install tqdm_batch

1) Прямой метод распараллеливания с использованием joblib

В 2021 году почти каждый процессор, который мы покупаем, будет многоядерным. Мой текущий ноутбук (Dell XPS) оснащен процессором Intel i7 с 6 ядрами и технологией Hyper-Threading, что дает в вашем распоряжении 12 ядер. Даже современные мобильные телефоны имеют несколько процессоров и обладают огромной вычислительной мощностью. Ядра в этих архитектурах ЦП могут быть идентичными, т. Е. Каждое ядро ​​​​имеет одинаковую вычислительную мощность или иметь индивидуальные характеристики. Примером последнего является архитектура ARM big.LITTLE, в которой высокопроизводительные ядра сочетаются с ядрами с низким энергопотреблением.

При работе с большими наборами данных мы все были там: не можем ли мы использовать все наши ядра и ускорить обработку? Конечно, да, но многопроцессорность вовсе не тривиальна.

Мы все были там: не можем ли мы использовать все наши ядра и ускорить обработку?

Python имеет встроенную многопроцессорность и многопоточность с модулем многопроцессорность и поточность. Это все, что вам нужно, чтобы начать работу как с многопроцессорной обработкой, так и с многопоточностью, но для этого потребуется некоторый шаблонный код. Это может быть сложно, особенно с точки зрения науки о данных, когда у нас есть этот цикл последовательной обработки, и мы просто хотим запустить его параллельно.

Чтобы упростить нашу многоядерную обработку, мы можем использовать фантастические библиотеки, такие как joblib или pathos. На веб-сайте joblib автор говорит, что циклы поразительно параллельны, и они, безусловно, правильны, поскольку начать работу очень легко. Давайте создадим пример:

Это просто фиктивная функция, которая аппроксимирует Пи, но с порядком 6, и для ее вычисления требуется около 200 мс. Есть еще две переменные. Первая переменная — это строка, которая будет единственным обрабатываемым элементом. Это может быть имя файла, который метод должен открыть и загрузить, или строка, которую нам нужно нормализовать. В этой фиктивной функции мы просто игнорируем ее. Полезная нагрузка будет использоваться для отображения ограничений, и ее также можно пока игнорировать. Давайте получим некоторые данные времени выполнения для этой функции:

Последовательный запуск, то есть просто цикл for, обрабатывающий каждую строку, занимает немногим менее 100 секунд. В этом примере мы можем ясно увидеть красоту joblib. Нам не нужно было ничего менять, чтобы обрабатывать это параллельно, и результат почти в пять раз быстрее при использовании 8 ядер. Как правило, это прекрасно работает, но мы должны помнить об ограничениях.

2) Ограничения и почему это не всегда работает

Чтобы понять ограничения, нам нужно немного объяснить разницу между многопроцессорностью и многопоточностью и то, как все это связано с глобальной блокировкой интерпретатора Python (GIL). Когда вы пытаетесь использовать несколько ядер, вы, вероятно, уже слышали о GIL. Есть много замечательных статей о том, что такое GIL, но вкратце это блокировка, которая предотвращает одновременный доступ нескольких процессов к интерпретатору Python. Это защищает от странных проблем, таких как удаление переменной одним процессом, в то время как другой хочет прочитать ее в то же время. Большим недостатком GIL является то, что если у вас есть тяжелая задача Python (много кода Python) и вы пытаетесь запустить ее параллельно, каждая задача должна ждать своей очереди для интерпретатора. Они ограничены GIL.

Многопроцессорная обработка — это один из способов параллельного выполнения задач, при котором создается изолированный процесс с собственным интерпретатором Python. Многопроцессорная задача не имеет проблемы GIL. Однако, поскольку мы запускаем изолированный процесс, задача не имеет доступа к той же памяти, что и основная программа, и все необходимые данные должны быть отправлены (сериализованы) в этот процесс. Сама сериализация выполняется с использованием встроенного в Python Pickle (или Dill), который делает замороженную копию памяти всех необходимых объектов. Некоторые объекты не могут быть сериализованы, и вы получите сообщение об ошибке. Например, ссылка на UI из процесса 1 не может быть передана процессу 2 (это тоже не имеет смысла). Эта сериализация, то есть замораживание состояния объектов в байтах для отправки в процесс, является причиной накладных расходов (дополнительная память и ввод-вывод). Например, когда каждому процессу нужен словарь для некоторых переводов, многопроцессорная библиотека создаст копию этого словаря для каждого процесса. Если этот словарь большой, то нередко накладные расходы занимают больше времени, чем последовательная обработка.

Нередко эти накладные расходы занимают больше времени, чем последовательная обработка.

Еще один метод параллельного выполнения задач — многопоточность. Большим преимуществом является то, что он разделяет память с основной программой и не требует сериализации, следовательно, гораздо меньше накладных расходов. Но цена в том, что вам придется работать с GIL. Это не должно быть проблемой, если задача использует множество скомпилированных библиотек, таких как Numpy, которые не требуют GIL.

По умолчанию joblib использует многопоточность, но ее можно настроить на многопоточность с помощью параметра prefer='threads'. Вы можете поиграть с этим параметром, и вы можете получить преимущество, если задачи не являются тяжелыми для Python. Как правило, многопроцессорная обработка по умолчанию является очень разумным значением по умолчанию.

Когда многопроцессорные задачи требуют больших объемов данных для обработки, например. большая модель для некоторой классификации, эта модель должна быть сериализована для каждой задачи, что создает большие накладные расходы. Исходный пример joblib запускает новый процесс для каждой строки в элементах, это означает, что он должен сериализовать все для каждой итерации, что далеко не идеально:

Накладные расходы настолько велики, что время выполнения в четыре раза больше, чем при выполнении их по одному. Означает ли это, что мы не можем распараллелить такие рабочие нагрузки? Конечно, нет! Используя подход пакетного процесса: сначала разделите рабочую нагрузку на пакеты одинакового размера и позвольте каждому процессу работать с ними.

3) Распараллелить с помощью пакетов

Мы видели, что для многопроцессорной обработки из-за сериализации больших требований к данным для каждой задачи накладные расходы могут быть огромными. Решение простое: уменьшить количество сериализаций. Вместо сериализации для каждого элемента мы создадим дополнительную функцию-оболочку, которая работает с пакетом внутри процесса. Это приводит к сериализации данных только один раз для каждого процесса.

Это дает нам такую ​​же экономию, как и наш последовательный подход без больших требований к данным. Волшебная функция %%time также показывает нам кое-что интересное: мы тратим на этот процесс всего 1,33 секунды. Остальные 23 секунды тратятся на другие процессы, к которым %%time не может получить доступ.

Еще одна вещь, которую вы могли заметить, это то, что индикатор выполнения больше не работает. Tqdm связан с пакетами и отправляет все пакеты и должен ждать завершения процессов. Мы не видим на уровне элемента, сколько уже обработано. И поскольку у нас нет доступа к памяти, не так просто показать один (или несколько) индикаторов выполнения.

4) Давайте исправим индикатор выполнения

Создание индикатора выполнения при многопроцессорной обработке — непростая задача. При использовании последовательного подхода мы можем оставить tqdm в основном прогрессе (коллективном прогрессе). Он видит каждый элемент, который передается процессу. Подключение индикатора выполнения через сериализацию невозможно, так как это потребует от процесса записи в часть основной памяти программы. Это по определению не разрешено и является причиной того, что вы получаете ошибку сериализации, когда пытаетесь добавить объект progress_bar в параметры функции.

Чтобы решить эту проблему, мы можем использовать очередь, своего рода список, который позволяет процессам взаимодействовать друг с другом. Чтобы сделать это еще более простым, давайте создадим полную оболочку, чтобы мы могли предоставить список элементов и функций, при этом наша оболочка разделит работу между всеми процессами.

Этот код также доступен на Github и в виде пакета на PyPi.

Используя этот метод, мы создали встраиваемую замену, для которой вам нужно всего лишь написать метод, обрабатывающий одну строку. batch_process перенесет эту функцию в пакетный процессор, который также обновляет индикатор выполнения. Все параметры функции могут быть добавлены с помощью аргументов, поскольку они передаются функции обработки, включая необходимые большие данные и модели. Результат аналогичен исходному запуску с небольшими накладными расходами. tqdm_batch добавлен в pypi, так что вы можете сделать pip install tqdm_batch и попробовать сами.

Количество работников также не соответствует вашим ожиданиям, поскольку они не масштабируются линейно. В определенный момент добавление дополнительных рабочих не дает никакой выгоды. Я также заметил, что мои ядра с гиперпоточностью также не дают никакого прироста производительности. Вот обзор требуемого времени вычислений по сравнению с количеством рабочих:

Округлять

Надеюсь, вам было интересно поиграть с многопроцессорной обработкой и вы поняли, почему она работает и каковы ограничения. Вы можете использовать batch_process в качестве замены для joblib и tqdm, чтобы выполнять обработку пакетами без изменения исходного кода.

Определенно есть некоторые улучшения, которые можно было бы сделать. Прежде всего, я искал расширение для joblib, которого не существовало. Оглядываясь назад, я мог бы полностью отказаться от зависимости joblib и создать все на ванильном Python. Кроме того, индикатор выполнения не должен быть в отдельном потоке. Возможно, я изменю это в будущей версии (или я принимаю ваш запрос на включение (-:).

Это подводит меня к концу этой статьи. Пожалуйста, дайте мне знать, если у вас есть какие-либо комментарии! Смело подключайтесь на LinkedIn.