option('workers'); // connection is now an optional argument, use 'database' as a typical fallback if not provided $connection = $this->argument('connection') ?? $this->option('connection') ?? 'database'; $queue = $this->option('queue'); // Collect other relevant options to pass to queue:work $workerOptions = collect($this->options()) ->only([ 'max-jobs', 'max-time', 'memory', 'sleep', 'rest', 'timeout', 'tries', 'force' ]) ->map(function ($value, $key) { // Format options for the command string $option = "--{$key}"; // Only append value if it's not a boolean flag (like --force) or if the value is not null/empty if ($value !== false && $value !== null) { return $option . '=' . escapeshellarg((string) $value); } return $value === true ? $option : null; }) ->filter() ->implode(' '); if (strtoupper(substr(PHP_OS, 0, 3)) !== 'WIN') { $this->warn('Killing any existing queue:work processes...'); // Improved pkill to target only workers for the specific connection/queue if possible, but keeping simple for general safety exec('pkill -f "artisan queue:work"'); } $this->line("Starting {$workers} queue:work processes on connection '{$connection}' and queue '{$queue}' with options: '{$workerOptions}'..."); $processes = []; $pipesList = []; for ($i = 1; $i <= $workers; $i++) { $this->line("Lastring: unching worker {$i}..."); $descriptors = [ 1 => ['pipe', 'w'], // stdout 2 => ['pipe', 'w'], // stderr ]; // Construct the full command string $cmd = "php artisan queue:work " . escapeshellarg($connection) . " --queue=" . escapeshellarg($queue) . " {$workerOptions}"; $process = proc_open($cmd, $descriptors, $pipes); if (is_resource($process)) { stream_set_blocking($pipes[1], false); stream_set_blocking($pipes[2], false); $processes[$i] = $process; $pipesList[$i] = $pipes; } else { $this->error("Failed to launch worker {$i}."); } } // Monitor loop while (true) { foreach ($processes as $i => $process) { if (is_resource($process)) { $status = proc_get_status($process); // Read stdout and stderr $out = stream_get_contents($pipesList[$i][1]); if ($out !== '') { foreach (explode("\n", trim($out)) as $line) { if ($line !== '') { $this->line("[Worker {$i}] " . $line); } } } $err = stream_get_contents($pipesList[$i][2]); if ($err !== '') { foreach (explode("\n", trim($err)) as $line) { if ($line !== '') { $this->error("[Worker {$i}] " . $line); } } } // Restart if stopped (exited or not running) if (!$status['running']) { $this->warn("Worker {$i} stopped (Exit Code: {$status['exitcode']}). Restarting..."); proc_close($process); // Re-construct the command with the same options $cmd = "php artisan queue:work " . escapeshellarg($connection) . " --queue=" . escapeshellarg($queue) . " {$workerOptions}"; $process = proc_open($cmd, [ 1 => ['pipe', 'w'], 2 => ['pipe', 'w'], ], $pipes); if (is_resource($process)) { stream_set_blocking($pipes[1], false); stream_set_blocking($pipes[2], false); $processes[$i] = $process; $pipesList[$i] = $pipes; } else { $this->error("Failed to restart worker {$i}. Stopping monitoring for this worker."); unset($processes[$i], $pipesList[$i]); // Remove failed process } } } } usleep(200000); // 0.2s pause } } }