Redis Kuyruğu

Redis tabanlı bir mesaj kuyruğu, mesajların gecikmeli işlenmesini destekler.

Kurulum

composer require webman/redis-queue

Konfigürasyon Dosyası

Redis konfigürasyon dosyası otomatik olarak {ana proje}/config/plugin/webman/redis-queue/redis.php yolunda oluşturulur, içerik aşağıdaki gibidir:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // Şifre, isteğe bağlı parametre
            'db' => 0,            // Veritabanı
            'max_attempts'  => 5, // Tüketim başarısız olduğunda tekrar deneme sayısı
            'retry_seconds' => 5, // Tekrar deneme aralığı, saniye cinsinden
        ]
    ],
];

Tüketim Başarısızlığı Tekrar Deneme

Eğer tüketim başarısız olursa (bir hata meydana gelirse), mesaj gecikmeli kuyruğa alınır ve bir sonraki deneme için bekletilir. Tekrar deneme sayısı max_attempts parametresiyle kontrol edilir, tekrar deneme aralığı retry_seconds ve max_attempts ile birlikte kontrol edilir. Örneğin max_attempts 5 ise, retry_seconds 10 olduğunda, 1. tekrar deneme aralığı 1*10 saniye, 2. tekrar deneme aralığı 2*10 saniye, 3. tekrar deneme aralığı 3*10 saniye şeklinde devam eder ve bu şekilde 5 kez tekrar deneme yapılır. Eğer max_attempts sınırını aşarsa, mesaj {redis-queue}-failed anahtarına sahip başarısızlık kuyruğuna yerleştirilir.

Mesaj Gönderimi (Senkron)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // Kuyruğun adı
        $queue = 'send-mail';
        // Veri, diziyi doğrudan geçirebilirsiniz, serileştirmenize gerek yok
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Mesaj gönderimi
        Redis::send($queue, $data);
        // Gecikmeli mesaj gönderimi, mesaj 60 saniye sonra işlenecek
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Başarıyla gönderim gerçekleştirildiğinde Redis::send() true döner, aksi takdirde false veya bir istisna fırlatır.

İpucu
Gecikmeli kuyruk tüketim süresi hata verebilir, örneğin tüketim hızı üretim hızından düşük olduğunda kuyruk birikir ve bu da tüketim gecikmesine neden olur, bunu hafifletmek için daha fazla tüketim süreci başlatabilirsiniz.

Mesaj Gönderimi (Asenkron)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // Kuyruğun adı
        $queue = 'send-mail';
        // Veri, diziyi doğrudan geçirebilirsiniz, serileştirmenize gerek yok
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Mesaj gönderimi
        Client::send($queue, $data);
        // Gecikmeli mesaj gönderimi, mesaj 60 saniye sonra işlenecek
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Client::send() herhangi bir değer döndürmez, asenkron bir gönderimdir ve mesajın %100 redis'e ulaşmasını garanti etmez.

İpucu
Client::send() çalışma prensibi, yerel bellek içinde bir bellek kuyruğu oluşturmak ve mesajları asenkron olarak redis'e senkronize etmektir (senkronizasyon hızı oldukça hızlıdır, saniyede yaklaşık 10.000 mesaj). Eğer süreç yeniden başlatılırsa ve yerel bellek kuyruğundaki veriler tamamlanmadan senkronize edilirse, mesaj kaybı meydana gelebilir. Client::send() asenkron gönderimi, önemli olmayan mesajlar için uygundur.

İpucu
Client::send() asenkron bir yöntemdir, sadece workerman çalışma ortamında kullanılmalıdır, komut satırı betikleri için senkron arayüz olan Redis::send() kullanın.

Diğer Projelerde Mesaj Gönderimi

Bazen başka projelerde mesaj göndermeniz gerekebilir ve webman\redis-queue kullanamazsanız, kuyruğa mesaj göndermek için aşağıdaki fonksiyonu kullanabilirsiniz.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Burada, $redis parametresi bir redis örneğidir. Örneğin redis uzantısının kullanımı aşağıdaki gibidir:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

Tüketim

Tüketim süreci konfigürasyon dosyası {ana proje}/config/plugin/webman/redis-queue/process.php yolundadır. Tüketici dizini {ana proje}/app/queue/redis/ altında bulunur.

php webman redis-queue:consumer my-send-mail komutunu çalıştırarak {ana proje}/app/queue/redis/MyMailSend.php dosyası oluşturulur.

