日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

【Laravel系列7.7】隊(duì)列系統(tǒng)

 硬核項(xiàng)目經(jīng)理 2022-06-16 發(fā)布于湖南

隊(duì)列系統(tǒng)

隊(duì)列相關(guān)的應(yīng)用對(duì)于現(xiàn)在的系統(tǒng)開(kāi)發(fā)來(lái)說(shuō)非常常見(jiàn),不管你是發(fā)消息還是應(yīng)對(duì)大流量,隊(duì)列都是一個(gè)非常常用而且非常好用的解決方案。我們自己寫(xiě)隊(duì)列去實(shí)現(xiàn)很多功能其實(shí)已經(jīng)非常方便了,不過(guò) Laravel 也為我們準(zhǔn)備好了一套現(xiàn)成的隊(duì)列系統(tǒng),直接配置一下就能夠方便地使用了。今天,我們就來(lái)學(xué)習(xí)了解一下 Laravel 中隊(duì)列系統(tǒng)相關(guān)的內(nèi)容。

配置

隊(duì)列的配置非常簡(jiǎn)單,在 config 目錄下就有一個(gè)名為 queue.php 的文件,這個(gè)文件就是隊(duì)列的配置文件。

'default' => env('QUEUE_CONNECTION''sync'),

第一行的這個(gè) default 就是一個(gè)默認(rèn)隊(duì)列系統(tǒng)的連接配置,在默認(rèn)情況下,它使用的是 sync 。意思就是同步的,也就是說(shuō),只要調(diào)用了隊(duì)列分發(fā),馬上就執(zhí)行隊(duì)列的內(nèi)容。顯然,這個(gè)和普通的順序編寫(xiě)代碼沒(méi)什么區(qū)別,它也不是我們的重點(diǎn)。我們可以通過(guò)修改 .env 配置文件中的 QUEUE_CONNECTION 來(lái)修改默認(rèn)的連接配置,它所能接受的值就是這個(gè)配置文件中下方 connections 中的內(nèi)容。

'connections' => [
    // ………………
    // ………………
    'redis' => [
        'driver' => 'redis',
        'connection' => 'default',
        'queue' => env('REDIS_QUEUE''default'),
        'retry_after' => 90,
        'block_for' => null,
        'after_commit' => false,
    ],

],

在這些連接配置中,我們可以看到 database、beanstalkd、sqs、redis 等相關(guān)隊(duì)列系統(tǒng)的配置。database 其實(shí)就是使用數(shù)據(jù)庫(kù)來(lái)作為隊(duì)列系統(tǒng)。相應(yīng)的你需要建立對(duì)應(yīng)的數(shù)據(jù)表,不過(guò)數(shù)據(jù)庫(kù)當(dāng)隊(duì)列的效率很一般。另外 beanstalkd 和 sqs 大家可能也接觸得不多,所以我們主要還是使用 redis 這個(gè)數(shù)據(jù)連接驅(qū)動(dòng)?,F(xiàn)在大家直接把 .env 中的 QUEUE_CONNECTION 改成 redis 就好了。至于更復(fù)雜的 RabbitMQ 和 Kafka 之類的隊(duì)列系統(tǒng),在 Laravel 框架中并沒(méi)有給出直接的集成應(yīng)用方案,這些還是建議大家找找其它的 Composer 包吧。

使用隊(duì)列

在默認(rèn)情況下,我們所使用的 任務(wù)類 都被存放在了 app/Jobs 目錄中。沒(méi)錯(cuò),在 Laravel 中,隊(duì)列被表示為一個(gè)一個(gè)的任務(wù)。我們可以使用下面這個(gè)命令來(lái)創(chuàng)建一個(gè)任務(wù)類。如果你的目錄中沒(méi)有 Jobs 目錄也沒(méi)關(guān)系,命令行會(huì)自動(dòng)創(chuàng)建這個(gè)目錄。

php artisan make:job Test

生成后的任務(wù)類有一個(gè)構(gòu)造函數(shù),還有一個(gè) handle() 方法。相信大家對(duì)這種類已經(jīng)不陌生了,handle() 方法肯定是用來(lái)處理隊(duì)列任務(wù)的。那么我們就給 handle() 方法中增加一些內(nèi)容。

