Hàng đợi Redis

Hàng đợi tin nhắn dựa trên Redis, hỗ trợ xử lý tin nhắn với độ trễ.

Cài đặt

composer require webman/redis-queue

Tệp cấu hình

Tệp cấu hình redis sẽ được tự động tạo tại {dự án chính}/config/plugin/webman/redis-queue/redis.php, nội dung tương tự như sau:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // Mật khẩu, tham số tùy chọn
            'db' => 0,            // Cơ sở dữ liệu
            'max_attempts'  => 5, // Số lần thử lại sau khi tiêu thụ thất bại
            'retry_seconds' => 5, // Khoảng thời gian thử lại, đơn vị giây
        ]
    ],
];

Thử lại khi tiêu thụ thất bại

Nếu tiêu thụ thất bại (phát sinh ngoại lệ), tin nhắn sẽ được đưa vào hàng đợi trì hoãn, chờ thử lại lần sau. Số lần thử lại được điều khiển bởi tham số max_attempts, khoảng thời gian thử lại được kiểm soát bởi retry_secondsmax_attempts. Ví dụ, nếu max_attempts là 5, retry_seconds là 10, khoảng thời gian thử lại lần thứ nhất là 1*10 giây, khoảng thời gian thử lại lần thứ hai là 2*10 giây, khoảng thời gian thử lại lần thứ ba là 3*10 giây, và cứ tiếp tục như vậy cho đến khi thử lại 5 lần. Nếu vượt quá số lần thử lại được đặt bởi max_attempts, tin nhắn sẽ được đưa vào hàng đợi thất bại với khóa là {redis-queue}-failed.

Gửi tin nhắn (đồng bộ)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // Tên hàng đợi
        $queue = 'send-mail';
        // Dữ liệu, có thể truyền trực tiếp dưới dạng mảng, không cần phải tuần tự hóa
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Gửi tin nhắn
        Redis::send($queue, $data);
        // Gửi tin nhắn trì hoãn, tin nhắn sẽ được xử lý sau 60 giây
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Gửi thành công Redis::send() sẽ trả về true, nếu không sẽ trả về false hoặc ném ra ngoại lệ.

Mẹo
Thời gian tiêu thụ hàng đợi trì hoãn có thể có sai lệch, chẳng hạn như tốc độ tiêu thụ thấp hơn tốc độ sản xuất dẫn đến hàng đợi bị dồn, từ đó dẫn đến độ trễ trong việc tiêu thụ, cách khắc phục là mở thêm một số tiến trình tiêu thụ.

Gửi tin nhắn (phi tâm)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // Tên hàng đợi
        $queue = 'send-mail';
        // Dữ liệu, có thể truyền trực tiếp dưới dạng mảng, không cần phải tuần tự hóa
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Gửi tin nhắn
        Client::send($queue, $data);
        // Gửi tin nhắn trì hoãn, tin nhắn sẽ được xử lý sau 60 giây
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Client::send() không có giá trị trả về, nó thuộc về hình thức đẩy phi tâm, không đảm bảo tin nhắn %100 được gửi đến redis.

Mẹo
Nguyên lý của Client::send() là tạo một hàng đợi trong bộ nhớ cục bộ, phi tâm đồng bộ hóa tin nhắn vào Redis (tốc độ đồng bộ hóa rất nhanh, khoảng 10.000 tin nhắn mỗi giây). Nếu tiến trình khởi động lại và dữ liệu trong hàng đợi bộ nhớ cục bộ chưa được đồng bộ hóa hoàn tất, có thể gây ra mất tin nhắn. Việc gửi tin nhắn phi tâm bằng Client::send() thích hợp cho việc gửi những tin nhắn không quan trọng.

Mẹo
Client::send() là phi tâm, nó chỉ có thể được sử dụng trong môi trường chạy của workerman, hãy sử dụng giao diện đồng bộ Redis::send() cho các kịch bản dòng lệnh.

