141 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			PHP
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			141 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			PHP
		
	
	
		
			Executable File
		
	
	
<?php
 | 
						|
/**
 | 
						|
 * Copyright © Magento, Inc. All rights reserved.
 | 
						|
 * See COPYING.txt for license details.
 | 
						|
 */
 | 
						|
declare(strict_types=1);
 | 
						|
 | 
						|
namespace Magento\MysqlMq\Model;
 | 
						|
 | 
						|
use Magento\Framework\MessageQueue\UseCase\QueueTestCaseAbstract;
 | 
						|
use Magento\MysqlMq\Model\ResourceModel\MessageCollection;
 | 
						|
use Magento\MysqlMq\Model\ResourceModel\MessageStatusCollection;
 | 
						|
 | 
						|
/**
 | 
						|
 * Test for MySQL publisher class.
 | 
						|
 *
 | 
						|
 * @magentoDbIsolation disabled
 | 
						|
 */
 | 
						|
class PublisherConsumerTest extends QueueTestCaseAbstract
 | 
						|
{
 | 
						|
    const MAX_NUMBER_OF_TRIALS = 3;
 | 
						|
 | 
						|
    /**
 | 
						|
     * @var string[]
 | 
						|
     */
 | 
						|
    protected $consumers = [
 | 
						|
        'demoConsumerQueueOne',
 | 
						|
        'demoConsumerQueueTwo',
 | 
						|
        'demoConsumerQueueThree',
 | 
						|
        'delayedOperationConsumer',
 | 
						|
        'demoConsumerWithException'
 | 
						|
    ];
 | 
						|
 | 
						|
    /**
 | 
						|
     * @magentoDataFixture Magento/MysqlMq/_files/queues.php
 | 
						|
     */
 | 
						|
    public function testPublishConsumeFlow()
 | 
						|
    {
 | 
						|
        /** @var \Magento\TestModuleMysqlMq\Model\DataObjectFactory $objectFactory */
 | 
						|
        $objectFactory = $this->objectManager->create(\Magento\TestModuleMysqlMq\Model\DataObjectFactory::class);
 | 
						|
        /** @var \Magento\TestModuleMysqlMq\Model\DataObject $object */
 | 
						|
        $object = $objectFactory->create();
 | 
						|
        $object->setOutputPath($this->logFilePath);
 | 
						|
        file_put_contents($this->logFilePath, '');
 | 
						|
        for ($i = 0; $i < 10; $i++) {
 | 
						|
            $object->setName('Object name ' . $i)->setEntityId($i);
 | 
						|
            $this->publisher->publish('demo.object.created', $object);
 | 
						|
        }
 | 
						|
        for ($i = 0; $i < 5; $i++) {
 | 
						|
            $object->setName('Object name ' . $i)->setEntityId($i);
 | 
						|
            $this->publisher->publish('demo.object.updated', $object);
 | 
						|
        }
 | 
						|
        for ($i = 0; $i < 3; $i++) {
 | 
						|
            $object->setName('Object name ' . $i)->setEntityId($i);
 | 
						|
            $this->publisher->publish('demo.object.custom.created', $object);
 | 
						|
        }
 | 
						|
        $this->waitForAsynchronousResult(18, $this->logFilePath);
 | 
						|
 | 
						|
        //Check lines in file
 | 
						|
        $createdPattern = '/Processed object created \d+/';
 | 
						|
        $updatedPattern = '/Processed object updated \d+/';
 | 
						|
        $customCreatedPattern = '/Processed custom object created \d+/';
 | 
						|
        $logFileContents = file_get_contents($this->logFilePath);
 | 
						|
 | 
						|
        preg_match_all($createdPattern, $logFileContents, $createdMatches);
 | 
						|
        $this->assertCount(10, $createdMatches[0]);
 | 
						|
        preg_match_all($updatedPattern, $logFileContents, $updatedMatches);
 | 
						|
        $this->assertCount(5, $updatedMatches[0]);
 | 
						|
        preg_match_all($customCreatedPattern, $logFileContents, $customCreatedMatches);
 | 
						|
        $this->assertCount(3, $customCreatedMatches[0]);
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @magentoDataFixture Magento/MysqlMq/_files/queues.php
 | 
						|
     */
 | 
						|
    public function testPublishAndConsumeSchemaDefinedByMethod()
 | 
						|
    {
 | 
						|
        $topic = 'test.schema.defined.by.method';
 | 
						|
        /** @var \Magento\TestModuleMysqlMq\Model\DataObjectFactory $objectFactory */
 | 
						|
        $objectFactory = $this->objectManager->create(\Magento\TestModuleMysqlMq\Model\DataObjectFactory::class);
 | 
						|
        /** @var \Magento\TestModuleMysqlMq\Model\DataObject $object */
 | 
						|
        $object = $objectFactory->create();
 | 
						|
        $id = 33;
 | 
						|
        $object->setName('Object name ' . $id)->setEntityId($id);
 | 
						|
        $object->setOutputPath($this->logFilePath);
 | 
						|
        $requiredStringParam = 'Required value';
 | 
						|
        $optionalIntParam = 44;
 | 
						|
        $this->publisher->publish($topic, [$object, $requiredStringParam, $optionalIntParam]);
 | 
						|
 | 
						|
        $expectedOutput = "Processed '{$object->getEntityId()}'; "
 | 
						|
            . "Required param '{$requiredStringParam}'; Optional param '{$optionalIntParam}'";
 | 
						|
 | 
						|
        $this->waitForAsynchronousResult(1, $this->logFilePath);
 | 
						|
 | 
						|
        $this->assertEquals($expectedOutput, trim(file_get_contents($this->logFilePath)));
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @magentoDataFixture Magento/MysqlMq/_files/queues.php
 | 
						|
     */
 | 
						|
    public function testConsumeWithException()
 | 
						|
    {
 | 
						|
        $topic = 'demo.exception';
 | 
						|
        /** @var \Magento\TestModuleMysqlMq\Model\DataObjectFactory $objectFactory */
 | 
						|
        $objectFactory = $this->objectManager->create(\Magento\TestModuleMysqlMq\Model\DataObjectFactory::class);
 | 
						|
        /** @var \Magento\TestModuleMysqlMq\Model\DataObject $object */
 | 
						|
        $object = $objectFactory->create();
 | 
						|
        $id = 99;
 | 
						|
 | 
						|
        $object->setName('Object name ' . $id)->setEntityId($id);
 | 
						|
        $object->setOutputPath($this->logFilePath);
 | 
						|
        $this->publisher->publish($topic, $object);
 | 
						|
        $expectedOutput = "Exception processing {$id}";
 | 
						|
        $this->waitForAsynchronousResult(1, $this->logFilePath);
 | 
						|
        $message = $this->getTopicLatestMessage($topic);
 | 
						|
        $this->assertEquals($expectedOutput, trim(file_get_contents($this->logFilePath)));
 | 
						|
        $this->assertEquals(QueueManagement::MESSAGE_STATUS_ERROR, $message->getStatus());
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @param string $topic
 | 
						|
     * @return Message
 | 
						|
     */
 | 
						|
    private function getTopicLatestMessage(string $topic) : Message
 | 
						|
    {
 | 
						|
        // Assert message status is error
 | 
						|
        $messageCollection = $this->objectManager->create(MessageCollection::class);
 | 
						|
        $messageStatusCollection = $this->objectManager->create(MessageStatusCollection::class);
 | 
						|
 | 
						|
        $messageCollection->addFilter('topic_name', $topic);
 | 
						|
        $messageCollection->join(
 | 
						|
            ['status' => $messageStatusCollection->getMainTable()],
 | 
						|
            "status.message_id = main_table.id"
 | 
						|
        );
 | 
						|
        $messageCollection->addOrder('updated_at', MessageCollection::SORT_ORDER_DESC);
 | 
						|
 | 
						|
        $message = $messageCollection->getFirstItem();
 | 
						|
        return $message;
 | 
						|
    }
 | 
						|
}
 |