// app/Jobs/Test.php
// ………………
public function handle()
{
    //
    echo date("Y-m-d H:i:s");
    sleep(10);
}
// ………………

打印一下日期,然后再睡個(gè) 10 秒鐘,這樣一會(huì)我們測(cè)試的時(shí)候可以看得更清楚。

接下來(lái)我們定義一個(gè)路由,并且實(shí)現(xiàn)隊(duì)列的分發(fā)。

Route::get('queue/test1'function(){
    \App\Jobs\Test::dispatch();
    \App\Jobs\Test::dispatch();
    \App\Jobs\Test::dispatch();

    dispatch(function(){
        echo 'callback queue';
        sleep(10);
    });
    dispatch(function(){
        echo 'callback queue';
        sleep(10);
    });
});

在這個(gè)測(cè)試路由中,我們將 Test 任務(wù)分發(fā)了三次。分發(fā)?沒(méi)錯(cuò),相信你又發(fā)現(xiàn)了一個(gè)問(wèn)題,這是不是和事件有關(guān)???是的,Laravel 中的隊(duì)列也是以事件的形式實(shí)現(xiàn)的。另外我們還分發(fā)了兩條回調(diào)函數(shù)形式的隊(duì)列任務(wù),也就是說(shuō),隊(duì)列任務(wù)是支持兩種形式的,要么我們定義的 Jobs 任務(wù)類,要么就是回調(diào)函數(shù)形式的任務(wù)。

好了,訪問(wèn)這個(gè)路由,貌似沒(méi)什么效果,但你可以在 redis 中看到一條 laravel_database_queues:default 數(shù)據(jù)。type laravel_database_queues:default 可以看到它是一個(gè) list 類型的數(shù)據(jù)。我們直接 LPOP 彈一條出來(lái)看看。

"{\"uuid\":\"568c9f5f-062f-4f16-b0df-a9711406332d\",\"displayName\":\"App\\\\Jobs\\\\Test\",\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"maxTries\":null,\"maxExceptions\":null,\"backoff\":null,\"timeout\":null,\"retryUntil\":null,\"data\":{\"commandName\":\"App\\\\Jobs\\\\Test\",\"command\":\"O:13:\\\"App\\\\Jobs\\\\Test\\\":10:{s:3:\\\"job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:15:\\\"chainConnection\\\";N;s:10:\\\"chainQueue\\\";N;s:19:\\\"chainCatchCallbacks\\\";N;s:5:\\\"delay\\\";N;s:11:\\\"afterCommit\\\";N;s:10:\\\"middleware\\\";a:0:{}s:7:\\\"chained\\\";a:0:{}}\"},\"id\":\"xLG4ZUsds6uWibV3sxOXxY9ESwWFni8F\",\"attempts\":0}"

{
 "uuid""568c9f5f-062f-4f16-b0df-a9711406332d",
 "displayName""App\\Jobs\\Test",
 "job""Illuminate\\Queue\\CallQueuedHandler@call",
 "maxTries"null,
 "maxExceptions"null,
 "backoff"null,
 "timeout"null,
 "retryUntil"null,
 "data": {
  "commandName""App\\Jobs\\Test",
  "command""O:13:\"App\\Jobs\\Test\":10:{s:3:\"job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:19:\"chainCatchCallbacks\";N;s:5:\"delay\";N;s:11:\"afterCommit\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}"
 },
 "id""xLG4ZUsds6uWibV3sxOXxY9ESwWFni8F",
 "attempts"0
}

將它格式化之后就看得比較清楚了。在這里面我們看到了 App\Jobs\Test 的存在,也看到了許多其它參數(shù)。很明顯,從這里我們可以猜測(cè)出來(lái) Laravel 也是通過(guò) POP 一條隊(duì)列數(shù)據(jù),然后再去根據(jù)這個(gè) json 內(nèi)容實(shí)例化 Test 對(duì)象并執(zhí)行里面的 handle() 方法來(lái)實(shí)現(xiàn)隊(duì)列的處理。后面我們?cè)诜治鲈创a的時(shí)候再深入地看一看是不是這樣。

