隊(duì)列系統(tǒng)
隊(duì)列相關(guān)的應(yīng)用對(duì)于現(xiàn)在的系統(tǒng)開(kāi)發(fā)來(lái)說(shuō)非常常見(jiàn),不管你是發(fā)消息還是應(yīng)對(duì)大流量,隊(duì)列都是一個(gè)非常常用而且非常好用的解決方案。我們自己寫(xiě)隊(duì)列去實(shí)現(xiàn)很多功能其實(shí)已經(jīng)非常方便了,不過(guò) Laravel 也為我們準(zhǔn)備好了一套現(xiàn)成的隊(duì)列系統(tǒng),直接配置一下就能夠方便地使用了。今天,我們就來(lái)學(xué)習(xí)了解一下 Laravel 中隊(duì)列系統(tǒng)相關(guān)的內(nèi)容。
配置
隊(duì)列的配置非常簡(jiǎn)單,在 config 目錄下就有一個(gè)名為 queue.php 的文件,這個(gè)文件就是隊(duì)列的配置文件。
'default' => env('QUEUE_CONNECTION', 'sync'),
第一行的這個(gè) default 就是一個(gè)默認(rèn)隊(duì)列系統(tǒng)的連接配置,在默認(rèn)情況下,它使用的是 sync 。意思就是同步的,也就是說(shuō),只要調(diào)用了隊(duì)列分發(fā),馬上就執(zhí)行隊(duì)列的內(nèi)容。顯然,這個(gè)和普通的順序編寫(xiě)代碼沒(méi)什么區(qū)別,它也不是我們的重點(diǎn)。我們可以通過(guò)修改 .env 配置文件中的 QUEUE_CONNECTION 來(lái)修改默認(rèn)的連接配置,它所能接受的值就是這個(gè)配置文件中下方 connections 中的內(nèi)容。
'connections' => [
// ………………
// ………………
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
'after_commit' => false,
],
],
在這些連接配置中,我們可以看到 database、beanstalkd、sqs、redis 等相關(guān)隊(duì)列系統(tǒng)的配置。database 其實(shí)就是使用數(shù)據(jù)庫(kù)來(lái)作為隊(duì)列系統(tǒng)。相應(yīng)的你需要建立對(duì)應(yīng)的數(shù)據(jù)表,不過(guò)數(shù)據(jù)庫(kù)當(dāng)隊(duì)列的效率很一般。另外 beanstalkd 和 sqs 大家可能也接觸得不多,所以我們主要還是使用 redis 這個(gè)數(shù)據(jù)連接驅(qū)動(dòng)?,F(xiàn)在大家直接把 .env 中的 QUEUE_CONNECTION 改成 redis 就好了。至于更復(fù)雜的 RabbitMQ 和 Kafka 之類的隊(duì)列系統(tǒng),在 Laravel 框架中并沒(méi)有給出直接的集成應(yīng)用方案,這些還是建議大家找找其它的 Composer 包吧。
使用隊(duì)列
在默認(rèn)情況下,我們所使用的 任務(wù)類 都被存放在了 app/Jobs 目錄中。沒(méi)錯(cuò),在 Laravel 中,隊(duì)列被表示為一個(gè)一個(gè)的任務(wù)。我們可以使用下面這個(gè)命令來(lái)創(chuàng)建一個(gè)任務(wù)類。如果你的目錄中沒(méi)有 Jobs 目錄也沒(méi)關(guān)系,命令行會(huì)自動(dòng)創(chuàng)建這個(gè)目錄。
php artisan make:job Test
生成后的任務(wù)類有一個(gè)構(gòu)造函數(shù),還有一個(gè) handle() 方法。相信大家對(duì)這種類已經(jīng)不陌生了,handle() 方法肯定是用來(lái)處理隊(duì)列任務(wù)的。那么我們就給 handle() 方法中增加一些內(nèi)容。
// app/Jobs/Test.php
// ………………
public function handle()
{
//
echo date("Y-m-d H:i:s");
sleep(10);
}
// ………………
打印一下日期,然后再睡個(gè) 10 秒鐘,這樣一會(huì)我們測(cè)試的時(shí)候可以看得更清楚。
接下來(lái)我們定義一個(gè)路由,并且實(shí)現(xiàn)隊(duì)列的分發(fā)。
Route::get('queue/test1', function(){
\App\Jobs\Test::dispatch();
\App\Jobs\Test::dispatch();
\App\Jobs\Test::dispatch();
dispatch(function(){
echo 'callback queue';
sleep(10);
});
dispatch(function(){
echo 'callback queue';
sleep(10);
});
});
在這個(gè)測(cè)試路由中,我們將 Test 任務(wù)分發(fā)了三次。分發(fā)?沒(méi)錯(cuò),相信你又發(fā)現(xiàn)了一個(gè)問(wèn)題,這是不是和事件有關(guān)???是的,Laravel 中的隊(duì)列也是以事件的形式實(shí)現(xiàn)的。另外我們還分發(fā)了兩條回調(diào)函數(shù)形式的隊(duì)列任務(wù),也就是說(shuō),隊(duì)列任務(wù)是支持兩種形式的,要么我們定義的 Jobs 任務(wù)類,要么就是回調(diào)函數(shù)形式的任務(wù)。
好了,訪問(wèn)這個(gè)路由,貌似沒(méi)什么效果,但你可以在 redis 中看到一條 laravel_database_queues:default 數(shù)據(jù)。type laravel_database_queues:default 可以看到它是一個(gè) list 類型的數(shù)據(jù)。我們直接 LPOP 彈一條出來(lái)看看。
"{\"uuid\":\"568c9f5f-062f-4f16-b0df-a9711406332d\",\"displayName\":\"App\\\\Jobs\\\\Test\",\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"maxTries\":null,\"maxExceptions\":null,\"backoff\":null,\"timeout\":null,\"retryUntil\":null,\"data\":{\"commandName\":\"App\\\\Jobs\\\\Test\",\"command\":\"O:13:\\\"App\\\\Jobs\\\\Test\\\":10:{s:3:\\\"job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:15:\\\"chainConnection\\\";N;s:10:\\\"chainQueue\\\";N;s:19:\\\"chainCatchCallbacks\\\";N;s:5:\\\"delay\\\";N;s:11:\\\"afterCommit\\\";N;s:10:\\\"middleware\\\";a:0:{}s:7:\\\"chained\\\";a:0:{}}\"},\"id\":\"xLG4ZUsds6uWibV3sxOXxY9ESwWFni8F\",\"attempts\":0}"
{
"uuid": "568c9f5f-062f-4f16-b0df-a9711406332d",
"displayName": "App\\Jobs\\Test",
"job": "Illuminate\\Queue\\CallQueuedHandler@call",
"maxTries": null,
"maxExceptions": null,
"backoff": null,
"timeout": null,
"retryUntil": null,
"data": {
"commandName": "App\\Jobs\\Test",
"command": "O:13:\"App\\Jobs\\Test\":10:{s:3:\"job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:19:\"chainCatchCallbacks\";N;s:5:\"delay\";N;s:11:\"afterCommit\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}"
},
"id": "xLG4ZUsds6uWibV3sxOXxY9ESwWFni8F",
"attempts": 0
}
將它格式化之后就看得比較清楚了。在這里面我們看到了 App\Jobs\Test 的存在,也看到了許多其它參數(shù)。很明顯,從這里我們可以猜測(cè)出來(lái) Laravel 也是通過(guò) POP 一條隊(duì)列數(shù)據(jù),然后再去根據(jù)這個(gè) json 內(nèi)容實(shí)例化 Test 對(duì)象并執(zhí)行里面的 handle() 方法來(lái)實(shí)現(xiàn)隊(duì)列的處理。后面我們?cè)诜治鲈创a的時(shí)候再深入地看一看是不是這樣。
隊(duì)列插入是成功了,redis 中也有了數(shù)據(jù)了,接下來(lái)要怎么執(zhí)行隊(duì)列里面的內(nèi)容呢?也就是異步地去執(zhí)行隊(duì)列操作。相信不少同學(xué)已經(jīng)想到了,肯定得有一個(gè)命令行在后端持續(xù)運(yùn)行嘛。
隊(duì)列處理
對(duì)于隊(duì)列的處理,我們有兩個(gè)命令可以使用,這兩個(gè)命令都會(huì)掛起一個(gè)監(jiān)聽(tīng),也就是監(jiān)聽(tīng)隊(duì)列內(nèi)是否有內(nèi)容,如果有的話就 pop 出來(lái)進(jìn)行處理。
php artisan queue:work
php artisan queue:listen
那么他們兩個(gè)有什么區(qū)別呢?work 是工作的意思,也就是讓隊(duì)列開(kāi)始工作,它比較適合線上使用,效率更高。另外如果修改 job 類或者修改代碼,它也是需要先手動(dòng)停止然后再次啟動(dòng)才能看到效果的。
而對(duì)于我們現(xiàn)在的測(cè)試來(lái)說(shuō),使用 listen 更好一些,它是監(jiān)聽(tīng)的意思。這種運(yùn)行方式的效率差一些,但可以實(shí)時(shí)監(jiān)聽(tīng) job 任務(wù)類的變化。
現(xiàn)在你可以隨便運(yùn)行這兩個(gè)命令中的任何一個(gè),前面我們?cè)诼酚芍刑砑拥疥?duì)列中應(yīng)該有 5 條隊(duì)列任務(wù),但是我們?cè)诓榭?redis 的時(shí)候手動(dòng) lpop 出來(lái)的一條,那么現(xiàn)在應(yīng)該是輸出四條任務(wù),就像下面一樣:
[2021-11-18 08:49:56][YluIVplfkScY3lqFhCr5bICzzlX0Yx6H] Processing: App\Jobs\Test
2021-11-18 08:49:56
[2021-11-18 08:50:06][YluIVplfkScY3lqFhCr5bICzzlX0Yx6H] Processed: App\Jobs\Test
[2021-11-18 08:50:07][yUZIA8g0xoJGLZd3EML3PTmuWr3VlVgs] Processing: App\Jobs\Test
2021-11-18 08:50:07
[2021-11-18 08:50:17][yUZIA8g0xoJGLZd3EML3PTmuWr3VlVgs] Processed: App\Jobs\Test
[2021-11-18 08:50:17][q1eG5yZfRBLRHJArn9z43jgX9iYe7KNH] Processing: Closure (web.php:834)
callback queue[2021-11-18 08:50:27][q1eG5yZfRBLRHJArn9z43jgX9iYe7KNH] Processed: Closure (web.php:834)
[2021-11-18 08:50:28][j7SPjzF0WNxGKsA32icXovT433BWDHCJ] Processing: Closure (web.php:838)
callback queue[2021-11-18 08:50:38][j7SPjzF0WNxGKsA32icXovT433BWDHCJ] Processed: Closure (web.php:838)
可能數(shù)據(jù)會(huì)比較亂,但是應(yīng)該也能清晰地看出我們輸出的結(jié)果是四條隊(duì)列信息中的輸出內(nèi)容。前兩個(gè)是我們?cè)?app/Jobs/Test 類中輸出的時(shí)間信息,后兩條是打印的回調(diào)函數(shù)輸出的 callback queue 內(nèi)容。
隊(duì)列參數(shù)
隊(duì)列的定義、分發(fā)、執(zhí)行都沒(méi)問(wèn)題了,但是小伙伴們肯定要問(wèn)了,光這樣不行呀,不管是發(fā)送短信、郵件還是處理訂單信息,我們肯定是要傳值給任務(wù)處理對(duì)象的嘛,就像是手機(jī)號(hào)、郵箱地址或者訂單號(hào)之類的,這個(gè)要怎么傳給任務(wù)對(duì)象或者回調(diào)函數(shù)呢?
class Test2 implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
private $obj;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($obj)
{
//
$this->obj = $obj;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
//
echo date("Y-m-d H:i:s");
print_r($this->obj);
sleep(10);
}
}
其實(shí)非常簡(jiǎn)單,就像上面的這個(gè)新定義的 Test2 類一樣,我們直接在構(gòu)造函數(shù)中接參就可以了。
Route::get('queue/test2', function(){
$obj = new stdClass();
$obj->a = 111;
\App\Jobs\Test2::dispatch($obj);
\App\Jobs\Test2::dispatch($obj);
\App\Jobs\Test2::dispatch($obj);
dispatch(function() use ($obj){
echo 'callback queue';
print_r($obj);
sleep(10);
});
dispatch(function() use($obj){
echo 'callback queue';
print_r($obj);
sleep(10);
});
});
在路由分發(fā)的時(shí)候,直接就把參數(shù)放到 Test2::dispatch() 方法的參數(shù)中就可以了?;卣{(diào)函數(shù)形式的則直接使用 use 判斷字將參數(shù)傳遞進(jìn)去就可以了。Test2::dispatch() 方法的實(shí)現(xiàn)其實(shí)就是實(shí)例化自己并將所有接收到的參數(shù)傳給自己的構(gòu)造函數(shù)。
// vendor/laravel/framework/src/Illuminate/Foundation/Bus/Dispatchable.php
public static function dispatch()
{
return new PendingDispatch(new static(...func_get_args()));
}
這個(gè) trait 是我們生成的任務(wù)類都會(huì)調(diào)用的,它是靜態(tài)的方法,所以在這個(gè)方法中,使用的是 new static() ,也就是實(shí)例化當(dāng)前這個(gè) Test2 類自己,將這個(gè)實(shí)例化對(duì)象再當(dāng)做參數(shù)傳遞給新 new 出來(lái)的 PendingDispatch() 對(duì)象。
任務(wù)鏈
任務(wù)鏈?zhǔn)莻€(gè)什么鬼?它其實(shí)就是讓你能夠指定一級(jí)在主任務(wù)成功執(zhí)行后按順序運(yùn)行的排隊(duì)任務(wù)。也就是說(shuō),它是一個(gè)大隊(duì)列任務(wù),然后在這個(gè)隊(duì)列任務(wù)中我們可以再指定一系列小的隊(duì)列任務(wù),讓他們?cè)谶@個(gè)大任務(wù)中有序執(zhí)行。
\Illuminate\Support\Facades\Bus::chain([
function(){
echo 'first';
},
new \App\Jobs\Test(),
function(){
echo 'third';
}
])->dispatch();
執(zhí)行這段任務(wù)鏈接之后,輸出的結(jié)果是 first Test third 這樣的效果。如果中間有任務(wù)出現(xiàn)問(wèn)題了,那么我們可以通過(guò) catch() 來(lái)捕獲異常。
\Illuminate\Support\Facades\Bus::chain([
function(){
echo 'first';
throw new \Exception("第一個(gè)就錯(cuò)了");
},
new \App\Jobs\Test(),
function(){
echo 'third';
}
])->catch(function(Throwable $e){
echo "Error:", $e->getMessage();
})->dispatch();
執(zhí)行分析
對(duì)于隊(duì)列的執(zhí)行分析,我們要從兩個(gè)方向上來(lái)看,一個(gè)是分發(fā)也就是入隊(duì),另一個(gè)是腳本 queue:work 或者 queue:listen ,也就是出隊(duì)。
分發(fā)入隊(duì)
前面我們已經(jīng)看到了,在執(zhí)行 dispatch() 方法時(shí)會(huì) new 一個(gè) PendingDispatch() 對(duì)象,然后將 Test 這種 Job 對(duì)象當(dāng)做參數(shù)放到它的 job 屬性中。通過(guò)構(gòu)造函數(shù)賦值完 job 之后,直接會(huì)進(jìn)入它的 __destruct() 析構(gòu)函數(shù)。
public function __destruct()
{
if (! $this->shouldDispatch()) {
return;
} elseif ($this->afterResponse) {
app(Dispatcher::class)->dispatchAfterResponse($this->job);
} else {
app(Dispatcher::class)->dispatch($this->job);
}
}
在這個(gè)函數(shù)中會(huì)進(jìn)行判斷,如果我們沒(méi)有別的操作,那么它會(huì)進(jìn)入到 app(Dispatcher::class)->dispatch($this->job); 中。
public function dispatch($command)
{
return $this->queueResolver && $this->commandShouldBeQueued($command)
? $this->dispatchToQueue($command)
: $this->dispatchNow($command);
}
接著在判斷完成后進(jìn)入到 dispatchToQueue() 。
public function dispatchToQueue($command)
{
$connection = $command->connection ?? null;
$queue = call_user_func($this->queueResolver, $connection);
if (! $queue instanceof Queue) {
throw new RuntimeException('Queue resolver did not return a Queue implementation.');
}
if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
}
return $this->pushCommandToQueue($queue, $command);
}
這里會(huì)組織我們的隊(duì)列連接內(nèi)容,也就是使用哪種隊(duì)列驅(qū)動(dòng)。如果你設(shè)置了斷點(diǎn)調(diào)試的話,最后傳遞到 pushCommandToQueue() 的 $queue 屬性實(shí)際上已經(jīng)是一個(gè) Illuminate\Queue\RedisQueue 對(duì)象。
protected function pushCommandToQueue($queue, $command)
{
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}
if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}
if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}
return $queue->push($command);
}
pushCommandToQueue() 方法繼續(xù)組織數(shù)據(jù),最后將命令 push 到我們指定的 redis 隊(duì)列中。這個(gè) $command 又是什么呢?其實(shí)就是我們的 Test 對(duì)象。它最終會(huì)和其它一些參數(shù)組成一個(gè) payload 并進(jìn)行 json_encode() 之后保存在 redis 中。
腳本出隊(duì)執(zhí)行
接下來(lái)的腳本出隊(duì)操作就是在命令行了,我們需要找到 queue:work 的位置,這個(gè)也比較好找,它就在 vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php 。直接看它的 handle() 方法。
public function handle()
{
if ($this->downForMaintenance() && $this->option('once')) {
return $this->worker->sleep($this->option('sleep'));
}
$this->listenForEvents();
$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];
$queue = $this->getQueue($connection);
return $this->runWorker(
$connection, $queue
);
}
從這段代碼中,我們可以看出,最后返回的 runWorker() 肯定是在運(yùn)行工作腳本,而前面的 $connection 就是獲取使用的隊(duì)列連接配置,它返回的是 redis ,而 $queue 則是隊(duì)列的名稱配置,也就是在 redis 中的 list 名稱的定義,這里返回的是 default 默認(rèn)的隊(duì)列名稱。
protected function runWorker($connection, $queue)
{
return $this->worker->setName($this->option('name'))
->setCache($this->cache)
->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}
在 runWorker() 方法中,我們使用的是當(dāng)前命令類中的 worker 屬性,它是在構(gòu)造函數(shù)中通過(guò)服務(wù)容器依賴注入進(jìn)來(lái)的一個(gè) vendor/laravel/framework/src/Illuminate/Queue/Worker.php 對(duì)象。我們的 work 默認(rèn)走的是 daemon 模式,所以會(huì)進(jìn)入 Worker 的 daemon() 方法。
public function daemon($connectionName, $queue, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
}
$lastRestart = $this->getTimestampOfLastQueueRestart();
[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
while (true) {
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$status = $this->pauseWorker($options, $lastRestart);
if (! is_null($status)) {
return $this->stop($status);
}
continue;
}
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
if ($this->supportsAsyncSignals()) {
$this->registerTimeoutHandler($job, $options);
}
if ($job) {
$jobsProcessed++;
$this->runJob($job, $connectionName, $options);
if ($options->rest > 0) {
$this->sleep($options->rest);
}
} else {
$this->sleep($options->sleep);
}
if ($this->supportsAsyncSignals()) {
$this->resetTimeoutHandler();
}
$status = $this->stopIfNecessary(
$options, $lastRestart, $startTime, $jobsProcessed, $job
);
if (! is_null($status)) {
return $this->stop($status);
}
}
}
在這個(gè)方法的 getNextJob() 中,$this->manager->connection() 是通過(guò) vendor/laravel/framework/src/Illuminate/Queue/QueueManager.php 的 connection() 方法獲得一個(gè)驅(qū)動(dòng)實(shí)例,如果你在斷點(diǎn)調(diào)試的話,它返回的就是一個(gè) redis 連接實(shí)例 vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php ,然后在 getNextJob() 方法中會(huì)使用 RedisJob 對(duì)象的 pop() 方法彈出隊(duì)列中的一條信息并封裝成 vendor/laravel/framework/src/Illuminate/Queue/Jobs/RedisJob.php 對(duì)象。
接下來(lái)就是使用 runJob() 方法來(lái)執(zhí)行這個(gè) RedisJob 對(duì)象中的內(nèi)容。runJob() 方法繼續(xù)向下調(diào)用 process() 方法,在這個(gè)方法中會(huì)執(zhí)行 $job->fire() 方法,這個(gè)方法是 RedisJob 繼承的 vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php 。
public function fire()
{
$payload = $this->payload();
[$class, $method] = JobName::parse($payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}
payload() 方法用于將隊(duì)列中的 job 內(nèi)容取出來(lái),其實(shí)也就將我們上面保存在 redis 中的 json 字符串轉(zhuǎn)換為數(shù)組。然后再取出這個(gè)數(shù)組信息中的 job 字段的內(nèi)容。在我這里,它顯示的是一個(gè) Illuminate\Queue\CallQueuedHandler@call 信息,其實(shí)也就是框架默認(rèn)將使用這樣一個(gè)回調(diào)類來(lái)處理我們的隊(duì)列對(duì)象信息。而 data 中的信息則是我們的任務(wù)類 App\Jobs\Test 。那么我們就再進(jìn)入到 CallQueuedHandler 的 call() 方法中看一下。
public function call(Job $job, array $data)
{
try {
$command = $this->setJobInstanceIfNecessary(
$job, $this->getCommand($data)
);
} catch (ModelNotFoundException $e) {
return $this->handleModelNotFound($job, $e);
}
if ($command instanceof ShouldBeUniqueUntilProcessing) {
$this->ensureUniqueJobLockIsReleased($command);
}
$this->dispatchThroughMiddleware($job, $command);
if (! $job->isReleased() && ! $command instanceof ShouldBeUniqueUntilProcessing) {
$this->ensureUniqueJobLockIsReleased($command);
}
if (! $job->hasFailed() && ! $job->isReleased()) {
$this->ensureNextJobInChainIsDispatched($command);
$this->ensureSuccessfulBatchJobIsRecorded($command);
}
if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}
在這個(gè) call() 方法中,首先獲得 $command 對(duì)象,它就是通過(guò)容器實(shí)例化之后的 App\Jobs\Test 對(duì)象。然后轉(zhuǎn)入 dispatchThroughMiddleware() 方法中。
protected function dispatchThroughMiddleware(Job $job, $command)
{
return (new Pipeline($this->container))->send($command)
->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
->then(function ($command) use ($job) {
return $this->dispatcher->dispatchNow(
$command, $this->resolveHandler($job, $command)
);
});
}
dispatchThroughMiddleware() 方法會(huì)再封裝成一個(gè) 管道 繼續(xù)向下執(zhí)行到 then() 方法里面的 dispatchNow() 方法。這個(gè)方法是實(shí)例 vendor/laravel/framework/src/Illuminate/Bus/Dispatcher.php 中的方法,
public function dispatchNow($command, $handler = null)
{
$uses = class_uses_recursive($command);
if (in_array(InteractsWithQueue::class, $uses) &&
in_array(Queueable::class, $uses) &&
! $command->job) {
$command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
}
if ($handler || $handler = $this->getCommandHandler($command)) {
$callback = function ($command) use ($handler) {
$method = method_exists($handler, 'handle') ? 'handle' : '__invoke';
return $handler->{$method}($command);
};
} else {
$callback = function ($command) {
$method = method_exists($command, 'handle') ? 'handle' : '__invoke';
return $this->container->call([$command, $method]);
};
}
return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}
可以看到在這個(gè)方法中,通過(guò)不同的判斷分別生成了兩個(gè)回調(diào)方法,它們的內(nèi)容略有不同,但都可以看到 handle 和 __invoke 的影子,而且他們是調(diào)用 $command 也就是我們之前已經(jīng)實(shí)例化的 App\Jobs\Test 里面對(duì)應(yīng)的 handle() 或 __invoke() 方法了。這下相信大家就比較清楚了。最后還是返回了一個(gè) 管道 操作,不過(guò)管道操作的最終都會(huì)進(jìn)入到 then() 方法,其實(shí)也就是調(diào)用了 $callback 里面的內(nèi)容。之后就是進(jìn)入到我們定義的 App\Jobs\Test 對(duì)象的 handle() 方法中執(zhí)行我們寫(xiě)好的隊(duì)列處理操作了。
看出來(lái)了吧,整個(gè)隊(duì)列的調(diào)用執(zhí)行過(guò)程非常長(zhǎng),也非常復(fù)雜。這里我們也只是將最核心的步驟摘取了出來(lái)。在這其中,我們見(jiàn)到了事件分發(fā)使用,也見(jiàn)到了管道操作的使用,至于服務(wù)容器更是不用多說(shuō)了。從這里也可以看出,隊(duì)列系統(tǒng)就是建立在之前我們已經(jīng)學(xué)習(xí)過(guò)的這些內(nèi)容的基礎(chǔ)上實(shí)現(xiàn)的。更多具體的內(nèi)容大家可以再繼續(xù)深入的自行調(diào)試,配置好斷點(diǎn),學(xué)會(huì)斷點(diǎn)調(diào)試真的非常重要哦。
總結(jié)
看似一個(gè)小小的隊(duì)列系統(tǒng),內(nèi)部實(shí)現(xiàn)并沒(méi)有我們想像中的簡(jiǎn)單吧。當(dāng)然,如果只是使用的話這套隊(duì)列系統(tǒng)還是非常簡(jiǎn)單方便的。如果不想那么復(fù)雜,其實(shí)你自己去使用 redis 的 lpop 、lpush 之類的功能也是沒(méi)問(wèn)題的。還是那句話,具體業(yè)務(wù)具體分析。
另外,整個(gè)隊(duì)列系統(tǒng)還有很多其它的功能,比如說(shuō)任務(wù)中間件、延遲分發(fā)、任務(wù)批處理、優(yōu)先隊(duì)列、Supervisor?;钆渲玫龋蠹铱梢岳^續(xù)根據(jù)官方文檔進(jìn)行深入的學(xué)習(xí)哦!
參考文檔:
https:///docs/laravel/8.5/queues/10395