İpucu
Komut yoksa manuel olarak da oluşturabilirsiniz

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // Tüketilecek kuyruğun adı
    public $queue = 'send-mail';

    // Bağlantı adı, plugin/webman/redis-queue/redis.php dosyasındaki bağlantıya karşılık gelir
    public $connection = 'default';

    // Tüketim gerçekleştirme
    public function consume($data)
    {
        // Serileştirmeye gerek yok
        var_export($data); // Çıktı ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // Tüketim başarısızlığı geri çağrısı
    /* 
    $package = [
        'id' => 1357277951, // Mesaj ID'si
        'time' => 1709170510, // Mesaj zamanı
        'delay' => 0, // Gecikme süresi
        'attempts' => 2, // Tüketim sayısı
        'queue' => 'send-mail', // Kuyruğun adı
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Mesaj içeriği
        'max_attempts' => 5, // Maksimum tekrar deneme sayısı
        'error' => 'Hata bilgisi' // Hata bilgisi
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "tüketim başarısızlığı\n";
        echo $e->getMessage() . "\n";
        // Serileştirmeye gerek yok
        var_export($package); 
    }
}

Dikkat
Tüketim işlemi sırasında istisna veya Error fırlatılmadığında işlemin başarılı olduğu kabul edilir, aksi takdirde tüketim başarısız olur ve tekrar deneme kuyruğuna girer.
redis-queue'da ack mekanizması yoktur, bunu otomatik ack olarak düşünebilirsiniz (istisna veya Error meydana gelmediğinde). Eğer tüketim işlemi sırasında mevcut mesajın tüketim başarılı olmadığını işaretlemek isterseniz, manuel olarak bir hata fırlatabilir ve mevcut mesajı tekrar deneme kuyruğuna alabilirsiniz. Bu aslında ack mekanizması ile aynı değildir.

İpucu
Tüketiciler çoklu sunucu ve çoklu süreç desteği sağlar ve aynı mesaj tekrar tüketilmez. Tüketilen mesajlar otomatik olarak kuyruktan silinir, manuel silme gerektirmez.

İpucu
Tüketim süreçleri farklı kuyrukları aynı anda tüketebilir, yeni bir kuyruk eklemek için process.php dosyasındaki konfigürasyonu değiştirmeye gerek yoktur, sadece app/queue/redis altında yeni bir karşılık gelen Consumer sınıfı oluşturarak sınıf özelliği $queue ile tüketilecek kuyruk adını belirtmek yeterlidir.

İpucu
Windows kullanıcıları php windows.php komutunu çalıştırarak webman'ı başlatmalıdır, aksi takdirde tüketim süreci başlatılmaz.

İpucu
onConsumeFailure geri çağrısı, her tüketim başarısız olduğunda tetiklenir, burada başarısızlık sonrası mantığı işleyebilirsiniz. (Bu özellik için webman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1 gereklidir.)

Farklı Kuyruklar için Farklı Tüketim Süreçleri Ayarlama

Varsayılan olarak, tüm tüketiciler aynı tüketim sürecini paylaşır. Ancak bazen bazı kuyrukların tüketimini ayrı hale getirmek gereklidir, örneğin yavaş tüketilen işler için bir grup süreçte tüketim, hızlı tüketilen işler için başka bir grup süreçte tüketim yapılması. Bu nedenle tüketicileri iki farklı dizine ayırabiliriz, örneğin app_path() . '/queue/redis/fast' ve app_path() . '/queue/redis/slow' (tüketici sınıflarının ad alanlarının uygun şekilde değiştirilmesi gerektiğine dikkat edin), konfigürasyon aşağıdaki gibi olacaktır:

return [
    ...diğer konfigürasyonlar burada yer almaktadır...

    'redis_consumer_fast'  => [ // anahtar özelleştirilebilir, burada redis_consumer_fast olarak adlandırıldı
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Tüketici sınıf dizini
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // anahtar özelleştirilebilir, burada redis_consumer_slow olarak adlandırıldı
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Tüketici sınıf dizini
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

Bu şekilde hızlı iş tüketicileri queue/redis/fast dizinine, yavaş iş tüketicileri queue/redis/slow dizinine yerleştirilerek kuyruklara ait tüketim süreçleri belirlenmiş olur.

Çoklu Redis Konfigürasyonu

Konfigürasyon

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // Şifre, string türünde, isteğe bağlı parametre
            'db' => 0,            // Veritabanı
            'max_attempts'  => 5, // Tüketim başarısız olduğunda tekrar deneme sayısı
            'retry_seconds' => 5, // Tekrar deneme aralığı, saniye cinsinden
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // Şifre, string türünde, isteğe bağlı parametre
            'db' => 0,             // Veritabanı
            'max_attempts'  => 5, // Tüketim başarısız olduğunda tekrar deneme sayısı
            'retry_seconds' => 5, // Tekrar deneme aralığı, saniye cinsinden
        ]
    ],
];

Konfigürasyona other anahtarı ile bir redis yapılandırması eklenmiştir.

Çoklu Redis ile Mesaj Gönderimi

// `default` anahtarına sahip kuyruğa mesaj gönderimi
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Bu, aşağıdakine eşittir
Client::send($queue, $data);
Redis::send($queue, $data);

// `other` anahtarına sahip kuyruğa mesaj gönderimi
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

Çoklu Redis ile Tüketim

Tüketim konfigürasyonunda other anahtarına sahip kuyruğa mesaj gönderimi

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // Tüketilecek kuyruğun adı
    public $queue = 'send-mail';

    // === Burası other olmalı, diğerinde anahtar olarak other'a karşılık gelir ===
    public $connection = 'other';

    // Tüketim
    public function consume($data)
    {
        // Serileştirmeye gerek yok
        var_export($data);
    }
}

Sıkça Sorulan Sorular

Neden Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds) hatası alıyorum?

Bu hata yalnızca asenkron gönderim arayüzü Client::send() içinde meydana gelir. Asenkron gönderim önce mesajı yerel bellekte saklar, süreç boşta olduğunda mesaj redis'e gönderilir. Eğer redis, mesaj üretim hızından yavaş alıyorsa veya süreç her zaman diğer işlerle meşgulse, bellek içindeki mesajların redis'e senkronize edilmesi için yeterli zaman yoksa mesaj sıkışması meydana gelir. Eğer sıkışan mesaj süresi 600 saniyeyi aşarsa, bu hata tetiklenir.

Çözüm: Mesaj gönderimi için senkron gönderim arayüzü Redis::send() kullanın.