使用 ZeroMQ 在分布式系统中发送消息
使用 ZeroMQ 在分布式系统中发送消息
让我们使用 Python 来开发不同的消息传递模式。
您需要观看以下视频才能按照分步命令进行操作。
慢慢来;确保在运行命令之前仔细检查它们。
- 以下视频演示了本教程中使用的命令。
我在我的 GCP 虚拟机上运行本教程,但也可以在本地运行它 ✅
本教程使用 ZeroMQ 介绍 Python3 中套接字的概念。 ZeroMQ 是一种开发套接字的简单方法,允许分布式进程通过发送消息相互通信。
- 最简单的形式是,一个套接字(节点)“监听”特定的 IP 端口,同时另一个套接字伸出来形成连接。使用套接字,我们可以拥有一对一、一对多和多对多连接模式。
我们今天将研究的消息传递模式如下:
- 配对: 排他性、一对一的通信,两个同伴相互通信。通信是双向的,套接字中没有存储特定的状态。服务器监听某个端口,客户端连接到该端口。
- 客户端 – 服务器:客户端连接到一台或多台服务器。该模式允许请求-响应模式。客户端发送请求“zmq.REQ”并接收回复。
- 发布/订阅: 一种传统的通信模式,消息的发送者(称为发布者)将消息发送到特定的接收者(称为订阅者)。消息的发布无需知道该知识的订阅者是什么或是否存在。多个订阅者订阅由发布者发布的消息/主题,或者一个订阅者可以连接到多个发布者。
- 推拉套接字(又名管道):让您将消息分发给排列在管道中的多个工作人员。 Push 套接字会将发送的消息均匀分发到其 Pull 客户端。这相当于生产者/消费者模型,但是消费者计算的结果不会发送到上游,而是下游到另一个拉取/消费者套接字。
? 注意: 使用套接字可能会很棘手,使用相同的端口号/相同的套接字一次又一次运行相同的代码,可能会导致连接“挂起”(服务器看起来像是正在运行,但它不能接受连接)。发生这种情况是因为我们没有正确关闭和销毁之前的连接。
解决这个问题最合适的方法是关闭套接字并销毁 ZeroMQ 上下文。有关更多详细信息,请参阅第 2 阶段和第 3 阶段的 try – catch 块。
在本教程中,您可能会遇到此类问题,例如,在同一端口中多次运行同一服务器。如果您遇到挂起问题,建议您终止 Python 进程,清理 TCP 端口号,然后再次运行服务器(请参阅步骤 11)。
第 1 阶段:将服务器与客户端配对
让我们首先创建一个新的虚拟机,然后我们将安装Python3。
- 保留虚拟机内部 IP 的副本,在本教程中我们将使用内部 IP 地址。
- 打开一个新的终端连接并运行以下命令(一个接一个)。最后一个命令安装 ZeroMQ。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
出现提示时输入:Y。
如今许多应用程序都包含跨网络的组件,因此消息传递至关重要。今天我们将使用 TCP 进行消息传输。
您可以使用 VSC 访问您的虚拟机,也可以使用 SSH 运行命令并使用 pico 编辑文件,在我的例子中,我将使用 SSH。
?确保仔细复制代码。
我们需要创建第一个ZeroMQ 服务器,该服务器一次只允许与一个客户端绑定。
创建一个名为pair-server.py的新文件,然后输入以下代码。
代码使用 zmq.PAIR 模式创建一个新套接字,然后将服务器绑定到特定的 IP 端口(我们已经在 GCP 中打开)。请注意,在我们停止服务器之前,服务器不会停止运行。
查看评论以了解其工作原理。
确保更改 ;这是 GCP 虚拟机的 内部 IP 地址;客户端端口应与服务器端口相同。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
先不要运行服务器,首先让我们创建客户端。
创建客户端并花一点时间检查评论。我将其命名为pair-client.py。
确保更改 ;端口应与服务器中的端口相同。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
我们需要两个个终端窗口来运行PAIR示例。我们将在一个窗口上运行服务器,在另一个窗口上运行客户端。现在,按如下方式运行它。
- 运行服务器
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
- 运行客户端
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
检查输出,我们刚刚创建了一个新的 PAIR 套接字。
- 当客户端完成连接时,脚本将终止。然后停止服务器(ctrl c)并杀死它。
在再次运行之前,我们需要清除 TCP 连接。为此,请使用以下命令。
$ python3 pair-server.py
?备注:
我们一次只能运行一个PAIR,这意味着我们不能有多个客户端,记住这是一个PAIR,第一个客户端将锁定套接字.
如果我们运行服务器一次,客户端运行两次,第二个客户端将“挂起”,这意味着第二个客户端将等待新服务器连接。
如果我们想要多次运行该对,我们需要终止服务器并清除 TCP 连接。
PAIR 当客户端需要独占访问服务器时是理想的选择。
我们可以将多个服务器作为一对连接到多个客户端,但我们需要使用不同的端口号进行连接。
每个阶段都是相互独立的,因此,停止服务器,清除 TCP 端口,然后进入下一阶段。
第 2 阶段:将服务器与多个客户端配对
让我们创建一个客户端-服务器连接,其中多个客户端将连接到单个服务器。这是最流行的消息传递模式。
- 让我们在 REP-REQ(回复请求)模式的上下文中创建一个服务器。
- 我们将调用服务器rep-server.py,使用端口5555。
$ python3 pair-client.py
现在我们将开发两个功能相同的客户端。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
让我们创建该客户端的副本并进行相应的编辑。运行以下命令来制作新副本。
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
然后编辑req-client2.py并将客户端1更改为客户端2。
让我们编辑打印和套接字消息(第 8 行和第 9 行)
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
要运行此示例,我们需要三个 个终端窗口,一个用于服务器,两个用于客户端。在第一个终端中运行以下命令。
- 让我们启动服务器
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
- 让我们启动第一个客户端
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
- 让我们启动第二个客户端
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
检查窗口的输出,我们刚刚创建了两个与一台服务器通信的客户端。您可以拥有任意数量的客户端,您将需要创建客户端,即使具有连接到一台服务器的不同功能。
? 备注:
客户端-服务器是最广泛使用的模式,当我们安装和运行 Apache HTTP 服务器时,我们已经在第 1 类中使用了它。
停止服务器并清理 TCP 端口 5555
- 杀死服务器:
重击
$ sudo fusion -k 5555/tcp
第 3 阶段:将服务器与客户端配对
发布-订阅模式是一种非常常见的方法,用于控制向订阅上下文的许多客户端广播数据,服务器将数据发送到一个或多个客户端。
$ python3 pair-server.py
让我们首先创建一个简单的示例。
$ python3 pair-client.py
让我们创建一个新文件,命名为 pub_server.py。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
- 该命令将指示 python 以特定的方式运行服务器
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
创建一个新文件 pub_client.py。
* 该脚本接受来自命令行的三个参数(即 IP 和两个端口)。
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
我们已准备好运行我们的pub-sub应用程序!我们需要三个个终端窗口。在第一个终端中运行:
$ cp req-client1.py req-client2.py
- 在第二个终端中运行:
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 2 ", request,"...") socket.send_string("Hello from client 2") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
- 每个服务器都会生成天气数据。例如:
- 邮政编码,例如:10001
- 温带,例如:-68
让我们运行客户端以通过邮政编码连接并订阅数据,例如 10001 (NYC)。请记住,客户端脚本订阅了两个服务器实例。运行下一个命令:
$ python3 rep-server.py
- 完成杀死服务器(ctrl z)并清除 TCP 端口后,运行以下命令:
$ python3 req-client1.py
$ python3 req-client2.py
第 4 阶段:推/拉:使用管道模式**
推/拉套接字可让您将消息分发给排列在管道中的多个工作人员。这对于并行运行代码非常有用。 Push 套接字会将消息均匀分发到其 Pull 客户端,客户端将响应发送到另一个称为收集器的服务器。
这相当于生产者/消费者模型,但是消费者计算的结果不会发送到上游,而是下游到另一个拉取/消费者套接字。
我们将实现以下功能。
生产者将向消费者推送 0 到 10 的随机数。
同一消费者的两个实例将拉取数字并执行繁重的任务。
任务可以是任何繁重的计算,例如矩阵乘法。
为了简单起见,我们的“繁重任务”将只返回相同的数字。
消费者会将各个结果(繁重的任务计算)推送到结果收集器,该收集器将汇总结果。
为了简单起见,结果收集器的实例将拉取结果并计算每个消费者的部分总和。如果需要,我们可以轻松地将两个部分和相加。
-
让我们看一个简单的例子。
- 生产者生成 [1,2,3,4,5]。
- 消费者1接收到[2,4],然后计算一个繁重的任务并将结果转发给结果收集器。
- 消费者2收到[1,3,5],然后计算一个繁重的任务,并将结果转发给结果收集器。
- 结果收集器计算计数和部分总和,例如:
- Consumer1[2,4],这意味着从 Consumer1 收到 2 个数字,它们的总和为 6。
- Consumer2[1,3,5],表示从该 Consumer2 收到 3 个数字,其总和为 9。
此示例演示了分布式处理并行处理的潜力。
首先,让我们创建在端口 5555 上运行的名为 Producer.py 的生产者,确保您调整了您的 .
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
然后创建consumer.py如下。不要忘记更改代码中的两个 s。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
最后,让我们开发collector.py,再次更改.
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
确保没有缩进错误!
$ python3 pair-server.py
首先,我们需要运行collector.py,收集器将等待数据被收集,直到我们启动生产者。
$ python3 pair-client.py
- 然后,我们将一一启动消费者,在不同的终端窗口中运行每个命令。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
- 在另一个终端中运行相同的命令。
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
- 最后,我们将启动生产者,开始将数据发送到我们的管道。
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
干得好! ?您使用 ZeroMQ 来开发消息传递模式!
以上是使用 ZeroMQ 在分布式系统中发送消息的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。 Python以简洁和强大的生态系统着称,C 则以高性能和底层控制能力闻名。

