Redis Queue
A message queue based on Redis, supporting delayed message processing.
Installation
composer require webman/redis-queue
Configuration File
The Redis configuration file is automatically generated at {main_project}/config/plugin/webman/redis-queue/redis.php
, with content similar to the following:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Password, optional
'db' => 0, // Database
'max_attempts' => 5, // Number of retries after consumer failure
'retry_seconds' => 5, // Retry interval, in seconds
]
],
];
Retry on Consumer Failure
If consumption fails (an exception occurs), the message will be placed in a delayed queue, waiting for the next retry. The number of retries is controlled by the parameter max_attempts
, and the retry interval is jointly controlled by retry_seconds
and max_attempts
. For example, if max_attempts
is 5 and retry_seconds
is 10, the interval for the first retry will be 1*10
seconds, the second retry interval will be 2*10
seconds, the third retry interval will be 3*10
seconds, and so on, up to 5 retries. If the retry count exceeds the max_attempts
setting, the message is placed in a failure queue with the key {redis-queue}-failed
.
Sending Messages (Synchronous)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Queue name
$queue = 'send-mail';
// Data, can be passed as an array directly, no serialization needed
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Sending message
Redis::send($queue, $data);
// Sending delayed message, which will be processed after 60 seconds
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
If successful, Redis::send()
returns true, otherwise it returns false or throws an exception.
Tip
The consumption time for delayed queues may vary, for example, if the consumption speed is slower than the production speed, resulting in queue backlog and hence consumption delays. A way to alleviate this is to start more consumer processes.
Sending Messages (Asynchronous)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Queue name
$queue = 'send-mail';
// Data, can be passed as an array directly, no serialization needed
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Sending message
Client::send($queue, $data);
// Sending delayed message, which will be processed after 60 seconds
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
has no return value; it is an asynchronous push, and does not guarantee 100% delivery of the message to Redis.
Tip
The principle behindClient::send()
is to establish a memory queue in the local memory, asynchronously synchronizing the messages to Redis (synchronization speed is very fast, approximately 10,000 messages per second). If the process restarts and the data in the local memory queue has not been fully synchronized, it may cause message loss.Client::send()
asynchronous delivery is suitable for delivering less important messages.Tip
Client::send()
is asynchronous and can only be used in the Workerman runtime environment; for command line scripts, please use the synchronous interfaceRedis::send()
.
Sending Messages from Other Projects
Sometimes you may need to send messages from other projects and cannot use webman\redis-queue
, in which case you can refer to the following function to send messages to the queue.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Here, the $redis
parameter is a Redis instance. For example, the usage of the Redis extension is similar to the following:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Consumption
The consumer process configuration file is located at {main_project}/config/plugin/webman/redis-queue/process.php
.
The consumer directory is under {main_project}/app/queue/redis/
.
Executing the command php webman redis-queue:consumer my-send-mail
will generate the file {main_project}/app/queue/redis/MyMailSend.php
.
Tip
If the command does not exist, you can also generate it manually.
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Queue name to consume
public $queue = 'send-mail';
// Connection name, corresponding to plugin/webman/redis-queue/redis.php connection
public $connection = 'default';
// Consume
public function consume($data)
{
// No need for deserialization
var_export($data); // Outputs ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// Callback for consumption failure
/*
$package = [
'id' => 1357277951, // Message ID
'time' => 1709170510, // Message time
'delay' => 0, // Delay time
'attempts' => 2, // Number of consumptions
'queue' => 'send-mail', // Queue name
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Message content
'max_attempts' => 5, // Maximum number of retries
'error' => 'Error message' // Error message
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "consume failure\n";
echo $e->getMessage() . "\n";
// No need for deserialization
var_export($package);
}
}
Notice
If no exceptions or errors are thrown during the consumption process, it is considered a successful consumption; otherwise, it is considered a failure and enters the retry queue. Redis-queue does not have an ack mechanism; you can think of it as automatic ack (no exceptions or errors occurred). If you want to mark the current message as not successfully consumed during the consumption process, you can manually throw an exception so that the current message enters the retry queue. This is essentially no different from an ack mechanism.Tip
Consumers support multiple servers and multiple processes, and the same message will not be consumed multiple times. Messages that have been consumed will be automatically removed from the queue without manual deletion.Tip
Consumer processes can consume multiple different queues simultaneously. Adding a new queue does not require modifying the configuration inprocess.php
; when adding a new queue consumer, simply add the correspondingConsumer
class inapp/queue/redis
and specify the queue name to consume using the class property$queue
.Tip
Windows users need to executephp windows.php
to start Webman; otherwise, the consumer process will not start.Tip
TheonConsumeFailure
callback will be triggered every time a consumption fails, where you can handle the logic after failure. (This feature requireswebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
)
Setting Different Consumer Processes for Different Queues
By default, all consumers share the same consumer process. However, sometimes we may need to isolate the consumption of some queues, for example, slow business consumption can be placed in one group of processes while fast business consumption can be placed in another. To achieve this, we can divide consumers into two directories, such as app_path() . '/queue/redis/fast'
and app_path() . '/queue/redis/slow'
(note that the naming space of the consumer classes needs to be changed accordingly), the configuration will be as follows:
return [
...other configurations omitted...
'redis_consumer_fast' => [ // The key is custom, with no format restriction, here named redis_consumer_fast
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Consumer class directory
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // The key is custom, with no format restriction, here named redis_consumer_slow
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Consumer class directory
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
In this way, fast business consumers are placed in the queue/redis/fast
directory, and slow business consumers are placed in the queue/redis/slow
directory to achieve the goal of assigning consumption processes to queues.
Multiple Redis Configurations
Configuration
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Password, string type, optional
'db' => 0, // Database
'max_attempts' => 5, // Number of retries after consumer failure
'retry_seconds' => 5, // Retry interval, in seconds
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Password, string type, optional
'db' => 0, // Database
'max_attempts' => 5, // Number of retries after consumer failure
'retry_seconds' => 5, // Retry interval, in seconds
]
],
];
Note that an additional other
key for Redis configuration has been added.
Sending Messages to Multiple Redis
// Sending messages to the queue with `default` as the key
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Equivalent to
Client::send($queue, $data);
Redis::send($queue, $data);
// Sending messages to the queue with `other` as the key
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Consuming from Multiple Redis
For the consumer configuration, consuming messages from the queue with other
as the key
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Queue name to consume
public $queue = 'send-mail';
// === Set this to other, representing the queue with the key `other` in the configuration ===
public $connection = 'other';
// Consume
public function consume($data)
{
// No need for deserialization
var_export($data);
}
}
Common Issues
Why am I getting the error Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
This error only exists in the asynchronous sending interface Client::send()
. Asynchronous sending first stores the message in local memory. When the process is idle, the message is sent to Redis. If Redis's receiving speed is slower than the message production speed, or if the process is always busy with other tasks and does not have enough time to synchronize the messages in memory to Redis, it can lead to message congestion. If message congestion exceeds 600 seconds, this error will be triggered.
Solution: Use the synchronous sending interface Redis::send()
to send messages.