Redis Kuyruğu
Redis tabanlı bir mesaj kuyruğu, mesajların gecikmeli işlenmesini destekler.
Kurulum
composer require webman/redis-queue
Konfigürasyon Dosyası
Redis konfigürasyon dosyası otomatik olarak {ana proje}/config/plugin/webman/redis-queue/redis.php
yolunda oluşturulur, içerik aşağıdaki gibidir:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Şifre, isteğe bağlı parametre
'db' => 0, // Veritabanı
'max_attempts' => 5, // Tüketim başarısız olduğunda tekrar deneme sayısı
'retry_seconds' => 5, // Tekrar deneme aralığı, saniye cinsinden
]
],
];
Tüketim Başarısızlığı Tekrar Deneme
Eğer tüketim başarısız olursa (bir hata meydana gelirse), mesaj gecikmeli kuyruğa alınır ve bir sonraki deneme için bekletilir. Tekrar deneme sayısı max_attempts
parametresiyle kontrol edilir, tekrar deneme aralığı retry_seconds
ve max_attempts
ile birlikte kontrol edilir. Örneğin max_attempts
5 ise, retry_seconds
10 olduğunda, 1. tekrar deneme aralığı 1*10
saniye, 2. tekrar deneme aralığı 2*10
saniye, 3. tekrar deneme aralığı 3*10
saniye şeklinde devam eder ve bu şekilde 5 kez tekrar deneme yapılır. Eğer max_attempts
sınırını aşarsa, mesaj {redis-queue}-failed
anahtarına sahip başarısızlık kuyruğuna yerleştirilir.
Mesaj Gönderimi (Senkron)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Kuyruğun adı
$queue = 'send-mail';
// Veri, diziyi doğrudan geçirebilirsiniz, serileştirmenize gerek yok
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Mesaj gönderimi
Redis::send($queue, $data);
// Gecikmeli mesaj gönderimi, mesaj 60 saniye sonra işlenecek
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
Başarıyla gönderim gerçekleştirildiğinde Redis::send()
true döner, aksi takdirde false veya bir istisna fırlatır.
İpucu
Gecikmeli kuyruk tüketim süresi hata verebilir, örneğin tüketim hızı üretim hızından düşük olduğunda kuyruk birikir ve bu da tüketim gecikmesine neden olur, bunu hafifletmek için daha fazla tüketim süreci başlatabilirsiniz.
Mesaj Gönderimi (Asenkron)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Kuyruğun adı
$queue = 'send-mail';
// Veri, diziyi doğrudan geçirebilirsiniz, serileştirmenize gerek yok
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Mesaj gönderimi
Client::send($queue, $data);
// Gecikmeli mesaj gönderimi, mesaj 60 saniye sonra işlenecek
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
herhangi bir değer döndürmez, asenkron bir gönderimdir ve mesajın %100 redis'e ulaşmasını garanti etmez.
İpucu
Client::send()
çalışma prensibi, yerel bellek içinde bir bellek kuyruğu oluşturmak ve mesajları asenkron olarak redis'e senkronize etmektir (senkronizasyon hızı oldukça hızlıdır, saniyede yaklaşık 10.000 mesaj). Eğer süreç yeniden başlatılırsa ve yerel bellek kuyruğundaki veriler tamamlanmadan senkronize edilirse, mesaj kaybı meydana gelebilir.Client::send()
asenkron gönderimi, önemli olmayan mesajlar için uygundur.İpucu
Client::send()
asenkron bir yöntemdir, sadece workerman çalışma ortamında kullanılmalıdır, komut satırı betikleri için senkron arayüz olanRedis::send()
kullanın.
Diğer Projelerde Mesaj Gönderimi
Bazen başka projelerde mesaj göndermeniz gerekebilir ve webman\redis-queue
kullanamazsanız, kuyruğa mesaj göndermek için aşağıdaki fonksiyonu kullanabilirsiniz.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Burada, $redis
parametresi bir redis örneğidir. Örneğin redis uzantısının kullanımı aşağıdaki gibidir:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Tüketim
Tüketim süreci konfigürasyon dosyası {ana proje}/config/plugin/webman/redis-queue/process.php
yolundadır. Tüketici dizini {ana proje}/app/queue/redis/
altında bulunur.
php webman redis-queue:consumer my-send-mail
komutunu çalıştırarak {ana proje}/app/queue/redis/MyMailSend.php
dosyası oluşturulur.
İpucu
Komut yoksa manuel olarak da oluşturabilirsiniz
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Tüketilecek kuyruğun adı
public $queue = 'send-mail';
// Bağlantı adı, plugin/webman/redis-queue/redis.php dosyasındaki bağlantıya karşılık gelir
public $connection = 'default';
// Tüketim gerçekleştirme
public function consume($data)
{
// Serileştirmeye gerek yok
var_export($data); // Çıktı ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// Tüketim başarısızlığı geri çağrısı
/*
$package = [
'id' => 1357277951, // Mesaj ID'si
'time' => 1709170510, // Mesaj zamanı
'delay' => 0, // Gecikme süresi
'attempts' => 2, // Tüketim sayısı
'queue' => 'send-mail', // Kuyruğun adı
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Mesaj içeriği
'max_attempts' => 5, // Maksimum tekrar deneme sayısı
'error' => 'Hata bilgisi' // Hata bilgisi
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "tüketim başarısızlığı\n";
echo $e->getMessage() . "\n";
// Serileştirmeye gerek yok
var_export($package);
}
}
Dikkat
Tüketim işlemi sırasında istisna veya Error fırlatılmadığında işlemin başarılı olduğu kabul edilir, aksi takdirde tüketim başarısız olur ve tekrar deneme kuyruğuna girer.
redis-queue'da ack mekanizması yoktur, bunu otomatik ack olarak düşünebilirsiniz (istisna veya Error meydana gelmediğinde). Eğer tüketim işlemi sırasında mevcut mesajın tüketim başarılı olmadığını işaretlemek isterseniz, manuel olarak bir hata fırlatabilir ve mevcut mesajı tekrar deneme kuyruğuna alabilirsiniz. Bu aslında ack mekanizması ile aynı değildir.İpucu
Tüketiciler çoklu sunucu ve çoklu süreç desteği sağlar ve aynı mesaj tekrar tüketilmez. Tüketilen mesajlar otomatik olarak kuyruktan silinir, manuel silme gerektirmez.İpucu
Tüketim süreçleri farklı kuyrukları aynı anda tüketebilir, yeni bir kuyruk eklemek içinprocess.php
dosyasındaki konfigürasyonu değiştirmeye gerek yoktur, sadeceapp/queue/redis
altında yeni bir karşılık gelenConsumer
sınıfı oluşturarak sınıf özelliği$queue
ile tüketilecek kuyruk adını belirtmek yeterlidir.İpucu
Windows kullanıcılarıphp windows.php
komutunu çalıştırarak webman'ı başlatmalıdır, aksi takdirde tüketim süreci başlatılmaz.İpucu
onConsumeFailure geri çağrısı, her tüketim başarısız olduğunda tetiklenir, burada başarısızlık sonrası mantığı işleyebilirsiniz. (Bu özellik içinwebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
gereklidir.)
Farklı Kuyruklar için Farklı Tüketim Süreçleri Ayarlama
Varsayılan olarak, tüm tüketiciler aynı tüketim sürecini paylaşır. Ancak bazen bazı kuyrukların tüketimini ayrı hale getirmek gereklidir, örneğin yavaş tüketilen işler için bir grup süreçte tüketim, hızlı tüketilen işler için başka bir grup süreçte tüketim yapılması. Bu nedenle tüketicileri iki farklı dizine ayırabiliriz, örneğin app_path() . '/queue/redis/fast'
ve app_path() . '/queue/redis/slow'
(tüketici sınıflarının ad alanlarının uygun şekilde değiştirilmesi gerektiğine dikkat edin), konfigürasyon aşağıdaki gibi olacaktır:
return [
...diğer konfigürasyonlar burada yer almaktadır...
'redis_consumer_fast' => [ // anahtar özelleştirilebilir, burada redis_consumer_fast olarak adlandırıldı
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Tüketici sınıf dizini
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // anahtar özelleştirilebilir, burada redis_consumer_slow olarak adlandırıldı
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Tüketici sınıf dizini
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
Bu şekilde hızlı iş tüketicileri queue/redis/fast
dizinine, yavaş iş tüketicileri queue/redis/slow
dizinine yerleştirilerek kuyruklara ait tüketim süreçleri belirlenmiş olur.
Çoklu Redis Konfigürasyonu
Konfigürasyon
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Şifre, string türünde, isteğe bağlı parametre
'db' => 0, // Veritabanı
'max_attempts' => 5, // Tüketim başarısız olduğunda tekrar deneme sayısı
'retry_seconds' => 5, // Tekrar deneme aralığı, saniye cinsinden
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Şifre, string türünde, isteğe bağlı parametre
'db' => 0, // Veritabanı
'max_attempts' => 5, // Tüketim başarısız olduğunda tekrar deneme sayısı
'retry_seconds' => 5, // Tekrar deneme aralığı, saniye cinsinden
]
],
];
Konfigürasyona other
anahtarı ile bir redis yapılandırması eklenmiştir.
Çoklu Redis ile Mesaj Gönderimi
// `default` anahtarına sahip kuyruğa mesaj gönderimi
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Bu, aşağıdakine eşittir
Client::send($queue, $data);
Redis::send($queue, $data);
// `other` anahtarına sahip kuyruğa mesaj gönderimi
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Çoklu Redis ile Tüketim
Tüketim konfigürasyonunda other
anahtarına sahip kuyruğa mesaj gönderimi
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Tüketilecek kuyruğun adı
public $queue = 'send-mail';
// === Burası other olmalı, diğerinde anahtar olarak other'a karşılık gelir ===
public $connection = 'other';
// Tüketim
public function consume($data)
{
// Serileştirmeye gerek yok
var_export($data);
}
}
Sıkça Sorulan Sorular
Neden Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
hatası alıyorum?
Bu hata yalnızca asenkron gönderim arayüzü Client::send()
içinde meydana gelir. Asenkron gönderim önce mesajı yerel bellekte saklar, süreç boşta olduğunda mesaj redis'e gönderilir. Eğer redis, mesaj üretim hızından yavaş alıyorsa veya süreç her zaman diğer işlerle meşgulse, bellek içindeki mesajların redis'e senkronize edilmesi için yeterli zaman yoksa mesaj sıkışması meydana gelir. Eğer sıkışan mesaj süresi 600 saniyeyi aşarsa, bu hata tetiklenir.
Çözüm: Mesaj gönderimi için senkron gönderim arayüzü Redis::send()
kullanın.