Redis 큐

Redis 기반의 메시지 큐로, 메시지 지연 처리를 지원합니다.

설치

composer require webman/redis-queue

구성 파일

Redis 구성 파일은 {주 프로젝트}/config/plugin/webman/redis-queue/redis.php에 자동으로 생성되며, 내용은 다음과 유사합니다:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // 비밀번호, 선택 사항
            'db' => 0,            // 데이터베이스
            'max_attempts'  => 5, // 소비 실패 후, 재시도 횟수
            'retry_seconds' => 5, // 재시도 간격, 초 단위
        ]
    ],
];

소비 실패 재시도

소비에 실패하면(예외가 발생한 경우) 메시지는 지연 큐에 배치되어 다음 재시도를 기다립니다. 재시도 횟수는 max_attempts 매개변수로 제어되며, 재시도 간격은 retry_secondsmax_attempts가 함께 제어합니다. 예를 들어, max_attempts가 5이고, retry_seconds가 10인 경우, 첫 번째 재시도 간격은 1*10초, 두 번째 재시도 간격은 2*10초, 세 번째 재시도 간격은 3*10초로 설정되며, 이렇게 계속해서 5회 재시도하게 됩니다. max_attempts에서 설정한 재시도 횟수를 초과하면 메시지는 키 {redis-queue}-failed의 실패 큐에 저장됩니다.

메시지 발송(동기)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // 큐 이름
        $queue = 'send-mail';
        // 데이터, 배열을 직접 전달할 수 있으며, 직렬화 필요 없음
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // 메시지 발송
        Redis::send($queue, $data);
        // 지연 메시지 발송, 메시지는 60초 후에 처리됨
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }

}

발송 성공 시 Redis::send()는 true를 반환하며, 그렇지 않으면 false를 반환하거나 예외를 발생시킵니다.


지연 큐 소비 시간에 차이가 발생할 수 있습니다. 소비 속도가 생산 속도보다 느릴 경우 큐가 쌓이고, 이로 인해 소비 지연이 발생할 수 있습니다. 완화 방법은 소비 프로세스를 여러 개 실행하는 것입니다.

메시지 발송(비동기)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // 큐 이름
        $queue = 'send-mail';
        // 데이터, 배열을 직접 전달할 수 있으며, 직렬화 필요 없음
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // 메시지 발송
        Client::send($queue, $data);
        // 지연 메시지 발송, 메시지는 60초 후에 처리됨
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Client::send()는 반환값이 없으며, 비동기 푸시입니다. 이 방법은 메시지가 100% Redis에 도착한다는 보장을 하지 않습니다.


Client::send()의 원리는 로컬 메모리에 메모리 큐를 설정하고, 비동기적으로 메시지를 Redis에 동기화하는 것입니다(동기화 속도가 매우 빠르며, 초당 약 10,000건의 메시지를 처리할 수 있습니다). 만약 프로세스가 재시작되고, 로컬 메모리 큐에 데이터가 동기화되지 않은 경우 메시지가 손실될 수 있습니다. Client::send() 비동기 발송은 중요하지 않은 메시지를 발송하는 데 적합합니다.


Client::send()는 비동기 방식으로, workerman 실행 환경에서만 사용할 수 있으며, 명령줄 스크립트에서는 동기 인터페이스 Redis::send()를 사용해야 합니다.

다른 프로젝트에서 메시지 발송

가끔 다른 프로젝트에서 메시지를 발송해야 하지만 webman\redis-queue를 사용할 수 없는 경우, 다음 함수를 참조하여 큐에 메시지를 발송할 수 있습니다.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

매개변수 $redis는 Redis 인스턴스를 의미합니다. 예를 들어 Redis 확장 사용법은 다음과 유사합니다:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

소비

소비 프로세스 구성 파일은 {주 프로젝트}/config/plugin/webman/redis-queue/process.php에 있습니다.
소비자 디렉토리는 {주 프로젝트}/app/queue/redis/ 아래에 있습니다.

명령 php webman redis-queue:consumer my-send-mail를 실행하면 {주 프로젝트}/app/queue/redis/MyMailSend.php 파일이 생성됩니다.


