Redisキュー

Redisに基づくメッセージキューで、メッセージの遅延処理をサポートしています。

インストール

composer require webman/redis-queue

設定ファイル

Redisの設定ファイルは自動生成され、{メインプロジェクト}/config/plugin/webman/redis-queue/redis.phpに保存されます。内容は以下のようになります:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // パスワード(オプション)
            'db' => 0,            // データベース
            'max_attempts'  => 5, // 消費失敗後の再試行回数
            'retry_seconds' => 5, // 再試行の間隔(秒単位)
        ]
    ],
];

消費失敗の再試行

消費が失敗した場合(例外が発生した場合)、メッセージは遅延キューに投入され、次回の再試行を待機します。再試行回数はmax_attemptsパラメータで制御され、再試行の間隔はretry_secondsmax_attemptsによって共同で制御されます。例えば、max_attemptsが5で、retry_secondsが10の場合、第1回の再試行の間隔は1*10秒、第2回の再試行の間隔は2*10秒、第3回の再試行の間隔は3*10秒、というように続き、5回の再試行に達するまで続きます。max_attemptsで設定された再試行回数を超えると、メッセージはキーが{redis-queue}-failedの失敗ク queue に保存されます。

メッセージの投げる(同期)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // キュー名
        $queue = 'send-mail';
        // データ:配列をそのまま渡すことができ、シリアライズは不要
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // メッセージを投げる
        Redis::send($queue, $data);
        // 遅延メッセージを投げる:メッセージは60秒後に処理されます
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }

}

投げるのが成功したらRedis::send()はtrueを返し、そうでなければfalseまたは例外がスローされます。

ヒント
遅延キューの消費時間には誤差が生じる可能性があります。たとえば、消費速度が生産速度より遅くなると、キューが詰まり、消費が遅延する原因となります。この問題を軽減する方法は、消費プロセスをいくつか増やすことです。

メッセージの投げる(非同期)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // キュー名
        $queue = 'send-mail';
        // データ:配列をそのまま渡すことができ、シリアライズは不要
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // メッセージを投げる
        Client::send($queue, $data);
        // 遅延メッセージを投げる:メッセージは60秒後に処理されます
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Client::send()は返り値がなく、非同期プッシュに属し、メッセージが100%Redisに届く保証はありません。

ヒント
Client::send()の原理は、ローカルメモリにメモリキューを構築し、非同期でメッセージをRedisに同期することです(同期速度は非常に速く、毎秒約1万件のメッセージを処理します)。プロセスが再起動すると、ローカルメモリ内のキューのデータが同期されていない場合、メッセージが失われる可能性があります。Client::send()の非同期投げは、重要でないメッセージの投げに適しています。

ヒント
Client::send()は非同期で、workermanの実行環境内でのみ使用できます。コマンドラインスクリプトでは同期インターフェイスRedis::send()を使用してください。

他のプロジェクトでメッセージを投げる

時には、他のプロジェクトでメッセージを投げる必要があり、webman\redis-queueを使用できない場合、次の関数を参照してキューにメッセージを投げることができます。

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

ここで、パラメータ$redisはredisインスタンスを示します。たとえば、redis拡張の使用法は以下のようになります:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

消費

消費プロセス設定ファイルは{メインプロジェクト}/config/plugin/webman/redis-queue/process.phpにあります。
消費者のディレクトリは{メインプロジェクト}/app/queue/redis/の下にあります。

コマンドphp webman redis-queue:consumer my-send-mailを実行すると、ファイル{メインプロジェクト}/app/queue/redis/MyMailSend.phpが生成されます。

