diff --git a/wp-product-importer/includes/class-cron.php b/wp-product-importer/includes/class-cron.php index 325b39a..dc1a1c4 100644 --- a/wp-product-importer/includes/class-cron.php +++ b/wp-product-importer/includes/class-cron.php @@ -5,11 +5,18 @@ class WPI_Cron const HOOK = 'wpi_import_products_cron'; const TRIGGER_HOOK = 'wpi_trigger_import_cron'; const SYNC_CATEGORIES_HOOK = 'wpi_sync_categories_cron'; + const DISPATCH_HOOK = 'wpi_dispatch_import_workers'; public static function init() { add_filter('cron_schedules', [self::class, 'add_schedule']); - add_action(self::HOOK, [self::class, 'run']); + add_action( + self::HOOK, + [self::class, 'run'], + 10, + 1 + ); + add_action(self::DISPATCH_HOOK, [self::class, 'dispatch_workers']); add_action(self::TRIGGER_HOOK, [self::class, 'trigger']); add_action(self::SYNC_CATEGORIES_HOOK, [self::class, 'sync_categories']); } @@ -49,9 +56,11 @@ class WPI_Cron wp_clear_scheduled_hook(self::TRIGGER_HOOK); } - - public static function run() + public static function run(array $args = []) { + + $worker = $args['worker'] ?? 'default'; + if (!class_exists('WooCommerce')) { WPI_Logger::log('WooCommerce not active'); return; @@ -62,13 +71,14 @@ class WPI_Cron return; } - if (WPI_Config_Model::get_config('sync_status')) { + if (WPI_Config_Model::get_config('sync_status_' . $worker)) { WPI_Logger::log('Sync already running'); return; } $import_api = WPI_Config_Model::get_config('import_api'); $authen_key = WPI_Config_Model::get_config('authen_key'); + $import_workers = WPI_Config_Model::get_config('import_workers'); if (!$import_api || !$authen_key) { WPI_Logger::log('Missing API config'); @@ -76,9 +86,14 @@ class WPI_Cron } $limit = (int) WPI_Config_Model::get_config('limit_per_time', 20); - $page = (int) WPI_Config_Model::get_config('current_page', 1); + $page = (int) WPI_Config_Model::claim_next_page(); - WPI_Config_Model::set_config('sync_status', 1); + if ($page < 1) { + WPI_Logger::log('Invalid page claimed'); + return; + } + + WPI_Config_Model::set_config('sync_status_' . $worker, 1, 'Status process sync ' . $worker); try { @@ -87,6 +102,8 @@ class WPI_Cron 'limit' => $limit, ]); + // WPI_Config_Model::set_config('current_page', $page + 1); + if (is_wp_error($response)) { WPI_Logger::log('API Error: ' . $response->get_error_message()); return; @@ -113,9 +130,6 @@ class WPI_Cron $category_map = WPI_Category_Mapper::get_category_map(); - - WPI_Logger::log(json_encode($category_map)); - foreach ($data as $item) { $import_data = WPI_Product_Map::from_api($item, $category_map); @@ -137,22 +151,70 @@ class WPI_Cron } - WPI_Config_Model::set_config('current_page', $page + 1); if (count($data) < $limit) { WPI_Config_Model::set_config('status_cron', 0); - WPI_Config_Model::set_config('current_page', 1); + + if (!WPI_Config_Model::any_worker_running()) { + WPI_Config_Model::set_config('current_page', 0); + } + } + + if ($worker === $import_workers) { + WPI_Config_Model::set_config('status_workers', 0); } WPI_Logger::log("Page {$page} done"); } finally { - WPI_Config_Model::set_config('sync_status', 0); + WPI_Config_Model::set_config('sync_status_' . $worker, 0); } } + + public static function dispatch_workers() + { + if (!WPI_Config_Model::get_config('status_cron')) { + WPI_Logger::log('Dispatcher aborted: status_cron OFF'); + return; + } + + if (!WPI_Config_Model::acquire_lock('status_workers')) { + WPI_Logger::log('Dispatcher already running'); + return; + } + + if (WPI_Config_Model::any_worker_running()) { + WPI_Logger::log('Some workers still running'); + return; + } + + WPI_Config_Model::set_config('status_workers', 1); + + $workers = (int) WPI_Config_Model::get_config('import_workers', 1); + + if ($workers < 1) { + $workers = 1; + } + + WPI_Logger::log("Dispatching {$workers} import workers"); + + for ($i = 1; $i <= $workers; $i++) { + + WPI_Logger::log("Dispatch worker {$i}"); + + $cmd = sprintf( + 'wp eval "do_action(\'wpi_import_products_cron\', [\'worker\' => %d]);" --allow-root > /dev/null 2>&1 &', + $i + ); + + exec($cmd); + } + } + + public static function trigger() { - if (WPI_Config_Model::get_config('sync_status')) { + if (WPI_Config_Model::get_config('status_workers')) { WPI_Logger::log('Trigger skipped: sync is running'); return; } diff --git a/wp-product-importer/includes/models/class-wpi-config-model.php b/wp-product-importer/includes/models/class-wpi-config-model.php index 39e2e9d..7ab1abd 100644 --- a/wp-product-importer/includes/models/class-wpi-config-model.php +++ b/wp-product-importer/includes/models/class-wpi-config-model.php @@ -8,6 +8,23 @@ class WPI_Config_Model return $wpdb->prefix . 'wpi_configs'; } + public static function acquire_lock(string $key): bool + { + global $wpdb; + + $table = self::table(); + + return (bool) $wpdb->query( + $wpdb->prepare( + "UPDATE {$table} + SET config_value = 1 + WHERE config_key = %s AND config_value = 0", + $key + ) + ); + } + + /** * Check config exists */ @@ -81,4 +98,57 @@ class WPI_Config_Model return $value !== null ? json_decode($value, true) : $default; } + + public static function any_worker_running(): bool + { + global $wpdb; + + $table = self::table(); + + $rows = $wpdb->get_results( + "SELECT config_value + FROM {$table} + WHERE config_key LIKE 'sync_status_%'" + ); + + if (empty($rows)) { + // Không có worker nào + return false; + } + + foreach ($rows as $row) { + $value = json_decode($row->config_value, true); + + if ((int) $value !== 1) { + return false; + } + } + + return true; + } + + public static function claim_next_page(): int + { + global $wpdb; + + $table = self::table(); + + // Atomic increment + $updated = $wpdb->query( + "UPDATE {$table} + SET config_value = config_value + 1 + WHERE config_key = 'current_page'" + ); + + if ($updated === false) { + return 0; + } + + // Lấy page vừa claim + $page = (int) $wpdb->get_var( + "SELECT config_value FROM {$table} WHERE config_key = 'current_page'" + ); + + return $page; + } } diff --git a/wp-product-importer/readme.md b/wp-product-importer/readme.md index 9a03e46..6a18e94 100644 --- a/wp-product-importer/readme.md +++ b/wp-product-importer/readme.md @@ -4,6 +4,7 @@ wp cron event run wpi_import_products_cron --allow-root wp eval "do_action('wpi_import_products_cron');" --allow-root + wp eval "do_action('wpi_import_products_cron', ['worker' => 1]);" --allow-root @@ -15,6 +16,11 @@ wp cron event run wpi_trigger_import_cron --allow-root wp eval "do_action('wpi_trigger_import_cron');" --allow-root + + + wp cron event run wpi_dispatch_import_workers --allow-root + wp eval "do_action('wpi_dispatch_import_workers');" --allow-root + - Run Cron tabs 0 2 \* \* \* cd /var/www/html && wp cron event run wpi_trigger_import_cron --allow-root >> /var/log/wp-trigger-import.log 2>&1 @@ -24,12 +30,11 @@ # Xoá toàn bộ products -wp post delete \ -$(wp post list --post_type=product --field=ID --allow-root) \ ---force --allow-root +wp --allow-root post delete $(wp --allow-root post list --post_type=product,product_variation --format=ids) --force -# Xoá toàn bộ product categories +# Cài WP CLI -wp term delete product_cat \ -$(wp term list product_cat --field=term_id --allow-root) \ ---allow-root +curl -O https://raw.githubusercontent.com/wp-cli/builds/gh-pages/phar/wp-cli.phar +php wp-cli.phar --info +chmod +x wp-cli.phar +mv wp-cli.phar /usr/local/bin/wp diff --git a/wp-product-importer/wp-product-importer.php b/wp-product-importer/wp-product-importer.php index 0f9570c..a3b937c 100644 --- a/wp-product-importer/wp-product-importer.php +++ b/wp-product-importer/wp-product-importer.php @@ -107,7 +107,7 @@ register_activation_hook(__FILE__, function () { WPI_Config_Model::set_config( 'current_page', - 1, + 0, 'Current page sync' ); @@ -123,6 +123,18 @@ register_activation_hook(__FILE__, function () { 'Status cron sync' ); + WPI_Config_Model::set_config( + 'status_workers', + 0, + 'Status workers' + ); + + WPI_Config_Model::set_config( + 'import_workers', + 5, + 'Workers process per time' + ); + WPI_Config_Model::set_config( 'categories_sync_status', 0,