명령이 존재하지 않더라도 수동으로 생성할 수 있습니다.

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // 소비할 큐 이름
    public $queue = 'send-mail';

    // 연결명, plugin/webman/redis-queue/redis.php의 연결에 해당
    public $connection = 'default';

    // 소비
    public function consume($data)
    {
        // 반직렬화 필요 없음
        var_export($data); // 출력 ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // 소비 실패 콜백
    /* 
    $package = [
        'id' => 1357277951, // 메시지 ID
        'time' => 1709170510, // 메시지 시간
        'delay' => 0, // 지연 시간
        'attempts' => 2, // 소비 횟수
        'queue' => 'send-mail', // 큐 이름
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // 메시지 내용
        'max_attempts' => 5, // 최대 재시도 횟수
        'error' => '오류 정보' // 오류 정보
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "소비 실패\n";
        echo $e->getMessage() . "\n";
        // 반직렬화 필요 없음
        var_export($package); 
    }
}

주의
소비 과정에서 예외나 Error가 발생하지 않으면 소비 성공으로 간주됩니다. 그렇지 않으면 소비 실패로 간주되어 재시도 큐로 들어갑니다.
redis-queue는 ack 메커니즘이 없으며 자동 ack(예외나 Error가 발생하지 않음)으로 간주할 수 있습니다. 소비 중 현재 메시지를 소비 실패로 표시하고자 한다면 수동으로 예외를 발생시켜야 하며, 그렇게 되면 현재 메시지는 재시도 큐로 들어갑니다. 이는 사실상 ack 메커니즘과 동일합니다.


소비자는 다중 서버 및 다중 프로세스를 지원하며, 동일한 메시지는 중복 소비되지 않습니다. 소비된 메시지는 자동으로 큐에서 삭제되므로 수동으로 삭제할 필요가 없습니다.


소비 프로세스는 동시에 여러 다른 큐를 소비할 수 있으며, 큐를 새로 추가할 때 process.php의 구성을 수정할 필요가 없고, app/queue/redis 아래에 해당 Consumer 클래스를 새로 추가한 후, 클래스 속성 $queue에서 소비할 큐 이름을 지정하기만 하면 됩니다.


Windows 사용자는 php windows.php를 실행하여 webman을 시작해야 하며, 그렇지 않으면 소비 프로세스가 시작되지 않습니다.


onConsumeFailure 콜백은 매번 소비 실패 시 트리거되며, 이곳에서 실패 후의 로직을 처리할 수 있습니다. (이 기능은 webman/redis-queue>=1.3.2workerman/redis-queue>=1.2.1 필요)

다른 큐를 위한 서로 다른 소비 프로세스 설정

기본적으로 모든 소비자는 동일한 소비 프로세스를 공유합니다. 하지만 특정 큐의 소비를 독립적으로 분리해야 할 때가 있습니다. 예를 들어, 소비 속도가 느린 비즈니스를 한 그룹의 프로세스에서 소비하고, 소비 속도가 빠른 비즈니스를 다른 그룹의 프로세스에서 소비하도록 하고 싶을 때입니다. 이를 위해 소비자를 두 개의 디렉토리로 분리할 수 있습니다. 예를 들어 app_path() . '/queue/redis/fast'app_path() . '/queue/redis/slow'로 구성하면(소비 클래스의 네임스페이스명을 적절히 변경해야 합니다), 구성은 다음과 같습니다:

return [
    ... 여기에 다른 구성 생략 ...

    'redis_consumer_fast'  => [ // key는 사용자 정의 이름으로 형식에 제약 없음, 여기서는 redis_consumer_fast로 명명
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // 소비자 클래스 디렉토리
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // key는 사용자 정의 이름으로 형식에 제약 없음, 여기서는 redis_consumer_slow로 명명
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // 소비자 클래스 디렉토리
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

이렇게 하면 빠른 비즈니스 소비자는 queue/redis/fast 디렉토리에 배치되고 느린 비즈니스 소비자는 queue/redis/slow 디렉토리에 배치되어 특정 큐에 소비 프로세스를 지정할 수 있습니다.

다중 Redis 구성

구성

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // 비밀번호, 문자열 타입, 선택 사항
            'db' => 0,            // 데이터베이스
            'max_attempts'  => 5, // 소비 실패 후, 재시도 횟수
            'retry_seconds' => 5, // 재시도 간격, 초 단위
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // 비밀번호, 문자열 타입, 선택 사항
            'db' => 0,             // 데이터베이스
            'max_attempts'  => 5, // 소비 실패 후, 재시도 횟수
            'retry_seconds' => 5, // 재시도 간격, 초 단위
        ]
    ],
];

구성에 other라는 키 값이 추가된 Redis 구성이 있습니다.

다중 Redis 메시지 발송

// `default`라는 키의 큐에 메시지 발송
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
//  동등함
Client::send($queue, $data);
Redis::send($queue, $data);

// `other`라는 키의 큐에 메시지 발송
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

다중 Redis 소비

소비 설정에서 other라는 키의 큐에 메시지를 소비합니다.

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // 소비할 큐 이름
    public $queue = 'send-mail';

    // === 여기를 other로 설정하여 소비 설정에서 other라는 키의 큐를 소비합니다 ===
    public $connection = 'other';

    // 소비
    public function consume($data)
    {
        // 반직렬화 필요 없음
        var_export($data);
    }
}

자주 묻는 질문

Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)라는 오류가 발생합니까?

이 오류는 비동기 메시지 발송 인터페이스 Client::send()에서만 발생합니다. 비동기 발송은 먼저 메시지를 로컬 메모리에 저장한 후, 프로세스가 비어 있을 때 메시지를 Redis에 전송합니다. Redis 수신 속도가 메시지 생산 속도보다 느리거나, 프로세스가 항상 다른 업무로 바빠서 로컬의 메시지를 Redis에 동기화할 충분한 시간이 없으면 메시지 압착이 발생할 수 있습니다. 만약 메시지 압착이 600초를 초과하면 이 오류가 발생합니다.

해결 방법: 메시지 발송에 동기 발송 인터페이스 Redis::send()를 사용하십시오.