From 200ff63324cf8eebdb1f19b101243ea3e0eca8a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 24 Aug 2018 11:56:29 +0300 Subject: [PATCH 01/25] add stomp driver --- composer.json | 6 +- src/drivers/stomp/Command.php | 42 ++++++++ src/drivers/stomp/Queue.php | 177 ++++++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 src/drivers/stomp/Command.php create mode 100644 src/drivers/stomp/Queue.php diff --git a/composer.json b/composer.json index 032db57c8b..94155495f4 100644 --- a/composer.json +++ b/composer.json @@ -38,7 +38,8 @@ "php-amqplib/php-amqplib": "Need for AMQP queue.", "enqueue/amqp-lib": "Need for AMQP interop queue.", "ext-gearman": "Need for Gearman queue.", - "aws/aws-sdk-php": "Need for aws SQS." + "aws/aws-sdk-php": "Need for aws SQS.", + "enqueue/stomp": "Need for Stomp queue." }, "autoload": { "psr-4": { @@ -51,7 +52,8 @@ "yii\\queue\\gearman\\": "src/drivers/gearman", "yii\\queue\\redis\\": "src/drivers/redis", "yii\\queue\\sync\\": "src/drivers/sync", - "yii\\queue\\sqs\\": "src/drivers/sqs" + "yii\\queue\\sqs\\": "src/drivers/sqs", + "yii\\queue\\stomp\\": "src/drivers/stomp" } }, "autoload-dev": { diff --git a/src/drivers/stomp/Command.php b/src/drivers/stomp/Command.php new file mode 100644 index 0000000000..cc5936d6a6 --- /dev/null +++ b/src/drivers/stomp/Command.php @@ -0,0 +1,42 @@ +queue->run(false); + } + + + public function actionListen($timeout = 3) + { + if (!is_numeric($timeout)) { + throw new Exception('Timeout must be numeric.'); + } + if ($timeout < 1) { + throw new Exception('Timeout must be greater that zero.'); + } + + return $this->queue->run(true, $timeout); + } +} diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php new file mode 100644 index 0000000000..b2dd5a610c --- /dev/null +++ b/src/drivers/stomp/Queue.php @@ -0,0 +1,177 @@ +close(); + }); + } + + + protected function open() + { + if ($this->context) { + return; + } + + $config = [ + 'host' => $this->host, + 'port' => $this->port, + 'login' => $this->user, + 'password' => $this->password, + 'vhost' => $this->vhost, + 'buffer_size' => $this->bufferSize, + 'connection_timeout' => $this->connectionTimeout, + 'sync' => $this->sync, + 'lazy' => $this->lazy, + 'ssl_on' => $this->sslOn, + ]; + + $config = array_filter($config, function ($value) { + return null !== $value; + }); + + $factory = new StompConnectionFactory($config); + + $this->context = $factory->createContext(); + } + + public function run($repeat, $timeout = 0) + { + return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { + + $this->open(); + $queue = $this->context->createQueue($this->queueName); + $consumer = $this->context->createConsumer($queue); + + while ($canContinue()) { + + if ($message = $consumer->receive()) { + + if ($message->isRedelivered()) { + $consumer->acknowledge($message); + + $this->redeliver($message); + + continue; + } + + $ttr = $message->getProperty(self::TTR); + $attempt = $message->getProperty(self::ATTEMPT, 1); + + if ($this->handleMessage($message->getMessageId(), $message->getBody(), $ttr, $attempt)) { + $consumer->acknowledge($message); + + } else { + $consumer->acknowledge($message); + + $this->redeliver($message); + } + + } elseif (!$repeat) { + break; + } elseif ($timeout) { + sleep($timeout); + } + } + }); + } + + /** + * @inheritdoc + */ + protected function pushMessage($message, $ttr, $delay, $priority) + { + $this->open(); + + $queue = $this->context->createQueue($this->queueName); + $message = $this->context->createMessage($message); + $message->setMessageId(uniqid('', true)); + $message->setPersistent(true); + $message->setProperty(self::ATTEMPT, 1); + $message->setProperty(self::TTR, $ttr); + + $producer = $this->context->createProducer(); + + if ($delay) { + $message->setProperty('AMQ_SCHEDULED_DELAY', $delay * 1000); + } + + if ($priority) { + $message->setHeader('priority', $priority); + } + + $producer->send($queue, $message); + + return $message->getMessageId(); + + } + + protected function close() + { + if (!$this->context) { + return; + } + + $this->context->close(); + $this->context = null; + } + + /** + * @inheritdoc + */ + public function status($id) + { + throw new NotSupportedException('Status is not supported in the driver.'); + } + + protected function redeliver(StompMessage $message) + { + $attempt = $message->getProperty(self::ATTEMPT, 1); + + $newMessage = $this->context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders()); + $newMessage->setProperty(self::ATTEMPT, ++$attempt); + + $this->context->createProducer()->send( + $this->context->createQueue($this->queueName), + $newMessage + ); + } +} From c2f6e044a016fc46abd35ee60dc760c6063f0858 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 24 Aug 2018 11:58:26 +0300 Subject: [PATCH 02/25] php cs --- src/drivers/stomp/Command.php | 7 ++++++- src/drivers/stomp/Queue.php | 16 +++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/drivers/stomp/Command.php b/src/drivers/stomp/Command.php index cc5936d6a6..6dc549219d 100644 --- a/src/drivers/stomp/Command.php +++ b/src/drivers/stomp/Command.php @@ -1,9 +1,14 @@ runWorker(function (callable $canContinue) use ($repeat, $timeout) { - $this->open(); $queue = $this->context->createQueue($this->queueName); $consumer = $this->context->createConsumer($queue); while ($canContinue()) { - if ($message = $consumer->receive()) { - if ($message->isRedelivered()) { $consumer->acknowledge($message); @@ -98,13 +99,11 @@ public function run($repeat, $timeout = 0) if ($this->handleMessage($message->getMessageId(), $message->getBody(), $ttr, $attempt)) { $consumer->acknowledge($message); - } else { $consumer->acknowledge($message); $this->redeliver($message); } - } elseif (!$repeat) { break; } elseif ($timeout) { @@ -141,7 +140,6 @@ protected function pushMessage($message, $ttr, $delay, $priority) $producer->send($queue, $message); return $message->getMessageId(); - } protected function close() From bb92a946dd46d86ce884f4da08b2cf584ce9a81f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 24 Aug 2018 12:22:23 +0300 Subject: [PATCH 03/25] add test --- composer.json | 3 +- tests/app/benchmark/waiting/Action.php | 2 + tests/app/config/main.php | 6 +++ tests/bootstrap.php | 1 + tests/docker-compose.yml | 10 ++++ tests/drivers/stomp/QueueTest.php | 69 ++++++++++++++++++++++++++ tests/yii | 1 + 7 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 tests/drivers/stomp/QueueTest.php diff --git a/composer.json b/composer.json index 94155495f4..eb04dd9476 100644 --- a/composer.json +++ b/composer.json @@ -29,7 +29,8 @@ "yiisoft/yii2-debug": "*", "yiisoft/yii2-gii": "*", "phpunit/phpunit": "~4.4", - "aws/aws-sdk-php": ">=2.4" + "aws/aws-sdk-php": ">=2.4", + "enqueue/stomp": "^0.8.27" }, "suggest": { "ext-pcntl": "Need for process signals.", diff --git a/tests/app/benchmark/waiting/Action.php b/tests/app/benchmark/waiting/Action.php index 53f841737c..101c55783a 100644 --- a/tests/app/benchmark/waiting/Action.php +++ b/tests/app/benchmark/waiting/Action.php @@ -33,6 +33,7 @@ class Action extends \yii\base\Action 'amqpInteropQueue' => 'amqp-interop-queue/listen --isolate=0', 'mysqlQueue' => 'mysql-queue/listen 1 --isolate=0', 'fileQueue' => 'file-queue/listen 1 --isolate=0', + 'stompQueue' => 'stomp-queue/listen --isolate=0', ], // Worker will be run in isolate mode 'isolate' => [ @@ -43,6 +44,7 @@ class Action extends \yii\base\Action 'amqpInteropQueue' => 'amqp-interop-queue/listen --isolate=1', 'mysqlQueue' => 'mysql-queue/listen 1 --isolate=1', 'fileQueue' => 'file-queue/listen 1 --isolate=1', + 'stompQueue' => 'stomp-queue/listen 1 --isolate=1', ], ]; /** diff --git a/tests/app/config/main.php b/tests/app/config/main.php index f77aa000b8..f355f6e2b1 100644 --- a/tests/app/config/main.php +++ b/tests/app/config/main.php @@ -13,6 +13,7 @@ 'amqpQueue', 'amqpInteropQueue', 'beanstalkQueue', + 'stompQueue', ], 'components' => [ 'syncQueue' => [ @@ -100,6 +101,11 @@ 'class' => \yii\queue\beanstalk\Queue::class, 'host' => getenv('BEANSTALK_HOST') ?: 'localhost', ], + 'stompQueue' => [ + 'class' => \yii\queue\stomp\Queue::class, + 'host' => getenv('ACTIVEMQ_HOST') ?: 'localhost', + 'port' => getenv('ACTIVEMQ_PORT') ?: 61613, + ], ], ]; diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 7b44c2e912..64cfda50f5 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -17,6 +17,7 @@ Yii::setAlias('@yii/queue/redis', dirname(__DIR__) . '/src/drivers/redis'); Yii::setAlias('@yii/queue/sync', dirname(__DIR__) . '/src/drivers/sync'); Yii::setAlias('@yii/queue/sqs', dirname(__DIR__) . '/src/drivers/sqs'); +Yii::setAlias('@yii/queue/stomp', dirname(__DIR__) . '/src/drivers/stomp'); Yii::setAlias('@tests', __DIR__); $config = require(__DIR__ . '/app/config/main.php'); diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 2306786e70..68c210a9e7 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -34,6 +34,7 @@ services: AWS_SECRET: ${AWS_SECRET} AWS_REGION: ${AWS_REGION} AWS_SQS_URL: ${AWS_SQS_URL} + ACTIVEMQ_HOST: activemq AWS_SQS_FIFO_ENABLED: ${AWS_SQS_FIFO_ENABLED} AWS_SQS_FIFO_URL: ${AWS_SQS_FIFO_URL} AWS_SQS_FIFO_MESSAGE_GROUP_ID: ${AWS_SQS_FIFO_MESSAGE_GROUP_ID} @@ -44,6 +45,7 @@ services: - rabbitmq - beanstalk - gearmand + - activemq networks: net: {} php72: @@ -148,6 +150,14 @@ services: networks: net: {} + # https://hub.docker.com/r/webcenter/activemq/ + activemq: + image: webcenter/activemq + ports: + - 61613:61613 + networks: + net: {} + networks: net: name: yii2_queue_net diff --git a/tests/drivers/stomp/QueueTest.php b/tests/drivers/stomp/QueueTest.php new file mode 100644 index 0000000000..4ec9433640 --- /dev/null +++ b/tests/drivers/stomp/QueueTest.php @@ -0,0 +1,69 @@ +startProcess('php yii queue/listen'); + $job = $this->createSimpleJob(); + $this->getQueue()->push($job); + + $this->assertSimpleJobDone($job); + } + + public function testLater() + { + $this->startProcess('php yii queue/listen'); + $job = $this->createSimpleJob(); + $this->getQueue()->delay(2)->push($job); + + $this->assertSimpleJobLaterDone($job, 2); + } + + public function testRetry() + { + $this->startProcess('php yii queue/listen'); + $job = new RetryJob(['uid' => uniqid()]); + $this->getQueue()->push($job); + sleep(6); + + $this->assertFileExists($job->getFileName()); + $this->assertEquals('aa', file_get_contents($job->getFileName())); + } + + public function testPriority() + { + $this->getQueue()->priority(3)->push(new PriorityJob(['number' => 1])); + $this->getQueue()->priority(1)->push(new PriorityJob(['number' => 5])); + $this->getQueue()->priority(2)->push(new PriorityJob(['number' => 3])); + $this->getQueue()->priority(2)->push(new PriorityJob(['number' => 4])); + $this->getQueue()->priority(3)->push(new PriorityJob(['number' => 2])); + $this->startProcess('php yii queue/listen'); + sleep(3); + + $this->assertEquals('12345', file_get_contents(PriorityJob::getFileName())); + } + + /** + * @return Queue + */ + protected function getQueue() + { + return Yii::$app->amqpInteropQueue; + } + +} diff --git a/tests/yii b/tests/yii index 72945f7668..8a69da60a6 100755 --- a/tests/yii +++ b/tests/yii @@ -18,6 +18,7 @@ Yii::setAlias('@yii/queue/gearman', dirname(__DIR__) . '/src/drivers/gearman'); Yii::setAlias('@yii/queue/redis', dirname(__DIR__) . '/src/drivers/redis'); Yii::setAlias('@yii/queue/sync', dirname(__DIR__) . '/src/drivers/sync'); Yii::setAlias('@yii/queue/sqs', dirname(__DIR__) . '/src/drivers/sqs'); +Yii::setAlias('@yii/queue/stomp', dirname(__DIR__) . '/src/drivers/stomp'); Yii::setAlias('@tests', __DIR__); $config = \yii\helpers\ArrayHelper::merge( From 2b0147363794fe581e4e47e024ac5b6187e2caa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 24 Aug 2018 12:38:07 +0300 Subject: [PATCH 04/25] exclude stomp from php5.5 test --- .travis.yml | 5 ++++- tests/drivers/amqp_interop/QueueTest.php | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 4874175322..89a52051cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,9 @@ php: matrix: include: - php: 5.5 - env: EXCLUDE_AMQP_INTEROP=true + env: + - EXCLUDE_AMQP_INTEROP=true + - EXCLUDE_STOMP=true fast_finish: true @@ -36,6 +38,7 @@ install: - travis_retry composer self-update && composer --version - export PATH="$HOME/.composer/vendor/bin:$PATH" - if [ "$EXCLUDE_AMQP_INTEROP" = true ]; then travis_retry composer remove "enqueue/amqp-lib" "enqueue/amqp-tools" --dev --no-interaction --no-update; fi + - if [ "$EXCLUDE_STOMP" = true ]; then travis_retry composer remove "enqueue/stomp" --dev --no-interaction --no-update; fi - travis_retry composer install --prefer-dist --no-interaction before_script: diff --git a/tests/drivers/amqp_interop/QueueTest.php b/tests/drivers/amqp_interop/QueueTest.php index f5953b1261..86095f1137 100644 --- a/tests/drivers/amqp_interop/QueueTest.php +++ b/tests/drivers/amqp_interop/QueueTest.php @@ -76,6 +76,10 @@ protected function setUp() $this->markTestSkipped('Amqp tests are disabled for php 5.5'); } + if ('true' == getenv('EXCLUDE_STOMP')) { + $this->markTestSkipped('Stomp tests are disabled for php 5.5'); + } + parent::setUp(); } } From 10d8ad382070c57f13bbb3ccd8e31ddad75ed728 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 24 Aug 2018 12:43:43 +0300 Subject: [PATCH 05/25] fix component name --- tests/drivers/stomp/QueueTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/drivers/stomp/QueueTest.php b/tests/drivers/stomp/QueueTest.php index 4ec9433640..356078b414 100644 --- a/tests/drivers/stomp/QueueTest.php +++ b/tests/drivers/stomp/QueueTest.php @@ -63,7 +63,7 @@ public function testPriority() */ protected function getQueue() { - return Yii::$app->amqpInteropQueue; + return Yii::$app->stompQueue; } } From da944b77b2cd35a0bdf311aa626fbb4428392dc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 24 Aug 2018 12:57:40 +0300 Subject: [PATCH 06/25] add activemq --- .travis.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.travis.yml b/.travis.yml index 89a52051cc..7e0fa5eb78 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,7 @@ services: - postgresql - redis-server - rabbitmq + - docker # cache vendor dirs cache: @@ -32,6 +33,9 @@ before_install: - sudo apt-get install -qq beanstalkd - sudo beanstalkd -v - sudo service beanstalkd start + - pecl install igbinary + - docker pull webcenter/activemq + - docker run -d -p 61613:61613 webcenter/activemq - if [[ ${TRAVIS_PHP_VERSION:0:1} == "5" ]]; then pecl install igbinary-2.0.8; else pecl install igbinary; fi install: From 30d93e1789bf02a1fcc5a20fca8466eccd62a7d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 24 Aug 2018 13:08:52 +0300 Subject: [PATCH 07/25] delayed and priority does not support --- src/drivers/stomp/Queue.php | 4 +-- tests/drivers/amqp_interop/QueueTest.php | 4 --- tests/drivers/stomp/QueueTest.php | 32 ++++++++---------------- 3 files changed, 12 insertions(+), 28 deletions(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index 7436f6a96c..ffd924af1c 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -130,11 +130,11 @@ protected function pushMessage($message, $ttr, $delay, $priority) $producer = $this->context->createProducer(); if ($delay) { - $message->setProperty('AMQ_SCHEDULED_DELAY', $delay * 1000); + throw new NotSupportedException('Delayed work is not supported in the driver.'); } if ($priority) { - $message->setHeader('priority', $priority); + throw new NotSupportedException('Job priority is not supported in the driver.'); } $producer->send($queue, $message); diff --git a/tests/drivers/amqp_interop/QueueTest.php b/tests/drivers/amqp_interop/QueueTest.php index 86095f1137..0e57102d39 100644 --- a/tests/drivers/amqp_interop/QueueTest.php +++ b/tests/drivers/amqp_interop/QueueTest.php @@ -75,10 +75,6 @@ protected function setUp() if ('true' == getenv('EXCLUDE_AMQP_INTEROP')) { $this->markTestSkipped('Amqp tests are disabled for php 5.5'); } - - if ('true' == getenv('EXCLUDE_STOMP')) { - $this->markTestSkipped('Stomp tests are disabled for php 5.5'); - } parent::setUp(); } diff --git a/tests/drivers/stomp/QueueTest.php b/tests/drivers/stomp/QueueTest.php index 356078b414..af41e193ca 100644 --- a/tests/drivers/stomp/QueueTest.php +++ b/tests/drivers/stomp/QueueTest.php @@ -25,15 +25,6 @@ public function testListen() $this->assertSimpleJobDone($job); } - public function testLater() - { - $this->startProcess('php yii queue/listen'); - $job = $this->createSimpleJob(); - $this->getQueue()->delay(2)->push($job); - - $this->assertSimpleJobLaterDone($job, 2); - } - public function testRetry() { $this->startProcess('php yii queue/listen'); @@ -45,19 +36,6 @@ public function testRetry() $this->assertEquals('aa', file_get_contents($job->getFileName())); } - public function testPriority() - { - $this->getQueue()->priority(3)->push(new PriorityJob(['number' => 1])); - $this->getQueue()->priority(1)->push(new PriorityJob(['number' => 5])); - $this->getQueue()->priority(2)->push(new PriorityJob(['number' => 3])); - $this->getQueue()->priority(2)->push(new PriorityJob(['number' => 4])); - $this->getQueue()->priority(3)->push(new PriorityJob(['number' => 2])); - $this->startProcess('php yii queue/listen'); - sleep(3); - - $this->assertEquals('12345', file_get_contents(PriorityJob::getFileName())); - } - /** * @return Queue */ @@ -66,4 +44,14 @@ protected function getQueue() return Yii::$app->stompQueue; } + + protected function setUp() + { + if ('true' == getenv('EXCLUDE_STOMP')) { + $this->markTestSkipped('Stomp tests are disabled for php 5.5'); + } + + parent::setUp(); + } + } From 1b702e86718b8a95f46c8da3098f022c09abcd67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 24 Aug 2018 14:34:12 +0300 Subject: [PATCH 08/25] update readme --- README.md | 2 +- docs/guide-ru/README.md | 1 + docs/guide-ru/driver-stomp.md | 43 +++++++++++++++++++++++++++++++++++ docs/guide/README.md | 1 + docs/guide/driver-stomp.md | 39 +++++++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 docs/guide-ru/driver-stomp.md create mode 100644 docs/guide/driver-stomp.md diff --git a/README.md b/README.md index 74a974e262..83f5d26319 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ An extension for running tasks asynchronously via queues. -It supports queues based on **DB**, **Redis**, **RabbitMQ**, **AMQP**, **Beanstalk** and **Gearman**. +It supports queues based on **DB**, **Redis**, **RabbitMQ**, **AMQP**, **Beanstalk**, **ActiveMQ** and **Gearman**. Documentation is at [docs/guide/README.md](docs/guide/README.md). diff --git a/docs/guide-ru/README.md b/docs/guide-ru/README.md index 30bd03b218..f14100c17f 100644 --- a/docs/guide-ru/README.md +++ b/docs/guide-ru/README.md @@ -22,6 +22,7 @@ * [Beanstalk драйвер](driver-beanstalk.md) * [Gearman драйвер](driver-gearman.md) * [AWS SQS драйвер](driver-sqs.md) +* [Stomp драйвер](driver-stomp.md) Инструменты разработчика ------------------------ diff --git a/docs/guide-ru/driver-stomp.md b/docs/guide-ru/driver-stomp.md new file mode 100644 index 0000000000..a813b2a305 --- /dev/null +++ b/docs/guide-ru/driver-stomp.md @@ -0,0 +1,43 @@ +Stomp драйвер +================ + +Драйвер работает с очередью на базе ActiveMQ. + +В приложении должно быть установлено расширение `enqueue/stomp`. + +Пример настройки: + +```php +return [ + 'bootstrap' => [ + 'queue', // Компонент регистрирует свои консольные команды + ], + 'components' => [ + 'queue' => [ + 'class' => \yii\queue\stomp\Queue::class, + 'host' => 'localhost', + 'port' => 61613, + 'queueName' => 'queue', + ], + ], +]; +``` + +Консоль +------- + +Для обработки очереди используются консольные команды. + +```sh +yii queue/listen +``` + +Команда `listen` запускает обработку очереди в режиме демона. Очередь опрашивается непрерывно. +Если добавляются новые задания, то они сразу же извлекаются и выполняются. Способ наиболее эфективен +если запускать команду через [supervisor](worker.md#supervisor) или [systemd](worker.md#systemd). + +Для команды `listen` доступны следующие опции: + +- `--verbose`, `-v`: состояние обработки заданий выводится в консоль. +- `--isolate`: каждое задание выполняется в отдельном дочернем процессе. +- `--color`: подсветка вывода в режиме `--verbose`. diff --git a/docs/guide/README.md b/docs/guide/README.md index c48c99898a..0811c2dd66 100644 --- a/docs/guide/README.md +++ b/docs/guide/README.md @@ -22,6 +22,7 @@ Queue Drivers * [Beanstalk](driver-beanstalk.md) * [Gearman](driver-gearman.md) * [AWS SQS](driver-sqs.md) +* [Stomp](driver-stomp.md) Developer tools --------------- diff --git a/docs/guide/driver-stomp.md b/docs/guide/driver-stomp.md new file mode 100644 index 0000000000..6d33983c71 --- /dev/null +++ b/docs/guide/driver-stomp.md @@ -0,0 +1,39 @@ +Stomp Driver +=============== + + +This driver works with ActiveMQ queues. + +It requires the `enqueue/stomp` package. + +Configuration example: + +```php +return [ + 'bootstrap' => [ + 'queue', // The component registers its own console commands + ], + 'components' => [ + 'queue' => [ + 'class' => \yii\queue\stomp\Queue::class, + 'host' => 'localhost', + 'port' => 61613, + 'queueName' => 'queue', + ], + ], +]; +``` + +Console +------- + +A console command is used to execute queued jobs. + +```sh +yii queue/listen [timeout] +``` + +The `listen` command launches a daemon which infinitely queries the queue. If there are new tasks +they're immediately obtained and executed. The `timeout` parameter specifies the number of seconds to sleep between +querying the queue. This method is most efficient when the command is properly daemonized via +[supervisor](worker.md#supervisor) or [systemd](worker.md#systemd). From e3b00de4c10239917565dcc3938d984b20ce3bc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Mon, 27 Aug 2018 10:18:50 +0300 Subject: [PATCH 09/25] add php-doc --- src/drivers/stomp/Command.php | 20 ++++++- src/drivers/stomp/Queue.php | 109 +++++++++++++++++++++++++++++++--- 2 files changed, 121 insertions(+), 8 deletions(-) diff --git a/src/drivers/stomp/Command.php b/src/drivers/stomp/Command.php index 6dc549219d..722a120311 100644 --- a/src/drivers/stomp/Command.php +++ b/src/drivers/stomp/Command.php @@ -10,6 +10,11 @@ use yii\console\Exception; use yii\queue\cli\Command as CliCommand; +/** + * Manages application stomp-queue. + * + * @author Sergey Vershinin + */ class Command extends CliCommand { /** @@ -27,12 +32,25 @@ protected function isWorkerAction($actionID) } + /** + * Runs all jobs from stomp-queue. + * It can be used as cron job. + * + * @return null|int exit code. + */ public function actionRun() { return $this->queue->run(false); } - + /** + * Listens stomp-queue and runs new jobs. + * It can be used as daemon process. + * + * @param int $timeout number of seconds to wait a job. + * @throws Exception when params are invalid. + * @return null|int exit code. + */ public function actionListen($timeout = 3) { if (!is_numeric($timeout)) { diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index ffd924af1c..e005c47eee 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -15,31 +15,91 @@ use yii\base\NotSupportedException; use yii\queue\cli\Queue as CliQueue; +/** + * Stomp Queue. + * @author Sergey Vershinin + */ class Queue extends CliQueue { const ATTEMPT = 'yii-attempt'; const TTR = 'yii-ttr'; + /** + * The message queue broker's host. + * + * @var string|null + */ public $host; + /** + * The message queue broker's port. + * + * @var string|null + */ public $port; + /** + * This is user which is used to login on the broker. + * + * @var string|null + */ public $user; + /** + * This is password which is used to login on the broker. + * + * @var string|null + */ public $password; - + /** + * Sets an fixed vhostname, which will be passed on connect as header['host']. + * + * @var string|null + */ public $vhost; + /** + * @var int + */ public $bufferSize; + /** + * @var int + */ public $connectionTimeout; + /** + * Perform request synchronously. + * @var bool + */ public $sync; + /** + * The connection will be established as later as possible if set true. + * + * @var bool|null + */ public $lazy; + /** + * Defines whether secure connection should be used or not. + * + * @var bool|null + */ public $sslOn; - + /** + * The queue used to consume messages from. + * + * @var string + */ public $queueName = 'stomp_queue'; - + /** + * The property contains a command class which used in cli. + * + * @var string command class name + */ public $commandClass = Command::class; + /** * @var StompContext */ protected $context; + /** + * @inheritdoc + */ public function init() { parent::init(); @@ -48,7 +108,9 @@ public function init() }); } - + /** + * Opens connection. + */ protected function open() { if ($this->context) { @@ -77,11 +139,18 @@ protected function open() $this->context = $factory->createContext(); } + /** + * Listens queue and runs each job. + * + * @param $repeat + * @param int $timeout + * @return int|null + */ public function run($repeat, $timeout = 0) { return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { $this->open(); - $queue = $this->context->createQueue($this->queueName); + $queue = $this->createQueue($this->queueName); $consumer = $this->context->createConsumer($queue); while ($canContinue()) { @@ -115,12 +184,14 @@ public function run($repeat, $timeout = 0) /** * @inheritdoc + * @throws \Interop\Queue\Exception + * @throws NotSupportedException */ protected function pushMessage($message, $ttr, $delay, $priority) { $this->open(); - $queue = $this->context->createQueue($this->queueName); + $queue = $this->createQueue($this->queueName); $message = $this->context->createMessage($message); $message->setMessageId(uniqid('', true)); $message->setPersistent(true); @@ -142,6 +213,9 @@ protected function pushMessage($message, $ttr, $delay, $priority) return $message->getMessageId(); } + /** + * Closes connection. + */ protected function close() { if (!$this->context) { @@ -154,12 +228,19 @@ protected function close() /** * @inheritdoc + * @throws NotSupportedException */ public function status($id) { throw new NotSupportedException('Status is not supported in the driver.'); } + /** + * @param StompMessage $message + * @throws \Interop\Queue\Exception + * @throws \Interop\Queue\InvalidDestinationException + * @throws \Interop\Queue\InvalidMessageException + */ protected function redeliver(StompMessage $message) { $attempt = $message->getProperty(self::ATTEMPT, 1); @@ -168,8 +249,22 @@ protected function redeliver(StompMessage $message) $newMessage->setProperty(self::ATTEMPT, ++$attempt); $this->context->createProducer()->send( - $this->context->createQueue($this->queueName), + $this->createQueue($this->queueName), $newMessage ); } + + /** + * @param $name + * @return \Enqueue\Stomp\StompDestination + */ + private function createQueue($name) + { + $queue = $this->context->createQueue($name); + $queue->setDurable(true); + $queue->setAutoDelete(false); + $queue->setExclusive(false); + + return $queue; + } } From aeffa42c47db818e17dfbd658cf8f6008f179f0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Thu, 6 Sep 2018 09:39:53 +0300 Subject: [PATCH 10/25] update to last changes --- tests/app/config/main.php | 1 - tests/docker/php/entrypoint.sh | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/app/config/main.php b/tests/app/config/main.php index f355f6e2b1..942f080ce1 100644 --- a/tests/app/config/main.php +++ b/tests/app/config/main.php @@ -104,7 +104,6 @@ 'stompQueue' => [ 'class' => \yii\queue\stomp\Queue::class, 'host' => getenv('ACTIVEMQ_HOST') ?: 'localhost', - 'port' => getenv('ACTIVEMQ_PORT') ?: 61613, ], ], ]; diff --git a/tests/docker/php/entrypoint.sh b/tests/docker/php/entrypoint.sh index b82c35f3b2..641ee5b4d9 100755 --- a/tests/docker/php/entrypoint.sh +++ b/tests/docker/php/entrypoint.sh @@ -20,6 +20,8 @@ tests/docker/wait-for-it.sh beanstalk:11300 -t 180 tests/docker/wait-for-it.sh gearmand:4730 -t 180 +tests/docker/wait-for-it.sh activemq:61613 -t 180 + php --version set -x exec "$@" From ed187303b18c18b76253f2f291c00a16fb96c671 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 7 Sep 2018 15:41:40 +0300 Subject: [PATCH 11/25] fix no wait receiver --- src/drivers/stomp/Queue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index e005c47eee..2f89a3e5a5 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -154,7 +154,7 @@ public function run($repeat, $timeout = 0) $consumer = $this->context->createConsumer($queue); while ($canContinue()) { - if ($message = $consumer->receive()) { + if ($message = $consumer->receiveNoWait()) { if ($message->isRedelivered()) { $consumer->acknowledge($message); From 2002948442ed3a7417561622d0100e84f5fffb16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Mon, 10 Sep 2018 11:40:13 +0300 Subject: [PATCH 12/25] This changes need if we use separate queue exchange ( other application use queue without some headers ) * use default value ttr if not exists * generate messageId if not exists in headers --- src/drivers/stomp/Queue.php | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index 2f89a3e5a5..8158fbd204 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -10,6 +10,7 @@ use Enqueue\Stomp\StompConnectionFactory; use Enqueue\Stomp\StompContext; use Enqueue\Stomp\StompMessage; +use Interop\Queue\PsrMessage; use yii\base\Application as BaseApp; use yii\base\Event; use yii\base\NotSupportedException; @@ -155,6 +156,11 @@ public function run($repeat, $timeout = 0) while ($canContinue()) { if ($message = $consumer->receiveNoWait()) { + $messageId = $message->getMessageId(); + if (!$messageId) { + $message = $this->setMessageId($message); + } + if ($message->isRedelivered()) { $consumer->acknowledge($message); @@ -163,7 +169,7 @@ public function run($repeat, $timeout = 0) continue; } - $ttr = $message->getProperty(self::TTR); + $ttr = $message->getProperty(self::TTR, $this->ttr); $attempt = $message->getProperty(self::ATTEMPT, 1); if ($this->handleMessage($message->getMessageId(), $message->getBody(), $ttr, $attempt)) { @@ -182,6 +188,12 @@ public function run($repeat, $timeout = 0) }); } + protected function setMessageId(PsrMessage $message) + { + $message->setMessageId(uniqid('', true)); + return $message; + } + /** * @inheritdoc * @throws \Interop\Queue\Exception @@ -193,7 +205,7 @@ protected function pushMessage($message, $ttr, $delay, $priority) $queue = $this->createQueue($this->queueName); $message = $this->context->createMessage($message); - $message->setMessageId(uniqid('', true)); + $message = $this->setMessageId($message); $message->setPersistent(true); $message->setProperty(self::ATTEMPT, 1); $message->setProperty(self::TTR, $ttr); From fe2d6517784fb43931e9892da0880150c542166d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Mon, 10 Sep 2018 11:55:23 +0300 Subject: [PATCH 13/25] add receive timeout bug with empty buffer --- src/drivers/stomp/Queue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index 8158fbd204..9f67ee21a5 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -155,7 +155,7 @@ public function run($repeat, $timeout = 0) $consumer = $this->context->createConsumer($queue); while ($canContinue()) { - if ($message = $consumer->receiveNoWait()) { + if ($message = $consumer->receive(0.1)) { $messageId = $message->getMessageId(); if (!$messageId) { $message = $this->setMessageId($message); From 6255f669b519ad2ad44625271e9906f5af88b9c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Tue, 11 Sep 2018 09:29:23 +0300 Subject: [PATCH 14/25] readtimeout configurable --- src/drivers/stomp/Queue.php | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index 9f67ee21a5..a6d0c67f8d 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -93,6 +93,12 @@ class Queue extends CliQueue */ public $commandClass = Command::class; + /** + * Set the read timeout. + * @var float + */ + public $readTimeOut = 0.1; + /** * @var StompContext */ @@ -146,16 +152,22 @@ protected function open() * @param $repeat * @param int $timeout * @return int|null + * @throws NotSupportedException */ public function run($repeat, $timeout = 0) { + if ($this->readTimeOut <= 0) { + //prevent infinite loop + throw new NotSupportedException('readTimeOut must be greater than 0'); + } + return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { $this->open(); $queue = $this->createQueue($this->queueName); $consumer = $this->context->createConsumer($queue); while ($canContinue()) { - if ($message = $consumer->receive(0.1)) { + if ($message = $consumer->receive($this->readTimeOut)) { $messageId = $message->getMessageId(); if (!$messageId) { $message = $this->setMessageId($message); From f555fadcbf45224e55475ddb3ae2ca938f29518c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Tue, 11 Sep 2018 12:15:20 +0300 Subject: [PATCH 15/25] readTimeOut in integer --- src/drivers/stomp/Queue.php | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index a6d0c67f8d..71f9ed31f5 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -95,9 +95,9 @@ class Queue extends CliQueue /** * Set the read timeout. - * @var float + * @var int */ - public $readTimeOut = 0.1; + public $readTimeOut = 0; /** * @var StompContext @@ -152,22 +152,16 @@ protected function open() * @param $repeat * @param int $timeout * @return int|null - * @throws NotSupportedException */ public function run($repeat, $timeout = 0) { - if ($this->readTimeOut <= 0) { - //prevent infinite loop - throw new NotSupportedException('readTimeOut must be greater than 0'); - } - return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { $this->open(); $queue = $this->createQueue($this->queueName); $consumer = $this->context->createConsumer($queue); while ($canContinue()) { - if ($message = $consumer->receive($this->readTimeOut)) { + if ($message = ($this->readTimeOut > 0 ? $consumer->receive($this->readTimeOut) : $consumer->receiveNoWait())) { $messageId = $message->getMessageId(); if (!$messageId) { $message = $this->setMessageId($message); From 614a886e6b53ebe6aba8662440ef3f9a0c1836d2 Mon Sep 17 00:00:00 2001 From: Carsten Brandt Date: Wed, 10 Oct 2018 21:02:13 +0200 Subject: [PATCH 16/25] Update Command.php --- src/drivers/stomp/Command.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/drivers/stomp/Command.php b/src/drivers/stomp/Command.php index 722a120311..9f9bfb0d88 100644 --- a/src/drivers/stomp/Command.php +++ b/src/drivers/stomp/Command.php @@ -14,6 +14,7 @@ * Manages application stomp-queue. * * @author Sergey Vershinin + * @since 2.1.1 */ class Command extends CliCommand { From a1c2691842658acacc014b93ffbf656b965e1d41 Mon Sep 17 00:00:00 2001 From: Carsten Brandt Date: Wed, 10 Oct 2018 21:02:43 +0200 Subject: [PATCH 17/25] Update Queue.php --- src/drivers/stomp/Queue.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index 71f9ed31f5..d94f3c0d5c 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -19,6 +19,7 @@ /** * Stomp Queue. * @author Sergey Vershinin + * @since 2.1.1 */ class Queue extends CliQueue { From 6d9bf8314506759f545fc1022cddc09671006ec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Mon, 15 Oct 2018 11:00:15 +0300 Subject: [PATCH 18/25] add alive package --- src/drivers/stomp/Queue.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index d94f3c0d5c..e08bde6e9e 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -190,6 +190,7 @@ public function run($repeat, $timeout = 0) break; } elseif ($timeout) { sleep($timeout); + $this->context->getStomp()->getConnection()->sendAlive(); } } }); From 1acc9889c4f1e9c3a01046b04b24760764bed883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Mon, 15 Oct 2018 11:52:37 +0300 Subject: [PATCH 19/25] fix --- src/drivers/stomp/Queue.php | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index e08bde6e9e..ea9a635e1a 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -10,7 +10,6 @@ use Enqueue\Stomp\StompConnectionFactory; use Enqueue\Stomp\StompContext; use Enqueue\Stomp\StompMessage; -use Interop\Queue\PsrMessage; use yii\base\Application as BaseApp; use yii\base\Event; use yii\base\NotSupportedException; @@ -196,7 +195,7 @@ public function run($repeat, $timeout = 0) }); } - protected function setMessageId(PsrMessage $message) + protected function setMessageId(StompMessage $message) { $message->setMessageId(uniqid('', true)); return $message; @@ -258,8 +257,8 @@ public function status($id) /** * @param StompMessage $message * @throws \Interop\Queue\Exception - * @throws \Interop\Queue\InvalidDestinationException - * @throws \Interop\Queue\InvalidMessageException + * @throws \Interop\Queue\Exception\InvalidDestinationException + * @throws \Interop\Queue\Exception\InvalidMessageException */ protected function redeliver(StompMessage $message) { From f8dc591d10fac805ae37f7296629266294f5bd99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 19 Oct 2018 12:20:57 +0300 Subject: [PATCH 20/25] use stable version --- src/drivers/stomp/Queue.php | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index ea9a635e1a..87e203aac1 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -195,6 +195,11 @@ public function run($repeat, $timeout = 0) }); } + /** + * @param StompMessage $message + * @return StompMessage + * @throws \Interop\Queue\Exception + */ protected function setMessageId(StompMessage $message) { $message->setMessageId(uniqid('', true)); @@ -257,8 +262,6 @@ public function status($id) /** * @param StompMessage $message * @throws \Interop\Queue\Exception - * @throws \Interop\Queue\Exception\InvalidDestinationException - * @throws \Interop\Queue\Exception\InvalidMessageException */ protected function redeliver(StompMessage $message) { From db319c923f8bc963aad8924de775fe30cd6ababb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B5=D1=80=D1=88=D0=B8=D0=BD=D0=B8=D0=BD=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9E=D0=BB=D0=B5=D0=B3=D0=BE?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Fri, 19 Oct 2018 12:30:05 +0300 Subject: [PATCH 21/25] version with public method --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index eb04dd9476..f38e0f6b6d 100644 --- a/composer.json +++ b/composer.json @@ -30,7 +30,7 @@ "yiisoft/yii2-gii": "*", "phpunit/phpunit": "~4.4", "aws/aws-sdk-php": ">=2.4", - "enqueue/stomp": "^0.8.27" + "enqueue/stomp": "^0.8.39" }, "suggest": { "ext-pcntl": "Need for process signals.", From 28775471ee27e812c11374ef03ab288c5930a4e4 Mon Sep 17 00:00:00 2001 From: vershinin_so Date: Fri, 22 Mar 2019 17:08:18 +0300 Subject: [PATCH 22/25] fix travis --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 7e0fa5eb78..47a30d0912 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,7 +33,6 @@ before_install: - sudo apt-get install -qq beanstalkd - sudo beanstalkd -v - sudo service beanstalkd start - - pecl install igbinary - docker pull webcenter/activemq - docker run -d -p 61613:61613 webcenter/activemq - if [[ ${TRAVIS_PHP_VERSION:0:1} == "5" ]]; then pecl install igbinary-2.0.8; else pecl install igbinary; fi From fc18acb040fe9221748f6ba3dc30b5f9773ee7ee Mon Sep 17 00:00:00 2001 From: Alexander Makarov Date: Tue, 4 Jun 2019 21:51:50 +0300 Subject: [PATCH 23/25] Update Command.php --- src/drivers/stomp/Command.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drivers/stomp/Command.php b/src/drivers/stomp/Command.php index 9f9bfb0d88..63844279b7 100644 --- a/src/drivers/stomp/Command.php +++ b/src/drivers/stomp/Command.php @@ -14,7 +14,7 @@ * Manages application stomp-queue. * * @author Sergey Vershinin - * @since 2.1.1 + * @since 2.3.0 */ class Command extends CliCommand { From 4db56a8c9f62fc1844cd1eb7017059f78a5464d8 Mon Sep 17 00:00:00 2001 From: Alexander Makarov Date: Tue, 4 Jun 2019 21:52:15 +0300 Subject: [PATCH 24/25] Update Queue.php --- src/drivers/stomp/Queue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index 87e203aac1..67a6167314 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -18,7 +18,7 @@ /** * Stomp Queue. * @author Sergey Vershinin - * @since 2.1.1 + * @since 2.3.0 */ class Queue extends CliQueue { From 6a161fd025ff9f34a3be5376513c23279d831ad5 Mon Sep 17 00:00:00 2001 From: Alexander Makarov Date: Tue, 4 Jun 2019 21:55:27 +0300 Subject: [PATCH 25/25] Update CHANGELOG.md --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 485ba78c72..5e60e2d8b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,10 @@ Yii2 Queue Extension Change Log =============================== -2.2.2 under development +2.3.0 under development ----------------------- -- no changes in this release. +- Enh #260: Added STOMP driver (versh23) 2.2.1 May 21, 2019