Skip to content

Commit

Permalink
v2.6.2024.1004 支持RocketMQ v5.3,在公网测试通过。默认内网broker地址替换为公网地址。
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Oct 4, 2024
1 parent a5dbb18 commit 006817e
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 86 deletions.
22 changes: 20 additions & 2 deletions NewLife.RocketMQ/MqBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Diagnostics;
using System.Reflection;
using NewLife.Log;
using NewLife.Net;
using NewLife.RocketMQ.Protocol;
using NewLife.Serialization;

Expand Down Expand Up @@ -267,8 +268,21 @@ protected BrokerClient GetBroker(String name)
{
if (_Brokers.TryGetValue(name, out client)) return client;

// broker可能在内网,转为公网地址
var uri = new NetUri(NameServerAddress.Split(";").FirstOrDefault());
var addrs = bk.Addresses.ToArray();
for (var i = 0; i < addrs.Length; i++)
{
var addr = addrs[i];
if (addr.StartsWithIgnoreCase("10.", "192.", "172."))
{
var p = addr.IndexOf(':');
addrs[i] = p > 0 ? uri.Host + addr[p..] : uri.Host;
}
}

// 实例化客户端
client = CreateBroker(bk.Name, bk.Addresses);
client = CreateBroker(bk.Name, addrs);

client.Start();

Expand Down Expand Up @@ -316,7 +330,7 @@ protected virtual BrokerClient CreateBroker(String name, String[] addrs)
/// <param name="topic">主题</param>
/// <param name="queueNum">队列数</param>
/// <param name="topicSysFlag"></param>
public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag = 0)
public virtual Int32 CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag = 0)
{
var header = new
{
Expand All @@ -330,6 +344,7 @@ public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag
order = false,
};

var count = 0;
using var span = Tracer?.NewSpan($"mq:{Name}:CreateTopic", header);
try
{
Expand All @@ -341,6 +356,7 @@ public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag
{
var bk = GetBroker(item.Name);
var rs = bk.Invoke(RequestCode.UPDATE_AND_CREATE_TOPIC, null, header);
if (rs != null && rs.Header.Code == (Int32)ResponseCode.SUCCESS) count++;
}
catch (Exception ex)
{
Expand All @@ -354,6 +370,8 @@ public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag

throw;
}

return count;
}
#endregion

Expand Down
2 changes: 1 addition & 1 deletion NewLife.RocketMQ/NewLife.RocketMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="11.0.2024.917-beta0004" />
<PackageReference Include="NewLife.Core" Version="11.0.2024.1001" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions NewLife.RocketMQ/Protocol/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ public IPacket ToPacket()
{
var ms = new MemoryStream();
Write(ms, null);
ms.Position = 0;

return new Packet(ms);
ms.Position = 0;
return new ArrayPacket(ms);
}

/// <summary>创建响应</summary>
Expand Down
2 changes: 1 addition & 1 deletion NewLife.RocketMQ/Protocol/MessageExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public Boolean Read(Stream stream, Object context = null)
ms.Write(port.GetBytes(false));
ms.Write(CommitLogOffset.GetBytes(false));

MsgId = ms.Put(true).ToHex(0, 16);
MsgId = ms.Return(true).ToHex(0, 16);

return true;
}
Expand Down
48 changes: 37 additions & 11 deletions Test/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,70 @@ static void Main(String[] args)
{
XTrace.UseConsole();

//Test5();
TestAliyun();
Test1();
//TestAliyun();

Console.WriteLine("OK!");
Console.ReadKey();
}

