Skip to content

Commit

Permalink
Simplify binding code
Browse files Browse the repository at this point in the history
This also removes the capability of choosing the network interface by its primary IP address, since we've never needed it and the code got too confusing.
  • Loading branch information
ltrzesniewski committed Nov 8, 2023
1 parent 726ab01 commit cb72758
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 22 deletions.
11 changes: 5 additions & 6 deletions src/Abc.Zebus.Tests/Transport/ZmqTransportTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,15 @@ public void should_support_peer_endpoint_modifications()
var senderTransport = CreateAndStartZmqTransport();

var receivedMessages = new ConcurrentBag<TransportMessage>();
var receiverTransport = CreateAndStartZmqTransport(onMessageReceived: receivedMessages.Add);
var receiver = receiverTransport.GetPeer();
var receiverTransport1 = CreateAndStartZmqTransport(onMessageReceived: receivedMessages.Add);
var receiverTransport2 = CreateAndStartZmqTransport(onMessageReceived: receivedMessages.Add);
var receiver = receiverTransport1.GetPeer();

senderTransport.Send(new FakeCommand(0).ToTransportMessage(), new[] { receiver });
Wait.Until(() => receivedMessages.Count == 1, 2.Seconds());

var newEndPoint = "tcp://127.0.0.1:" + TcpUtil.GetRandomUnusedPort();
receiverTransport.Stop();
receiverTransport = CreateAndStartZmqTransport(newEndPoint, receivedMessages.Add);
receiver.EndPoint = receiverTransport.InboundEndPoint;
receiverTransport1.Stop();
receiver.EndPoint = receiverTransport2.InboundEndPoint;

senderTransport.Send(new FakeCommand(0).ToTransportMessage(), new[] { receiver });
Wait.Until(() => receivedMessages.Count == 2, 2.Seconds(), "unable to receive message");
Expand Down
15 changes: 14 additions & 1 deletion src/Abc.Zebus/Transport/ZmqEndPoint.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
namespace Abc.Zebus.Transport;
using System;
using System.Text.RegularExpressions;

namespace Abc.Zebus.Transport;

internal readonly struct ZmqEndPoint
{
private static readonly Regex _endpointRegex = new(@"^tcp://(?<host>\*|[0-9a-zA-Z_.-]+):(?<port>\*|[0-9]+)/?$", RegexOptions.IgnoreCase);

private readonly string? _value;

public ZmqEndPoint(string? value)
=> _value = value;

public override string ToString()
=> _value ?? "tcp://*:*";

public static (string host, string port) Parse(string? endpoint)
{
var match = _endpointRegex.Match(endpoint ?? string.Empty);
return match.Success
? (match.Groups["host"].Value, match.Groups["port"].Value)
: throw new InvalidOperationException($"Invalid endpoint: {endpoint}");
}
}
19 changes: 4 additions & 15 deletions src/Abc.Zebus/Transport/ZmqInboundSocket.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Net;
using System.Text.RegularExpressions;
using Abc.Zebus.Serialization.Protobuf;
using Abc.Zebus.Transport.Zmq;
using Microsoft.Extensions.Logging;
Expand All @@ -11,9 +10,6 @@ internal class ZmqInboundSocket : IDisposable
{
private static readonly ILogger _logger = ZebusLogManager.GetLogger(typeof(ZmqInboundSocket));

private static readonly Regex _endpointRegex = new(@"^tcp://(?<host>\*|[0-9a-zA-Z_.-]+):(?<port>\*|[0-9]+)/?$", RegexOptions.IgnoreCase);
private static readonly Regex _ipRegex = new(@"^(?:[0-9]+\.){3}[0-9]+$");

private readonly ZmqContext _context;
private readonly ZmqEndPoint _configuredEndPoint;
private readonly ZmqSocketOptions _options;
Expand All @@ -32,11 +28,12 @@ public ZmqEndPoint Bind()
{
_socket = CreateSocket();

var (configuredHost, configuredPort) = ParseEndpoint(_configuredEndPoint.ToString());
var (configuredHost, configuredPort) = ZmqEndPoint.Parse(_configuredEndPoint.ToString());

_socket.Bind($"tcp://*:{configuredPort}");

_socket.Bind($"tcp://{(_ipRegex.IsMatch(configuredHost) ? configuredHost : "*")}:{configuredPort}");
var (boundHost, boundPort) = ZmqEndPoint.Parse(_socket.GetOptionString(ZmqSocketOption.LAST_ENDPOINT));

var (boundHost, boundPort) = ParseEndpoint(_socket.GetOptionString(ZmqSocketOption.LAST_ENDPOINT));
if (boundHost == "0.0.0.0")
{
// Use the hostname from the config when one is provided, or the FQDN otherwise
Expand All @@ -49,14 +46,6 @@ public ZmqEndPoint Bind()
_logger.LogInformation($"Socket bound, Inbound EndPoint: {socketEndPoint}");

return socketEndPoint;

static (string host, string port) ParseEndpoint(string endpoint)
{
var match = _endpointRegex.Match(endpoint);
return match.Success
? (match.Groups["host"].Value, match.Groups["port"].Value)
: throw new InvalidOperationException($"Invalid endpoint: {endpoint}");
}
}

public void Dispose()
Expand Down

0 comments on commit cb72758

Please sign in to comment.