ヒント
コマンドが存在しない場合は、手動で生成することもできます。

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // 消費するキュー名
    public $queue = 'send-mail';

    // 接続名(plugin/webman/redis-queue/redis.php内の接続に対応)
    public $connection = 'default';

    // 消費
    public function consume($data)
    {
        // シリアライズは不要
        var_export($data); // 出力 ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // 消費失敗のコールバック
    /* 
    $package = [
        'id' => 1357277951, // メッセージID
        'time' => 1709170510, // メッセージ時間
        'delay' => 0, // 遅延時間
        'attempts' => 2, // 消費回数
        'queue' => 'send-mail', // キュー名
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // メッセージ内容
        'max_attempts' => 5, // 最大再試行回数
        'error' => 'エラーメッセージ' // エラーメッセージ
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "consume failure\n";
        echo $e->getMessage() . "\n";
        // シリアライズは不要
        var_export($package); 
    }
}

注意
消費の過程で例外やエラーが発生しなかった場合は消費成功と見なされます。それ以外は消費失敗と見なされ、再試行キューに入ります。
redis-queueにはackメカニズムが存在しません。これを自動ackと見なすことができます(例外やエラーが発生しなかった場合)。消費過程で現在のメッセージを消費不成功としてマークしたい場合は、手動で例外をスローし、現在のメッセージを再試行キューに入れてください。これは実際にはackメカニズムとは異なりません。

ヒント
消費者は複数のサーバーおよびプロセスをサポートし、同一のメッセージは重複して消費されません。消費されたメッセージはキューから自動的に削除され、手動で削除する必要はありません。

ヒント
消費プロセスは同時に複数の異なるキューを消費できます。新しいキューを追加する際、process.php内の設定を変更する必要はなく、app/queue/redisの下に対応するConsumerクラスを新たに追加し、クラスプロパティ$queueを使用して消費するキュー名を指定するだけです。

ヒント
Windowsユーザーは、php windows.phpを実行してwebmanを起動する必要があります。そうしないと、消費プロセスは起動しません。

ヒント
onConsumeFailureコールバックは、消費が失敗するたびにトリガーされます。ここで失敗後のロジックを処理することができます。(この機能はwebman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1が必要です)

異なるキューに異なる消費プロセスを設定する

デフォルトでは、すべての消費者は同じ消費プロセスを共有します。しかし、あるキューの消費を独立させたいことがあります。例えば、消費が遅いビジネスを1セットのプロセスで消費し、消費が早いビジネスを別のセットのプロセスで消費するようにします。このために、消費者を2つのディレクトリに分けることができます。例:app_path() . '/queue/redis/fast'app_path() . '/queue/redis/slow'(消費者クラスの名前空間も適宜変更する必要があります)。設定は以下のようになります:

return [
    ...他の設定は省略...

    'redis_consumer_fast'  => [ // キーはカスタムで、フォーマット制限はありません。ここではredis_consumer_fastと名付けました
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // 消費者クラスディレクトリ
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // キーはカスタムで、フォーマット制限はありません。ここではredis_consumer_slowと名付けました
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // 消費者クラスディレクトリ
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

これにより、高速ビジネスの消費者はqueue/redis/fastディレクトリに置かれ、低速ビジネスの消費者はqueue/redis/slowディレクトリに置かれることで、キューに指定された消費プロセスの目的が達成されます。

複数のRedis設定

設定

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // パスワード(文字列型、オプション)
            'db' => 0,            // データベース
            'max_attempts'  => 5, // 消費失敗後の再試行回数
            'retry_seconds' => 5, // 再試行の間隔(秒単位)
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // パスワード(文字列型、オプション)
            'db' => 0,             // データベース
            'max_attempts'  => 5, // 消費失敗後の再試行回数
            'retry_seconds' => 5, // 再試行の間隔(秒単位)
        ]
    ],
];

ここで、設定にotherというキーのRedis設定が追加されました。

複数のRedisにメッセージを投げる

// `default`をキーとするキューにメッセージを投げる
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// 同様に
Client::send($queue, $data);
Redis::send($queue, $data);

// `other`をキーとするキューにメッセージを投げる
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

複数のRedisで消費

消費設定の中でotherをキーとするキューのメッセージを消費します。

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // 消費するキュー名
    public $queue = 'send-mail';

    // === ここでotherを設定し、設定内でotherをキーとするキューを消費します ===
    public $connection = 'other';

    // 消費
    public function consume($data)
    {
        // シリアライズは不要
        var_export($data);
    }
}

よくある質問

なぜ Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds) というエラーが表示されるのか?

このエラーは非同期投与インターフェースClient::send()にのみ存在します。非同期投与は、まずメッセージをローカルメモリに保存し、プロセスが空いている時にメッセージをRedisに送信します。もしRedisの受信速度がメッセージ生産速度より遅い場合、またはプロセスが他の作業で常に忙しく、メモリのメッセージをRedisに同期させる十分な時間がない場合、メッセージが圧迫されることになります。もし圧迫されたメッセージが600秒を超えると、このエラーがトリガーされます。

解決策:メッセージを投げる際に同期投与インターフェースRedis::send()を使用してください。