Redis คิว

คิวข้อความที่ใช้ Redis เป็นตัวจัดการ สนับสนุนการจัดการข้อความแบบล่าช้า

การติดตั้ง

composer require webman/redis-queue

ไฟล์การกำหนดค่า

ไฟล์การกำหนดค่า Redis จะถูกสร้างขึ้นอัตโนมัติที่ {โปรเจกต์หลัก}/config/plugin/webman/redis-queue/redis.php เนื้อหาจะคล้ายกับดังนี้:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // รหัสผ่าน (ตัวเลือก)
            'db' => 0,            // ฐานข้อมูล
            'max_attempts'  => 5, // จำนวนการลองใหม่เมื่อการบริโภคล้มเหลว
            'retry_seconds' => 5, // ระยะเวลาในการลองใหม่ (หน่วยเป็นวินาที)
        ]
    ],
];

การลองใหม่เมื่อการบริโภคล้มเหลว

หากการบริโภคล้มเหลว (เกิดข้อยกเว้น) ข้อความจะถูกนำไปไว้ในคิวล่าช้า รอการลองใหม่ครั้งถัดไป จำนวนการลองใหม่ควบคุมโดยพารามิเตอร์ max_attempts และระยะเวลาในการลองใหม่จะควบคุมร่วมกันโดย retry_seconds และ max_attempts ตัวอย่างเช่น หาก max_attempts เป็น 5 และ retry_seconds เป็น 10 ระยะเวลาในการลองใหม่ครั้งที่ 1 จะเป็น 1*10 วินาที ระยะเวลาในการลองใหม่ครั้งที่ 2 จะเป็น 2*10 วินาที ระยะเวลาในการลองใหม่ครั้งที่ 3 จะเป็น 3*10 วินาที และต่อ ๆ ไปจนกว่าจะลองใหม่ครบ 5 ครั้ง หากเกินการตั้งค่าจำนวนการลองใหม่ max_attempts ข้อความจะถูกนำไปไว้ในคิวความล้มเหลวที่มีคีย์เป็น {redis-queue}-failed

ส่งข้อความ (ซิงโครนัส)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // ชื่อคิว
        $queue = 'send-mail';
        // ข้อมูล สามารถส่งอาเรย์ได้โดยตรง โดยไม่ต้องทำการสุ่ม
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // ส่งข้อความ
        Redis::send($queue, $data);
        // ส่งข้อความแบบล่าช้า ข้อความจะถูกประมวลผลภายใน 60 วินาที
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }
}

หากส่งข้อความสำเร็จ Redis::send() จะส่งค่ากลับเป็น true หากไม่สำเร็จจะส่งค่ากลับ false หรือโยนข้อยกเว้น

คำแนะนำ
เวลาการบริโภคจากคิวล่าช้าอาจเกิดความคลาดเคลื่อน เช่น ความเร็วในการบริโภคน้อยกว่าความเร็วในการผลิตทำให้คิวล่าช้า วิธีบรรเทาคือการเปิดกระบวนการบริโภคหลาย ๆ ตัว

ส่งข้อความ (อสมซิง)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // ชื่อคิว
        $queue = 'send-mail';
        // ข้อมูล สามารถส่งอาเรย์ได้โดยตรง โดยไม่ต้องทำการสุ่ม
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // ส่งข้อความ
        Client::send($queue, $data);
        // ส่งข้อความแบบล่าช้า ข้อความจะถูกประมวลผลภายใน 60 วินาที
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }
}

Client::send() ไม่มีค่าที่ส่งกลับ มันเป็นการส่งแบบอสมซิง ซึ่งไม่รับประกันว่าข้อความจะถูกส่งไปยัง redis อย่างเต็มที่ 100%

คำแนะนำ
หลักการของ Client::send() คือการสร้างคิวในหน่วยความจำภายในและส่งข้อความไปยัง redis แบบอสมซิง (ความเร็วในการส่งซิงค์ อย่างรวดเร็ว ประมาณ 10,000 ข้อความต่อวินาที) หากรีสตราซิสทำงานใหม่และข้อมูลในคิวภายในในหน่วยความจำยังไม่ได้ซิงค์ จะทำให้ข้อความสูญหาย การส่งข้อความแบบอสมซิงของ Client::send() เหมาะสำหรับการส่งข้อความที่ไม่สำคัญ

คำแนะนำ
Client::send() เป็นแบบอสมซิง ซึ่งสามารถใช้ได้เฉพาะในสภาพแวดล้อมการทำงานของ workerman สำหรับสคริปต์ที่รันใน command line โปรดใช้การเชื่อมต่อซิงโครนัส Redis::send()