隊(duì)列插入是成功了,redis 中也有了數(shù)據(jù)了,接下來(lái)要怎么執(zhí)行隊(duì)列里面的內(nèi)容呢?也就是異步地去執(zhí)行隊(duì)列操作。相信不少同學(xué)已經(jīng)想到了,肯定得有一個(gè)命令行在后端持續(xù)運(yùn)行嘛。

隊(duì)列處理

對(duì)于隊(duì)列的處理,我們有兩個(gè)命令可以使用,這兩個(gè)命令都會(huì)掛起一個(gè)監(jiān)聽(tīng),也就是監(jiān)聽(tīng)隊(duì)列內(nèi)是否有內(nèi)容,如果有的話就 pop 出來(lái)進(jìn)行處理。

php artisan queue:work

php artisan queue:listen

那么他們兩個(gè)有什么區(qū)別呢?work 是工作的意思,也就是讓隊(duì)列開(kāi)始工作,它比較適合線上使用,效率更高。另外如果修改 job 類或者修改代碼,它也是需要先手動(dòng)停止然后再次啟動(dòng)才能看到效果的。

而對(duì)于我們現(xiàn)在的測(cè)試來(lái)說(shuō),使用 listen 更好一些,它是監(jiān)聽(tīng)的意思。這種運(yùn)行方式的效率差一些,但可以實(shí)時(shí)監(jiān)聽(tīng) job 任務(wù)類的變化。

現(xiàn)在你可以隨便運(yùn)行這兩個(gè)命令中的任何一個(gè),前面我們?cè)诼酚芍刑砑拥疥?duì)列中應(yīng)該有 5 條隊(duì)列任務(wù),但是我們?cè)诓榭?redis 的時(shí)候手動(dòng) lpop 出來(lái)的一條,那么現(xiàn)在應(yīng)該是輸出四條任務(wù),就像下面一樣:

[2021-11-18 08:49:56][YluIVplfkScY3lqFhCr5bICzzlX0Yx6H] Processing: App\Jobs\Test
2021-11-18 08:49:56
[2021-11-18 08:50:06][YluIVplfkScY3lqFhCr5bICzzlX0Yx6H] Processed:  App\Jobs\Test

[2021-11-18 08:50:07][yUZIA8g0xoJGLZd3EML3PTmuWr3VlVgs] Processing: App\Jobs\Test
2021-11-18 08:50:07
[2021-11-18 08:50:17][yUZIA8g0xoJGLZd3EML3PTmuWr3VlVgs] Processed:  App\Jobs\Test

[2021-11-18 08:50:17][q1eG5yZfRBLRHJArn9z43jgX9iYe7KNH] Processing: Closure (web.php:834)
callback queue[2021-11-18 08:50:27][q1eG5yZfRBLRHJArn9z43jgX9iYe7KNH] Processed:  Closure (web.php:834)

[2021-11-18 08:50:28][j7SPjzF0WNxGKsA32icXovT433BWDHCJ] Processing: Closure (web.php:838)
callback queue[2021-11-18 08:50:38][j7SPjzF0WNxGKsA32icXovT433BWDHCJ] Processed:  Closure (web.php:838)

可能數(shù)據(jù)會(huì)比較亂,但是應(yīng)該也能清晰地看出我們輸出的結(jié)果是四條隊(duì)列信息中的輸出內(nèi)容。前兩個(gè)是我們?cè)?app/Jobs/Test 類中輸出的時(shí)間信息,后兩條是打印的回調(diào)函數(shù)輸出的 callback queue 內(nèi)容。

隊(duì)列參數(shù)

隊(duì)列的定義、分發(fā)、執(zhí)行都沒(méi)問(wèn)題了,但是小伙伴們肯定要問(wèn)了,光這樣不行呀,不管是發(fā)送短信、郵件還是處理訂單信息,我們肯定是要傳值給任務(wù)處理對(duì)象的嘛,就像是手機(jī)號(hào)、郵箱地址或者訂單號(hào)之類的,這個(gè)要怎么傳給任務(wù)對(duì)象或者回調(diào)函數(shù)呢?

