Table of Contents
Preface
Storage key:
Execute command
Mode difference
Public operations
Publishing process
Publishing parameters
立即执行
延迟发布
执行过程
在其他项目中推任务
php版本
go版本
Home PHP Framework ThinkPHP Analyze think-queue (analysis around redis)

Analyze think-queue (analysis around redis)

Jul 26, 2021 pm 04:00 PM
php redis thinkphp5

Preface

Before the analysis, please be sure to understand the implementation of the message queue

tp5 The message queue is based on database redis and TP official implementation of Topthink
This chapter is based on redis for analysis

Storage key:

key Type Description
queues:queueName list Task to be executed
think:queue:restart string Restart queue timestamp
queues:queueName:delayed zSet Delayed Task
queues :queueName:reserved zSet Execution failed, waiting for re-execution

Execute command

work The difference with listen will be explained below
Command Description
php think queue:work Listening queue
php think queue:listen Listening queue
php think queue:restart Restart queue
php think queue:subscribe None yet, it may be reserved. The official has other ideas but they haven’t been implemented yet
##Behavior tags

TagDescription##worker_daemon_startworker_memory_exceededworker_queue_restartworker_before_processworker_before_sleep##queue_failedCommand parameters
Daemon start
Memory exceeded
Restart daemon
Before the task starts execution
Delayed task execution
Task execution failed

ParametersDefault valueUsable modesDescriptionqueuework,listenName of the task to be executed##daemondelayforce memorysleeptries
null
nullwork Perform tasks as a daemon process
0work,listen Time to re-execute after failure
nullwork Time to re-execute after failure
128Mwork,listen Limit maximum memory
3work,listen Waiting time when there is no task
0work,listen Maximum number of attempts after task failure

Mode difference

1: Different execution principles
work: Single process processing mode;
No daemon parameter The work process will directly end the current process after processing the next message. When there are no new messages, it will sleep for a period of time and then exit;
With daemon parameters, the work process will process the messages in the queue in a loop until the memory exceeds the parameter configuration before ending the process. When there is no new message, it will sleep for a period of time in each loop;

listen: the processing mode of the parent process and child process;
will create a single execution mode in the parent process. The work child process will process the next message in the queue through the work child process. When the work child process exits, the parent process where
is located will listen to the exit signal of the child process and create a new one. Work child process executed in a single time;

2: The exit timing is different
work: See above
listen: The parent process will always run under normal circumstances, unless the following two situations are encountered
01: The execution time of a created work child process exceeds the --timeout parameter configuration in the listen command line; at this time, the work child process will be forcibly terminated, and the parent process where listen is located will also throw a ProcessTimeoutException exception and exit. ;

Developers can choose to catch this exception and let the parent process continue to execute;
02: If the parent process has a memory leak for some reason, when the memory occupied by the parent process itself exceeds the command line When the --memory parameter is configured in, both the parent and child processes will exit. Under normal circumstances, the memory occupied by the listen process itself is stable.

3: Different performance
work: It loops inside the script, and the framework script is loaded in the early stage of command execution;

listen: It starts a new process after processing a task A work process will reload the framework script at this time;

Therefore, the performance of work mode will be higher than that of listen mode.
Note: When the code is updated, you need to manually execute the php think queue:restart command in work mode to restart the queue for the changes to take effect; while in listen mode, it will take effect automatically without any other operations.

4: Timeout control capability
work: In essence, it can neither control the running time of the process itself nor limit the execution time of the executing tasks;
listen: can limit the work children it creates The timeout time of the process;

The timeout parameter can be used to limit the maximum time that the work sub-process is allowed to run. Sub-processes that have not ended beyond this time limit will be forcibly terminated;
The difference between expire and time

expire is set in the configuration file and refers to the expiration time of the task. This time is global and affects all work processes.
timeout is set in the command line parameters and refers to the timeout time of the work sub-process. This time It is only valid for the currently executed listen command. The target of timeout is the work sub-process;

5: Different usage scenarios

work applicable scenarios are:
01: A large number of tasks
02: High performance requirements
03: Short task execution time
04: There is no infinite loop, sleep(), exit(), die() and other logic that can easily lead to bugs in the consumer class

listen Applicable scenarios are:

01: The number of tasks is small
02: The execution time of the task is long
03: The execution time of the task needs to be strictly limited

Public operations

Since we are doing analysis based on redis, we only need to analyze src/queue/connector/redis.php
01: First call src/Queue.php The magic method in __callStatic
02: buildConnector is called in the __callStatic method
03: The configuration file is loaded first in buildConnector. If none, it will be executed synchronously
04: Create a connection based on the configuration file and pass in the configuration