การส่งข้อความในโปรเจกต์อื่น

บางครั้งคุณต้องการส่งข้อความในโปรเจกต์อื่นและไม่สามารถใช้ webman\redis-queue ได้ สามารถดูฟังก์ชันต่อไปนี้เพื่อนส่งข้อความไปยังคิว

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

ในนั้น พารามิเตอร์ $redis คืออินสแตนซ์ของ redis ตัวอย่างการใช้สำหรับการขยาย redis มีดังนี้:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

การบริโภค

ไฟล์การกำหนดค่ากระบวนการบริโภคอยู่ที่ {โปรเจกต์หลัก}/config/plugin/webman/redis-queue/process.php
ไดเร็กทอรีบริโภคอยู่ที่ {โปรเจกต์หลัก}/app/queue/redis/

การรันคำสั่ง php webman redis-queue:consumer my-send-mail จะสร้างไฟล์ {โปรเจกต์หลัก}/app/queue/redis/MyMailSend.php

คำแนะนำ
หากคำสั่งไม่สามารถใช้งานได้สามารถสร้างไฟล์ด้วยตนเองได้

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // ชื่อคิวที่ต้องการบริโภค
    public $queue = 'send-mail';

    // ชื่อการเชื่อมต่อ ตรงกับการเชื่อมต่อในไฟล์ plugin/webman/redis-queue/redis.php
    public $connection = 'default';

    // การบริโภค
    public function consume($data)
    {
        // ไม่ต้องทำการสุ่ม
        var_export($data); // แสดง ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // การเรียกกลับเมื่อการบริโภคล้มเหลว
    /* 
    $package = [
        'id' => 1357277951, // ID ข้อความ
        'time' => 1709170510, // เวลาข้อความ
        'delay' => 0, // เวลาในการล่าช้า
        'attempts' => 2, // จำนวนครั้งที่บริโภค
        'queue' => 'send-mail', // ชื่อคิว
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // ข้อมูลข้อความ
        'max_attempts' => 5, // จำนวนการลองสูงสุด
        'error' => 'ข้อความผิดพลาด' // ข้อความผิดพลาด
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "บริโภคล้มเหลว\n";
        echo $e->getMessage() . "\n";
        // ไม่ต้องทำการสุ่ม
        var_export($package); 
    }
}

หมายเหตุ
หากไม่มีข้อยกเว้นหรือ Error ในระหว่างการบริโภคจะถือว่าการบริโภคสำเร็จ มิฉะนั้นการบริโภคจะล้มเหลวและเข้าสู่คิวลองใหม่
redis-queue ไม่มีกลไก ack คุณสามารถมองว่ามันเป็นการรับรองขั้นต่ำ (ไม่มีข้อยกเว้นหรือ Error) หากคุณต้องการทำเครื่องหมายว่าข้อความปัจจุบันไม่สำเร็จในการบริโภค คุณสามารถโยนข้อยกเว้นแบบแมนนวลเพื่อให้ข้อความปัจจุบันเข้าสู่คิวลองใหม่ จริง ๆ แล้วมันไม่มีความแตกต่างจากกลไก ack

คำแนะนำ
ผู้บริโภคสนับสนุนการทำงานกับเซิร์ฟเวอร์และกระบวนการหลาย ๆ ตัว และข้อความเดียวกัน จะไม่ ถูกบริโภคซ้ำ ข้อความที่บริโภคแล้วจะถูกลบออกจากคิวโดยอัตโนมัติ ไม่จำเป็นต้องลบด้วยตนเอง

คำแนะนำ
กระบวนการบริโภคสามารถบริโภคคิวที่แตกต่างกันหลาย ๆ คิวพร้อมกัน การเพิ่มคิวใหม่ไม่จำเป็นต้องแก้ไขการกำหนดค่าใน process.php เพียงแค่เพิ่มคลาส Consumer ที่สอดคล้องใน app/queue/redis และใช้คุณสมบัติคลาส $queue เพื่อระบุชื่อคิวที่จะบริโภค

คำแนะนำ
ผู้ใช้ Windows ควรรัน php windows.php เพื่อเริ่มต้น webman มิฉะนั้นจะไม่สามารถเริ่มกระบวนการบริโภคได้

คำแนะนำ
การเรียกกลับ onConsumeFailure จะถูกเรียกใช้ทุกครั้งที่บริโภคล้มเหลว คุณสามารถจัดการตรรกะหลังจากล้มเหลวที่นี่ (คุณสมบัตินี้ต้องการ webman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1)

การกำหนดกระบวนการบริโภคที่แตกต่างกันสำหรับคิวที่แตกต่างกัน

ตามค่าเริ่มต้น ผู้บริโภคทั้งหมดใช้กระบวนการบริโภคเดียวกัน แต่บางครั้งเราจำเป็นต้องทำให้การบริโภคของคิวบางอย่างแยกออกจากกัน เช่น ลดความเร็วในการบริโภคของธุรกิจบางอย่างแบบแยกกลุ่ม โดยการจัดกลุ่มธุรกิจที่รวดเร็วกับธุรกิจที่ช้า เราสามารถแยกผู้บริโภคเป็นสองไดเร็กทอรี เช่น app_path() . '/queue/redis/fast' และ app_path() . '/queue/redis/slow' (หมายเหตุ: ต้องทำการเปลี่ยนแปลง namespace ของคลาสบริโภคตามความเหมาะสม) โดยการกำหนดค่าจะมีลักษณะดังนี้:

return [
    ... ที่นี่เป็นการละเว้นการกำหนดค่าหอื่น ๆ ...

    'redis_consumer_fast'  => [ // key นี้เป็นชื่อที่กำหนดเอง ไม่มีการจำกัดรูปแบบ เช่น naming as redis_consumer_fast
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // ไดเร็กทอรีคลาสบริโภค
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // key นี้เป็นชื่อที่กำหนดเอง ไม่มีการจำกัดรูปแบบ เช่น naming as redis_consumer_slow
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // ไดเร็กทอรีคลาสบริโภค
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

เช่นนี้ผู้บริโภคสำหรับธุรกิจที่รวดเร็วจะถูกเก็บไว้ในไดเร็กทอรี queue/redis/fast และผู้บริโภคสำหรับธุรกิจที่ช้าจะถูกเก็บไว้ในไดเร็กทอรี queue/redis/slow เพื่อบรรลุวัตถุประสงค์ในการกำหนดกระบวนการบริโภคให้กับคิว

การกำหนดค่าหลาย Redis

การกำหนดค่า

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // รหัสผ่าน (ประเภทสตริง ตัวเลือก)
            'db' => 0,            // ฐานข้อมูล
            'max_attempts'  => 5, // จำนวนการลองใหม่เมื่อการบริโภคล้มเหลว
            'retry_seconds' => 5, // ระยะเวลาในการลองใหม่ (หน่วยเป็นวินาที)
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // รหัสผ่าน (ประเภทสตริง ตัวเลือก)
            'db' => 0,            // ฐานข้อมูล
            'max_attempts'  => 5, // จำนวนการลองใหม่เมื่อการบริโภคล้มเหลว
            'retry_seconds' => 5, // ระยะเวลาในการลองใหม่ (หน่วยเป็นวินาที)
        ]
    ],
];

หมายเหตุว่าการกำหนดค่าได้เพิ่มค่า other เป็นคีย์สำหรับการกำหนดค่า redis

การส่งข้อความหลาย Redis

// ส่งข้อความไปยังคิวที่มีคีย์เป็น `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// เท่ากับ
Client::send($queue, $data);
Redis::send($queue, $data);

// ส่งข้อความไปยังคิวที่มีคีย์เป็น `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

การบริโภคหลาย Redis

การตั้งค่าบริโภคควรมีการส่งข้อความไปยังคิวที่มีคีย์เป็น other

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // ชื่อคิวที่ต้องการบริโภค
    public $queue = 'send-mail';

    // === ที่นี่ตั้งเป็น other เพื่อบริโภคคิวที่มีคีย์เป็น other ===
    public $connection = 'other';

    // การบริโภค
    public function consume($data)
    {
        // ไม่ต้องทำการสุ่ม
        var_export($data);
    }
}

คำถามที่พบบ่อย

ทำไมถึงมีข้อผิดพลาด Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)

ข้อผิดพลาดนี้จะเกิดขึ้นเฉพาะในหน้าที่ส่งแบบอสมซิง Client::send() การส่งแบบอสมซิงจะบันทึกข้อความไว้ในหน่วยความจำภายในก่อน เมื่อกระบวนการว่าง ข้อความจะถูกส่งไปยัง redis หากความเร็วในการรับข้อความจาก redis ช้ากว่าความเร็วในการผลิตข้อความ หรือหากกระบวนการวุ่นอยู่กับธุรกิจอื่น ๆ ไม่สามารถซิงค์ข้อความในหน่วยความจำเข้าสู่ redis ได้ จะทำให้เกิดการหนาแน่นของข้อความ หากมีข้อความที่หนาแน่นเกิน 600 วินาที จะเกิดข้อผิดพลาดนี้ขึ้น

วิธีแก้ไข: ส่งข้อความโดยใช้ฟังก์ชันส่งแบบซิงโครนัส Redis::send()