PHP和RabbitMQ:高级示例
>
钥匙要点- >使用PHP和RabbitMQ在多名工人中不同步处理数据,从而提高了高交易环境中的效率。
- >在兔子中实现持久消息,以防止服务器崩溃期间的数据丢失,通过将消息“ evely_mode”设置为2,并将队列宣布为持久。
- >使用RabbitMQ中的服务质量(QoS)设置来控制工人之间的消息分布,确保没有任何单个工人被不知所措。
- 通过发送需要响应的消息,可用于使用RABBITMQ进行RPC(远程过程调用) >设置RPC响应的独家,临时队列,以确保在客户端和服务器之间正确且安全地定向消息。
- >示例1:发送请求以在几个工人中对数据异步处理
- 在上一部分的示例中,我们有一个生产者,一个消费者。 如果消费者死亡,则消息将继续堆积在队列中,直到消费者再次开始。然后,它将处理所有消息。 在并发的用户环境中,每分钟请求相当多,这可能是不理想的。 幸运的是,扩展消费者非常容易,但让我们实现另一个示例。
- > >假设我们有一项发票生成服务,用户只需要提供发票号码,系统将自动生成PDF文件并将其通过电子邮件发送给用户。 如果生成过程运行的服务器受到资源有限,则生成和发送电子邮件可能需要几秒钟。 现在假设我们需要每秒支持几笔交易,我们如何在不压倒服务器的情况下完成此操作?
让我们看一下我们的制作人班:
> WorkerSender :: execute()方法将收到一个发票号。 接下来,我们像往常一样创建一个连接,频道和队列。
>请注意,这次,在创建消息对象时,构造函数将接收第二个参数:array('veliver_mode'=> 2)。 在这种情况下,我们要声明,如果RabbitMQ服务器崩溃,则不会丢失消息。 请注意,为了使这项工作,队列也必须被声明持久。
可以使用以下代码接收表单数据并执行生产者: 请使用您感到满意的输入消毒/验证。
>
现在,我们如何发送Ack? 请查看Workerreceiver :: process()方法(接收到消息时,该方法称为回调方法)。对生成PDF()和sendemail()方法的调用只是虚拟方法,它将模拟完成这两个任务所花费的时间。 $ msg参数不仅包含从生产者发送的有效载荷,还包含有关生产者使用的对象的信息。 我们可以使用$ msg-> velivery_info ['channel']提取有关生产者使用的通道的信息(这是我们使用$ Connection-> Channel();)为消费者打开的通道的对象类型。 由于我们需要向生产者的频道发送确认我们已经完成了该过程的确认,因此我们将使用其basic_ack()方法,将作为参数发送作为参数($ msg-> delivery_info ['evely_tag'])rabbitmq自动生成的rabbitmq正确地将ACK属于哪个消息相关联。 我们如何解雇工人?只需创建以下文件,请调用Workerreceiver :: listing()方法:
>
>另一个很大的更改是,在执行$ channel-> basic_consume()时,生产者也将成为队列的消费者,请注意,我们在声明队列时提供了$ callback_queue值。 像每个消费者一样,我们将在该过程收到响应时声明一个回调要执行。
接下来,我们必须为消息创建一个相关ID,这无非是每个消息的唯一标识符。 在示例中,我们正在使用uniqid()的输出,但是您可以使用自己喜欢的任何机制(只要它不创建种族条件,就不需要是强大的,加密的安全RNG)。 现在,让我们创建一条消息,与我们在前两个示例中使用的消息相比,它具有重要的变化。除了分配一个包含我们要身份验证的凭证的JSON编码字符串外,我们还必须提供给AMQPMESSAGE构造函数一个具有两个定义的属性的数组: 。
RPC消费者呢? 在这里是:
’)。 我们将定义QoS参数,因为我们将停用自动ACK,因此我们可以在验证凭据验证并具有结果时通知。
魔术来自声明的回调。一旦我们对凭据进行身份验证(是的,我知道该过程是针对静态用户名/密码值完成的,本教程并不是关于如何对凭据进行身份验证;)),我们必须创建具有相同关联ID的返回消息(生产者)创建。 我们可以使用$ req-> get('correlation_id')从请求消息中提取此值,以相同的方式传递此值。
>发布消息后,我们必须使用$ req-> delivery_info ['channel''] - > basic_ack()将ACK通知发送到频道,使用$ req-> evelyse_info ['velivery_tag'' ]这样生产者可以停止等待。 >关于RabbitMQ和AMQP的更多要说,例如虚拟主机,交换类型,服务器管理等……您可以在此处和文档页面上找到更多的应用程序模式(例如路由,主题)。 还有一个命令行工具来管理RabbitMQ,还有一个基于Web的接口。 如果您喜欢这个教程系列,并且想查看有关MQ和更多现实世界用例的更多信息,请在下面的评论中告诉我们! >我如何在php中从php? 我如何处理rabbitmq中的PHP中的错误可以使用try-catch块在兔子中使用PHP中的兔子中的错误处理?发生错误时,PHP AMQP扩展会引发AMQPException类的异常。您可以捕获这些异常并根据您的应用程序的需求处理它们。 >如何使用PHP?<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span>
<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
预摘要大小
>示例2:发送RPC请求并期望回复
到目前为止,我们一直在向RabbitMQ服务器发送消息,而无需等待回复。 对于异步过程,这可能比用户愿意花费更多的时间只是为了查看“确定”消息,这是可以的。 但是,如果我们真的需要答复怎么办?假设一个复杂的计算结果,因此我们可以将其显示给用户?<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span>
<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
>一旦我们收到来自频道的响应,将调用回调方法(rpcsender :: onResponse())。 此方法将与已接收的相关ID与生成的相关ID匹配,如果它们是相同的,则将设置响应主体,从而破坏while循环。<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span> <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>
</span><span> <span>/**
</span></span><span><span> * Sends an invoice generation task to the workers
</span></span><span><span> *
</span></span><span><span> * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span> */
</span></span><span> <span>public function execute($invoiceNum)
</span></span><span> <span>{
</span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span> <span>$channel = $connection->channel();
</span></span><span>
</span><span> <span>$channel->queue_declare(
</span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->basic_publish(
</span></span><span> <span>$msg, #message
</span></span><span> <span>'', #exchange
</span></span><span> <span>'invoice_queue' #routing key (queue)
</span></span><span> <span>);
</span></span><span>
</span><span> <span>$channel->close();
</span></span><span> <span>$connection->close();
</span></span><span> <span>}
</span></span><span><span>}</span></span>
<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span> <span>$msg = new AMQPMessage(
</span></span><span> <span>$invoiceNum,
</span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span> <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
>关于PHP RabbitMQ高级示例的常见问题(常见问题解答)
>兔子在PHP中的作用是什么?它通过使其能够更有效地处理高负载和复杂任务,在PHP应用中起着至关重要的作用。 RabbitMQ使用高级消息排队协议(AMQP)来促进应用程序不同部分之间的消息交换。这允许流程解耦,使应用程序更具可扩展性和弹性。你的机器。这可以通过官方的RabbitMQ网站完成。安装服务器后,您可以安装PHP AMQP扩展名,该扩展提供了与RabbitMQ交互的必要功能。可以使用命令pecl install amqp。 AMQPCHANNEL课程。此方法采用多个参数,包括交换的名称,交换的类型(直接,主题,粉丝或标题)以及可选参数,例如被动,耐用,auto_delete和grignestes。我是否在php中的兔子队列中发送消息?带有消息内容的AMQPMESSAGE类。然后,您可以使用AMQPCHANNEL类的Basic_Publish方法将消息发送到队列。 BASIC_PUBLISH方法将消息,交换和路由密钥作为参数。
>>
>如何确保使用php的兔子中的消息持久性? AMQPMESSAGE类的velivery_mode属性到2。这将使RabbitMQ将消息存储在磁盘上,即使RabbitMQ服务器崩溃或重新启动。
>如何通过php?
在兔子中的优先级队列在PHP中实现优先级排队,可以通过在声明队列时设置X-Max-Prifority参数来实现。然后,在发送消息时,您可以将AMQPMESSAGE类的优先属性设置为0和您指定的最大优先级之间的值。我如何在PHP中使用RabbitMQ作为RPC? > RABBITMQ可以通过向回调队列发送带有回复属性设置的消息,用于PHP中的远程过程调用(RPC)。然后,服务器可以将响应发送到回调队列,并且客户端可以从那里消耗响应。
>>监视PHP中的RabbitMQ中的RabbitMQ。 RabbitMQ管理插件,该插件提供了一个基于Web的接口,用于监视和管理RabbitMQ服务器。您还可以使用AMQPCHANNEL类的方法来获取有关频道状态的信息,例如未经确定的消息的数量。
>
以上是PHP和RabbitMQ:高级示例的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

JWT是一种基于JSON的开放标准,用于在各方之间安全地传输信息,主要用于身份验证和信息交换。1.JWT由Header、Payload和Signature三部分组成。2.JWT的工作原理包括生成JWT、验证JWT和解析Payload三个步骤。3.在PHP中使用JWT进行身份验证时,可以生成和验证JWT,并在高级用法中包含用户角色和权限信息。4.常见错误包括签名验证失败、令牌过期和Payload过大,调试技巧包括使用调试工具和日志记录。5.性能优化和最佳实践包括使用合适的签名算法、合理设置有效期、

会话劫持可以通过以下步骤实现:1.获取会话ID,2.使用会话ID,3.保持会话活跃。在PHP中防范会话劫持的方法包括:1.使用session_regenerate_id()函数重新生成会话ID,2.通过数据库存储会话数据,3.确保所有会话数据通过HTTPS传输。

PHP8.1中的枚举功能通过定义命名常量增强了代码的清晰度和类型安全性。1)枚举可以是整数、字符串或对象,提高了代码可读性和类型安全性。2)枚举基于类,支持面向对象特性,如遍历和反射。3)枚举可用于比较和赋值,确保类型安全。4)枚举支持添加方法,实现复杂逻辑。5)严格类型检查和错误处理可避免常见错误。6)枚举减少魔法值,提升可维护性,但需注意性能优化。