Gửi tin nhắn từ các dự án khác

Đôi khi bạn cần gửi tin nhắn từ các dự án khác và không thể sử dụng webman\redis-queue, có thể tham khảo hàm dưới đây để gửi tin nhắn vào hàng đợi.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Trong đó, tham số $redis là thể hiện của redis. Ví dụ cách sử dụng mở rộng redis như sau:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data = ['some', 'data'];
redis_queue_send($redis, $queue, $data);

Tiêu thụ

Tệp cấu hình tiến trình tiêu thụ nằm tại {dự án chính}/config/plugin/webman/redis-queue/process.php.
Thư mục người tiêu thụ nằm tại {dự án chính}/app/queue/redis/.

Thực hiện lệnh php webman redis-queue:consumer my-send-mail sẽ tạo ra tệp {dự án chính}/app/queue/redis/MyMailSend.php

Mẹo
Nếu lệnh không tồn tại cũng có thể tự tạo thủ công

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // Tên hàng đợi cần tiêu thụ
    public $queue = 'send-mail';

    // Tên kết nối, tương ứng với plugin/webman/redis-queue/redis.php
    public $connection = 'default';

    // Tiêu thụ
    public function consume($data)
    {
        // Không cần phải giải mã
        var_export($data); // Xuất ra ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // Gọi lại khi tiêu thụ thất bại
    /* 
    $package = [
        'id' => 1357277951, // ID tin nhắn
        'time' => 1709170510, // Thời gian tin nhắn
        'delay' => 0, // Thời gian trì hoãn
        'attempts' => 2, // Số lần tiêu thụ
        'queue' => 'send-mail', // Tên hàng đợi
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Nội dung tin nhắn
        'max_attempts' => 5, // Số lần thử lại tối đa
        'error' => 'Thông tin lỗi' // Thông tin lỗi
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "Tiêu thụ thất bại\n";
        echo $e->getMessage() . "\n";
        // Không cần phải giải mã
        var_export($package); 
    }
}

Lưu ý
Trong quá trình tiêu thụ không phát sinh ngoại lệ và Error sẽ được coi là thành công, nếu không sẽ bị coi là thất bại, và vào hàng đợi thử lại.
redis-queue không có cơ chế xác nhận, bạn có thể xem đây như một xác nhận tự động (nếu không phát sinh ngoại lệ hoặc Error). Nếu trong quá trình tiêu thụ muốn đánh dấu tin nhắn hiện tại không thành công, có thể ném ra ngoại lệ để cho tin nhắn hiện tại vào hàng đợi thử lại. Điều này thực tế cũng giống như cơ chế xác nhận.

Mẹo
Các người tiêu thụ hỗ trợ nhiều máy chủ đa tiến trình, và cùng một tin nhắn sẽ không bị tiêu thụ lại. Các tin nhắn đã được tiêu thụ sẽ tự động bị xóa khỏi hàng đợi, không cần phải xóa thủ công.

Mẹo
Tiến trình tiêu thụ có thể tiêu thụ cùng lúc nhiều loại hàng đợi khác nhau, việc thêm hàng đợi mới không cần phải sửa đổi cấu hình trong process.php, chỉ cần thêm các lớp Consumer tương ứng trong app/queue/redis và sử dụng thuộc tính lớp $queue để chỉ định tên hàng đợi cần tiêu thụ.

Mẹo
Người dùng windows cần thực hiện php windows.php để khởi động webman, nếu không sẽ không khởi động tiến trình tiêu thụ.

Mẹo
Gọi lại onConsumeFailure sẽ được kích hoạt mỗi khi tiêu thụ thất bại, bạn có thể xử lý logic sau khi thất bại ở đây. (Tính năng này yêu cầu webman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1)

Thiết lập tiến trình tiêu thụ khác nhau cho các hàng đợi khác nhau

Mặc định, tất cả người tiêu thụ chia sẻ cùng một tiến trình tiêu thụ. Nhưng đôi khi chúng ta cần tách riêng việc tiêu thụ của một số hàng đợi, chẳng hạn như việc tiêu thụ chậm được đưa vào một nhóm tiến trình, trong khi việc tiêu thụ nhanh được đưa vào một nhóm tiến trình khác. Để làm điều này, chúng ta có thể chia các người tiêu thụ thành hai thư mục, chẳng hạn như app_path() . '/queue/redis/fast'app_path() . '/queue/redis/slow' (Lưu ý rằng không gian tên của các lớp tiêu thụ cần được thay đổi tương ứng), cấu hình như sau:

return [
    ...bỏ qua các cấu hình khác...

    'redis_consumer_fast'  => [ // key là tùy chỉnh, không có giới hạn định dạng, ở đây đặt tên là redis_consumer_fast
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Thư mục lớp tiêu thụ
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // key là tùy chỉnh, không có giới hạn định dạng, ở đây đặt tên là redis_consumer_slow
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Thư mục lớp tiêu thụ
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

Như vậy, người tiêu thụ cho công việc nhanh sẽ được đưa vào thư mục queue/redis/fast, trong khi người tiêu thụ cho công việc chậm sẽ được đưa vào thư mục queue/redis/slow, đạt được mục đích chỉ định tiến trình tiêu thụ cho hàng đợi.

Cấu hình nhiều redis

Cấu hình

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // Mật khẩu, kiểu chuỗi, tham số tùy chọn
            'db' => 0,            // Cơ sở dữ liệu
            'max_attempts'  => 5, // Số lần thử lại sau khi tiêu thụ thất bại
            'retry_seconds' => 5, // Khoảng thời gian thử lại, đơn vị giây
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // Mật khẩu, kiểu chuỗi, tham số tùy chọn
            'db' => 0,            // Cơ sở dữ liệu
            'max_attempts'  => 5, // Số lần thử lại sau khi tiêu thụ thất bại
            'retry_seconds' => 5, // Khoảng thời gian thử lại, đơn vị giây
        ]
    ],
];

Lưu ý rằng cấu hình đã thêm một cấu hình redis với key là other

Gửi tin nhắn đến nhiều redis

// Gửi tin nhắn đến hàng đợi có key là `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
//  Tương đương
Client::send($queue, $data);
Redis::send($queue, $data);

// Gửi tin nhắn đến hàng đợi có key là `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

Tiêu thụ nhiều redis

Trong cấu hình tiêu thụ, hàng đợi có key là other sẽ gửi tin nhắn

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // Tên hàng đợi cần tiêu thụ
    public $queue = 'send-mail';

    // === Thiết lập ở đây là other, đại diện cho hàng đợi có key là other trong cấu hình tiêu thụ ===
    public $connection = 'other';

    // Tiêu thụ
    public function consume($data)
    {
        // Không cần phải giải mã
        var_export($data);
    }
}

Các vấn đề thường gặp

Tại sao lại có lỗi Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)

Lỗi này chỉ tồn tại trong giao diện gửi phi tâm Client::send(). Giao diện gửi phi tâm sẽ đầu tiên lưu tin nhắn trong bộ nhớ cục bộ, khi tiến trình rảnh sẽ gửi tin nhắn đến Redis. Nếu tốc độ tiếp nhận của Redis chậm hơn tốc độ sản xuất tin nhắn, hoặc tiến trình bận rộn với công việc khác không có đủ thời gian để đồng bộ hóa tin nhắn trong bộ nhớ, sẽ dẫn đến tình trạng tắc nghẽn tin nhắn. Nếu có tin nhắn tắc nghẽn trên 600 giây, lỗi này sẽ được kích hoạt.

Giải pháp: Sử dụng giao diện gửi đồng bộ Redis::send() để gửi tin nhắn.