Skip to content

Commit

Permalink
Merge pull request #4 from versh23/add-emiter
Browse files Browse the repository at this point in the history
add observers config
  • Loading branch information
versh23 authored Oct 14, 2019
2 parents 6df1959 + 886ea7c commit 0e7bbb1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
1 change: 0 additions & 1 deletion .php_cs.cache

This file was deleted.

33 changes: 32 additions & 1 deletion src/Messenger/Transport/Stomp/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
use Interop\Queue\Exception\InvalidDestinationException;
use Interop\Queue\Exception\InvalidMessageException;
use Stomp\Exception\ConnectionException;
use Stomp\Network\Observer\HeartbeatEmitter;
use Stomp\Network\Observer\ServerAliveObserver;

class Connection
{
Expand All @@ -33,6 +35,8 @@ public static function create(string $dsn, array $options = []): self
$destinationName = $options['destination'] ?? null;
$queueName = $options['queue'] ?? null;
$topicName = $options['topic'] ?? null;
$useHeartbeatEmitter = $options['useHeartbeatEmitter'] ?? false;
$useServerAliveObserver = $options['useServerAliveObserver'] ?? false;

if (!$destinationName && $queueName) {
$destinationName = '/queue/'.$queueName;
Expand Down Expand Up @@ -67,8 +71,35 @@ public static function create(string $dsn, array $options = []): self
}

$factory = new StompConnectionFactory($config);
$context = $factory->createContext();

return new self($factory->createContext(), $destinationName);
if ($useHeartbeatEmitter || $useServerAliveObserver) {
$stomp = $context->getStomp();
$stomp->disconnect();
$connection = $stomp->getConnection();

$sendBeat = 0;
$receiveBeat = 0;

if ($useHeartbeatEmitter) {
$sendBeat = 3000;
$emitter = new HeartbeatEmitter($connection);
$connection->getObservers()->addObserver($emitter);
}

if ($useServerAliveObserver) {
$receiveBeat = 3000;
$connection->getObservers()->addObserver(new ServerAliveObserver());
}

$stomp->setHeartbeat($sendBeat, $receiveBeat);

$connection->setReadTimeout(0, 3 * 500000);

$stomp->connect();
}

return new self($context, $destinationName);
}

/**
Expand Down
7 changes: 0 additions & 7 deletions src/Messenger/Transport/Stomp/StompReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Versh23\Messenger\Transport\Stomp;

use Enqueue\Stomp\StompMessage;
use Stomp\Exception\ConnectionException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
Expand Down Expand Up @@ -49,12 +48,6 @@ public function __construct(Connection $connection, SerializerInterface $seriali
*/
public function get(): iterable
{
try {
$this->connection->ping();
} catch (ConnectionException $e) {
throw new TransportException($e->getMessage());
}

$stompMessage = $this->connection->get();

if (!$stompMessage) {
Expand Down

0 comments on commit 0e7bbb1

Please sign in to comment.