Redis队列
基于Redis的消息队列,支持消息延迟处理。
التثبيت
composer require webman/redis-queue
ملف الإعداد
يتم إنشاء ملف إعدادات redis تلقائيًا في {主项目}/config/plugin/webman/redis-queue/redis.php
، ومحتواه مشابه لما يلي:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // كلمة المرور، خيار اختياري
'db' => 0, // قاعدة البيانات
'max_attempts' => 5, // عدد محاولات إعادة الاستخراج بعد الفشل
'retry_seconds' => 5, // فترة إعادة الاستخراج، بوحدات الثواني
]
],
];
إعادة محاولات الفشل في الاستخراج
إذا فشل الاستخراج (حدث استثناء)، فستتم إضافة الرسالة إلى قائمة الانتظار المتأخرة، في انتظار إعادة المحاولة التالية. يتم التحكم في عدد المحاولات بواسطة المعامل max_attempts
، وتتحكم كل من retry_seconds
و max_attempts
في فترة إعادة المحاولة. على سبيل المثال، إذا كانت max_attempts
هي 5، و retry_seconds
هي 10، فإن فترة إعادة المحاولة الأولى ستكون 1*10
ثوانٍ، وفترة إعادة المحاولة الثانية ستكون 2*10
ثوانٍ، وهكذا حتى يتم إعادة المحاولة 5 مرات. إذا تم تجاوز عدد المحاولات المحدد في max_attempts
، سيتم وضع الرسالة في قائمة الفشل ذات المفتاح {redis-queue}-failed
.
إرسال الرسائل (متزامن)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// اسم قائمة الانتظار
$queue = 'send-mail';
// البيانات، يمكن تمرير المصفوفة مباشرةً دون الحاجة إلى تسلسل
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// إرسال الرسالة
Redis::send($queue, $data);
// إرسال رسالة متأخرة، سيتم معالجة الرسالة بعد 60 ثانية
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
إذا كان إرسال الرسالة ناجحًا، فإن Redis::send()
ستعيد true
، وإلا ستعيد false
أو ترمي استثناء.
تلميح
قد تظهر أخطاء في توقيت استهلاك قائمة الانتظار المتأخرة، مثل أن تكون سرعة الاستهلاك أقل من سرعة الإنتاج مما يؤدي إلى تراكم الرسائل، وبالتالي تأخير الاستهلاك. الحل هو تشغيل بعض عمليات الاستهلاك الإضافية.
إرسال الرسائل (غير متزامن)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// اسم قائمة الانتظار
$queue = 'send-mail';
// البيانات، يمكن تمرير المصفوفة مباشرةً دون الحاجة إلى تسلسل
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// إرسال الرسالة
Client::send($queue, $data);
// إرسال رسالة متأخرة، سيتم معالجة الرسالة بعد 60 ثانية
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
ليس له قيمة عائدة، إنه ينتمي إلى الدفع غير المتزامن، ولا يضمن وصول الرسالة إلى redis بنسبة 100%.
تلميح
مبدأ عملClient::send()
هو إنشاء قائمة انتظار في الذاكرة المحلية، ودفع الرسائل بشكل غير متزامن إلى redis (سرعة الدفع سريعة جدًا، حوالي 10,000 رسالة في الثانية). إذا تم إعادة تشغيل العملية، وفي حال كان بيانات قائمة الانتظار في الذاكرة المحلية لم تتم مزامنتها بالكامل، فقد يؤدي ذلك إلى فقدان الرسائل. إن الإرسال غير المتزامن باستخدامClient::send()
هو الأنسب لإرسال الرسائل غير المهمة.تلميح
Client::send()
هو غير متزامن، ويجب استخدامه فقط في بيئة تشغيل workerman، أما سكربتات سطر الأوامر يجب أن تستخدم الواجهة المتزامنةRedis::send()
.
إرسال الرسائل من مشاريع أخرى
في بعض الأحيان تحتاج إلى إرسال رسائل من مشاريع أخرى ولا يمكنك استخدام webman\redis-queue
، يمكنك الرجوع إلى الدالة التالية لإرسال الرسائل إلى قائمة الانتظار.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
حيث أن المعامل $redis
هو مثيل redis. على سبيل المثال، استخدام تمديد redis مشابه لما يلي:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
الاستهلاك
ملف إعدادات عملية الاستهلاك موجود في {主项目}/config/plugin/webman/redis-queue/process.php
。
دليل المستهلك تحت {主项目}/app/queue/redis/
.
عند تنفيذ الأمر php webman redis-queue:consumer my-send-mail
سيتم إنشاء الملف {主项目}/app/queue/redis/MyMailSend.php
تلميح
إذا كان الأمر غير موجود، يمكنك أيضًا إنشاؤه يدويًا.
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// اسم قائمة الانتظار المستهلكة
public $queue = 'send-mail';
// اسم الاتصال، يتوافق مع plugin/webman/redis-queue/redis.php
public $connection = 'default';
// الاستهلاك
public function consume($data)
{
// لا حاجة إلى إعادة التسلسل
var_export($data); // يقوم بعرض ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// رد الاستدعاء عند فشل الاستهلاك
/*
$package = [
'id' => 1357277951, // معرف الرسالة
'time' => 1709170510, // وقت الرسالة
'delay' => 0, // فترة التأخير
'attempts' => 2, // عدد مرات الاستهلاك
'queue' => 'send-mail', // اسم قائمة الانتظار
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // محتوى الرسالة
'max_attempts' => 5, // الحد الأقصى لعدد المحاولات
'error' => 'رسالة خطأ' // معلومات الخطأ
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "فشل الاستهلاك\n";
echo $e->getMessage() . "\n";
// لا حاجة إلى إعادة التسلسل
var_export($package);
}
}
ملاحظة
إذا لم يتم طرح استثناءات أو أخطاء أثناء عملية الاستهلاك، يعتبر الاستهلاك ناجحًا، وإلا يعتبر فاشلاً ويدخل في قائمة إعادة المحاولة.
لا تحتوي redis-queue على آلية ack، يمكنك اعتبارها كـ ack تلقائي (إذا لم يحدث استثناء أو خطأ). إذا كنت ترغب في وضع علامة على أن الرسالة الحالية لم يتم استهلاكها بنجاح، يمكنك طرح استثناء يدويًا لجعل الرسالة الحالية تدخل قائمة إعادة المحاولة. وهذا في الواقع لا يختلف عن آلية ack.تلميح
يدعم المستهلكون خوادم متعددة وعمليات متعددة، ولن يتم استهلاك نفس الرسالة مرتين. الرسائل التي تم استهلاكها ستتم إزالتها تلقائيًا من قائمة الانتظار، ولا حاجة لحذفها يدويًا.تلميح
يمكن لعملية الاستهلاك استهلاك أنواع مختلفة من القوائم في نفس الوقت، ولا تحتاج إلى تعديل الإعدادات فيprocess.php
عند إضافة قائمة جديدة، كل ما عليك فعله هو إضافة فئةConsumer
المقابلة فيapp/queue/redis
وتحديد اسم قائمة الانتظار المراد استهلاكها باستخدام خاصية الفئة$queue
.تلميح
يحتاج مستخدمو windows إلى تنفيذ php windows.php لبدء webman، وإلا فلن يتم تشغيل عمليات الاستهلاك.تلميح
سيتم تفعيل رد الاستدعاء onConsumeFailure في كل مرة يفشل فيها الاستهلاك، يمكنك معالجة منطق الفشل هنا. (تتطلب هذه الميزةwebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
).
إعداد عمليات استهلاك مختلفة لقوائم مختلفة
بشكل افتراضي، يشترك جميع المستهلكين في نفس عمليات الاستهلاك. لكن في بعض الأحيان نحتاج إلى عزل استهلاك بعض القوائم، مثل وضع الأعمال البطيئة في مجموعة من العمليات، ووضع الأعمال السريعة في مجموعة أخرى من العمليات. لهذا يمكننا تقسيم المستهلكين إلى دليلين، مثل app_path() . '/queue/redis/fast'
و app_path() . '/queue/redis/slow'
(لاحظ أنه يجب تعديل مساحة أسماء فئات الاستهلاك بشكل مناسب)، وبالتالي التكوين كما يلي:
return [
...هنا تم حذف إعدادات أخرى...
'redis_consumer_fast' => [ // المفتاح هو اسم مخصص، بدون قيود في التنسيق، هنا يسمى redis_consumer_fast
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// دليل فئة المستهلك
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // المفتاح هو اسم مخصص، بدون قيود في التنسيق، هنا يسمى redis_consumer_slow
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// دليل فئة المستهلك
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
بهذه الطريقة، يتم وضع مستهلك الأعمال السريعة في دليل queue/redis/fast
، بينما يتم وضع مستهلك الأعمال البطيئة في دليل queue/redis/slow
لتحقيق الهدف من تخصيص عمليات استهلاك للقوائم.
تكوين متعدد redis
التكوين
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // كلمة المرور، نوع سلسلة، خيار اختياري
'db' => 0, // قاعدة البيانات
'max_attempts' => 5, // عدد محاولات إعادة الاستخراج بعد الفشل
'retry_seconds' => 5, // فترة إعادة الاستخراج، بوحدات الثواني
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // كلمة المرور، نوع سلسلة، خيار اختياري
'db' => 0, // قاعدة البيانات
'max_attempts' => 5, // عدد محاولات إعادة الاستخراج بعد الفشل
'retry_seconds' => 5, // فترة إعادة الاستخراج، بوحدات الثواني
]
],
];
لاحظ أنه تمت إضافة إعداد redis ذات المفتاح other
.
إرسال الرسائل إلى redis المتعددة
// لإرسال رسالة إلى قائمة ذات المفتاح `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// تساوي
Client::send($queue, $data);
Redis::send($queue, $data);
// لإرسال رسالة إلى قائمة ذات المفتاح `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
استهلاك redis المتعدد
في ملفات التكوين الخاصة بالاستنساخ، فإن إرسال الرسائل إلى قائمة ذات المفتاح other
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// اسم قائمة الانتظار المراد استهلاكها
public $queue = 'send-mail';
// === هنا نحدد أنها `other`، تمثل قائمة الانتظار ذات المفتاح other في التكوين ===
public $connection = 'other';
// الاستهلاك
public function consume($data)
{
// لا حاجة إلى إعادة التسلسل
var_export($data);
}
}
الأسئلة الشائعة
لماذا يظهر خطأ Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
هذا الخطأ سيظهر فقط في واجهة إرسال غير المتزامنة Client::send()
. تقوم عملية الإرسال غير المتزامنة أولاً بحفظ الرسائل في الذاكرة المحلية، وعندما تكون العملية خالية، يتم إرسال الرسائل إلى redis. إذا كانت سرعة استقبال redis أبطأ من سرعة إنتاج الرسائل، أو كانت العملية مشغولة باستمرار بأعمال أخرى ولا تتوفر لديها وقت كافٍ للمزامنة، فسيحدث ضغط على الرسائل. إذا زاد ضغط الرسائل عن 600 ثانية، فسيتم تفعيل هذا الخطأ.
الحل: استخدم واجهة الإرسال المتزامنة Redis::send()
لإرسال الرسائل.