Table of Contents
HelloWorld
Introduction
code
Work queue (task queue)
Message ack
Message persistence
Fair message distribution
Group sending
exchange
Temporary queue
Bind exchange and queue
Use the newly created logs exchange
Routing
Use direct exchange
Use topic exchange
 RPC
Home Backend Development Python Tutorial RabbitMQ quick start python tutorial

RabbitMQ quick start python tutorial

Mar 09, 2017 am 09:28 AM
pika python rabbitmq

HelloWorld

Introduction

RabbitMQ: Accepting messages and then delivering them can be regarded as a "post office". Senders and receivers interact through queues. The size of the queue can be considered unlimited. Multiple senders can send messages to a queue, and multiple receivers can also receive messages from a queue.

code

The protocol used by rabbitmq is amqp, and the recommended client for python is pika

pip install pika -i https://pypi.douban.com/simple/
Copy after login

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel
Copy after login

The link here is For this machine, if you want to connect to a server on another machine, just fill in the address or host name.

Next we start sending messages. Be careful to ensure that the queue that accepts messages exists, otherwise rabbitmq will discard the message

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush
Copy after login

RabbitMQ requires 1GB of free disk space by default, otherwise it will be sent will fail.

At this time, a message has been stored in the local queue hello. If you use rabbitmqctl list_queues, you can see that

hello 1
Copy after login

indicates that there is a message stored in the hello queue.

receive .py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
Copy after login

Still connect to the server first, the same as when sending before

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环
Copy after login

Work queue (task queue)

The work queue is used to distribute time-consuming tasks to multiple of a work process. Instead of doing resource-consuming tasks immediately (you need to wait for these tasks to complete), schedule these tasks for later execution. For example, we send the task as a message to the queue, start a worker process to accept and finally execute it, and can start multiple worker processes to work. This applies to web applications where complex tasks should not be completed within the processing window of an http request.

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))
Copy after login

The way to allocate messages is polling, that is, each worker process gets the same number of messages.

Message ack

If a message is assigned to a worker process, but the worker process crashes before processing is completed, the message may be lost, because once rabbitmq distributes a message to the worker process, it deletes the message.

In order to prevent message loss, rabbitmq provides ack, that is, after the worker process receives the message and processes it, it sends ack to rabbitmq to inform rabbitmq that the message can be deleted from the queue at this time. If the worker process dies and rabbitmq does not receive the ack, the message will be redistributed to other worker processes. There is no need to set a timeout, even if the task takes a long time it can be processed.

ack is enabled by default. Previously, our worker process specified no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack
Copy after login

Callback with ack:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
Copy after login

Message persistence

However, sometimes RabbitMQ is restarted and messages are lost. Persistence can be set when creating the queue:
(The nature of the queue cannot be changed once determined)

channel.queue_declare(queue='task_queue', durable=True)
Copy after login

At the same time, the persistence attribute of the message must also be set when sending the message:

channel .basic_publish(exchange='',

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))
Copy after login

However, if RabbitMQ has just received a message and has not had time to store it, the message will still be lost. At the same time, RabbitMQ does not save every message it receives. If you need a more complete guarantee, you need to use publisher confirm

Fair message distribution

Polling mode message distribution may not be fair, for example, if odd numbers of messages are heavy tasks, Some processes will always run heavy tasks. Even if there is a backlog of messages on a worker process that has not been processed, for example, many acks have not been sent, RabbitMQ will still send messages to it in order. You can add settings in the receiving process:

channel.basic_qos(prefetch_count=1)
Copy after login

Inform RabbitMQ so that if a worker process does not send back an ack, no more messages will be allocated to it

Group sending

Under normal circumstances, a message is sent. Give a worker process and then complete it. Sometimes you want to send a message to multiple processes at the same time:

exchange

Does the sender directly send the message to the queue? In fact, the sender does not send the message directly to the queue. The sender can only send messages to the exchange without knowing which queue the message will be sent to. On the one hand, the exchange receives messages from the producer, and on the other hand, it pushes them to the queue. Therefore, as an exchange, it needs to know when the message is received. What does it need to do, should it be added to a special queue, put into many queues, or discarded. Exchange has direct, topic, headers, fanout and other types, and the one used for mass sending is fanout before publishing. When sending a message, the value of exchange is '', which means using default exchange.

channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
Copy after login

Temporary queue

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue
Copy after login

In this way, result.method.queue is the queue name, which can be used when sending or receiving.

