#!/usr/bin/env pythonimportpikaimportsysimportjsonfromconfigimportRMQ_HOST,RMQ_PORT,RMQ_USER,RMQ_PASSWORDQUEUE_NAME='add_task_queue'params=pika.ConnectionParameters(host=RMQ_HOST,port=RMQ_PORT,credentials=pika.credentials.PlainCredentials(RMQ_USER,RMQ_PASSWORD))connection=pika.BlockingConnection(params)channel=connection.channel()channel.queue_declare(queue=QUEUE_NAME,durable=True)defqueue_put(func_args):#func_args:函数参数listmessage=json.dumps(func_args)returnchannel.basic_publish(exchange='',routing_key=QUEUE_NAME,body=message,properties=pika.BasicProperties(delivery_mode=2,# make message persistent))if__name__=='__main__':queue_put([1,2])connection.close()
#!/usr/bin/env pythonimportpikaimporttimeimportjsonfromconfigimportRMQ_HOST,RMQ_PORT,RMQ_USER,RMQ_PASSWORDQUEUE_NAME='add_task_queue'params=pika.ConnectionParameters(host=RMQ_HOST,port=RMQ_PORT,credentials=pika.credentials.PlainCredentials(RMQ_USER,RMQ_PASSWORD))connection=pika.BlockingConnection(params)channel=connection.channel()channel.queue_declare(queue=QUEUE_NAME,durable=True)print(' [*] Waiting for messages. To exit press CTRL+C')defadd(a,b):returna+bdefsave_result(args,result):print("save_result for",args,":",result)pass# using mysql or whatever storage...defcallback(ch,method,properties,body):print(" [x] Received %r"%body)args=json.loads(body.decode())result=add(*args)save_result(args,result)print(" [x] Done")ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(callback,queue=QUEUE_NAME)channel.start_consuming()