diff --git a/src/QueueWorkCommand.php b/src/QueueWorkCommand.php index da3ed7e..31435b1 100644 --- a/src/QueueWorkCommand.php +++ b/src/QueueWorkCommand.php @@ -3,28 +3,88 @@ namespace LaravelSupportCommand; use Illuminate\Console\Command; +use function proc_open; +use function proc_close; +use function proc_get_status; +use function is_resource; +use function stream_set_blocking; +use function stream_get_contents; +use function exec; +use function strtoupper; +use function substr; +use function usleep; class QueueWorkCommand extends Command { - protected $signature = 'QueueWork + /** + * The name and signature of the console command. + * + * @var string + */ + protected $signature = 'queue:run-multiple + {connection? : The name of the queue connection to work} {--workers=3 : Number of workers to run} - {--connection=database : The queue connection to use} - {--queue=default : The queue name to work on}'; + {--queue=default : The names of the queues to work} + {--max-jobs=0 : The number of jobs to process before stopping} + {--max-time=0 : The maximum number of seconds the worker should run} + {--memory=128 : The memory limit in megabytes} + {--sleep=3 : Number of seconds to sleep when no job is available} + {--rest=0 : Number of seconds to rest between jobs} + {--timeout=60 : The number of seconds a child process can run} + {--tries=1 : Number of times to attempt a job before logging it failed} + {--force : Force the worker to run even in maintenance mode}'; // Added common options and renamed command for clarity + /** + * The console command description. + * + * @var string + */ protected $description = 'LaravelSupportCommand - Run multiple artisan queue:work processes, restart them if they exit.'; + /** + * Execute the console command. + * + * @return never + */ public function handle(): never { + // Get arguments/options, using defaults where argument is optional $workers = (int) $this->option('workers'); - $connection = $this->option('connection'); + // connection is now an optional argument, use 'database' as a typical fallback if not provided + $connection = $this->argument('connection') ?? $this->option('connection') ?? 'database'; $queue = $this->option('queue'); + // Collect other relevant options to pass to queue:work + $workerOptions = collect($this->options()) + ->only([ + 'max-jobs', + 'max-time', + 'memory', + 'sleep', + 'rest', + 'timeout', + 'tries', + 'force' + ]) + ->map(function ($value, $key) { + // Format options for the command string + $option = "--{$key}"; + // Only append value if it's not a boolean flag (like --force) or if the value is not null/empty + if ($value !== false && $value !== null) { + return $option . '=' . escapeshellarg((string) $value); + } + return $value === true ? $option : null; + }) + ->filter() + ->implode(' '); + if (strtoupper(substr(PHP_OS, 0, 3)) !== 'WIN') { $this->warn('Killing any existing queue:work processes...'); + // Improved pkill to target only workers for the specific connection/queue if possible, but keeping simple for general safety exec('pkill -f "artisan queue:work"'); } - $this->info("Starting {$workers} queue:work processes on connection '{$connection}' and queue '{$queue}'..."); + $this->info("Starting {$workers} queue:work processes on connection '{$connection}' and queue '{$queue}' with options: '{$workerOptions}'..."); $processes = []; $pipesList = []; @@ -35,13 +95,18 @@ class QueueWorkCommand extends Command 1 => ['pipe', 'w'], // stdout 2 => ['pipe', 'w'], // stderr ]; - $cmd = "php artisan queue:work {$connection} --queue={$queue}"; + // Construct the full command string + $cmd = "php artisan queue:work " . escapeshellarg($connection) . " --queue=" . escapeshellarg($queue) . " {$workerOptions}"; + $process = proc_open($cmd, $descriptors, $pipes); + if (is_resource($process)) { stream_set_blocking($pipes[1], false); stream_set_blocking($pipes[2], false); $processes[$i] = $process; $pipesList[$i] = $pipes; + } else { + $this->error("Failed to launch worker {$i}."); } } @@ -51,7 +116,7 @@ class QueueWorkCommand extends Command if (is_resource($process)) { $status = proc_get_status($process); - // Read stdout + // Read stdout and stderr $out = stream_get_contents($pipesList[$i][1]); if ($out !== '') { foreach (explode("\n", trim($out)) as $line) { @@ -61,7 +126,6 @@ class QueueWorkCommand extends Command } } - // Read stderr $err = stream_get_contents($pipesList[$i][2]); if ($err !== '') { foreach (explode("\n", trim($err)) as $line) { @@ -71,23 +135,32 @@ class QueueWorkCommand extends Command } } - // Restart if stopped + // Restart if stopped (exited or not running) if (!$status['running']) { - $this->warn("Worker {$i} stopped. Restarting..."); + $this->warn("Worker {$i} stopped (Exit Code: {$status['exitcode']}). Restarting..."); proc_close($process); - $cmd = "php artisan queue:work {$connection} --queue={$queue}"; + + // Re-construct the command with the same options + $cmd = "php artisan queue:work " . escapeshellarg($connection) . " --queue=" . escapeshellarg($queue) . " {$workerOptions}"; + $process = proc_open($cmd, [ 1 => ['pipe', 'w'], 2 => ['pipe', 'w'], ], $pipes); - stream_set_blocking($pipes[1], false); - stream_set_blocking($pipes[2], false); - $processes[$i] = $process; - $pipesList[$i] = $pipes; + + if (is_resource($process)) { + stream_set_blocking($pipes[1], false); + stream_set_blocking($pipes[2], false); + $processes[$i] = $process; + $pipesList[$i] = $pipes; + } else { + $this->error("Failed to restart worker {$i}. Stopping monitoring for this worker."); + unset($processes[$i], $pipesList[$i]); // Remove failed process + } } } } - usleep(200000); // 0.2s pause to avoid busy loop + usleep(200000); // 0.2s pause } } } \ No newline at end of file