Operations in the constructor of the redis.php class:
01: Check whether the redis extension is installed
02: Merge configuration
03: Detect whether it is a redis extension or pRedis
04: Create a connection object

Publishing process

Publishing parameters

##$dataemptyTask datapush,later$queuedefaultTask namepush,later $delaynullDelay timelater##

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');
Copy after login

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]
Copy after login

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');
Copy after login

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]
Copy after login

Analyze think-queue (analysis around redis)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }
Copy after login

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行
Copy after login

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
Copy after login

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}
Copy after login

更多thinkphp技术知识,请访问thinkphp教程栏目!

Parameter name Default value Description Methods that can be used
$job None The class to perform the task push,later

The above is the detailed content of Analyze think-queue (analysis around redis). For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

PHP and Python: Comparing Two Popular Programming Languages PHP and Python: Comparing Two Popular Programming Languages Apr 14, 2025 am 12:13 AM

PHP and Python each have their own advantages, and choose according to project requirements. 1.PHP is suitable for web development, especially for rapid development and maintenance of websites. 2. Python is suitable for data science, machine learning and artificial intelligence, with concise syntax and suitable for beginners.

PHP: A Key Language for Web Development PHP: A Key Language for Web Development Apr 13, 2025 am 12:08 AM

PHP is a scripting language widely used on the server side, especially suitable for web development. 1.PHP can embed HTML, process HTTP requests and responses, and supports a variety of databases. 2.PHP is used to generate dynamic web content, process form data, access databases, etc., with strong community support and open source resources. 3. PHP is an interpreted language, and the execution process includes lexical analysis, grammatical analysis, compilation and execution. 4.PHP can be combined with MySQL for advanced applications such as user registration systems. 5. When debugging PHP, you can use functions such as error_reporting() and var_dump(). 6. Optimize PHP code to use caching mechanisms, optimize database queries and use built-in functions. 7

PHP's Current Status: A Look at Web Development Trends PHP's Current Status: A Look at Web Development Trends Apr 13, 2025 am 12:20 AM

PHP remains important in modern web development, especially in content management and e-commerce platforms. 1) PHP has a rich ecosystem and strong framework support, such as Laravel and Symfony. 2) Performance optimization can be achieved through OPcache and Nginx. 3) PHP8.0 introduces JIT compiler to improve performance. 4) Cloud-native applications are deployed through Docker and Kubernetes to improve flexibility and scalability.

PHP's Purpose: Building Dynamic Websites PHP's Purpose: Building Dynamic Websites Apr 15, 2025 am 12:18 AM

PHP is used to build dynamic websites, and its core functions include: 1. Generate dynamic content and generate web pages in real time by connecting with the database; 2. Process user interaction and form submissions, verify inputs and respond to operations; 3. Manage sessions and user authentication to provide a personalized experience; 4. Optimize performance and follow best practices to improve website efficiency and security.

The Enduring Relevance of PHP: Is It Still Alive? The Enduring Relevance of PHP: Is It Still Alive? Apr 14, 2025 am 12:12 AM

PHP is still dynamic and still occupies an important position in the field of modern programming. 1) PHP's simplicity and powerful community support make it widely used in web development; 2) Its flexibility and stability make it outstanding in handling web forms, database operations and file processing; 3) PHP is constantly evolving and optimizing, suitable for beginners and experienced developers.

PHP vs. Other Languages: A Comparison PHP vs. Other Languages: A Comparison Apr 13, 2025 am 12:19 AM

PHP is suitable for web development, especially in rapid development and processing dynamic content, but is not good at data science and enterprise-level applications. Compared with Python, PHP has more advantages in web development, but is not as good as Python in the field of data science; compared with Java, PHP performs worse in enterprise-level applications, but is more flexible in web development; compared with JavaScript, PHP is more concise in back-end development, but is not as good as JavaScript in front-end development.

PHP vs. Python: Core Features and Functionality PHP vs. Python: Core Features and Functionality Apr 13, 2025 am 12:16 AM

PHP and Python each have their own advantages and are suitable for different scenarios. 1.PHP is suitable for web development and provides built-in web servers and rich function libraries. 2. Python is suitable for data science and machine learning, with concise syntax and a powerful standard library. When choosing, it should be decided based on project requirements.

PHP in Action: Real-World Examples and Applications PHP in Action: Real-World Examples and Applications Apr 14, 2025 am 12:19 AM

PHP is widely used in e-commerce, content management systems and API development. 1) E-commerce: used for shopping cart function and payment processing. 2) Content management system: used for dynamic content generation and user management. 3) API development: used for RESTful API development and API security. Through performance optimization and best practices, the efficiency and maintainability of PHP applications are improved.

See all articles