SOLID原则在PHP开发中的应用包括:1.单一职责原则(SRP):每个类只负责一个功能。2.开闭原则(OCP):通过扩展而非修改实现变化。3.里氏替换原则(LSP):子类可替换基类而不影响程序正确性。4.接口隔离原则(ISP):使用细粒度接口避免依赖不使用的方法。5.依赖倒置原则(DIP):高低层次模块都依赖于抽象,通过依赖注入实现。

静态绑定(static::)在PHP中实现晚期静态绑定(LSB),允许在静态上下文中引用调用类而非定义类。1)解析过程在运行时进行,2)在继承关系中向上查找调用类,3)可能带来性能开销。

RESTAPI设计原则包括资源定义、URI设计、HTTP方法使用、状态码使用、版本控制和HATEOAS。1.资源应使用名词表示并保持层次结构。2.HTTP方法应符合其语义,如GET用于获取资源。3.状态码应正确使用,如404表示资源不存在。4.版本控制可通过URI或头部实现。5.HATEOAS通过响应中的链接引导客户端操作。

在PHP中,异常处理通过try,catch,finally,和throw关键字实现。1)try块包围可能抛出异常的代码;2)catch块处理异常;3)finally块确保代码始终执行;4)throw用于手动抛出异常。这些机制帮助提升代码的健壮性和可维护性。

匿名类在PHP中的主要作用是创建一次性使用的对象。1.匿名类允许在代码中直接定义没有名字的类,适用于临时需求。2.它们可以继承类或实现接口,增加灵活性。3.使用时需注意性能和代码可读性,避免重复定义相同的匿名类。
