Я пытаюсь взаимодействовать с устройством, которое может принимать только одно TCP-соединение (ограничения памяти), поэтому просто запуск соединения для каждого рабочего потока не вариант, как в обычной ситуации клиент-сервер, такой как соединение с базой данных.
Я попытался использовать словарь Multiprocessing Manager, который глобально доступен между потоками, в формате:
clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}
И такая задача:
from celery import shared_task
from .celery import manager, clients
@shared_task
def send_command(controller, commandname, args):
"""Send a command to the controller."""
# Create client connection if one does not exist.
conn = None
addr, port = controller
if controller not in clients:
conn = Client(addr, port)
conn.connect()
lock = manager.RLock()
clients[controller] = (conn, lock,)
print("New controller connection to %s:%s" % (addr, port,))
else:
conn, lock = clients[controller]
try:
f = getattr(conn, commandname) # See if connection.commandname() exists.
except Exception:
raise Exception("command: %s not known." % (commandname))
with lock:
res = f(*args)
return res
Однако задача завершится ошибкой сериализации, например:
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
Несмотря на то, что я не вызываю задачу с несериализуемым значением, и задача не пытается вернуть несериализуемое значение, кажется, Celery одержим попытками сериализовать этот глобальный объект?
Что мне не хватает? Как бы вы сделали соединения клиентских устройств, используемые в задачах Celery, потокобезопасными и доступными между потоками? Пример кода?
multiprocessing.reduction
, который должен разрешить совместное использование соединений сокетов между процессами. см. пример в этом блоге. - person antikantian   schedule 24.03.2016