Redis क्यू

Redis आधारित संदेश क्यू, संदेशों के लिए निलंबित प्रसंस्करण का समर्थन करता है।

स्थापना

composer require webman/redis-queue

कॉन्फ़िगरेशन फ़ाइल

Redis कॉन्फ़िगरेशन फ़ाइल स्वचालित रूप से {मुख्य प्रोजेक्ट}/config/plugin/webman/redis-queue/redis.php में生成 होती है, इसका कंटेंट निम्नानुसार होगा:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // पासवर्ड, वैकल्पिक पैरामीटर
            'db' => 0,            // डेटाबेस
            'max_attempts'  => 5, // उपभोक्ता विफल होने पर पुन: प्रयास की संख्या
            'retry_seconds' => 5, // पुन: प्रयास का अंतराल, इकाई सेकंड
        ]
    ],
];

उपभोक्ता विफलता पुन: प्रयास

यदि उपभोक्ता विफल हो जाता है (कोई अपवाद उत्पन्न होता है), तो संदेश को निलंबित क्यू में डाल दिया जाएगा, अगली बार पुन: प्रयास के लिए इंतजार करेगा। पुन: प्रयास की संख्या पैरामीटर max_attempts द्वारा नियंत्रित की जाती है, पुन: प्रयास का अंतराल retry_seconds और max_attempts के द्वारा समान रूप से नियंत्रित होता है। उदाहरण के लिए, यदि max_attempts 5 है और retry_seconds 10 है, तो पहले पुन: प्रयास के लिए अंतराल होगा 1*10 सेकंड, दूसरे पुन: प्रयास के लिए 2*10 सेकंड, तीसरे पुन: प्रयास के लिए 3*10 सेकंड, और इसी तरह से 5 बार पुन: प्रयास होने तक। यदि max_attempts द्वारा निर्दिष्ट पुन: प्रयास संख्या से अधिक हो जाता है, तो संदेश को की के साथ {redis-queue}-failed विफल क्यू में डाल दिया जाएगा।

संदेश भेजना (समकालिक)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // क्यू का नाम
        $queue = 'send-mail';
        // डेटा, इसे सीधे एरे के रूप में पास किया जा सकता है, संचयीकरण की आवश्यकता नहीं है
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // संदेश भेजना
        Redis::send($queue, $data);
        // निलंबित संदेश भेजना, संदेश 60 सेकंड बाद संसाधित होगा
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }

}

संदेश भेजने में सफल होने पर Redis::send() true लौटाएगा, अन्यथा false लौटाएगा या अपवाद उठाएगा।

सुझाव
निलंबित क्यू की उपभोक्ता समय में भिन्नता हो सकती है, जैसे कि उपभोक्ता गति उत्पादन गति से कम हो जाती है, जिससे क्यू में वृद्धि होती है और उपभोक्ता में देरी होती है। इसे हल करने के लिए कुछ उपभोक्ता प्रक्रियाएँ अधिक प्रारंभ करें।

संदेश भेजना (असामयिक)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // क्यू का नाम
        $queue = 'send-mail';
        // डेटा, इसे सीधे एरे के रूप में पास किया जा सकता है, संचयीकरण की आवश्यकता नहीं है
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // संदेश भेजना
        Client::send($queue, $data);
        // निलंबित संदेश भेजना, संदेश 60 सेकंड बाद संसाधित होगा
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Client::send() में कोई लौटाई नहीं है, यह असामयिक धक्का है, यह संदेश का 100% Redis तक पहुँचने की गारंटी नहीं देता है।

सुझाव
Client::send() का तंत्र स्थानीय मेमोरी में एक मेमोरी क्यू बनाने पर आधारित है, असामयिक रूप से संदेश को Redis तक पहुँचाता है (संकट की गति बहुत तेज है, लगभग प्रति सेकंड 10,000 संदेश)। यदि प्रक्रिया पुनः प्रारंभ होती है, जबकि स्थानीय मेमोरी क्यू में डेटा पूरी तरह से सिंक्रनाइज़ नहीं हो पाता है, तो संदेश खो सकते हैं। Client::send() असामयिक धक्का उन संदेशों के लिए उपयुक्त है जो महत्वपूर्ण नहीं हैं।

सुझाव
Client::send() असामयिक है, इसे केवल workerman के चलने के पर्यावरण में उपयोग किया जा सकता है, कमांड लाइन स्क्रिप्ट के लिए समकालिक इंटरफ़ेस Redis::send() का उपयोग करें।

अन्य परियोजनाओं में संदेश भेजना

