Redis队列

基于Redis的消息队列,支持消息延迟处理。

التثبيت

composer require webman/redis-queue

ملف الإعداد

يتم إنشاء ملف إعدادات redis تلقائيًا في {主项目}/config/plugin/webman/redis-queue/redis.php، ومحتواه مشابه لما يلي:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // كلمة المرور، خيار اختياري
            'db' => 0,            // قاعدة البيانات
            'max_attempts'  => 5, // عدد محاولات إعادة الاستخراج بعد الفشل
            'retry_seconds' => 5, // فترة إعادة الاستخراج، بوحدات الثواني
        ]
    ],
];

إعادة محاولات الفشل في الاستخراج

إذا فشل الاستخراج (حدث استثناء)، فستتم إضافة الرسالة إلى قائمة الانتظار المتأخرة، في انتظار إعادة المحاولة التالية. يتم التحكم في عدد المحاولات بواسطة المعامل max_attempts، وتتحكم كل من retry_seconds و max_attempts في فترة إعادة المحاولة. على سبيل المثال، إذا كانت max_attempts هي 5، و retry_seconds هي 10، فإن فترة إعادة المحاولة الأولى ستكون 1*10 ثوانٍ، وفترة إعادة المحاولة الثانية ستكون 2*10 ثوانٍ، وهكذا حتى يتم إعادة المحاولة 5 مرات. إذا تم تجاوز عدد المحاولات المحدد في max_attempts، سيتم وضع الرسالة في قائمة الفشل ذات المفتاح {redis-queue}-failed.

إرسال الرسائل (متزامن)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // اسم قائمة الانتظار
        $queue = 'send-mail';
        // البيانات، يمكن تمرير المصفوفة مباشرةً دون الحاجة إلى تسلسل
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // إرسال الرسالة
        Redis::send($queue, $data);
        // إرسال رسالة متأخرة، سيتم معالجة الرسالة بعد 60 ثانية
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }

}

إذا كان إرسال الرسالة ناجحًا، فإن Redis::send() ستعيد true، وإلا ستعيد false أو ترمي استثناء.

تلميح
قد تظهر أخطاء في توقيت استهلاك قائمة الانتظار المتأخرة، مثل أن تكون سرعة الاستهلاك أقل من سرعة الإنتاج مما يؤدي إلى تراكم الرسائل، وبالتالي تأخير الاستهلاك. الحل هو تشغيل بعض عمليات الاستهلاك الإضافية.

إرسال الرسائل (غير متزامن)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // اسم قائمة الانتظار
        $queue = 'send-mail';
        // البيانات، يمكن تمرير المصفوفة مباشرةً دون الحاجة إلى تسلسل
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // إرسال الرسالة
        Client::send($queue, $data);
        // إرسال رسالة متأخرة، سيتم معالجة الرسالة بعد 60 ثانية
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Client::send() ليس له قيمة عائدة، إنه ينتمي إلى الدفع غير المتزامن، ولا يضمن وصول الرسالة إلى redis بنسبة 100%.

تلميح
مبدأ عمل Client::send() هو إنشاء قائمة انتظار في الذاكرة المحلية، ودفع الرسائل بشكل غير متزامن إلى redis (سرعة الدفع سريعة جدًا، حوالي 10,000 رسالة في الثانية). إذا تم إعادة تشغيل العملية، وفي حال كان بيانات قائمة الانتظار في الذاكرة المحلية لم تتم مزامنتها بالكامل، فقد يؤدي ذلك إلى فقدان الرسائل. إن الإرسال غير المتزامن باستخدام Client::send() هو الأنسب لإرسال الرسائل غير المهمة.

تلميح
Client::send() هو غير متزامن، ويجب استخدامه فقط في بيئة تشغيل workerman، أما سكربتات سطر الأوامر يجب أن تستخدم الواجهة المتزامنة Redis::send().

إرسال الرسائل من مشاريع أخرى

في بعض الأحيان تحتاج إلى إرسال رسائل من مشاريع أخرى ولا يمكنك استخدام webman\redis-queue، يمكنك الرجوع إلى الدالة التالية لإرسال الرسائل إلى قائمة الانتظار.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

حيث أن المعامل $redis هو مثيل redis. على سبيل المثال، استخدام تمديد redis مشابه لما يلي:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

الاستهلاك

ملف إعدادات عملية الاستهلاك موجود في {主项目}/config/plugin/webman/redis-queue/process.php
دليل المستهلك تحت {主项目}/app/queue/redis/ .

عند تنفيذ الأمر php webman redis-queue:consumer my-send-mail سيتم إنشاء الملف {主项目}/app/queue/redis/MyMailSend.php

