最新消息: 生命不止,奋斗不息……

rabbitMQ消息系统

Python admin 728浏览 0评论

好久没有写文章了,最近项目需要,研究了下rabbitMQ的队列消息。小组也写了自己的队列通知服务。在此,记录一下学习心得。

producer->consumer

概念:

Kombu是一个AMQP(Advanced Message Queuing Protocol)消息框架。所谓框架,就是一个软件的半成品,是为了提高开发效率而开发的。

AMQP是一个协议,而RabbitMQ是对这个协议的一个实现。

Kombu和RabbitMQ的关系是什么呢?

我觉得就好像javaAPI和Structs/Hibernate这些框架的关系一样,Kombu对RabbitMQ提供的API进行了封装,使得 程序更加面向对象化,比如封装出了Exchange, Queue等这些类,使得对RabbitMQ的操作更加简单,同时,功能更加强悍。

下面来结合实例分析一下:

client.py,消息生产脚本,用来写入消息到队列中,用了kombu库。向rabbitMQ中写入消息。

  1 #!/usr/bin/env python
  2 from kombu.pools import producers
  3 from queues import task_exchange
  4 
  5 priority_to_routing_key = {'high':'hipri', 'mid':'midpri', 'low':'lopri'}
  6 
  7 def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
  8     payload = {'fun':fun, 'args':args, 'kwargs':kwargs}
  9     routing_key = priority_to_routing_key[priority]
 10 
 11     with producers[connection].acquire(block=True) as producer:
 12         producer.publish(payload,
 13                         serializer='pickle',
 14                         compression='bzip2',
 15                         exchange=task_exchange,
 16                         routing_key=routing_key)
 17 
 18 if __name__ == '__main__':
 19     from kombu import Connection
 20     from tasks import hello_task
 21     connection = Connection('amqp://admin:admin@localhost:5672//')
 22     send_as_task(connection, fun=hello_task, args=('World', ), kwargs={}, priority='high')

worker.py,消息消费程序。调用第三方接口,消费掉消息。在消费脚本中,添加了日志记录的功能。消息消费后,确认ACK。这里其实可以加上未消费掉或者回调接口挂掉后重新放入队列的机制(加上socket超时,message.requeue())

  1 #!/usr/bin/env python
  2 from kombu.mixins import ConsumerMixin
  3 from kombu.utils import kwdict, reprcall
  4 import logging
  5 from log import logger
  6 from queues import task_queues
  7 
  8 class Worker(ConsumerMixin):
  9 
 10     def __init__(self, connection):
 11         self.connection = connection
 12 
 13     def get_consumers(self, Consumer, channel):
 14         return [Consumer(queues = task_queues,
 15                         accept = ['pickle', 'json'],
 16                         callbacks = [self.process_task])]
 17 
 18     def process_task(self, body, message):
 19         fun = body['fun']
 20         args = body['args']
 21         kwargs = body['kwargs']
 22         info = {"fun":fun, "args": args, "kwargs": kwargs}
 23         logger.info(info)
 24         try:
 25             fun(*args, **kwdict(kwargs))
 26         except Exception as exc:
 27             logger.error('task raised exception:%r', exc)
 28         message.ack()
 29     
 30 
 31 if __name__ == '__main__':
 32     from kombu import Connection
 33     with Connection('amqp://admin:admin@localhost:5672//') as conn:
 34         try:
 35             worker = Worker(conn)
 36             worker.run()
 37         except KeyboardInterrupt:
 38             print('bye bye')

queues.py,队列和交换机配置。指定交换机名和消息发送机制。

  1 #!/usr/bin/env python
  2 from kombu import Exchange, Queue
  3 
  4 task_exchange = Exchange('tasks', type='direct')
  5 task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
  6                 Queue('midpri', task_exchange, routing_key='midpri'),
  7                 Queue('lopri', task_exchange, routing_key='lopri')]
  8

tasks.py,消费脚本需要调用的消费接口,可以为一个URL。

  1 #!/usr/bin/env python                       
  2 def hello_task(who="world"):
  3     print("Hello %s" % (who, ))

log.py,记录日志,在worker.py中有调用记录日志方法的代码。采用logging模块,文件记录的方式。如果有按天切割日志的需求,可以用TimedRotatingFileHandler

  1 #!/usr/bin/env python
  2 import logging
  3 
  4 #create a logger
  5 logger = logging.getLogger('mylogger')
  6 logger.setLevel(logging.DEBUG)
  7 
  8 #create a handler,use for writing log to file                                                                                                                      
  9 filehandler = logging.FileHandler('./log/worker.log')
 10 filehandler.setLevel(logging.DEBUG)
 11 
 12 #create another handler,user for writing log to terminal
 13 ch = logging.StreamHandler()
 14 ch.setLevel(logging.DEBUG)
 15 
 16 #define format
 17 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 18 filehandler.setFormatter(formatter)
 19 filehandler.setFormatter(formatter)
 20 
 21 #add handler for logger
 22 logger.addHandler(filehandler)
 23 logger.addHandler(ch)

这里只是很简单的几个脚本。可以加上Process,让日志,消息写入,消息消费各自有自己的进程。各个进程只负责自己单独的业务。加入supervisord服务,监控脚本,自动重启。另外,结合rabbitMQ 的UI management。可以在视图中看到详细的队列信息。包括速率,未消费的等等。非常方便。

转载请注明:IT世界 » rabbitMQ消息系统

您必须 登录 才能发表评论!