LaravelSupportCommand/src/QueueWorkCommand.php

85 lines
3.0 KiB
PHP

<?php
namespace LaravelSupportCommand;
use Illuminate\Console\Command;
class QueueWorkCommand extends Command
{
protected $signature = 'QueueWork {--workers=3 : Number of workers to run}';
protected $description = 'LaravelSupportCommand - Run multiple artisan queue:work processes, restart them if they exit.';
public function handle(): never
{
$workers = (int) $this->option('workers');
if (strtoupper(substr(PHP_OS, 0, 3)) !== 'WIN') {
$this->warn('Killing any existing queue:work processes...');
exec('pkill -f "artisan queue:work"');
}
$this->info("Starting {$workers} queue:work processes...");
$processes = [];
$pipesList = [];
for ($i = 1; $i <= $workers; $i++) {
$this->info("Launching worker {$i}...");
$descriptors = [
1 => ['pipe', 'w'], // stdout
2 => ['pipe', 'w'], // stderr
];
$process = proc_open('php artisan queue:work', $descriptors, $pipes);
if (is_resource($process)) {
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);
$processes[$i] = $process;
$pipesList[$i] = $pipes;
}
}
// Monitor loop
while (true) {
foreach ($processes as $i => $process) {
if (is_resource($process)) {
$status = proc_get_status($process);
// Read stdout
$out = stream_get_contents($pipesList[$i][1]);
if ($out !== '') {
foreach (explode("\n", trim($out)) as $line) {
if ($line !== '') {
$this->info("[Worker {$i}] " . $line);
}
}
}
// Read stderr
$err = stream_get_contents($pipesList[$i][2]);
if ($err !== '') {
foreach (explode("\n", trim($err)) as $line) {
if ($line !== '') {
$this->error("[Worker {$i}] " . $line);
}
}
}
// Restart if stopped
if (!$status['running']) {
$this->warn("Worker {$i} stopped. Restarting...");
proc_close($process);
$process = proc_open('php artisan queue:work', [
1 => ['pipe', 'w'],
2 => ['pipe', 'w'],
], $pipes);
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);
$processes[$i] = $process;
$pipesList[$i] = $pipes;
}
}
}
usleep(200000); // 0.2s pause to avoid busy loop
}
}
}