تلميح
إذا كان الأمر غير موجود، يمكنك أيضًا إنشاؤه يدويًا.

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // اسم قائمة الانتظار المستهلكة
    public $queue = 'send-mail';

    // اسم الاتصال، يتوافق مع plugin/webman/redis-queue/redis.php
    public $connection = 'default';

    // الاستهلاك
    public function consume($data)
    {
        // لا حاجة إلى إعادة التسلسل
        var_export($data); // يقوم بعرض ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // رد الاستدعاء عند فشل الاستهلاك
    /* 
    $package = [
        'id' => 1357277951, // معرف الرسالة
        'time' => 1709170510, // وقت الرسالة
        'delay' => 0, // فترة التأخير
        'attempts' => 2, // عدد مرات الاستهلاك
        'queue' => 'send-mail', // اسم قائمة الانتظار
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // محتوى الرسالة
        'max_attempts' => 5, // الحد الأقصى لعدد المحاولات
        'error' => 'رسالة خطأ' // معلومات الخطأ
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "فشل الاستهلاك\n";
        echo $e->getMessage() . "\n";
        // لا حاجة إلى إعادة التسلسل
        var_export($package); 
    }
}

ملاحظة
إذا لم يتم طرح استثناءات أو أخطاء أثناء عملية الاستهلاك، يعتبر الاستهلاك ناجحًا، وإلا يعتبر فاشلاً ويدخل في قائمة إعادة المحاولة.
لا تحتوي redis-queue على آلية ack، يمكنك اعتبارها كـ ack تلقائي (إذا لم يحدث استثناء أو خطأ). إذا كنت ترغب في وضع علامة على أن الرسالة الحالية لم يتم استهلاكها بنجاح، يمكنك طرح استثناء يدويًا لجعل الرسالة الحالية تدخل قائمة إعادة المحاولة. وهذا في الواقع لا يختلف عن آلية ack.

تلميح
يدعم المستهلكون خوادم متعددة وعمليات متعددة، ولن يتم استهلاك نفس الرسالة مرتين. الرسائل التي تم استهلاكها ستتم إزالتها تلقائيًا من قائمة الانتظار، ولا حاجة لحذفها يدويًا.

تلميح
يمكن لعملية الاستهلاك استهلاك أنواع مختلفة من القوائم في نفس الوقت، ولا تحتاج إلى تعديل الإعدادات في process.php عند إضافة قائمة جديدة، كل ما عليك فعله هو إضافة فئة Consumer المقابلة في app/queue/redis وتحديد اسم قائمة الانتظار المراد استهلاكها باستخدام خاصية الفئة $queue.

تلميح
يحتاج مستخدمو windows إلى تنفيذ php windows.php لبدء webman، وإلا فلن يتم تشغيل عمليات الاستهلاك.

تلميح
سيتم تفعيل رد الاستدعاء onConsumeFailure في كل مرة يفشل فيها الاستهلاك، يمكنك معالجة منطق الفشل هنا. (تتطلب هذه الميزة webman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1).

إعداد عمليات استهلاك مختلفة لقوائم مختلفة

بشكل افتراضي، يشترك جميع المستهلكين في نفس عمليات الاستهلاك. لكن في بعض الأحيان نحتاج إلى عزل استهلاك بعض القوائم، مثل وضع الأعمال البطيئة في مجموعة من العمليات، ووضع الأعمال السريعة في مجموعة أخرى من العمليات. لهذا يمكننا تقسيم المستهلكين إلى دليلين، مثل app_path() . '/queue/redis/fast' و app_path() . '/queue/redis/slow' (لاحظ أنه يجب تعديل مساحة أسماء فئات الاستهلاك بشكل مناسب)، وبالتالي التكوين كما يلي:

return [
    ...هنا تم حذف إعدادات أخرى...

    'redis_consumer_fast'  => [ // المفتاح هو اسم مخصص، بدون قيود في التنسيق، هنا يسمى redis_consumer_fast
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // دليل فئة المستهلك
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // المفتاح هو اسم مخصص، بدون قيود في التنسيق، هنا يسمى redis_consumer_slow
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // دليل فئة المستهلك
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

بهذه الطريقة، يتم وضع مستهلك الأعمال السريعة في دليل queue/redis/fast، بينما يتم وضع مستهلك الأعمال البطيئة في دليل queue/redis/slow لتحقيق الهدف من تخصيص عمليات استهلاك للقوائم.

تكوين متعدد redis

التكوين

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // كلمة المرور، نوع سلسلة، خيار اختياري
            'db' => 0,            // قاعدة البيانات
            'max_attempts'  => 5, // عدد محاولات إعادة الاستخراج بعد الفشل
            'retry_seconds' => 5, // فترة إعادة الاستخراج، بوحدات الثواني
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // كلمة المرور، نوع سلسلة، خيار اختياري
            'db' => 0,            // قاعدة البيانات
            'max_attempts'  => 5, // عدد محاولات إعادة الاستخراج بعد الفشل
            'retry_seconds' => 5, // فترة إعادة الاستخراج، بوحدات الثواني
        ]
    ],
];

لاحظ أنه تمت إضافة إعداد redis ذات المفتاح other.

إرسال الرسائل إلى redis المتعددة

// لإرسال رسالة إلى قائمة ذات المفتاح `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
//  تساوي
Client::send($queue, $data);
Redis::send($queue, $data);

// لإرسال رسالة إلى قائمة ذات المفتاح `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

استهلاك redis المتعدد

في ملفات التكوين الخاصة بالاستنساخ، فإن إرسال الرسائل إلى قائمة ذات المفتاح other

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // اسم قائمة الانتظار المراد استهلاكها
    public $queue = 'send-mail';

    // === هنا نحدد أنها `other`، تمثل قائمة الانتظار ذات المفتاح other في التكوين ===
    public $connection = 'other';

    // الاستهلاك
    public function consume($data)
    {
        // لا حاجة إلى إعادة التسلسل
        var_export($data);
    }
}

الأسئلة الشائعة

لماذا يظهر خطأ Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)

هذا الخطأ سيظهر فقط في واجهة إرسال غير المتزامنة Client::send(). تقوم عملية الإرسال غير المتزامنة أولاً بحفظ الرسائل في الذاكرة المحلية، وعندما تكون العملية خالية، يتم إرسال الرسائل إلى redis. إذا كانت سرعة استقبال redis أبطأ من سرعة إنتاج الرسائل، أو كانت العملية مشغولة باستمرار بأعمال أخرى ولا تتوفر لديها وقت كافٍ للمزامنة، فسيحدث ضغط على الرسائل. إذا زاد ضغط الرسائل عن 600 ثانية، فسيتم تفعيل هذا الخطأ.

الحل: استخدم واجهة الإرسال المتزامنة Redis::send() لإرسال الرسائل.