Redis-Warteschlange
Eine auf Redis basierende Nachrichtenwarteschlange, die die verzögerte Verarbeitung von Nachrichten unterstützt.
Installation
composer require webman/redis-queue
Konfigurationsdatei
Die Redis-Konfigurationsdatei wird automatisch in {Hauptprojekt}/config/plugin/webman/redis-queue/redis.php
generiert. Der Inhalt sieht ungefähr so aus:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Passwort, optional
'db' => 0, // Datenbank
'max_attempts' => 5, // Anzahl der Wiederholungsversuche bei einem Verbrauchsfehler
'retry_seconds' => 5, // Wiederholungsintervall in Sekunden
]
],
];
Wiederholungsversuche bei Verbrauchsfehlern
Wenn der Verbrauch fehlschlägt (eine Ausnahme auftritt), wird die Nachricht in eine Verzögerungswarteschlange gelegt, um auf die nächste Wiederholung zu warten. Die Anzahl der Wiederholungsversuche wird durch den Parameter max_attempts
gesteuert, das Wiederholungsintervall wird von retry_seconds
und max_attempts
gemeinsam gesteuert. Beispielsweise beträgt bei max_attempts
5 und retry_seconds
10, das Intervall für den 1. Wiederholungsversuch 1*10
Sekunden, für den 2. Wiederholungsversuch 2*10
Sekunden, für den 3. Wiederholungsversuch 3*10
Sekunden usw., bis zu 5 Versuchen. Wenn die Anzahl der Wiederholungsversuche die Einstellung von max_attempts
überschreitet, wird die Nachricht in der Warteschlange mit dem Schlüssel {redis-queue}-failed
abgelegt.
Nachricht senden (synchron)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Warteschlangenname
$queue = 'send-mail';
// Daten, können direkt als Array übergeben werden, keine Serialisierung erforderlich
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Nachricht senden
Redis::send($queue, $data);
// Verzögerte Nachricht senden, die Nachricht wird in 60 Sekunden verarbeitet
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
Bei erfolgreichem Senden gibt Redis::send()
true zurück, andernfalls false oder es wird eine Ausnahme geworfen.
Hinweis
Die Verarbeitungszeit der verzögerten Warteschlange kann variieren, z.B. kann eine niedrigere Verarbeitungsgeschwindigkeit als die Produktionsgeschwindigkeit zu einer Stauung führen, was wiederum zu Verzögerungen bei der Verarbeitung führt. Eine Lösung besteht darin, mehrere Verarbeitungsprozesse zu starten.
Nachricht senden (asynchron)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Warteschlangenname
$queue = 'send-mail';
// Daten, können direkt als Array übergeben werden, keine Serialisierung erforderlich
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Nachricht senden
Client::send($queue, $data);
// Verzögerte Nachricht senden, die Nachricht wird in 60 Sekunden verarbeitet
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
hat keinen Rückgabewert. Es handelt sich um eine asynchrone Übertragung, die nicht garantiert, dass die Nachricht zu 100 % an Redis gesendet wird.
Hinweis
Der Grundsatz vonClient::send()
besteht darin, eine lokale In-Memory-Warteschlange zu erstellen, um die Nachrichten asynchron an Redis zu übertragen (die Übertragung ist sehr schnell, etwa 10.000 Nachrichten pro Sekunde). Wenn der Prozess neu gestartet wird und die Daten in der lokalen In-Memory-Warteschlange noch nicht vollständig synchronisiert sind, können Nachrichten verloren gehen. Die asynchrone Übertragung vonClient::send()
eignet sich für die Übertragung unwichtiger Nachrichten.Hinweis
Client::send()
ist asynchron und kann nur in der Ausführungsumgebung von Workerman verwendet werden. Für Befehlszeilenskripte verwenden Sie bitte die synchrone SchnittstelleRedis::send()
.
Nachrichten in anderen Projekten senden
Manchmal müssen Sie Nachrichten in anderen Projekten senden, können jedoch webman\redis-queue
nicht verwenden. In diesem Fall können Sie die folgende Funktion verwenden, um Nachrichten an die Warteschlange zu senden.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Hierbei ist der Parameter $redis
eine Redis-Instanz. Zum Beispiel, wie die Redis-Erweiterung verwendet wird:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Verbrauch
Die Konfigurationsdatei für den Verbrauchsprozess befindet sich in {Hauptprojekt}/config/plugin/webman/redis-queue/process.php
.
Der Verbraucherordner befindet sich in {Hauptprojekt}/app/queue/redis/
.
Führen Sie den Befehl php webman redis-queue:consumer my-send-mail
aus, um die Datei {Hauptprojekt}/app/queue/redis/MyMailSend.php
zu generieren.
Hinweis
Wenn der Befehl nicht vorhanden ist, kann die Datei auch manuell generiert werden.
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Der zu konsumierende Warteschlangenname
public $queue = 'send-mail';
// Verbindungsname, entspricht der Verbindung in plugin/webman/redis-queue/redis.php
public $connection = 'default';
// Verbrauch
public function consume($data)
{
// Keine Deserialisierung erforderlich
var_export($data); // Ausgabe ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// Callback bei Verbrauchsfehler
/*
$package = [
'id' => 1357277951, // Nachrichten-ID
'time' => 1709170510, // Nachrichtenzeit
'delay' => 0, // Verzögerungszeit
'attempts' => 2, // Anzahl der Verbräuche
'queue' => 'send-mail', // Warteschlangenname
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Nachrichteninhalt
'max_attempts' => 5, // Maximale Anzahl der Wiederholungsversuche
'error' => 'Fehlerinformation' // Fehlerinformation
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "Verbrauchsfehler\n";
echo $e->getMessage() . "\n";
// Keine Deserialisierung erforderlich
var_export($package);
}
}
Achtung
Ein Verbrauch wird als erfolgreich betrachtet, wenn während des Prozesses keine Ausnahmen oder Fehler auftreten, andernfalls gilt der Verbrauch als fehlgeschlagen und die Nachricht gelangt in die Wiederholungswarteschlange.
redis-queue hat keinen Bestätigungsmechanismus; Sie können es als automatische Bestätigung betrachten (wenn keine Ausnahme oder kein Fehler auftritt). Wenn Sie während des Verbrauchs eine Nachricht als nicht erfolgreich kennzeichnen möchten, können Sie manuell eine Ausnahme auslösen, damit die aktuelle Nachricht in die Wiederholungswarteschlange gelangt. Dies unterscheidet sich in der Tat nicht vom Bestätigungsmechanismus.Hinweis
Verbraucher unterstützen mehrere Server und mehrere Prozesse, und die gleiche Nachricht wird nicht mehrfach verarbeitet. Bereits verarbeitete Nachrichten werden automatisch aus der Warteschlange entfernt, eine manuelle Entfernung ist nicht erforderlich.Hinweis
Verbrauchsprozesse können gleichzeitig mehrere verschiedene Warteschlangen konsumieren. Für neue Warteschlangen ist keine Änderung in derprocess.php
-Konfiguration erforderlich. Wenn neue Verbraucher für Warteschlangen hinzugefügt werden sollen, muss lediglich eine entsprechendeConsumer
-Klasse imapp/queue/redis
-Ordner hinzugefügt werden, und die Klassenattribute$queue
müssen den Namen der zu konsumierenden Warteschlange angeben.Hinweis
Windows-Benutzer müssenphp windows.php
ausführen, um Webman zu starten, da sonst der Verbrauchsprozess nicht gestartet wird.Hinweis
Der CallbackonConsumeFailure
wird bei jedem Verbrauchsfehler ausgelöst, Sie können hier die Logik für das Fehlermanagement verarbeiten. (Dieses Feature erfordertwebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
)
Verschiedene Verbrauchsprozesse für verschiedene Warteschlangen festlegen
Standardmäßig teilen sich alle Verbraucher denselben Verbrauchsprozess. Manchmal müssen wir jedoch den Verbrauch bestimmter Warteschlangen separat behandeln, z.B. langsame Geschäftsprozesse in einer Gruppe von Prozessen konsumieren und schnelle Geschäftsprozesse in einer anderen Gruppe von Prozessen konsumieren. Zu diesem Zweck können wir die Verbraucher in zwei Ordner unterteilen, z.B. app_path() . '/queue/redis/fast'
und app_path() . '/queue/redis/slow'
(beachten Sie, dass der Namensraum der Verbrauchsklassen entsprechend geändert werden muss), die Konfiguration sieht wie folgt aus:
return [
...hier wurden andere Konfigurationen weggelassen...
'redis_consumer_fast' => [ // key ist benutzerdefiniert, es gibt kein Formatlimit, hier heißt es redis_consumer_fast
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Verbraucherklassendirectory
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // key ist benutzerdefiniert, es gibt kein Formatlimit, hier heißt es redis_consumer_slow
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Verbraucherklassendirectory
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
Auf diese Weise werden die schnellen Geschäftsverbraucher in das Verzeichnis queue/redis/fast
und die langsamen Geschäftsverbraucher in das Verzeichnis queue/redis/slow
gelegt, um den Warteschlangen spezifische Verbrauchsprozesse zuzuweisen.
Mehrere Redis-Konfigurationen
Konfiguration
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Passwort, vom Typ String, optional
'db' => 0, // Datenbank
'max_attempts' => 5, // Anzahl der Wiederholungsversuche bei einem Verbrauchsfehler
'retry_seconds' => 5, // Wiederholungsintervall in Sekunden
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Passwort, vom Typ String, optional
'db' => 0, // Datenbank
'max_attempts' => 5, // Anzahl der Wiederholungsversuche bei einem Verbrauchsfehler
'retry_seconds' => 5, // Wiederholungsintervall in Sekunden
]
],
];
Beachten Sie, dass eine Redis-Konfiguration mit dem Schlüssel other
hinzugefügt wurde.
Mehrere Redis-Nachrichten senden
// Nachrichten an die Warteschlange mit dem Schlüssel `default` senden
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Entspricht dem
Client::send($queue, $data);
Redis::send($queue, $data);
// Nachrichten an die Warteschlange mit dem Schlüssel `other` senden
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Verbraucher für mehrere Redis
Die Verbraucherkonfiguration sendet Nachrichten an die Warteschlange mit dem Schlüssel other
.
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Der zu konsumierende Warteschlangenname
public $queue = 'send-mail';
// === Hier auf other setzen, was bedeutet, dass die Warteschlange mit dem Schlüssel other in der Konfiguration konsumiert wird ===
public $connection = 'other';
// Verbrauch
public function consume($data)
{
// Keine Deserialisierung erforderlich
var_export($data);
}
}
Häufige Fragen
Warum gibt es den Fehler Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
?
Dieser Fehler tritt nur bei der asynchronen Sendeschnittstelle Client::send()
auf. Bei der asynchronen Übertragung werden die Nachrichten zunächst im lokalen Speicher gespeichert und bei Leerlauf des Prozesses an Redis gesendet. Wenn Redis langsamer als die Produktionsgeschwindigkeit Nachrichten empfängt oder der Prozess die gesamte Zeit mit anderen Aufgaben beschäftigt ist und nicht genügend Zeit hat, um die Nachrichten aus dem Speicher auf Redis zu synchronisieren, kommt es zu einer Stauung. Wenn die Stauung 600 Sekunden überschreitet, wird dieser Fehler ausgelöst.
Lösung: Verwenden Sie die synchrone Sendeschnittstelle Redis::send()
zum Senden von Nachrichten.