# 队列任务

队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间。

# 相关配置

队列配置文件存放在 config/queue.php 文件中。每一种队列驱动的配置都可以在该文件中找到,包括数据库, Beanstalkd, Amazon SQS, Redis,以及同步(本地使用)驱动。其中还包含了一个 null 队列驱动用于那些放弃队列的任务。

config/queue.php 文件中,需要注意某些配置项。


return [
    .
    .
    .

    'default' => env('QUEUE_CONNECTION', 'sync'),// <=== 默认队列的链接

    'connections' => [

        'sync' => [
            'driver' => 'sync',
        ],

        'database' => [
            'driver' => 'database',
            'table' => 'jobs',
            'queue' => 'default',
            'retry_after' => 90,
            'after_commit' => false,
        ],

        'beanstalkd' => [
            'driver' => 'beanstalkd',
            'host' => 'localhost',
            'queue' => 'default',
            'retry_after' => 90,
            'block_for' => 0,
            'after_commit' => false,
        ],

        'sqs' => [
            'driver' => 'sqs',
            'key' => env('AWS_ACCESS_KEY_ID'),
            'secret' => env('AWS_SECRET_ACCESS_KEY'),
            'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
            'queue' => env('SQS_QUEUE', 'default'),
            'suffix' => env('SQS_SUFFIX'),
            'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
            'after_commit' => false,
        ],

        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
            'queue' => env('REDIS_QUEUE', 'default'), // <=== 默认队列的名称
            'retry_after' => 90, // <=== 重试时间 此处的时间单位为秒,而且需要比 timeout 长
            'block_for' => null, // <=== 阻塞时间 将这个值设置为 5 来表明这个驱动应该在等待任务可用时阻塞 5 秒
            'after_commit' => false,
        ],
    ],

    .
    .
    .
];

要注意的是,queue 配置文件中每个连接的配置示例中都包含一个 queue 属性。队列任务被发给指定连接的时候会被分发到 queue 属性和指定连接相同的队列中。换句话说,如果你分发任务的时候没有定义分配到哪个队列,那么它就会被放到连接配置中 queue 属性所定义的队列中.有些应用可能不需要把任务发到不同的队列,而只发到一个简单的队列中就行了。但是把任务推到不同的队列仍然是非常有用的,因为 Laravel 队列处理器允许你定义队列的优先级,所以你能给不同的队列划分不同的优先级或者区分不同任务的不同处理方式了。比如说,如果你把任务推到 high 队列中,你就能让队列处理器优先处理这些任务了。

以下举例两种方式:一种是在任务类中通过设置 queue 属性来指定任务的队列名称,二是在分发任务时,显示调用 onQueue('high') 方法来指定任务的队列名称。


// app/Jobs/TranslateSlugJob.php
class TranslateSlugJob implements ShouldQueue
{
    .
    .
    .

    public $queue = 'high'; # <=== 指定队列名称

    .
    .
    .
}

// app/Http/Controllers/PostController.php
class PostController extends Controller
{
    public function store(Request $request)
    {
        $job = new TranslateSlugJob($request->title);

        dispatch($job)->onQueue('high'); // <=== 指定队列名称

        return redirect()->route('posts.index');
    }
}

// 使用方式

php artisan queue:work --queue=high,default

另外 redis 的相关配置文件在 config/database.php 中,其中需要注意的是,redis 的客户端配置,以及默认的库,和缓存库。

return [
    .
    .
    .

    'redis' => [

            'client' => env('REDIS_CLIENT', 'phpredis'), // phpredis, predis, redis  <=== redis 客户端

            'options' => [
                'cluster' => env('REDIS_CLUSTER', 'redis'),
                'prefix' => env('REDIS_PREFIX', Str::slug(env('APP_NAME', 'laravel'), '_') . '_database_'),
            ],

            'default' => [
                'url' => env('REDIS_URL'),
                'host' => env('REDIS_HOST', '127.0.0.1'),
                'username' => env('REDIS_USERNAME'),
                'password' => env('REDIS_PASSWORD'),
                'port' => env('REDIS_PORT', '6379'),
                'database' => env('REDIS_DB', '0'), // <=== 默认库
            ],

            'cache' => [
                'url' => env('REDIS_URL'),
                'host' => env('REDIS_HOST', '127.0.0.1'),
                'username' => env('REDIS_USERNAME'),
                'password' => env('REDIS_PASSWORD'),
                'port' => env('REDIS_PORT', '6379'),
                'database' => env('REDIS_CACHE_DB', '1'), // <=== 缓存库
            ],

        ],

    ];

