Redis 큐
Redis 기반의 메시지 큐로, 메시지 지연 처리를 지원합니다.
설치
composer require webman/redis-queue
구성 파일
Redis 구성 파일은 {주 프로젝트}/config/plugin/webman/redis-queue/redis.php
에 자동으로 생성되며, 내용은 다음과 유사합니다:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // 비밀번호, 선택 사항
'db' => 0, // 데이터베이스
'max_attempts' => 5, // 소비 실패 후, 재시도 횟수
'retry_seconds' => 5, // 재시도 간격, 초 단위
]
],
];
소비 실패 재시도
소비에 실패하면(예외가 발생한 경우) 메시지는 지연 큐에 배치되어 다음 재시도를 기다립니다. 재시도 횟수는 max_attempts
매개변수로 제어되며, 재시도 간격은 retry_seconds
와 max_attempts
가 함께 제어합니다. 예를 들어, max_attempts
가 5이고, retry_seconds
가 10인 경우, 첫 번째 재시도 간격은 1*10
초, 두 번째 재시도 간격은 2*10
초, 세 번째 재시도 간격은 3*10
초로 설정되며, 이렇게 계속해서 5회 재시도하게 됩니다. max_attempts
에서 설정한 재시도 횟수를 초과하면 메시지는 키 {redis-queue}-failed
의 실패 큐에 저장됩니다.
메시지 발송(동기)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// 큐 이름
$queue = 'send-mail';
// 데이터, 배열을 직접 전달할 수 있으며, 직렬화 필요 없음
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// 메시지 발송
Redis::send($queue, $data);
// 지연 메시지 발송, 메시지는 60초 후에 처리됨
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
발송 성공 시 Redis::send()
는 true를 반환하며, 그렇지 않으면 false를 반환하거나 예외를 발생시킵니다.
팁
지연 큐 소비 시간에 차이가 발생할 수 있습니다. 소비 속도가 생산 속도보다 느릴 경우 큐가 쌓이고, 이로 인해 소비 지연이 발생할 수 있습니다. 완화 방법은 소비 프로세스를 여러 개 실행하는 것입니다.
메시지 발송(비동기)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// 큐 이름
$queue = 'send-mail';
// 데이터, 배열을 직접 전달할 수 있으며, 직렬화 필요 없음
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// 메시지 발송
Client::send($queue, $data);
// 지연 메시지 발송, 메시지는 60초 후에 처리됨
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
는 반환값이 없으며, 비동기 푸시입니다. 이 방법은 메시지가 100% Redis에 도착한다는 보장을 하지 않습니다.
팁
Client::send()
의 원리는 로컬 메모리에 메모리 큐를 설정하고, 비동기적으로 메시지를 Redis에 동기화하는 것입니다(동기화 속도가 매우 빠르며, 초당 약 10,000건의 메시지를 처리할 수 있습니다). 만약 프로세스가 재시작되고, 로컬 메모리 큐에 데이터가 동기화되지 않은 경우 메시지가 손실될 수 있습니다.Client::send()
비동기 발송은 중요하지 않은 메시지를 발송하는 데 적합합니다.팁
Client::send()
는 비동기 방식으로, workerman 실행 환경에서만 사용할 수 있으며, 명령줄 스크립트에서는 동기 인터페이스Redis::send()
를 사용해야 합니다.
다른 프로젝트에서 메시지 발송
가끔 다른 프로젝트에서 메시지를 발송해야 하지만 webman\redis-queue
를 사용할 수 없는 경우, 다음 함수를 참조하여 큐에 메시지를 발송할 수 있습니다.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
매개변수 $redis
는 Redis 인스턴스를 의미합니다. 예를 들어 Redis 확장 사용법은 다음과 유사합니다:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
소비
소비 프로세스 구성 파일은 {주 프로젝트}/config/plugin/webman/redis-queue/process.php
에 있습니다.
소비자 디렉토리는 {주 프로젝트}/app/queue/redis/
아래에 있습니다.
명령 php webman redis-queue:consumer my-send-mail
를 실행하면 {주 프로젝트}/app/queue/redis/MyMailSend.php
파일이 생성됩니다.
팁
명령이 존재하지 않더라도 수동으로 생성할 수 있습니다.
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// 소비할 큐 이름
public $queue = 'send-mail';
// 연결명, plugin/webman/redis-queue/redis.php의 연결에 해당
public $connection = 'default';
// 소비
public function consume($data)
{
// 반직렬화 필요 없음
var_export($data); // 출력 ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// 소비 실패 콜백
/*
$package = [
'id' => 1357277951, // 메시지 ID
'time' => 1709170510, // 메시지 시간
'delay' => 0, // 지연 시간
'attempts' => 2, // 소비 횟수
'queue' => 'send-mail', // 큐 이름
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // 메시지 내용
'max_attempts' => 5, // 최대 재시도 횟수
'error' => '오류 정보' // 오류 정보
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "소비 실패\n";
echo $e->getMessage() . "\n";
// 반직렬화 필요 없음
var_export($package);
}
}
주의
소비 과정에서 예외나 Error가 발생하지 않으면 소비 성공으로 간주됩니다. 그렇지 않으면 소비 실패로 간주되어 재시도 큐로 들어갑니다.
redis-queue는 ack 메커니즘이 없으며 자동 ack(예외나 Error가 발생하지 않음)으로 간주할 수 있습니다. 소비 중 현재 메시지를 소비 실패로 표시하고자 한다면 수동으로 예외를 발생시켜야 하며, 그렇게 되면 현재 메시지는 재시도 큐로 들어갑니다. 이는 사실상 ack 메커니즘과 동일합니다.팁
소비자는 다중 서버 및 다중 프로세스를 지원하며, 동일한 메시지는 중복 소비되지 않습니다. 소비된 메시지는 자동으로 큐에서 삭제되므로 수동으로 삭제할 필요가 없습니다.팁
소비 프로세스는 동시에 여러 다른 큐를 소비할 수 있으며, 큐를 새로 추가할 때process.php
의 구성을 수정할 필요가 없고,app/queue/redis
아래에 해당Consumer
클래스를 새로 추가한 후, 클래스 속성$queue
에서 소비할 큐 이름을 지정하기만 하면 됩니다.팁
Windows 사용자는php windows.php
를 실행하여 webman을 시작해야 하며, 그렇지 않으면 소비 프로세스가 시작되지 않습니다.팁
onConsumeFailure
콜백은 매번 소비 실패 시 트리거되며, 이곳에서 실패 후의 로직을 처리할 수 있습니다. (이 기능은webman/redis-queue>=1.3.2
및workerman/redis-queue>=1.2.1
필요)
다른 큐를 위한 서로 다른 소비 프로세스 설정
기본적으로 모든 소비자는 동일한 소비 프로세스를 공유합니다. 하지만 특정 큐의 소비를 독립적으로 분리해야 할 때가 있습니다. 예를 들어, 소비 속도가 느린 비즈니스를 한 그룹의 프로세스에서 소비하고, 소비 속도가 빠른 비즈니스를 다른 그룹의 프로세스에서 소비하도록 하고 싶을 때입니다. 이를 위해 소비자를 두 개의 디렉토리로 분리할 수 있습니다. 예를 들어 app_path() . '/queue/redis/fast'
및 app_path() . '/queue/redis/slow'
로 구성하면(소비 클래스의 네임스페이스명을 적절히 변경해야 합니다), 구성은 다음과 같습니다:
return [
... 여기에 다른 구성 생략 ...
'redis_consumer_fast' => [ // key는 사용자 정의 이름으로 형식에 제약 없음, 여기서는 redis_consumer_fast로 명명
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// 소비자 클래스 디렉토리
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // key는 사용자 정의 이름으로 형식에 제약 없음, 여기서는 redis_consumer_slow로 명명
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// 소비자 클래스 디렉토리
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
이렇게 하면 빠른 비즈니스 소비자는 queue/redis/fast
디렉토리에 배치되고 느린 비즈니스 소비자는 queue/redis/slow
디렉토리에 배치되어 특정 큐에 소비 프로세스를 지정할 수 있습니다.
다중 Redis 구성
구성
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // 비밀번호, 문자열 타입, 선택 사항
'db' => 0, // 데이터베이스
'max_attempts' => 5, // 소비 실패 후, 재시도 횟수
'retry_seconds' => 5, // 재시도 간격, 초 단위
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // 비밀번호, 문자열 타입, 선택 사항
'db' => 0, // 데이터베이스
'max_attempts' => 5, // 소비 실패 후, 재시도 횟수
'retry_seconds' => 5, // 재시도 간격, 초 단위
]
],
];
구성에 other
라는 키 값이 추가된 Redis 구성이 있습니다.
다중 Redis 메시지 발송
// `default`라는 키의 큐에 메시지 발송
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// 동등함
Client::send($queue, $data);
Redis::send($queue, $data);
// `other`라는 키의 큐에 메시지 발송
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
다중 Redis 소비
소비 설정에서 other
라는 키의 큐에 메시지를 소비합니다.
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// 소비할 큐 이름
public $queue = 'send-mail';
// === 여기를 other로 설정하여 소비 설정에서 other라는 키의 큐를 소비합니다 ===
public $connection = 'other';
// 소비
public function consume($data)
{
// 반직렬화 필요 없음
var_export($data);
}
}
자주 묻는 질문
왜 Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
라는 오류가 발생합니까?
이 오류는 비동기 메시지 발송 인터페이스 Client::send()
에서만 발생합니다. 비동기 발송은 먼저 메시지를 로컬 메모리에 저장한 후, 프로세스가 비어 있을 때 메시지를 Redis에 전송합니다. Redis 수신 속도가 메시지 생산 속도보다 느리거나, 프로세스가 항상 다른 업무로 바빠서 로컬의 메시지를 Redis에 동기화할 충분한 시간이 없으면 메시지 압착이 발생할 수 있습니다. 만약 메시지 압착이 600초를 초과하면 이 오류가 발생합니다.
해결 방법: 메시지 발송에 동기 발송 인터페이스 Redis::send()
를 사용하십시오.