update command

This commit is contained in:
root 2025-11-09 06:30:49 +00:00
parent 56771c12fd
commit 5a02bfd830
1 changed files with 89 additions and 16 deletions

View File

@ -3,28 +3,88 @@
namespace LaravelSupportCommand;
use Illuminate\Console\Command;
use function proc_open;
use function proc_close;
use function proc_get_status;
use function is_resource;
use function stream_set_blocking;
use function stream_get_contents;
use function exec;
use function strtoupper;
use function substr;
use function usleep;
class QueueWorkCommand extends Command
{
protected $signature = 'QueueWork
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'queue:run-multiple
{connection? : The name of the queue connection to work}
{--workers=3 : Number of workers to run}
{--connection=database : The queue connection to use}
{--queue=default : The queue name to work on}';
{--queue=default : The names of the queues to work}
{--max-jobs=0 : The number of jobs to process before stopping}
{--max-time=0 : The maximum number of seconds the worker should run}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--rest=0 : Number of seconds to rest between jobs}
{--timeout=60 : The number of seconds a child process can run}
{--tries=1 : Number of times to attempt a job before logging it failed}
{--force : Force the worker to run even in maintenance mode}'; // Added common options and renamed command for clarity
/**
* The console command description.
*
* @var string
*/
protected $description = 'LaravelSupportCommand - Run multiple artisan queue:work processes, restart them if they exit.';
/**
* Execute the console command.
*
* @return never
*/
public function handle(): never
{
// Get arguments/options, using defaults where argument is optional
$workers = (int) $this->option('workers');
$connection = $this->option('connection');
// connection is now an optional argument, use 'database' as a typical fallback if not provided
$connection = $this->argument('connection') ?? $this->option('connection') ?? 'database';
$queue = $this->option('queue');
// Collect other relevant options to pass to queue:work
$workerOptions = collect($this->options())
->only([
'max-jobs',
'max-time',
'memory',
'sleep',
'rest',
'timeout',
'tries',
'force'
])
->map(function ($value, $key) {
// Format options for the command string
$option = "--{$key}";
// Only append value if it's not a boolean flag (like --force) or if the value is not null/empty
if ($value !== false && $value !== null) {
return $option . '=' . escapeshellarg((string) $value);
}
return $value === true ? $option : null;
})
->filter()
->implode(' ');
if (strtoupper(substr(PHP_OS, 0, 3)) !== 'WIN') {
$this->warn('Killing any existing queue:work processes...');
// Improved pkill to target only workers for the specific connection/queue if possible, but keeping simple for general safety
exec('pkill -f "artisan queue:work"');
}
$this->info("Starting {$workers} queue:work processes on connection '{$connection}' and queue '{$queue}'...");
$this->info("Starting {$workers} queue:work processes on connection '{$connection}' and queue '{$queue}' with options: '{$workerOptions}'...");
$processes = [];
$pipesList = [];
@ -35,13 +95,18 @@ class QueueWorkCommand extends Command
1 => ['pipe', 'w'], // stdout
2 => ['pipe', 'w'], // stderr
];
$cmd = "php artisan queue:work {$connection} --queue={$queue}";
// Construct the full command string
$cmd = "php artisan queue:work " . escapeshellarg($connection) . " --queue=" . escapeshellarg($queue) . " {$workerOptions}";
$process = proc_open($cmd, $descriptors, $pipes);
if (is_resource($process)) {
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);
$processes[$i] = $process;
$pipesList[$i] = $pipes;
} else {
$this->error("Failed to launch worker {$i}.");
}
}
@ -51,7 +116,7 @@ class QueueWorkCommand extends Command
if (is_resource($process)) {
$status = proc_get_status($process);
// Read stdout
// Read stdout and stderr
$out = stream_get_contents($pipesList[$i][1]);
if ($out !== '') {
foreach (explode("\n", trim($out)) as $line) {
@ -61,7 +126,6 @@ class QueueWorkCommand extends Command
}
}
// Read stderr
$err = stream_get_contents($pipesList[$i][2]);
if ($err !== '') {
foreach (explode("\n", trim($err)) as $line) {
@ -71,23 +135,32 @@ class QueueWorkCommand extends Command
}
}
// Restart if stopped
// Restart if stopped (exited or not running)
if (!$status['running']) {
$this->warn("Worker {$i} stopped. Restarting...");
$this->warn("Worker {$i} stopped (Exit Code: {$status['exitcode']}). Restarting...");
proc_close($process);
$cmd = "php artisan queue:work {$connection} --queue={$queue}";
// Re-construct the command with the same options
$cmd = "php artisan queue:work " . escapeshellarg($connection) . " --queue=" . escapeshellarg($queue) . " {$workerOptions}";
$process = proc_open($cmd, [
1 => ['pipe', 'w'],
2 => ['pipe', 'w'],
], $pipes);
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);
$processes[$i] = $process;
$pipesList[$i] = $pipes;
if (is_resource($process)) {
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);
$processes[$i] = $process;
$pipesList[$i] = $pipes;
} else {
$this->error("Failed to restart worker {$i}. Stopping monitoring for this worker.");
unset($processes[$i], $pipesList[$i]); // Remove failed process
}
}
}
}
usleep(200000); // 0.2s pause to avoid busy loop
usleep(200000); // 0.2s pause
}
}
}