Redis คิว
คิวข้อความที่ใช้ Redis เป็นตัวจัดการ สนับสนุนการจัดการข้อความแบบล่าช้า
การติดตั้ง
composer require webman/redis-queue
ไฟล์การกำหนดค่า
ไฟล์การกำหนดค่า Redis จะถูกสร้างขึ้นอัตโนมัติที่ {โปรเจกต์หลัก}/config/plugin/webman/redis-queue/redis.php
เนื้อหาจะคล้ายกับดังนี้:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // รหัสผ่าน (ตัวเลือก)
'db' => 0, // ฐานข้อมูล
'max_attempts' => 5, // จำนวนการลองใหม่เมื่อการบริโภคล้มเหลว
'retry_seconds' => 5, // ระยะเวลาในการลองใหม่ (หน่วยเป็นวินาที)
]
],
];
การลองใหม่เมื่อการบริโภคล้มเหลว
หากการบริโภคล้มเหลว (เกิดข้อยกเว้น) ข้อความจะถูกนำไปไว้ในคิวล่าช้า รอการลองใหม่ครั้งถัดไป จำนวนการลองใหม่ควบคุมโดยพารามิเตอร์ max_attempts
และระยะเวลาในการลองใหม่จะควบคุมร่วมกันโดย retry_seconds
และ max_attempts
ตัวอย่างเช่น หาก max_attempts
เป็น 5 และ retry_seconds
เป็น 10 ระยะเวลาในการลองใหม่ครั้งที่ 1 จะเป็น 1*10
วินาที ระยะเวลาในการลองใหม่ครั้งที่ 2 จะเป็น 2*10
วินาที ระยะเวลาในการลองใหม่ครั้งที่ 3 จะเป็น 3*10
วินาที และต่อ ๆ ไปจนกว่าจะลองใหม่ครบ 5 ครั้ง หากเกินการตั้งค่าจำนวนการลองใหม่ max_attempts
ข้อความจะถูกนำไปไว้ในคิวความล้มเหลวที่มีคีย์เป็น {redis-queue}-failed
ส่งข้อความ (ซิงโครนัส)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// ชื่อคิว
$queue = 'send-mail';
// ข้อมูล สามารถส่งอาเรย์ได้โดยตรง โดยไม่ต้องทำการสุ่ม
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// ส่งข้อความ
Redis::send($queue, $data);
// ส่งข้อความแบบล่าช้า ข้อความจะถูกประมวลผลภายใน 60 วินาที
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
หากส่งข้อความสำเร็จ Redis::send()
จะส่งค่ากลับเป็น true หากไม่สำเร็จจะส่งค่ากลับ false หรือโยนข้อยกเว้น
คำแนะนำ
เวลาการบริโภคจากคิวล่าช้าอาจเกิดความคลาดเคลื่อน เช่น ความเร็วในการบริโภคน้อยกว่าความเร็วในการผลิตทำให้คิวล่าช้า วิธีบรรเทาคือการเปิดกระบวนการบริโภคหลาย ๆ ตัว
ส่งข้อความ (อสมซิง)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// ชื่อคิว
$queue = 'send-mail';
// ข้อมูล สามารถส่งอาเรย์ได้โดยตรง โดยไม่ต้องทำการสุ่ม
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// ส่งข้อความ
Client::send($queue, $data);
// ส่งข้อความแบบล่าช้า ข้อความจะถูกประมวลผลภายใน 60 วินาที
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
ไม่มีค่าที่ส่งกลับ มันเป็นการส่งแบบอสมซิง ซึ่งไม่รับประกันว่าข้อความจะถูกส่งไปยัง redis อย่างเต็มที่ 100%
คำแนะนำ
หลักการของClient::send()
คือการสร้างคิวในหน่วยความจำภายในและส่งข้อความไปยัง redis แบบอสมซิง (ความเร็วในการส่งซิงค์ อย่างรวดเร็ว ประมาณ 10,000 ข้อความต่อวินาที) หากรีสตราซิสทำงานใหม่และข้อมูลในคิวภายในในหน่วยความจำยังไม่ได้ซิงค์ จะทำให้ข้อความสูญหาย การส่งข้อความแบบอสมซิงของClient::send()
เหมาะสำหรับการส่งข้อความที่ไม่สำคัญคำแนะนำ
Client::send()
เป็นแบบอสมซิง ซึ่งสามารถใช้ได้เฉพาะในสภาพแวดล้อมการทำงานของ workerman สำหรับสคริปต์ที่รันใน command line โปรดใช้การเชื่อมต่อซิงโครนัสRedis::send()
การส่งข้อความในโปรเจกต์อื่น
บางครั้งคุณต้องการส่งข้อความในโปรเจกต์อื่นและไม่สามารถใช้ webman\redis-queue
ได้ สามารถดูฟังก์ชันต่อไปนี้เพื่อนส่งข้อความไปยังคิว
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
ในนั้น พารามิเตอร์ $redis
คืออินสแตนซ์ของ redis ตัวอย่างการใช้สำหรับการขยาย redis มีดังนี้:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
การบริโภค
ไฟล์การกำหนดค่ากระบวนการบริโภคอยู่ที่ {โปรเจกต์หลัก}/config/plugin/webman/redis-queue/process.php
ไดเร็กทอรีบริโภคอยู่ที่ {โปรเจกต์หลัก}/app/queue/redis/
การรันคำสั่ง php webman redis-queue:consumer my-send-mail
จะสร้างไฟล์ {โปรเจกต์หลัก}/app/queue/redis/MyMailSend.php
คำแนะนำ
หากคำสั่งไม่สามารถใช้งานได้สามารถสร้างไฟล์ด้วยตนเองได้
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// ชื่อคิวที่ต้องการบริโภค
public $queue = 'send-mail';
// ชื่อการเชื่อมต่อ ตรงกับการเชื่อมต่อในไฟล์ plugin/webman/redis-queue/redis.php
public $connection = 'default';
// การบริโภค
public function consume($data)
{
// ไม่ต้องทำการสุ่ม
var_export($data); // แสดง ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// การเรียกกลับเมื่อการบริโภคล้มเหลว
/*
$package = [
'id' => 1357277951, // ID ข้อความ
'time' => 1709170510, // เวลาข้อความ
'delay' => 0, // เวลาในการล่าช้า
'attempts' => 2, // จำนวนครั้งที่บริโภค
'queue' => 'send-mail', // ชื่อคิว
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // ข้อมูลข้อความ
'max_attempts' => 5, // จำนวนการลองสูงสุด
'error' => 'ข้อความผิดพลาด' // ข้อความผิดพลาด
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "บริโภคล้มเหลว\n";
echo $e->getMessage() . "\n";
// ไม่ต้องทำการสุ่ม
var_export($package);
}
}
หมายเหตุ
หากไม่มีข้อยกเว้นหรือ Error ในระหว่างการบริโภคจะถือว่าการบริโภคสำเร็จ มิฉะนั้นการบริโภคจะล้มเหลวและเข้าสู่คิวลองใหม่
redis-queue ไม่มีกลไก ack คุณสามารถมองว่ามันเป็นการรับรองขั้นต่ำ (ไม่มีข้อยกเว้นหรือ Error) หากคุณต้องการทำเครื่องหมายว่าข้อความปัจจุบันไม่สำเร็จในการบริโภค คุณสามารถโยนข้อยกเว้นแบบแมนนวลเพื่อให้ข้อความปัจจุบันเข้าสู่คิวลองใหม่ จริง ๆ แล้วมันไม่มีความแตกต่างจากกลไก ackคำแนะนำ
ผู้บริโภคสนับสนุนการทำงานกับเซิร์ฟเวอร์และกระบวนการหลาย ๆ ตัว และข้อความเดียวกัน จะไม่ ถูกบริโภคซ้ำ ข้อความที่บริโภคแล้วจะถูกลบออกจากคิวโดยอัตโนมัติ ไม่จำเป็นต้องลบด้วยตนเองคำแนะนำ
กระบวนการบริโภคสามารถบริโภคคิวที่แตกต่างกันหลาย ๆ คิวพร้อมกัน การเพิ่มคิวใหม่ไม่จำเป็นต้องแก้ไขการกำหนดค่าในprocess.php
เพียงแค่เพิ่มคลาสConsumer
ที่สอดคล้องในapp/queue/redis
และใช้คุณสมบัติคลาส$queue
เพื่อระบุชื่อคิวที่จะบริโภคคำแนะนำ
ผู้ใช้ Windows ควรรันphp windows.php
เพื่อเริ่มต้น webman มิฉะนั้นจะไม่สามารถเริ่มกระบวนการบริโภคได้คำแนะนำ
การเรียกกลับ onConsumeFailure จะถูกเรียกใช้ทุกครั้งที่บริโภคล้มเหลว คุณสามารถจัดการตรรกะหลังจากล้มเหลวที่นี่ (คุณสมบัตินี้ต้องการwebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
)
การกำหนดกระบวนการบริโภคที่แตกต่างกันสำหรับคิวที่แตกต่างกัน
ตามค่าเริ่มต้น ผู้บริโภคทั้งหมดใช้กระบวนการบริโภคเดียวกัน แต่บางครั้งเราจำเป็นต้องทำให้การบริโภคของคิวบางอย่างแยกออกจากกัน เช่น ลดความเร็วในการบริโภคของธุรกิจบางอย่างแบบแยกกลุ่ม โดยการจัดกลุ่มธุรกิจที่รวดเร็วกับธุรกิจที่ช้า เราสามารถแยกผู้บริโภคเป็นสองไดเร็กทอรี เช่น app_path() . '/queue/redis/fast'
และ app_path() . '/queue/redis/slow'
(หมายเหตุ: ต้องทำการเปลี่ยนแปลง namespace ของคลาสบริโภคตามความเหมาะสม) โดยการกำหนดค่าจะมีลักษณะดังนี้:
return [
... ที่นี่เป็นการละเว้นการกำหนดค่าหอื่น ๆ ...
'redis_consumer_fast' => [ // key นี้เป็นชื่อที่กำหนดเอง ไม่มีการจำกัดรูปแบบ เช่น naming as redis_consumer_fast
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// ไดเร็กทอรีคลาสบริโภค
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // key นี้เป็นชื่อที่กำหนดเอง ไม่มีการจำกัดรูปแบบ เช่น naming as redis_consumer_slow
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// ไดเร็กทอรีคลาสบริโภค
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
เช่นนี้ผู้บริโภคสำหรับธุรกิจที่รวดเร็วจะถูกเก็บไว้ในไดเร็กทอรี queue/redis/fast
และผู้บริโภคสำหรับธุรกิจที่ช้าจะถูกเก็บไว้ในไดเร็กทอรี queue/redis/slow
เพื่อบรรลุวัตถุประสงค์ในการกำหนดกระบวนการบริโภคให้กับคิว
การกำหนดค่าหลาย Redis
การกำหนดค่า
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // รหัสผ่าน (ประเภทสตริง ตัวเลือก)
'db' => 0, // ฐานข้อมูล
'max_attempts' => 5, // จำนวนการลองใหม่เมื่อการบริโภคล้มเหลว
'retry_seconds' => 5, // ระยะเวลาในการลองใหม่ (หน่วยเป็นวินาที)
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // รหัสผ่าน (ประเภทสตริง ตัวเลือก)
'db' => 0, // ฐานข้อมูล
'max_attempts' => 5, // จำนวนการลองใหม่เมื่อการบริโภคล้มเหลว
'retry_seconds' => 5, // ระยะเวลาในการลองใหม่ (หน่วยเป็นวินาที)
]
],
];
หมายเหตุว่าการกำหนดค่าได้เพิ่มค่า other
เป็นคีย์สำหรับการกำหนดค่า redis
การส่งข้อความหลาย Redis
// ส่งข้อความไปยังคิวที่มีคีย์เป็น `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// เท่ากับ
Client::send($queue, $data);
Redis::send($queue, $data);
// ส่งข้อความไปยังคิวที่มีคีย์เป็น `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
การบริโภคหลาย Redis
การตั้งค่าบริโภคควรมีการส่งข้อความไปยังคิวที่มีคีย์เป็น other
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// ชื่อคิวที่ต้องการบริโภค
public $queue = 'send-mail';
// === ที่นี่ตั้งเป็น other เพื่อบริโภคคิวที่มีคีย์เป็น other ===
public $connection = 'other';
// การบริโภค
public function consume($data)
{
// ไม่ต้องทำการสุ่ม
var_export($data);
}
}
คำถามที่พบบ่อย
ทำไมถึงมีข้อผิดพลาด Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
ข้อผิดพลาดนี้จะเกิดขึ้นเฉพาะในหน้าที่ส่งแบบอสมซิง Client::send()
การส่งแบบอสมซิงจะบันทึกข้อความไว้ในหน่วยความจำภายในก่อน เมื่อกระบวนการว่าง ข้อความจะถูกส่งไปยัง redis หากความเร็วในการรับข้อความจาก redis ช้ากว่าความเร็วในการผลิตข้อความ หรือหากกระบวนการวุ่นอยู่กับธุรกิจอื่น ๆ ไม่สามารถซิงค์ข้อความในหน่วยความจำเข้าสู่ redis ได้ จะทำให้เกิดการหนาแน่นของข้อความ หากมีข้อความที่หนาแน่นเกิน 600 วินาที จะเกิดข้อผิดพลาดนี้ขึ้น
วิธีแก้ไข: ส่งข้อความโดยใช้ฟังก์ชันส่งแบบซิงโครนัส Redis::send()