class Test2 implements ShouldQueue
{
    use DispatchableInteractsWithQueueQueueableSerializesModels;

    private $obj;

    /**
     * Create a new job instance.
     *
     * @return void
     */

    public function __construct($obj)
    
{
        //
        $this->obj = $obj;
    }

    /**
     * Execute the job.
     *
     * @return void
     */

    public function handle()
    
{
        //
        echo date("Y-m-d H:i:s");
        print_r($this->obj);
        sleep(10);
    }
}

其實(shí)非常簡(jiǎn)單,就像上面的這個(gè)新定義的 Test2 類一樣,我們直接在構(gòu)造函數(shù)中接參就可以了。

Route::get('queue/test2'function(){
    $obj = new stdClass();
    $obj->a = 111;
    \App\Jobs\Test2::dispatch($obj);
    \App\Jobs\Test2::dispatch($obj);
    \App\Jobs\Test2::dispatch($obj);

    dispatch(function() use ($obj){
        echo 'callback queue';
        print_r($obj);
        sleep(10);
    });
    dispatch(function() use($obj){
        echo 'callback queue';
        print_r($obj);
        sleep(10);
    });
});

在路由分發(fā)的時(shí)候,直接就把參數(shù)放到 Test2::dispatch() 方法的參數(shù)中就可以了?;卣{(diào)函數(shù)形式的則直接使用 use 判斷字將參數(shù)傳遞進(jìn)去就可以了。Test2::dispatch() 方法的實(shí)現(xiàn)其實(shí)就是實(shí)例化自己并將所有接收到的參數(shù)傳給自己的構(gòu)造函數(shù)。

// vendor/laravel/framework/src/Illuminate/Foundation/Bus/Dispatchable.php
public static function dispatch()
{
    return new PendingDispatch(new static(...func_get_args()));
}

這個(gè) trait 是我們生成的任務(wù)類都會(huì)調(diào)用的,它是靜態(tài)的方法,所以在這個(gè)方法中,使用的是 new static() ,也就是實(shí)例化當(dāng)前這個(gè) Test2 類自己,將這個(gè)實(shí)例化對(duì)象再當(dāng)做參數(shù)傳遞給新 new 出來(lái)的 PendingDispatch() 對(duì)象。

任務(wù)鏈

任務(wù)鏈?zhǔn)莻€(gè)什么鬼?它其實(shí)就是讓你能夠指定一級(jí)在主任務(wù)成功執(zhí)行后按順序運(yùn)行的排隊(duì)任務(wù)。也就是說(shuō),它是一個(gè)大隊(duì)列任務(wù),然后在這個(gè)隊(duì)列任務(wù)中我們可以再指定一系列小的隊(duì)列任務(wù),讓他們?cè)谶@個(gè)大任務(wù)中有序執(zhí)行。

\Illuminate\Support\Facades\Bus::chain([
    function(){
        echo 'first';
    },
    new \App\Jobs\Test(),
    function(){
        echo 'third';
    }
])->dispatch();

執(zhí)行這段任務(wù)鏈接之后,輸出的結(jié)果是 first Test third 這樣的效果。如果中間有任務(wù)出現(xiàn)問(wèn)題了,那么我們可以通過(guò) catch() 來(lái)捕獲異常。

\Illuminate\Support\Facades\Bus::chain([
    function(){
        echo 'first';
        throw new \Exception("第一個(gè)就錯(cuò)了");
    },
    new \App\Jobs\Test(),
    function(){
        echo 'third';
    }
])->catch(function(Throwable $e){
    echo "Error:", $e->getMessage();
})->dispatch();

執(zhí)行分析

對(duì)于隊(duì)列的執(zhí)行分析,我們要從兩個(gè)方向上來(lái)看,一個(gè)是分發(fā)也就是入隊(duì),另一個(gè)是腳本 queue:work 或者 queue:listen ,也就是出隊(duì)。

分發(fā)入隊(duì)