Bind exchange and queue

channel.queue_bind(exchange='logs',
               queue='hello')
Copy after login

Send a copy of logs to hello when sending a message

Use the newly created logs exchange

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)
Copy after login
.

Routing

Bind has been used before, that is, the relationship between exchange and queue is established (the queue is interested in messages from the exchange). You can also specify the routing_key option when binding.

Use direct exchange

Send the message corresponding to the routing key to the queue bound to the same routing key

channel.exchange_declare(exchange='direct_logs',
                     type='direct')
Copy after login

Send function to publish messages of different severity:

channel.basic_publish(exchange='direct_logs',
                  routing_key=severity,
                  body=message)
Copy after login

Accept function Binding corresponding to severity:

channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key=severity)
Copy after login

Use topic exchange

The direct exchange used before can only bind one routing key, you can use this topic exchange that can separate the routing keys. ,For example:

"stock.usd.nyse" "nyse.vmw"
Copy after login

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词
Copy after login

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的
Copy after login

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)
Copy after login

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)
Copy after login

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
Copy after login

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body
Copy after login

                                               

The above is the detailed content of RabbitMQ quick start python tutorial. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

PHP and Python: Different Paradigms Explained PHP and Python: Different Paradigms Explained Apr 18, 2025 am 12:26 AM

PHP is mainly procedural programming, but also supports object-oriented programming (OOP); Python supports a variety of paradigms, including OOP, functional and procedural programming. PHP is suitable for web development, and Python is suitable for a variety of applications such as data analysis and machine learning.

Choosing Between PHP and Python: A Guide Choosing Between PHP and Python: A Guide Apr 18, 2025 am 12:24 AM

PHP is suitable for web development and rapid prototyping, and Python is suitable for data science and machine learning. 1.PHP is used for dynamic web development, with simple syntax and suitable for rapid development. 2. Python has concise syntax, is suitable for multiple fields, and has a strong library ecosystem.

PHP and Python: A Deep Dive into Their History PHP and Python: A Deep Dive into Their History Apr 18, 2025 am 12:25 AM

PHP originated in 1994 and was developed by RasmusLerdorf. It was originally used to track website visitors and gradually evolved into a server-side scripting language and was widely used in web development. Python was developed by Guidovan Rossum in the late 1980s and was first released in 1991. It emphasizes code readability and simplicity, and is suitable for scientific computing, data analysis and other fields.

Python vs. JavaScript: The Learning Curve and Ease of Use Python vs. JavaScript: The Learning Curve and Ease of Use Apr 16, 2025 am 12:12 AM

Python is more suitable for beginners, with a smooth learning curve and concise syntax; JavaScript is suitable for front-end development, with a steep learning curve and flexible syntax. 1. Python syntax is intuitive and suitable for data science and back-end development. 2. JavaScript is flexible and widely used in front-end and server-side programming.

How to run sublime code python How to run sublime code python Apr 16, 2025 am 08:48 AM

To run Python code in Sublime Text, you need to install the Python plug-in first, then create a .py file and write the code, and finally press Ctrl B to run the code, and the output will be displayed in the console.

Can vs code run in Windows 8 Can vs code run in Windows 8 Apr 15, 2025 pm 07:24 PM

VS Code can run on Windows 8, but the experience may not be great. First make sure the system has been updated to the latest patch, then download the VS Code installation package that matches the system architecture and install it as prompted. After installation, be aware that some extensions may be incompatible with Windows 8 and need to look for alternative extensions or use newer Windows systems in a virtual machine. Install the necessary extensions to check whether they work properly. Although VS Code is feasible on Windows 8, it is recommended to upgrade to a newer Windows system for a better development experience and security.

Where to write code in vscode Where to write code in vscode Apr 15, 2025 pm 09:54 PM

Writing code in Visual Studio Code (VSCode) is simple and easy to use. Just install VSCode, create a project, select a language, create a file, write code, save and run it. The advantages of VSCode include cross-platform, free and open source, powerful features, rich extensions, and lightweight and fast.

Can visual studio code be used in python Can visual studio code be used in python Apr 15, 2025 pm 08:18 PM

VS Code can be used to write Python and provides many features that make it an ideal tool for developing Python applications. It allows users to: install Python extensions to get functions such as code completion, syntax highlighting, and debugging. Use the debugger to track code step by step, find and fix errors. Integrate Git for version control. Use code formatting tools to maintain code consistency. Use the Linting tool to spot potential problems ahead of time.

See all articles