Redis-Warteschlange
Eine Nachrichtenwarteschlange basierend auf Redis, die die verzögerte Verarbeitung von Nachrichten unterstützt.
Installation
composer require webman/redis-queue
Konfigurationsdatei
Die Redis-Konfigurationsdatei wird automatisch unter config/plugin/webman/redis-queue/redis.php
generiert. Der Inhalt ähnelt dem folgenden Beispiel:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Passwort, optionales Feld
'db' => 0, // Datenbank
'max_attempts' => 5, // Anzahl der Wiederholungsversuche nach fehlgeschlagener Verarbeitung
'retry_seconds' => 5, // Wartezeit zwischen den Wiederholungsversuchen in Sekunden
]
],
];
Wiederholungsversuch bei fehlgeschlagener Verarbeitung
Wenn die Verarbeitung fehlschlägt (z. B. aufgrund einer Ausnahme), wird die Nachricht in eine verzögerte Warteschlange verschoben und auf den nächsten erneuten Versuch gewartet. Die Anzahl der Versuche wird durch den Parameter max_attempts
gesteuert, während das Intervall zwischen den Versuchen von retry_seconds
und max_attempts
gemeinsam gesteuert wird. Zum Beispiel, wenn max_attempts
5 und retry_seconds
10 beträgt, beträgt das Intervall für den ersten Wiederholungsversuch 1*10
Sekunden, für den zweiten Wiederholungsversuch 2*10
Sekunden, für den dritten Wiederholungsversuch 3*10
Sekunden, und so weiter, bis zu 5 Wiederholungsversuchen. Wenn die Anzahl der Wiederholungsversuche, die in max_attempts
festgelegt ist, überschritten wird, wird die Nachricht in die fehlgeschlagene Warteschlange mit dem Namen {redis-queue}-failed
verschoben.
Senden von Nachrichten (synchron)
Hinweis
Erfordert webman/redis >= 1.2.0 und abhängige Redis-Erweiterung
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Warteschlangenname
$queue = 'send-mail';
// Daten können direkt als Array übergeben werden, ohne Serialisierung
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Nachricht senden
Redis::send($queue, $data);
// Verzögerte Nachricht senden, die nach 60 Sekunden verarbeitet wird
Redis::send($queue, $data, 60);
return response('Redis-Warteschlangentest');
}
}
Redis::send()
gibt true
zurück, wenn das Senden erfolgreich war. Andernfalls wird false
zurückgegeben oder eine Ausnahme ausgelöst.
Hinweis
Die Verzögerung bei der Verarbeitung von Warteschlangen kann zu Ungenauigkeiten führen, beispielsweise wenn die Verarbeitungsgeschwindigkeit geringer ist als die Produktionsgeschwindigkeit, was zu Warteschlangenüberlastung und damit zu Verzögerungen bei der Verarbeitung führen kann. Ein möglicher Ansatz zur Linderung besteht darin, mehr Verarbeitungsprozesse zu starten.
Senden von Nachrichten (asynchron)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Warteschlangenname
$queue = 'send-mail';
// Daten können direkt als Array übergeben werden, ohne Serialisierung
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Nachricht senden
Client::send($queue, $data);
// Verzögerte Nachricht senden, die nach 60 Sekunden verarbeitet wird
Client::send($queue, $data, 60);
return response('Redis-Warteschlangentest');
}
}
Client::send()
gibt keine Rückgabewerte zurück, da es sich um einen asynchronen Aufruf handelt, der nicht garantiert, dass die Nachricht zu 100 Prozent an Redis übermittelt wird.
Hinweis
Das Funktionsprinzip vonClient::send()
besteht darin, eine interne Speicherwarteschlange zu erstellen und die Nachrichten asynchron zu Redis zu übertragen (die Übertragungsgeschwindigkeit ist sehr hoch, etwa 10.000 Nachrichten pro Sekunde). Wenn der Prozess neu gestartet wird und die Nachrichten in der internen Speicherwarteschlange noch nicht vollständig übertragen wurden, geht die Nachricht verloren.Client::send()
eignet sich daher für die asynchrone Übermittlung von unwichtigen Nachrichten.Hinweis
Client::send()
ist asynchron und kann nur in der Workerman-Ausführungsumgebung verwendet werden. Für Befehlszeilenskripte sollte die synchrone SchnittstelleRedis::send()
verwendet werden.
Senden von Nachrichten in anderen Projekten
Manchmal müssen Sie Nachrichten in anderen Projekten senden und können webman/redis-queue
nicht verwenden. In diesem Fall können Sie die folgende Funktion als Beispiel verwenden, um Nachrichten an die Warteschlange zu senden.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Hierbei ist $redis
die Redis-Instanz. Ein Beispiel für die Verwendung der Redis-Erweiterung sieht wie folgt aus:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Verbrauch
Die Konfigurationsdatei des Verbraucherprozesses befindet sich unter config/plugin/webman/redis-queue/process.php
. Die Verbraucherklassen befinden sich im Verzeichnis app/queue/redis/
.
Wenn der Befehl php webman redis-queue:consumer my-send-mail
ausgeführt wird, wird die Datei app/queue/redis/MyMailSend.php
generiert.
Hinweis
Wenn der Befehl nicht vorhanden ist, können Sie die Verbraucherklasse auch manuell erstellen.
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Name der zu verarbeitenden Warteschlange
public $queue = 'send-mail';
// Name der Verbindung, die der Konfiguration in `plugin/webman/redis-queue/redis.php` entspricht
public $connection = 'default';
// Verarbeitung
public function consume($data)
{
// Keine Notwendigkeit das zu deserialisieren
var_export($data); // Gibt ['to' => 'tom@gmail.com', 'content' => 'hello'] aus
}
}
Hinweis
Für den Verbrauchsvorgang gilt: Wenn keine Ausnahme oder Fehler aufgetreten sind, gilt der Verbrauch als erfolgreich. Andernfalls wird der Verbrauch als fehlgeschlagen betrachtet und die Nachricht wird in die Wiederholungswarteschlange verschoben.
Redis-Queue hat keine ACK-Mechanismus. Sie können es als automatisches ACK (bei Abwesenheit von Ausnahme oder Fehler) betrachten. Wenn Sie während des Verbrauchsvorgangs markieren möchten, dass die aktuelle Nachricht nicht erfolgreich verarbeitet wurde, können Sie manuell eine Ausnahme werfen, um die aktuelle Nachricht in die Wiederholungswarteschlange zu verschieben. Dies unterscheidet sich faktisch nicht von einem ACK-Mechanismus.Hinweis
Der Verbraucher unterstützt mehrere Server und mehrere Prozesse, und dieselbe Nachricht wird nicht doppelt verbraucht. Verarbeitete Nachrichten werden automatisch aus der Warteschlange entfernt und müssen nicht manuell gelöscht werden.Hinweis
Der Verbraucherprozess kann gleichzeitig verschiedene Warteschlangen verbrauchen. Das Hinzufügen einer neuen Warteschlange erfordert keine Änderung in der Konfiguration vonprocess.php
. Es muss lediglich eine entsprechendeConsumer
-Klasse im Verzeichnisapp/queue/redis
hinzugefügt werden und der Name der zu verarbeitenden Warteschlange mit der Klassenvariable$queue
angegeben werden.Hinweis
Windows-Benutzer müssenphp windows.php
ausführen, um webman zu starten, anderfalls wird der Verbraucherprozess nicht gestartet
Separates Zuweisen von Verarbeitungsprozessen für verschiedene Warteschlangen
Standardmäßig teilen sich alle Verbraucher denselben Verbraucherprozess. Manchmal möchten wir jedoch einige Warteschlangen in getrennten Verarbeitungsprozessen verarbeiten, z. B. langsame Geschäftsvorgänge in einer Gruppe von Prozessen und schnelle Geschäftsvorfälle in einer anderen Gruppe. Hierfür können Verbraucher in zwei Verzeichnissen unterteilt werden, z. B. app_path() . '/queue/redis/fast'
und app_path() . '/queue/redis/slow'
(beachten Sie, dass der Namensraum der Verbrauchsklasse entsprechend angepasst werden muss). Die Konfiguration sieht dann wie folgt aus:
return [
...Andere Konfigurationen hier ausgelassen...
'redis_consumer_fast' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Verzeichnis für Verbraucherklassen
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Verzeichnis für Verbraucherklassen
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
Durch die Kategorisierung der Verbraucherklassen und die entsprechende Konfiguration können wir leicht unterschiedliche Verarbeitungsprozesse für verschiedene Verbraucher festlegen.
Mehrfach Redis-Konfiguration
Konfiguration
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Passwort, Zeichenfolgentyp, optionaler Parameter
'db' => 0, // Datenbank
'max_attempts' => 5, // Nach dem Verbrauchsfehler Wiederholungsversuche
'retry_seconds' => 5, // Wiederholungsintervall in Sekunden
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Passwort, Zeichenfolgentyp, optionaler Parameter
'db' => 0, // Datenbank
'max_attempts' => 5, // Nach dem Verbrauchsfehler Wiederholungsversuche
'retry_seconds' => 5, // Wiederholungsintervall in Sekunden
]
],
];
Beachten Sie, dass der Konfiguration ein zusätzliches other
als Schlüssel für die Redis-Konfiguration hinzugefügt wurde.
Mehrfaches Redis Message Publishing
// Nachrichten an die Warteschlange mit dem Schlüssel `default` senden
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Entsprechend
Client::send($queue, $data);
Redis::send($queue, $data);
// Nachrichten an die Warteschlange mit dem Schlüssel `other` senden
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Mehrfacher Redis-Verbrauch
Verbrauchen Sie Nachrichten aus der Warteschlange, die mit dem Schlüssel other
in der Konfiguration definiert ist.
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Zu konsumierende Warteschlangenname
public $queue = 'send-mail';
// === Hier wird `other` eingestellt, um die Warteschlange zu konsumieren, die in der Konfiguration mit `other` als Schlüssel definiert ist ===
public $connection = 'other';
// Konsumieren
public function consume($data)
{
// Keine Deserialisierung erforderlich
var_export($data);
}
}
Häufig gestellte Fragen
Warum erhalte ich den Fehler "Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)"?
Dieser Fehler tritt nur in der asynchronen Sendeschnittstelle Client::send()
auf. Bei asynchronem Senden wird die Nachricht zunächst im lokalen Speicher gespeichert und bei Leerlauf des Prozesses an Redis gesendet. Wenn die Geschwindigkeit, mit der Redis die Nachrichten empfängt, langsamer ist als die Produktionsgeschwindigkeit der Nachrichten, oder wenn der Prozess kontinuierlich mit anderen Geschäften beschäftigt ist und nicht genügend Zeit hat, um die im Speicher befindlichen Nachrichten mit Redis zu synchronisieren, kann es zu einer Verstopfung kommen. Wenn eine Verstopfung länger als 600 Sekunden besteht, wird dieser Fehler ausgelöst.
Lösungsansatz: Verwenden Sie die synchrone Sendeschnittstelle Redis::send()
zum Senden von Nachrichten.