Hàng đợi Redis
Hàng đợi tin nhắn dựa trên Redis, hỗ trợ xử lý tin nhắn với độ trễ.
Cài đặt
composer require webman/redis-queue
Tệp cấu hình
Tệp cấu hình redis sẽ được tự động tạo tại {dự án chính}/config/plugin/webman/redis-queue/redis.php
, nội dung tương tự như sau:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Mật khẩu, tham số tùy chọn
'db' => 0, // Cơ sở dữ liệu
'max_attempts' => 5, // Số lần thử lại sau khi tiêu thụ thất bại
'retry_seconds' => 5, // Khoảng thời gian thử lại, đơn vị giây
]
],
];
Thử lại khi tiêu thụ thất bại
Nếu tiêu thụ thất bại (phát sinh ngoại lệ), tin nhắn sẽ được đưa vào hàng đợi trì hoãn, chờ thử lại lần sau. Số lần thử lại được điều khiển bởi tham số max_attempts
, khoảng thời gian thử lại được kiểm soát bởi retry_seconds
và max_attempts
. Ví dụ, nếu max_attempts
là 5, retry_seconds
là 10, khoảng thời gian thử lại lần thứ nhất là 1*10
giây, khoảng thời gian thử lại lần thứ hai là 2*10
giây, khoảng thời gian thử lại lần thứ ba là 3*10
giây, và cứ tiếp tục như vậy cho đến khi thử lại 5 lần. Nếu vượt quá số lần thử lại được đặt bởi max_attempts
, tin nhắn sẽ được đưa vào hàng đợi thất bại với khóa là {redis-queue}-failed
.
Gửi tin nhắn (đồng bộ)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Tên hàng đợi
$queue = 'send-mail';
// Dữ liệu, có thể truyền trực tiếp dưới dạng mảng, không cần phải tuần tự hóa
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Gửi tin nhắn
Redis::send($queue, $data);
// Gửi tin nhắn trì hoãn, tin nhắn sẽ được xử lý sau 60 giây
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
Gửi thành công Redis::send()
sẽ trả về true, nếu không sẽ trả về false hoặc ném ra ngoại lệ.
Mẹo
Thời gian tiêu thụ hàng đợi trì hoãn có thể có sai lệch, chẳng hạn như tốc độ tiêu thụ thấp hơn tốc độ sản xuất dẫn đến hàng đợi bị dồn, từ đó dẫn đến độ trễ trong việc tiêu thụ, cách khắc phục là mở thêm một số tiến trình tiêu thụ.
Gửi tin nhắn (phi tâm)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Tên hàng đợi
$queue = 'send-mail';
// Dữ liệu, có thể truyền trực tiếp dưới dạng mảng, không cần phải tuần tự hóa
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Gửi tin nhắn
Client::send($queue, $data);
// Gửi tin nhắn trì hoãn, tin nhắn sẽ được xử lý sau 60 giây
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
không có giá trị trả về, nó thuộc về hình thức đẩy phi tâm, không đảm bảo tin nhắn %100 được gửi đến redis.
Mẹo
Nguyên lý củaClient::send()
là tạo một hàng đợi trong bộ nhớ cục bộ, phi tâm đồng bộ hóa tin nhắn vào Redis (tốc độ đồng bộ hóa rất nhanh, khoảng 10.000 tin nhắn mỗi giây). Nếu tiến trình khởi động lại và dữ liệu trong hàng đợi bộ nhớ cục bộ chưa được đồng bộ hóa hoàn tất, có thể gây ra mất tin nhắn. Việc gửi tin nhắn phi tâm bằngClient::send()
thích hợp cho việc gửi những tin nhắn không quan trọng.Mẹo
Client::send()
là phi tâm, nó chỉ có thể được sử dụng trong môi trường chạy của workerman, hãy sử dụng giao diện đồng bộRedis::send()
cho các kịch bản dòng lệnh.
Gửi tin nhắn từ các dự án khác
Đôi khi bạn cần gửi tin nhắn từ các dự án khác và không thể sử dụng webman\redis-queue
, có thể tham khảo hàm dưới đây để gửi tin nhắn vào hàng đợi.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Trong đó, tham số $redis
là thể hiện của redis. Ví dụ cách sử dụng mở rộng redis như sau:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data = ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Tiêu thụ
Tệp cấu hình tiến trình tiêu thụ nằm tại {dự án chính}/config/plugin/webman/redis-queue/process.php
.
Thư mục người tiêu thụ nằm tại {dự án chính}/app/queue/redis/
.
Thực hiện lệnh php webman redis-queue:consumer my-send-mail
sẽ tạo ra tệp {dự án chính}/app/queue/redis/MyMailSend.php
Mẹo
Nếu lệnh không tồn tại cũng có thể tự tạo thủ công
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Tên hàng đợi cần tiêu thụ
public $queue = 'send-mail';
// Tên kết nối, tương ứng với plugin/webman/redis-queue/redis.php
public $connection = 'default';
// Tiêu thụ
public function consume($data)
{
// Không cần phải giải mã
var_export($data); // Xuất ra ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// Gọi lại khi tiêu thụ thất bại
/*
$package = [
'id' => 1357277951, // ID tin nhắn
'time' => 1709170510, // Thời gian tin nhắn
'delay' => 0, // Thời gian trì hoãn
'attempts' => 2, // Số lần tiêu thụ
'queue' => 'send-mail', // Tên hàng đợi
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Nội dung tin nhắn
'max_attempts' => 5, // Số lần thử lại tối đa
'error' => 'Thông tin lỗi' // Thông tin lỗi
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "Tiêu thụ thất bại\n";
echo $e->getMessage() . "\n";
// Không cần phải giải mã
var_export($package);
}
}
Lưu ý
Trong quá trình tiêu thụ không phát sinh ngoại lệ và Error sẽ được coi là thành công, nếu không sẽ bị coi là thất bại, và vào hàng đợi thử lại.
redis-queue không có cơ chế xác nhận, bạn có thể xem đây như một xác nhận tự động (nếu không phát sinh ngoại lệ hoặc Error). Nếu trong quá trình tiêu thụ muốn đánh dấu tin nhắn hiện tại không thành công, có thể ném ra ngoại lệ để cho tin nhắn hiện tại vào hàng đợi thử lại. Điều này thực tế cũng giống như cơ chế xác nhận.Mẹo
Các người tiêu thụ hỗ trợ nhiều máy chủ đa tiến trình, và cùng một tin nhắn sẽ không bị tiêu thụ lại. Các tin nhắn đã được tiêu thụ sẽ tự động bị xóa khỏi hàng đợi, không cần phải xóa thủ công.Mẹo
Tiến trình tiêu thụ có thể tiêu thụ cùng lúc nhiều loại hàng đợi khác nhau, việc thêm hàng đợi mới không cần phải sửa đổi cấu hình trongprocess.php
, chỉ cần thêm các lớpConsumer
tương ứng trongapp/queue/redis
và sử dụng thuộc tính lớp$queue
để chỉ định tên hàng đợi cần tiêu thụ.Mẹo
Người dùng windows cần thực hiệnphp windows.php
để khởi động webman, nếu không sẽ không khởi động tiến trình tiêu thụ.Mẹo
Gọi lại onConsumeFailure sẽ được kích hoạt mỗi khi tiêu thụ thất bại, bạn có thể xử lý logic sau khi thất bại ở đây. (Tính năng này yêu cầuwebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
)
Thiết lập tiến trình tiêu thụ khác nhau cho các hàng đợi khác nhau
Mặc định, tất cả người tiêu thụ chia sẻ cùng một tiến trình tiêu thụ. Nhưng đôi khi chúng ta cần tách riêng việc tiêu thụ của một số hàng đợi, chẳng hạn như việc tiêu thụ chậm được đưa vào một nhóm tiến trình, trong khi việc tiêu thụ nhanh được đưa vào một nhóm tiến trình khác. Để làm điều này, chúng ta có thể chia các người tiêu thụ thành hai thư mục, chẳng hạn như app_path() . '/queue/redis/fast'
và app_path() . '/queue/redis/slow'
(Lưu ý rằng không gian tên của các lớp tiêu thụ cần được thay đổi tương ứng), cấu hình như sau:
return [
...bỏ qua các cấu hình khác...
'redis_consumer_fast' => [ // key là tùy chỉnh, không có giới hạn định dạng, ở đây đặt tên là redis_consumer_fast
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Thư mục lớp tiêu thụ
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // key là tùy chỉnh, không có giới hạn định dạng, ở đây đặt tên là redis_consumer_slow
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Thư mục lớp tiêu thụ
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
Như vậy, người tiêu thụ cho công việc nhanh sẽ được đưa vào thư mục queue/redis/fast
, trong khi người tiêu thụ cho công việc chậm sẽ được đưa vào thư mục queue/redis/slow
, đạt được mục đích chỉ định tiến trình tiêu thụ cho hàng đợi.
Cấu hình nhiều redis
Cấu hình
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Mật khẩu, kiểu chuỗi, tham số tùy chọn
'db' => 0, // Cơ sở dữ liệu
'max_attempts' => 5, // Số lần thử lại sau khi tiêu thụ thất bại
'retry_seconds' => 5, // Khoảng thời gian thử lại, đơn vị giây
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Mật khẩu, kiểu chuỗi, tham số tùy chọn
'db' => 0, // Cơ sở dữ liệu
'max_attempts' => 5, // Số lần thử lại sau khi tiêu thụ thất bại
'retry_seconds' => 5, // Khoảng thời gian thử lại, đơn vị giây
]
],
];
Lưu ý rằng cấu hình đã thêm một cấu hình redis với key là other
Gửi tin nhắn đến nhiều redis
// Gửi tin nhắn đến hàng đợi có key là `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Tương đương
Client::send($queue, $data);
Redis::send($queue, $data);
// Gửi tin nhắn đến hàng đợi có key là `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Tiêu thụ nhiều redis
Trong cấu hình tiêu thụ, hàng đợi có key là other
sẽ gửi tin nhắn
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Tên hàng đợi cần tiêu thụ
public $queue = 'send-mail';
// === Thiết lập ở đây là other, đại diện cho hàng đợi có key là other trong cấu hình tiêu thụ ===
public $connection = 'other';
// Tiêu thụ
public function consume($data)
{
// Không cần phải giải mã
var_export($data);
}
}
Các vấn đề thường gặp
Tại sao lại có lỗi Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
Lỗi này chỉ tồn tại trong giao diện gửi phi tâm Client::send()
. Giao diện gửi phi tâm sẽ đầu tiên lưu tin nhắn trong bộ nhớ cục bộ, khi tiến trình rảnh sẽ gửi tin nhắn đến Redis. Nếu tốc độ tiếp nhận của Redis chậm hơn tốc độ sản xuất tin nhắn, hoặc tiến trình bận rộn với công việc khác không có đủ thời gian để đồng bộ hóa tin nhắn trong bộ nhớ, sẽ dẫn đến tình trạng tắc nghẽn tin nhắn. Nếu có tin nhắn tắc nghẽn trên 600 giây, lỗi này sẽ được kích hoạt.
Giải pháp: Sử dụng giao diện gửi đồng bộ Redis::send()
để gửi tin nhắn.