Использование механизмов многопроцессорного параллелизма в задачах Celery

Я пытаюсь взаимодействовать с устройством, которое может принимать только одно TCP-соединение (ограничения памяти), поэтому просто запуск соединения для каждого рабочего потока не вариант, как в обычной ситуации клиент-сервер, такой как соединение с базой данных.

Я попытался использовать словарь Multiprocessing Manager, который глобально доступен между потоками, в формате:

clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}

И такая задача:

from celery import shared_task
from .celery import manager, clients

@shared_task
def send_command(controller, commandname, args):
    """Send a command to the controller."""
    # Create client connection if one does not exist.
    conn = None
    addr, port = controller
    if controller not in clients:
        conn = Client(addr, port)
        conn.connect()
        lock = manager.RLock()
        clients[controller] = (conn, lock,)
        print("New controller connection to %s:%s" % (addr, port,))
    else:
        conn, lock = clients[controller]

    try:
        f = getattr(conn, commandname) # See if connection.commandname() exists.
    except Exception:
        raise Exception("command: %s not known." % (commandname))

    with lock:
        res = f(*args)
        return res

Однако задача завершится ошибкой сериализации, например:

_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

Несмотря на то, что я не вызываю задачу с несериализуемым значением, и задача не пытается вернуть несериализуемое значение, кажется, Celery одержим попытками сериализовать этот глобальный объект?

Что мне не хватает? Как бы вы сделали соединения клиентских устройств, используемые в задачах Celery, потокобезопасными и доступными между потоками? Пример кода?


person Daniel Devine    schedule 24.03.2016    source источник
comment
Я не уверен, что это сработает в вашей ситуации, но я только что вспомнил, что читал о multiprocessing.reduction, который должен разрешить совместное использование соединений сокетов между процессами. см. пример в этом блоге.   -  person antikantian    schedule 24.03.2016
comment
Клиент не работает с необработанным сокетом, это объект соединения Twisted, у которого есть протокол. Использование необработанного сокета или воссоздание объекта соединения Twisted из fd нетривиально.   -  person Daniel Devine    schedule 24.03.2016
comment
В итоге я придумал, как обернуть протокол Twisted вокруг существующего сокета, однако в моем случае это не сработало, потому что потребители Celery как отдельные дочерние процессы основного рабочего процесса не могли получить доступ к необходимым файловым дескрипторам. (хранится в Redis), а создание путаницы каналов unix для совместного использования FD — это слишком много хакерства. Проблема с моей ситуацией заключается в том, что устройство ограничено в памяти и просто не может иметь несколько подключений... Поэтому я решил просто иметь пул рабочих с одним потребителем и одним устройством на каждом. Не хорошо!   -  person Daniel Devine    schedule 30.03.2016


Ответы (3)


Как насчет реализации диспетчера распределенных блокировок с помощью Redis? Клиент Python Redis имеет встроенную функцию блокировки. Также см. этот документ на redis.io. Даже если вы используете RabbitMQ или другого брокера, Redis очень легковесен.

Например, как декоратор:

from functools import wraps

def device_lock(block=True):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return_value = None
            have_lock = False
            lock = redisconn.lock('locks.device', timeout=2, sleep=0.01)
            try:
                have_lock = lock.acquire(blocking=block)
                if have_lock:
                    return_value = func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()
            return return_value
        return wrapper
    return decorator

@shared_task
@device_lock
def send_command(controller, commandname, args):
    """Send a command to the controller."""
    ...

Вы также можете использовать этот подход из кулинарной книги задач Celery:

from celery import task
from celery.utils.log import get_task_logger
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed

logger = get_task_logger(__name__)

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes

@task(bind=True)
def import_feed(self, feed_url):
    # The cache key consists of the task name and the MD5 digest
    # of the feed URL.
    feed_url_hexdigest = md5(feed_url).hexdigest()
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)

    # cache.add fails if the key already exists
    acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    logger.debug('Importing feed: %s', feed_url)
    if acquire_lock():
        try:
            feed = Feed.objects.import_feed(feed_url)
        finally:
            release_lock()
        return feed.url

    logger.debug(
        'Feed %s is already being imported by another worker', feed_url)
person antikantian    schedule 24.03.2016
comment
Я знал об этих решениях, однако причина, по которой я его не использовал, заключается в том, что он не делает то, что я хотел, а именно просто разделяет фактический объект соединения между процессами и использует уже открытое соединение. Я пытаюсь избежать отключения и повторного подключения при каждом запуске задачи. Я могу сохранить объект соединения как глобальный, если запущу рабочего с одним потоком и повторно использую его. Я рассматриваю возможность использования пула однопроцессорных рабочих для этих клиентов. В противном случае, если я решу просто подключаться каждый раз, когда отправляю сообщение, я буду выполнять блокировку с помощью Redis. Среди прочих решений... - person Daniel Devine; 24.03.2016

 ...
self._send_bytes(ForkingPickler.dumps(obj))
 File "/usr/lib64/python3.4/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

Покопавшись в Интернете, я понял, что, вероятно, пропустил что-то важное в трассировке. Посмотрев на трассировку поближе, я понял, что это не Celery пытается замариновать объект подключения, а скорее Multiprocessing.reduction. Редукция используется для сериализации с одной стороны и восстановления с другой.

У меня есть несколько альтернативных подходов к решению этой проблемы, однако ни один из них на самом деле не делает то, что я изначально хотел, а именно просто заимствовать объект подключения клиентской библиотеки и использовать его, что просто невозможно с многопроцессорностью и предварительным ответвлением.

person Daniel Devine    schedule 24.03.2016
comment
Ах, я думаю, мой ответ был немного поспешным, так как вы хотите передать один и тот же объект соединения. Mutliprocessing и prefork обычно плохо работают с соединениями и вводом-выводом между процессами. Обычно вы хотите установить соединение после разветвления. Рассматривали ли вы переход с prefork на eventlet или gevent для параллелизма, а затем реализовать пул соединений? - person antikantian; 24.03.2016

Вы пытались использовать gevent или eventlet celery worker вместо процессов и потоков? В этом случае вы сможете использовать глобальную переменную или threading.local() для совместного использования объекта соединения.

person kirax    schedule 26.03.2016
comment
Я получал блокировки с помощью eventlet. Я мог бы приложить больше усилий, чтобы выяснить, почему, но у меня мало стимулов, потому что блокирующий характер ввода-вывода того, что я пытаюсь сделать, не подходит для природы цикла событий eventlet/gevent. - person Daniel Devine; 29.03.2016