前面我們已經(jīng)看到了,在執(zhí)行 dispatch() 方法時(shí)會(huì) new 一個(gè) PendingDispatch() 對(duì)象,然后將 Test 這種 Job 對(duì)象當(dāng)做參數(shù)放到它的 job 屬性中。通過(guò)構(gòu)造函數(shù)賦值完 job 之后,直接會(huì)進(jìn)入它的 __destruct() 析構(gòu)函數(shù)。

public function __destruct()
{
    if (! $this->shouldDispatch()) {
        return;
    } elseif ($this->afterResponse) {
        app(Dispatcher::class)->dispatchAfterResponse($this->job);
    } else {
        app(Dispatcher::class)->dispatch($this->job);
    }
}

在這個(gè)函數(shù)中會(huì)進(jìn)行判斷,如果我們沒(méi)有別的操作,那么它會(huì)進(jìn)入到 app(Dispatcher::class)->dispatch($this->job); 中。

public function dispatch($command)
{
    return $this->queueResolver && $this->commandShouldBeQueued($command)
                    ? $this->dispatchToQueue($command)
                    : $this->dispatchNow($command);
}

接著在判斷完成后進(jìn)入到 dispatchToQueue() 。

public function dispatchToQueue($command)
{
    $connection = $command->connection ?? null;

    $queue = call_user_func($this->queueResolver, $connection);

    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }

    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }

    return $this->pushCommandToQueue($queue, $command);
}

這里會(huì)組織我們的隊(duì)列連接內(nèi)容,也就是使用哪種隊(duì)列驅(qū)動(dòng)。如果你設(shè)置了斷點(diǎn)調(diào)試的話,最后傳遞到 pushCommandToQueue() 的 $queue 屬性實(shí)際上已經(jīng)是一個(gè) Illuminate\Queue\RedisQueue 對(duì)象。

protected function pushCommandToQueue($queue, $command)
{
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }

    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }

    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }

    return $queue->push($command);
}

pushCommandToQueue() 方法繼續(xù)組織數(shù)據(jù),最后將命令 push 到我們指定的 redis 隊(duì)列中。這個(gè) $command 又是什么呢?其實(shí)就是我們的 Test 對(duì)象。它最終會(huì)和其它一些參數(shù)組成一個(gè) payload 并進(jìn)行 json_encode() 之后保存在 redis 中。

腳本出隊(duì)執(zhí)行

接下來(lái)的腳本出隊(duì)操作就是在命令行了,我們需要找到 queue:work 的位置,這個(gè)也比較好找,它就在 vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php 。直接看它的 handle() 方法。

public function handle()
{
    if ($this->downForMaintenance() && $this->option('once')) {
        return $this->worker->sleep($this->option('sleep'));
    }

    $this->listenForEvents();

    $connection = $this->argument('connection')
                    ?: $this->laravel['config']['queue.default'];


    $queue = $this->getQueue($connection);

    return $this->runWorker(
        $connection, $queue
    );
}

從這段代碼中,我們可以看出,最后返回的 runWorker() 肯定是在運(yùn)行工作腳本,而前面的 $connection 就是獲取使用的隊(duì)列連接配置,它返回的是 redis ,而 $queue 則是隊(duì)列的名稱配置,也就是在 redis 中的 list 名稱的定義,這里返回的是 default 默認(rèn)的隊(duì)列名稱。

protected function runWorker($connection, $queue)
{
    return $this->worker->setName($this->option('name'))
                    ->setCache($this->cache)
                    ->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}

在 runWorker() 方法中,我們使用的是當(dāng)前命令類中的 worker 屬性,它是在構(gòu)造函數(shù)中通過(guò)服務(wù)容器依賴注入進(jìn)來(lái)的一個(gè) vendor/laravel/framework/src/Illuminate/Queue/Worker.php 對(duì)象。我們的 work 默認(rèn)走的是 daemon 模式,所以會(huì)進(jìn)入 Worker 的 daemon() 方法。