Python在游戏和GUI开发中表现出色。1)游戏开发使用Pygame,提供绘图、音频等功能,适合创建2D游戏。2)GUI开发可选择Tkinter或PyQt,Tkinter简单易用,PyQt功能丰富,适合专业开发。

2小时内可以学会Python的基本编程概念和技能。1.学习变量和数据类型,2.掌握控制流(条件语句和循环),3.理解函数的定义和使用,4.通过简单示例和代码片段快速上手Python编程。

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

两小时内可以学到Python的基础知识。1.学习变量和数据类型,2.掌握控制结构如if语句和循环,3.了解函数的定义和使用。这些将帮助你开始编写简单的Python程序。

要在有限的时间内最大化学习Python的效率,可以使用Python的datetime、time和schedule模块。1.datetime模块用于记录和规划学习时间。2.time模块帮助设置学习和休息时间。3.schedule模块自动化安排每周学习任务。

Python在自动化、脚本编写和任务管理中表现出色。1)自动化:通过标准库如os、shutil实现文件备份。2)脚本编写:使用psutil库监控系统资源。3)任务管理:利用schedule库调度任务。Python的易用性和丰富库支持使其在这些领域中成为首选工具。

Python在web开发、数据科学、机器学习、自动化和脚本编写等领域有广泛应用。1)在web开发中,Django和Flask框架简化了开发过程。2)数据科学和机器学习领域,NumPy、Pandas、Scikit-learn和TensorFlow库提供了强大支持。3)自动化和脚本编写方面,Python适用于自动化测试和系统管理等任务。