最后在 config/horizon.php 文件中,配置相关选项。

return [
   .
   .
   .

    'environments' => [
        'production' => [
           'supervisor-1' => [
                'connection' =>'redis',
                'queue' => ['default', 'high', 'low'],
                'balance' =>'simple',
                'processes' => 1,
                'tries' => 3,
                'timeout' => 60, // <=== 超时时间 此处的时间单位为秒 也就是任务的最长执行时间
           ],
        ],
    ],

   .
   .
   .
];

# Supervisor 配置以及相关命令

supervisor 的相关配置

[program:horizon]
process_name=%(program_name)s
command=php /home/forge/app.com/artisan horizon
autostart=true
autorestart=true
user=forge
redirect_stderr=true
stdout_logfile=/home/forge/app.com/horizon.log
stopwaitsecs=3600

要确保 stopwaitsecs 的值大于运行时间最长的任务所消耗的秒数。否则,Supervisor 可能会在工作完成前终止任务。

supervisor 的相关命令


# 启动 supervisord 服务
supervisord -c /etc/supervisord.conf

# 查看状态
supervisorctl status

# 重新读取 supervisor 的配置文件,如果配置文件发生了更改,可以使用该命令使 supervisor 重新加载配置。
supervisorctl reread

# 更新 supervisor 的状态,它会重新加载配置文件并启动/停止相关进程
supervisorctl update

# 启动进程
supervisorctl start [program_name]

# 停止进程
supervisorctl stop [program_name]

# 重启进程
supervisorctl restart [program_name]

# 重启一组进程
supervisorctl restart group:

# 重启所有进程
supervisorctl restart all

# 显示日志
tail -f /home/forge/app.com/horizon.log

# Fail、Retry 以及 Delay

  • attempts() 它返回尝试次数

  • 当队列 Job 或侦听器由于多种原因排队后不应处理时,可以使用 delete()

  • 在任务失败时,向用户发送警报或恢复任务执行的任何操作。可使用 failed() 方法来实现这个功能。

  • 你可以使用 release() 方法来重新排队任务,并设置延迟时间。

<?php
namespace App\Jobs;
use App\Podcast;
use Transcoder\Transcoder;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use App\Exceptions\PodcastUnretrievable;
use App\Notifications\PodcastTranscoded;
use Illuminate\Queue\InteractsWithQueue;
use App\Notifications\TranscoderHighUsage;
use Illuminate\Foundation\Bus\Dispatchable;
use App\Notifications\RetyingPodcastTranscode;

class TranscodePodcast
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    /**
     * Transcoder Instance
     *
     * @var \App\Podcast
     */
    protected $podcast;
    /**
     * 创建一个新的转码podcast实例。
     *
     * @param \App\Podcast $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }
    /**
     * 执行队列job.
     *
     * @param \Transcoder\Transcoder $podcast
     * @return void
     */
    public function handle(Transcoder $transcoder)
    {
        // 如果发布服务器已被停用,请删除此队列job
        if ($this->podcast->publisher->isDeactivated()) {
            $this->delete();
        }
        // 如果podcast不能从storage存储中检索,我们就会失败。
        if ($this->podcast->fileDoesntExists()) {
            $this->fail(new PodcastUnretrievable($this->podcast));
        }

        // 如果转码器使用率很高,我们将
        // 延迟转码5分钟. 否则我们可能会有拖延转码器进程的危险 
        // 它会把所有的转码子进程都记录下来。
        if ($transcoder->getLoad()->isHigh()) {
            $delay = 60 * 5;
            $this->podcast->publisher->notify(new TranscoderHighUsage($this->podcast, $delay));
            $this->release($delay);
        }

        // 告诉用户我们第几次重试
        if ($this->attempts() > 1) {
            $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
        }

        $transcoded = $this->transcoder->setFile($event->podcast)
                ->format('mp3')
                ->bitrate(192)
                ->start();

        // 将转码podcast与原始podcast关联
        $this->podcast->transcode()->associate($transcoded);

        // 通知podcast的发布者他的podcast已经准备好了
        $this->publisher->notify(new PodcastTranscoded($this->podcast));
    }
}