public function daemon($connectionName, $queue, WorkerOptions $options)
{
    if ($this->supportsAsyncSignals()) {
        $this->listenForSignals();
    }

    $lastRestart = $this->getTimestampOfLastQueueRestart();

    [$startTime, $jobsProcessed] = [hrtime(true) / 1e90];

    while (true) {
        if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
            $status = $this->pauseWorker($options, $lastRestart);

            if (! is_null($status)) {
                return $this->stop($status);
            }

            continue;
        }

        $job = $this->getNextJob(
            $this->manager->connection($connectionName), $queue
        );

        if ($this->supportsAsyncSignals()) {
            $this->registerTimeoutHandler($job, $options);
        }

        if ($job) {
            $jobsProcessed++;

            $this->runJob($job, $connectionName, $options);

            if ($options->rest > 0) {
                $this->sleep($options->rest);
            }
        } else {
            $this->sleep($options->sleep);
        }

        if ($this->supportsAsyncSignals()) {
            $this->resetTimeoutHandler();
        }

        $status = $this->stopIfNecessary(
            $options, $lastRestart, $startTime, $jobsProcessed, $job
        );

        if (! is_null($status)) {
            return $this->stop($status);
        }
    }
}

在這個(gè)方法的 getNextJob() 中,$this->manager->connection() 是通過(guò) vendor/laravel/framework/src/Illuminate/Queue/QueueManager.php 的 connection() 方法獲得一個(gè)驅(qū)動(dòng)實(shí)例,如果你在斷點(diǎn)調(diào)試的話,它返回的就是一個(gè) redis 連接實(shí)例 vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php ,然后在 getNextJob() 方法中會(huì)使用 RedisJob 對(duì)象的 pop() 方法彈出隊(duì)列中的一條信息并封裝成 vendor/laravel/framework/src/Illuminate/Queue/Jobs/RedisJob.php 對(duì)象。

接下來(lái)就是使用 runJob() 方法來(lái)執(zhí)行這個(gè) RedisJob 對(duì)象中的內(nèi)容。runJob() 方法繼續(xù)向下調(diào)用 process() 方法,在這個(gè)方法中會(huì)執(zhí)行 $job->fire() 方法,這個(gè)方法是 RedisJob 繼承的 vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php 。

public function fire()
{
    $payload = $this->payload();

    [$class, $method] = JobName::parse($payload['job']);

    ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}

payload() 方法用于將隊(duì)列中的 job 內(nèi)容取出來(lái),其實(shí)也就將我們上面保存在 redis 中的 json 字符串轉(zhuǎn)換為數(shù)組。然后再取出這個(gè)數(shù)組信息中的 job 字段的內(nèi)容。在我這里,它顯示的是一個(gè) Illuminate\Queue\CallQueuedHandler@call 信息,其實(shí)也就是框架默認(rèn)將使用這樣一個(gè)回調(diào)類來(lái)處理我們的隊(duì)列對(duì)象信息。而 data 中的信息則是我們的任務(wù)類 App\Jobs\Test 。那么我們就再進(jìn)入到 CallQueuedHandler 的 call() 方法中看一下。

public function call(Job $job, array $data)
{
    try {
        $command = $this->setJobInstanceIfNecessary(
            $job, $this->getCommand($data)
        );
    } catch (ModelNotFoundException $e) {
        return $this->handleModelNotFound($job, $e);
    }

    if ($command instanceof ShouldBeUniqueUntilProcessing) {
        $this->ensureUniqueJobLockIsReleased($command);
    }

    $this->dispatchThroughMiddleware($job, $command);

    if (! $job->isReleased() && ! $command instanceof ShouldBeUniqueUntilProcessing) {
        $this->ensureUniqueJobLockIsReleased($command);
    }

    if (! $job->hasFailed() && ! $job->isReleased()) {
        $this->ensureNextJobInChainIsDispatched($command);
        $this->ensureSuccessfulBatchJobIsRecorded($command);
    }

    if (! $job->isDeletedOrReleased()) {
        $job->delete();
    }
}

在這個(gè) call() 方法中,首先獲得 $command 對(duì)象,它就是通過(guò)容器實(shí)例化之后的 App\Jobs\Test 對(duì)象。然后轉(zhuǎn)入 dispatchThroughMiddleware() 方法中。

