Pika и RabbitMQ — обработчик асинхронного класса

Я пытаюсь создать класс, который сможет создавать и возвращать объект, который можно вызывать в определенные моменты времени, и отправлять сообщение RabbitMQ.

Я использую Pika, и я пробовал этот код, но он не запустится, если я не вызову ioloop, но мне он не нужен, и ioloop застревает в остальной части моего кода, который называется этим классом.

есть идеи, как этого добиться?

import pika

импорт ConfigurationManager импорт json импорт gc

класс Отправитель сообщения:

def __init__(self,brokerUrl,queueName):
    credentials = pika.PlainCredentials('admin', 'admin')        
    self.brokerUrl = brokerUrl
    self.queueName = queueName        
    self.channel = None
    self.connectionParameters = pika.ConnectionParameters(host=self.brokerUrl, credentials=credentials)
    self.connection = pika.SelectConnection(self.connectionParameters, on_open_callback=self.onOpen, custom_ioloop=None)                        

def onOpen(self, connection):
    connection.channel(self.openChannel)

def openChannel(self, channel):
    channel.queue_declare(None, queue=self.queueName, durable=True) 
    self.channel = channel

def createMessageJson(self,commandType,searchId,senderChannel,data,runId=None, originalMessage=None):
    rtnDict = {}
    rtnDict["commandType"] = commandType
    rtnDict["searchId"] = searchId
    rtnDict["receipientChannel"]  = self.queueName
    rtnDict["senderChannel"] = senderChannel
    rtnDict["data"] = data
    if 'executionPlanSchema' in data:
        rtnDict['executionPlanSchema'] = data['executionPlanSchema']
    if runId != None:
        rtnDict["runId"] = runId
    if originalMessage != None:
        rtnDict["originalMessage"] = originalMessage
    return rtnDict

#def sendMessage(self,msg):
#    tornado.ioloop.IOLoop.current().run_sync(lambda: self.send(msg))

def sendMessageAsync(self,msg):
    msg = json.dumps(msg)
    self.channel.basic_publish(exchange='', routing_key=self.queueName, body=msg)    
    gc.collect()

def sendMessage(self,msg):
    msg = json.dumps(msg)
    self.channel.basic_publish(exchange='', routing_key=self.queueName, body=msg)
    gc.collect()

person Ran Landau    schedule 18.01.2018    source источник


Ответы (1)


Команда RabbitMQ следит за этим списком рассылки и лишь иногда отвечает на вопросы в StackOverflow. .


Вместо этого попробуйте использовать BlockingConnection.

person Luke Bakken    schedule 20.01.2018