कभी-कभी आपको अन्य परियोजनाओं में संदेश भेजने की आवश्यकता होती है और आप webman\redis-queue का उपयोग नहीं कर सकते हैं, तो आप क्यू में संदेश भेजने के लिए निम्नलिखित फ़ंक्शन का संदर्भ ले सकते हैं।

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

यहाँ, पैरामीटर $redis Redis उदाहरण है। उदाहरण के लिए Redis एक्सटेंशन का उपयोग कुछ इस तरह से किया जा सकता है:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

उपभोक्ता

उपभोक्ता प्रक्रिया कॉन्फ़िगरेशन फ़ाइल {मुख्य प्रोजेक्ट}/config/plugin/webman/redis-queue/process.php में है।
उपभोक्ता निदेशिका {मुख्य प्रोजेक्ट}/app/queue/redis/ के तहत है।

कमांड चलाने पर php webman redis-queue:consumer my-send-mail एक फ़ाइल बनाएगा {मुख्य प्रोजेक्ट}/app/queue/redis/MyMailSend.php

सुझाव
यदि कमांड मौजूद नहीं है तो आप मैन्युअल रूप से भी बना सकते हैं।

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // उपभोक्ताओं के लिए क्यू का नाम
    public $queue = 'send-mail';

    // कनेक्शन का नाम, plugin/webman/redis-queue/redis.php में कनेक्शन के अनुरूप`
    public $connection = 'default';

    // उपभोक्ता
    public function consume($data)
    {
        // संचयीकरण की आवश्यकता नहीं है
        var_export($data); // बाहर निकलें ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // उपभोक्ता विफलता कॉलबैक
    /* 
    $package = [
        'id' => 1357277951, // संदेश ID
        'time' => 1709170510, // संदेश समय
        'delay' => 0, // देरी का समय
        'attempts' => 2, // उपभोक्ता次数
        'queue' => 'send-mail', // क्यू का नाम
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // संदेश सामग्री
        'max_attempts' => 5, // अधिकतम पुन: प्रयास次数
        'error' => 'त्रुटि जानकारी' // त्रुटि जानकारी
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "उपभोक्ता विफलता\n";
        echo $e->getMessage() . "\n";
        // संचयीकरण की आवश्यकता नहीं है
        var_export($package); 
    }
}

ध्यान दें
उपभोक्ताओं की प्रक्रिया में किसी अपवाद या त्रुटि का फेंकना उपभोक्ता की सफलता की स्थिति मानी जाएगी, अन्यथा उपभोक्ता विफल रहेगा, पुन: प्रयास क्यू में जाएगा।
redis-queue में कोई ack तंत्र नहीं है, आप इसे स्वचालित ack के रूप में देख सकते हैं (अपवाद या त्रुटियों का उत्पन्न ना होने पर)। यदि आप उपभोक्ता प्रक्रिया में वर्तमान संदेश को उपभोक्ता में विफलता का चिह्नित करना चाहें, तो आप मैन्युअल रूप से अपवाद उठाकर वर्तमान संदेश को पुन: प्रयास क्यू में डाल सकते हैं। यह वास्तव में ack तंत्र के साथ समान है।

सुझाव
उपभोक्ताओं को कई सर्वरों और प्रक्रियाओं के लिए समर्थन है, और एक ही संदेश दोबारा उपभोक्ताओं द्वारा उपभोक्त नहीं किया जाएगा। उपभोक्त किए गए संदेशों को स्वतः क्यू से हटा दिया जाएगा, मैन्युअल रूप से हटाने की आवश्यकता नहीं होगी।

सुझाव
उपभोक्ता प्रक्रिया एक ही समय में विभिन्न क्यूज़ का उपभोक्ता कर सकती है, अतिरिक्त क्यूज़ जोड़ने की आवश्यकता नहीं होगी process.php में कॉन्फ़िगरेशन को संशोधित करने की, नए क्यू उपभोक्ताओं को बस app/queue/redis के तहत संबंधित Consumer वर्ग को जोड़ना होगा और वर्ग गुण $queue का उपयोग कर उपभोक्ता क्यू नाम को निर्दिष्ट करना होगा।

सुझाव
विंडोज उपयोगकर्ताओं को php windows.php चलाकर webman प्रारंभ करना होगा, अन्यथा उपभोक्ता प्रक्रिया शुरू नहीं होगी।

सुझाव
onConsumeFailure कॉलबैक हर बार उपभोक्ता विफल होने पर सक्रिय होगा, आप यहाँ विफलता के बाद की लॉजिक को संभाल सकते हैं। (यह विशेषता के लिए webman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1 की आवश्यकता है।)

विभिन्न क्यू के लिए विभिन्न उपभोक्ता प्रक्रियाओं को सेट करना

डिफ़ॉल्ट रूप से, सभी उपभोक्ता समान उपभोक्ता प्रक्रियाओं को साझा करते हैं। लेकिन कभी-कभी हमें कुछ क्यू के उपभोक्ताओं को अलग करना पड़ता है, जैसे कि धीमी व्यवसाय प्रक्रियाओं को एक समूह में उपभोक्त करना, तेज़ व्यवसाय प्रक्रियाओं को दूसरे समूह में उपभोक्त करना। इसके लिए हम उपभोक्ताओं को दो निदेशिकाओं में विभाजित कर सकते हैं, जैसे app_path() . '/queue/redis/fast' और app_path() . '/queue/redis/slow' (ध्यान दें कि उपभोक्ता वर्ग का नामकरण स्थान आवश्यक रूप से संशोधित किया जाना चाहिए), तो कॉन्फ़िगरेशन इस तरह होगा:

return [
    ...यहाँ अन्य कॉन्फ़िगरेशन समाप्त किया गया है...

    'redis_consumer_fast'  => [ // key कस्टम है, यहाँ redis_consumer_fast नाम रखा गया है
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // उपभोक्ता वर्ग निदेशिका
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // key कस्टम है, यहाँ redis_consumer_slow नाम रखा गया है
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // उपभोक्ता वर्ग निदेशिका
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

इससे तेज़ व्यवसाय उपभोक्ता को queue/redis/fast निदेशिका में रखा जाता है, धीमे व्यवसाय उपभोक्ता को queue/redis/slow निदेशिका में रखा जाता है ताकि क्यू को विशेष उपभोक्ता प्रक्रियाओं के लिए निर्दिष्ट किया जा सके।

कई Redis कॉन्फ़िगरेशन

कॉन्फ़िगरेशन

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // पासवर्ड, स्ट्रिंग प्रकार, वैकल्पिक पैरामीटर
            'db' => 0,            // डेटाबेस
            'max_attempts'  => 5, // उपभोक्ता विफल होने पर पुन: प्रयास की संख्या
            'retry_seconds' => 5, // पुन: प्रयास का अंतराल, इकाई सेकंड
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // पासवर्ड, स्ट्रिंग प्रकार, वैकल्पिक पैरामीटर
            'db' => 0,             // डेटाबेस
            'max_attempts'  => 5, // उपभोक्ता विफल होने पर पुन: प्रयास की संख्या
            'retry_seconds' => 5, // पुन: प्रयास का अंतराल, इकाई सेकंड
        ]
    ],
];

ध्यान दें कि कॉन्फ़िगरेशन में एक other कुंजी जोड़ी गई है।

कई Redis के लिए संदेश भेजना

// `default` कुंजी की क्यू में संदेश भेजना
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
//  यह समान है
Client::send($queue, $data);
Redis::send($queue, $data);

// `other` कुंजी की क्यू में संदेश भेजना
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

कई Redis उपभोक्ताओं

उपभोक्ता कॉन्फ़िगरेशन में other कुंजी की क्यू में संदेश भेजें

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // उपभोक्ताओं के लिए क्यू का नाम
    public $queue = 'send-mail';

    // === यहाँ इसे other पर सेट करें, जो उपभोक्ता कॉन्फ़िगरेशन में other की कुंजी की क्यू को संदर्भित करता है ===
    public $connection = 'other';

    // उपभोक्ता
    public function consume($data)
    {
        // संचयीकरण की आवश्यकता नहीं है
        var_export($data);
    }
}

सामान्य प्रश्न

Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds) त्रुटि क्यों आती है?

यह त्रुटि केवल असामयिक संदेश भेजने के इंटरफ़ेस Client::send() में होती है। असामयिक भेजने में सबसे पहले संदेशों को स्थानीय मेमोरी में सहेजा जाता है, जब प्रक्रिया खाली होती है तो संदेशों को Redis को भेजा जाता है। यदि Redis की अधिग्रहण गति संदेश उत्पादन गति से धीमी है, या प्रक्रिया अन्य कार्यों में व्यस्त होती है और मेमोरी में मौजूद संदेशों को Redis के साथ समकालिक करने का पर्याप्त समय नहीं मिलता है, तो संदेशों की भीड़ लग सकती है। यदि कोई संदेश भीड़ 600 सेकंड से अधिक हो जाती है, तो यह त्रुटि उत्पन्न कर देगी।

समाधान: संदेश भेजने के लिए समकालिक भेजने का इंटरफ़ेस Redis::send() का उपयोग करें।