protected function dispatchThroughMiddleware(Job $job, $command)
{
    return (new Pipeline($this->container))->send($command)
            ->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
            ->then(function ($command) use ($job) {
                return $this->dispatcher->dispatchNow(
                    $command, $this->resolveHandler($job, $command)
                );
            });
}

dispatchThroughMiddleware() 方法會(huì)再封裝成一個(gè) 管道 繼續(xù)向下執(zhí)行到 then() 方法里面的 dispatchNow() 方法。這個(gè)方法是實(shí)例 vendor/laravel/framework/src/Illuminate/Bus/Dispatcher.php 中的方法,

public function dispatchNow($command, $handler = null)
{
    $uses = class_uses_recursive($command);

    if (in_array(InteractsWithQueue::class, $uses) &&
        in_array(Queueable::class, $uses) &&
        ! $command->job) {
        $command->setJob(new SyncJob($this->container, json_encode([]), 'sync''sync'));
    }

    if ($handler || $handler = $this->getCommandHandler($command)) {
        $callback = function ($command) use ($handler) {
            $method = method_exists($handler, 'handle') ? 'handle' : '__invoke';

            return $handler->{$method}($command);
        };
    } else {
        $callback = function ($command) {
            $method = method_exists($command, 'handle') ? 'handle' : '__invoke';

            return $this->container->call([$command, $method]);
        };
    }

    return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}

可以看到在這個(gè)方法中,通過(guò)不同的判斷分別生成了兩個(gè)回調(diào)方法,它們的內(nèi)容略有不同,但都可以看到 handle 和 __invoke 的影子,而且他們是調(diào)用 $command 也就是我們之前已經(jīng)實(shí)例化的 App\Jobs\Test 里面對(duì)應(yīng)的 handle() 或 __invoke() 方法了。這下相信大家就比較清楚了。最后還是返回了一個(gè) 管道 操作,不過(guò)管道操作的最終都會(huì)進(jìn)入到 then() 方法,其實(shí)也就是調(diào)用了 $callback 里面的內(nèi)容。之后就是進(jìn)入到我們定義的 App\Jobs\Test 對(duì)象的 handle() 方法中執(zhí)行我們寫(xiě)好的隊(duì)列處理操作了。

看出來(lái)了吧,整個(gè)隊(duì)列的調(diào)用執(zhí)行過(guò)程非常長(zhǎng),也非常復(fù)雜。這里我們也只是將最核心的步驟摘取了出來(lái)。在這其中,我們見(jiàn)到了事件分發(fā)使用,也見(jiàn)到了管道操作的使用,至于服務(wù)容器更是不用多說(shuō)了。從這里也可以看出,隊(duì)列系統(tǒng)就是建立在之前我們已經(jīng)學(xué)習(xí)過(guò)的這些內(nèi)容的基礎(chǔ)上實(shí)現(xiàn)的。更多具體的內(nèi)容大家可以再繼續(xù)深入的自行調(diào)試,配置好斷點(diǎn),學(xué)會(huì)斷點(diǎn)調(diào)試真的非常重要哦。

總結(jié)

看似一個(gè)小小的隊(duì)列系統(tǒng),內(nèi)部實(shí)現(xiàn)并沒(méi)有我們想像中的簡(jiǎn)單吧。當(dāng)然,如果只是使用的話這套隊(duì)列系統(tǒng)還是非常簡(jiǎn)單方便的。如果不想那么復(fù)雜,其實(shí)你自己去使用 redis 的 lpop 、lpush 之類的功能也是沒(méi)問(wèn)題的。還是那句話,具體業(yè)務(wù)具體分析。

另外,整個(gè)隊(duì)列系統(tǒng)還有很多其它的功能,比如說(shuō)任務(wù)中間件、延遲分發(fā)、任務(wù)批處理、優(yōu)先隊(duì)列、Supervisor?;钆渲玫龋蠹铱梢岳^續(xù)根據(jù)官方文檔進(jìn)行深入的學(xué)習(xí)哦!

參考文檔:

https:///docs/laravel/8.5/queues/10395

    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多