static void Test1()
{
var mq = new Producer
XTrace.WriteLine("");
XTrace.WriteLine("创建生产者……");
var producer = new Producer
{
Topic = "nx_test",
NameServerAddress = "127.0.0.1:9876",
NameServerAddress = "rocketmq.newlifex.com:9876",

Log = XTrace.Log,
};

mq.Configure(MqSetting.Current);
mq.Start();
producer.Configure(MqSetting.Current);
producer.Start();

//mq.CreateTopic("nx_test", 2);

for (var i = 0; i < 1000_000; i++)
XTrace.WriteLine("");
XTrace.WriteLine("创建消费者……");
var consumer = new Consumer
{
Topic = producer.Topic,
Group = "test",
NameServerAddress = producer.NameServerAddress,

FromLastOffset = false,
//SkipOverStoredMsgCount = 0,
//BatchSize = 20,

Log = XTrace.Log,
ClientLog = XTrace.Log,
};

consumer.OnConsume = OnConsume;

consumer.Configure(MqSetting.Current);
consumer.Start();
Thread.Sleep(1000);

XTrace.WriteLine("");
XTrace.WriteLine("发布测试消息……");
for (var i = 0; i < 10; i++)
{
var str = "学无先后达者为师" + i;
//var str = Rand.NextString(1337);

var sr = mq.Publish(str, "TagA", null);
var sr = producer.Publish(str, "TagA", null);

//Console.WriteLine("[{0}] {1} {2} {3}", sr.Queue.BrokerName, sr.Queue.QueueId, sr.MsgId, sr.QueueOffset);

// 阿里云发送消息不能过快,否则报错“服务不可用”
Thread.Sleep(100);
Thread.Sleep(500);
}

Console.WriteLine("完成");

mq.Dispose();
producer.Dispose();
}

private static Consumer _consumer;
Expand Down Expand Up @@ -121,7 +147,7 @@ private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)

foreach (var item in ms.ToList())
{
Console.WriteLine($"消息:主键【{item.Keys}】 Topic 【{item.Topic}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr(null, 0, 64)}");
Console.WriteLine($"消息:主键【{item.Keys}】 Topic 【{item.Topic}】,产生时间【{item.BornTimestamp.ToDateTime().ToFullString()}】,内容【{item.Body.ToStr(null, 0, 64)}");
}

return true;
Expand Down
36 changes: 36 additions & 0 deletions XUnitTestRocketMQ/BasicTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Linq;
using NewLife;
using NewLife.Log;
using NewLife.RocketMQ;
using Xunit;

// 所有测试用例放入一个汇编级集合,除非单独指定Collection特性
[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly)]

namespace XUnitTestRocketMQ;

[Collection("Basic")]
public class BasicTest
{
private static MqSetting _config;
public static MqSetting GetConfig()
{
if (_config != null) return _config;
lock (typeof(BasicTest))
{
if (_config != null) return _config;

var set = MqSetting.Current;
if (set.IsNew)
{
set.NameServer = "rocketmq.newlifex.com:9876";
set.Save();
}

XTrace.WriteLine("RocketMQ配置:{0}", set.NameServer);

return _config = set;
}
}
}
4 changes: 0 additions & 4 deletions XUnitTestRocketMQ/CommandTests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NewLife;
using NewLife.Data;
using NewLife.RocketMQ.Protocol;
Expand Down
66 changes: 33 additions & 33 deletions XUnitTestRocketMQ/ConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,49 @@
using System.Threading;
using Xunit;

namespace XUnitTestRocketMQ
namespace XUnitTestRocketMQ;

public class ConsumerTests
{
public class ConsumerTests
private static Consumer _consumer;
[Fact]
public static void ConsumeTest()
{
private static Consumer _consumer;
[Fact]
static void ConsumeTest()
var set = BasicTest.GetConfig();
var consumer = new Consumer
{
var consumer = new Consumer
{
Topic = "nx_test",
Group = "test",
NameServerAddress = "127.0.0.1:9876",

FromLastOffset = true,
BatchSize = 20,
Topic = "nx_test",
Group = "test",
NameServerAddress = set.NameServer,

Log = XTrace.Log,
};
FromLastOffset = true,
BatchSize = 20,

consumer.OnConsume = OnConsume;
consumer.Start();
Log = XTrace.Log,
};

_consumer = consumer;
consumer.OnConsume = OnConsume;
consumer.Start();

Thread.Sleep(3000);
//foreach (var item in consumer.Clients)
//{
// var rs = item.GetRuntimeInfo();
// Console.WriteLine("{0}\t{1}", item.Name, rs["brokerVersionDesc"]);
//}
}
_consumer = consumer;

private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
{
Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
Thread.Sleep(3000);
//foreach (var item in consumer.Clients)
//{
// var rs = item.GetRuntimeInfo();
// Console.WriteLine("{0}\t{1}", item.Name, rs["brokerVersionDesc"]);
//}
}

foreach (var item in ms.ToList())
{
Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}");
}
private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
{
Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);

return true;
foreach (var item in ms.ToList())
{
Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}");
}

return true;
}
}
Loading

0 comments on commit 006817e

Please sign in to comment.