From 147d039d62e6a1bd1bc39f9c9b5305fb4486e7a5 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 1 Aug 2023 07:44:35 +0700 Subject: [PATCH] Expose `Tags` property in `EventEnvelope` (#6862) * Expose `Tags` property in `EventEnvelope` * update API approval list * Change `Tags` default value from `null` to empty array * Update API approval list --- .../InMemoryCurrentEventsByTagSpec.cs | 2 + .../InMemoryEventsByTagSpec.cs | 2 + .../AllEventsPublisher.cs | 4 +- .../EventsByPersistenceIdPublisher.cs | 4 +- .../EventsByTagPublisher.cs | 3 +- .../AllEventsPublisher.cs | 4 +- .../EventsByPersistenceIdPublisher.cs | 4 +- .../EventsByTagPublisher.cs | 3 +- .../Query/SqliteCurrentEventsByTagSpec.cs | 2 + .../Query/SqliteEventsByTagSpec.cs | 2 + ...pprovePersistenceQuery.DotNet.verified.txt | 6 +- ...c.ApprovePersistenceQuery.Net.verified.txt | 6 +- .../Akka.Persistence.Query/EventEnvelope.cs | 23 +++-- .../Query/CurrentEventsByTagSpec.cs | 86 ++++++++++++++----- .../Query/EventsByTagSpec.cs | 35 +++++--- 15 files changed, 137 insertions(+), 49 deletions(-) diff --git a/src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryCurrentEventsByTagSpec.cs b/src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryCurrentEventsByTagSpec.cs index 08f7d905b85..1c9762551e5 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryCurrentEventsByTagSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryCurrentEventsByTagSpec.cs @@ -32,5 +32,7 @@ public InMemoryCurrentEventsByTagSpec(ITestOutputHelper output) : { ReadJournal = Sys.ReadJournalFor(InMemoryReadJournal.Identifier); } + + protected override bool SupportsTagsInEventEnvelope => true; } } diff --git a/src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryEventsByTagSpec.cs b/src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryEventsByTagSpec.cs index 0e18a0d9fb8..e47176db490 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryEventsByTagSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.InMemory.Tests/InMemoryEventsByTagSpec.cs @@ -32,5 +32,7 @@ public InMemoryEventsByTagSpec(ITestOutputHelper output) : { ReadJournal = Sys.ReadJournalFor(InMemoryReadJournal.Identifier); } + + protected override bool SupportsTagsInEventEnvelope => true; } } diff --git a/src/contrib/persistence/Akka.Persistence.Query.InMemory/AllEventsPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.InMemory/AllEventsPublisher.cs index 8063f57db06..c79b627a7a3 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.InMemory/AllEventsPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.InMemory/AllEventsPublisher.cs @@ -108,12 +108,14 @@ protected bool Replaying( object message) if (replayed.Offset > ToOffset) return true; + // NOTES: tags is empty because tags are not retrieved from the database query (as of this writing) Buffer.Add(new EventEnvelope( offset: new Sequence(replayed.Offset), persistenceId: replayed.Persistent.PersistenceId, sequenceNr: replayed.Persistent.SequenceNr, @event: replayed.Persistent.Payload, - timestamp: replayed.Persistent.Timestamp)); + timestamp: replayed.Persistent.Timestamp, + tags: Array.Empty())); CurrentOffset = replayed.Offset + 1; Buffer.DeliverBuffer(TotalDemand); diff --git a/src/contrib/persistence/Akka.Persistence.Query.InMemory/EventsByPersistenceIdPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.InMemory/EventsByPersistenceIdPublisher.cs index b3be195a336..52c8ec36704 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.InMemory/EventsByPersistenceIdPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.InMemory/EventsByPersistenceIdPublisher.cs @@ -121,12 +121,14 @@ protected Receive Replaying(int limit) { case ReplayedMessage replayed: var seqNr = replayed.Persistent.SequenceNr; + // NOTES: tags is empty because tags are not retrieved from the database query (as of this writing) Buffer.Add(new EventEnvelope( offset: new Sequence(seqNr), persistenceId: PersistenceId, sequenceNr: seqNr, @event: replayed.Persistent.Payload, - timestamp: replayed.Persistent.Timestamp)); + timestamp: replayed.Persistent.Timestamp, + tags: Array.Empty())); CurrentSequenceNr = seqNr + 1; Buffer.DeliverBuffer(TotalDemand); return true; diff --git a/src/contrib/persistence/Akka.Persistence.Query.InMemory/EventsByTagPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.InMemory/EventsByTagPublisher.cs index b698ef22d19..1ae4e1279fd 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.InMemory/EventsByTagPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.InMemory/EventsByTagPublisher.cs @@ -120,7 +120,8 @@ protected Receive Replaying(int limit) persistenceId: replayed.Persistent.PersistenceId, sequenceNr: replayed.Persistent.SequenceNr, @event: replayed.Persistent.Payload, - timestamp: replayed.Persistent.Timestamp)); + timestamp: replayed.Persistent.Timestamp, + tags: new [] { replayed.Tag })); CurrentOffset = replayed.Offset + 1; Buffer.DeliverBuffer(TotalDemand); diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs index 1f239ec1093..fbfa13c884a 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/AllEventsPublisher.cs @@ -139,12 +139,14 @@ protected bool Replaying( object message ) if (replayed.Offset > ToOffset) return true; + // NOTES: tags is empty because tags are not retrieved from the database query (as of this writing) Buffer.Add(new EventEnvelope( offset: new Sequence(replayed.Offset), persistenceId: replayed.Persistent.PersistenceId, sequenceNr: replayed.Persistent.SequenceNr, @event: replayed.Persistent.Payload, - timestamp: replayed.Persistent.Timestamp)); + timestamp: replayed.Persistent.Timestamp, + tags: Array.Empty())); CurrentOffset = replayed.Offset; Buffer.DeliverBuffer(TotalDemand); diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs index 596b59f17bb..a5d9fe4227e 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByPersistenceIdPublisher.cs @@ -153,12 +153,14 @@ protected bool Replaying(object message) { case ReplayedMessage replayed: var seqNr = replayed.Persistent.SequenceNr; + // NOTES: tags is empty because tags are not retrieved from the database query (as of this writing) Buffer.Add(new EventEnvelope( offset: new Sequence(seqNr), persistenceId: PersistenceId, sequenceNr: seqNr, @event: replayed.Persistent.Payload, - timestamp: replayed.Persistent.Timestamp)); + timestamp: replayed.Persistent.Timestamp, + tags: Array.Empty())); CurrentSequenceNr = seqNr + 1; Buffer.DeliverBuffer(TotalDemand); return true; diff --git a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs index a2469601721..a539c85a240 100644 --- a/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs +++ b/src/contrib/persistence/Akka.Persistence.Query.Sql/EventsByTagPublisher.cs @@ -153,7 +153,8 @@ protected bool Replaying(object message) persistenceId: replayed.Persistent.PersistenceId, sequenceNr: replayed.Persistent.SequenceNr, @event: replayed.Persistent.Payload, - timestamp: replayed.Persistent.Timestamp)); + timestamp: replayed.Persistent.Timestamp, + tags: new [] { replayed.Tag })); CurrentOffset = replayed.Offset; Buffer.DeliverBuffer(TotalDemand); diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs index 2f7585f8668..32d91b3af35 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteCurrentEventsByTagSpec.cs @@ -44,5 +44,7 @@ public SqliteCurrentEventsByTagSpec(ITestOutputHelper output) : base(Config(Coun { ReadJournal = Sys.ReadJournalFor(SqlReadJournal.Identifier); } + + protected override bool SupportsTagsInEventEnvelope => true; } } diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteEventsByTagSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteEventsByTagSpec.cs index 208cd8e5cbf..b022d87ea8c 100644 --- a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteEventsByTagSpec.cs +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/Query/SqliteEventsByTagSpec.cs @@ -44,5 +44,7 @@ public SqliteEventsByTagSpec(ITestOutputHelper output) : base(Config(Counter.Get { ReadJournal = Sys.ReadJournalFor(SqlReadJournal.Identifier); } + + protected override bool SupportsTagsInEventEnvelope => true; } } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.DotNet.verified.txt index 6d9073534c6..ac9d02f5fdf 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.DotNet.verified.txt @@ -4,15 +4,19 @@ [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName=".NET 6.0")] namespace Akka.Persistence.Query { + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class EventEnvelope : System.IEquatable { - [System.ObsoleteAttribute("For binary compatibility with previous releases")] + [System.ObsoleteAttribute("For binary compatibility with previous releases. Since 1.4.14")] public EventEnvelope(Akka.Persistence.Query.Offset offset, string persistenceId, long sequenceNr, object @event) { } + [System.ObsoleteAttribute("For binary compatibility with previous releases. Since 1.5.11")] public EventEnvelope(Akka.Persistence.Query.Offset offset, string persistenceId, long sequenceNr, object @event, long timestamp) { } + public EventEnvelope(Akka.Persistence.Query.Offset offset, string persistenceId, long sequenceNr, object @event, long timestamp, string[] tags) { } public object Event { get; } public Akka.Persistence.Query.Offset Offset { get; } public string PersistenceId { get; } public long SequenceNr { get; } + public string[] Tags { get; } public long Timestamp { get; } public bool Equals(Akka.Persistence.Query.EventEnvelope other) { } public override bool Equals(object obj) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Net.verified.txt index d95c2cfff3a..eb4682f9eaa 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistenceQuery.Net.verified.txt @@ -4,15 +4,19 @@ [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")] namespace Akka.Persistence.Query { + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class EventEnvelope : System.IEquatable { - [System.ObsoleteAttribute("For binary compatibility with previous releases")] + [System.ObsoleteAttribute("For binary compatibility with previous releases. Since 1.4.14")] public EventEnvelope(Akka.Persistence.Query.Offset offset, string persistenceId, long sequenceNr, object @event) { } + [System.ObsoleteAttribute("For binary compatibility with previous releases. Since 1.5.11")] public EventEnvelope(Akka.Persistence.Query.Offset offset, string persistenceId, long sequenceNr, object @event, long timestamp) { } + public EventEnvelope(Akka.Persistence.Query.Offset offset, string persistenceId, long sequenceNr, object @event, long timestamp, string[] tags) { } public object Event { get; } public Akka.Persistence.Query.Offset Offset { get; } public string PersistenceId { get; } public long SequenceNr { get; } + public string[] Tags { get; } public long Timestamp { get; } public bool Equals(Akka.Persistence.Query.EventEnvelope other) { } public override bool Equals(object obj) { } diff --git a/src/core/Akka.Persistence.Query/EventEnvelope.cs b/src/core/Akka.Persistence.Query/EventEnvelope.cs index bdd047196ca..0aa2fb69802 100644 --- a/src/core/Akka.Persistence.Query/EventEnvelope.cs +++ b/src/core/Akka.Persistence.Query/EventEnvelope.cs @@ -7,6 +7,7 @@ using System; +#nullable enable namespace Akka.Persistence.Query { /// @@ -23,21 +24,27 @@ public sealed class EventEnvelope : IEquatable /// /// Initializes a new instance of the class. /// - [Obsolete("For binary compatibility with previous releases")] + [Obsolete("For binary compatibility with previous releases. Since 1.4.14")] public EventEnvelope(Offset offset, string persistenceId, long sequenceNr, object @event) - : this(offset, persistenceId, sequenceNr, @event, 0L) + : this(offset, persistenceId, sequenceNr, @event, 0L, Array.Empty()) { } /// /// Initializes a new instance of the class. /// + [Obsolete("For binary compatibility with previous releases. Since 1.5.11")] public EventEnvelope(Offset offset, string persistenceId, long sequenceNr, object @event, long timestamp) + : this(offset, persistenceId, sequenceNr, @event, timestamp, Array.Empty()) + { } + + public EventEnvelope(Offset offset, string persistenceId, long sequenceNr, object @event, long timestamp, string[] tags) { Offset = offset; PersistenceId = persistenceId; SequenceNr = sequenceNr; Event = @event; Timestamp = timestamp; + Tags = tags ?? Array.Empty(); } public Offset Offset { get; } @@ -50,28 +57,30 @@ public EventEnvelope(Offset offset, string persistenceId, long sequenceNr, objec public long Timestamp { get; } - public bool Equals(EventEnvelope other) + public string[] Tags { get; } + + public bool Equals(EventEnvelope? other) { if (ReferenceEquals(this, other)) return true; if (ReferenceEquals(other, null)) return false; - // timestamp not included in Equals for backwards compatibility + // Timestamp and Tags not included in Equals for backwards compatibility return Offset == other.Offset && PersistenceId == other.PersistenceId && SequenceNr == other.SequenceNr && Equals(Event, other.Event); } - public override bool Equals(object obj) => obj is EventEnvelope evt && Equals(evt); + public override bool Equals(object? obj) => obj is EventEnvelope evt && Equals(evt); public override int GetHashCode() { unchecked { var hashCode = Offset.GetHashCode(); - hashCode = (hashCode*397) ^ (PersistenceId != null ? PersistenceId.GetHashCode() : 0); + hashCode = (hashCode*397) ^ (PersistenceId?.GetHashCode() ?? 0); hashCode = (hashCode*397) ^ SequenceNr.GetHashCode(); - hashCode = (hashCode*397) ^ (Event != null ? Event.GetHashCode() : 0); + hashCode = (hashCode*397) ^ (Event?.GetHashCode() ?? 0); return hashCode; } } diff --git a/src/core/Akka.Persistence.TCK/Query/CurrentEventsByTagSpec.cs b/src/core/Akka.Persistence.TCK/Query/CurrentEventsByTagSpec.cs index d63c037612c..9a3ee5cd0b2 100644 --- a/src/core/Akka.Persistence.TCK/Query/CurrentEventsByTagSpec.cs +++ b/src/core/Akka.Persistence.TCK/Query/CurrentEventsByTagSpec.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Linq; using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Query; @@ -14,6 +15,7 @@ using FluentAssertions; using Xunit; using Xunit.Abstractions; +using Xunit.Sdk; using static Akka.Persistence.Query.Offset; namespace Akka.Persistence.TCK.Query @@ -24,6 +26,8 @@ public abstract class CurrentEventsByTagSpec : Akka.TestKit.Xunit2.TestKit protected IReadJournal ReadJournal { get; set; } + protected virtual bool SupportsTagsInEventEnvelope => false; + protected CurrentEventsByTagSpec(Config config = null, string actorSystemName = null, ITestOutputHelper output = null) : base(config ?? Config.Empty, actorSystemName, output) { @@ -39,7 +43,9 @@ public void ReadJournal_should_implement_ICurrentEventsByTagQuery() [Fact] public virtual void ReadJournal_query_CurrentEventsByTag_should_find_existing_events() { - var queries = ReadJournal as ICurrentEventsByTagQuery; + if (ReadJournal is not ICurrentEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(ICurrentEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); + var a = Sys.ActorOf(Query.TestActor.Props("a")); var b = Sys.ActorOf(Query.TestActor.Props("b")); @@ -59,30 +65,32 @@ public virtual void ReadJournal_query_CurrentEventsByTag_should_find_existing_ev var greenSrc = queries.CurrentEventsByTag("green", offset: NoOffset()); var probe = greenSrc.RunWith(this.SinkProbe(), Materializer); probe.Request(2); - probe.ExpectNext(p => p.PersistenceId == "a" && p.SequenceNr == 2L && p.Event.Equals("a green apple")); - probe.ExpectNext(p => p.PersistenceId == "a" && p.SequenceNr == 4L && p.Event.Equals("a green banana")); + ExpectEnvelope(probe, "a", 2L, "a green apple", "green"); + ExpectEnvelope(probe, "a", 4L, "a green banana", "green"); probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); probe.Request(2); - probe.ExpectNext(p => p.PersistenceId == "b" && p.SequenceNr == 2L && p.Event.Equals("a green leaf")); + ExpectEnvelope(probe, "b", 2L, "a green leaf", "green"); probe.ExpectComplete(); var blackSrc = queries.CurrentEventsByTag("black", offset: NoOffset()); var probe2 = blackSrc.RunWith(this.SinkProbe(), Materializer); probe2.Request(5); - probe2.ExpectNext(p => p.PersistenceId == "b" && p.SequenceNr == 1L && p.Event.Equals("a black car")); + ExpectEnvelope(probe2, "b", 1L, "a black car", "black"); probe2.ExpectComplete(); var appleSrc = queries.CurrentEventsByTag("apple", offset: NoOffset()); var probe3 = appleSrc.RunWith(this.SinkProbe(), Materializer); probe3.Request(5); - probe3.ExpectNext(p => p.PersistenceId == "a" && p.SequenceNr == 2L && p.Event.Equals("a green apple")); + ExpectEnvelope(probe3, "a", 2L, "a green apple", "apple"); probe3.ExpectComplete(); } [Fact] public virtual void ReadJournal_query_CurrentEventsByTag_should_complete_when_no_events() { - var queries = ReadJournal as ICurrentEventsByTagQuery; + if (ReadJournal is not ICurrentEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(ICurrentEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); + var a = Sys.ActorOf(Query.TestActor.Props("a")); var b = Sys.ActorOf(Query.TestActor.Props("b")); @@ -100,16 +108,32 @@ public virtual void ReadJournal_query_CurrentEventsByTag_should_complete_when_no [Fact] public virtual void ReadJournal_query_CurrentEventsByTag_should_not_see_new_events_after_complete() { - var queries = ReadJournal as ICurrentEventsByTagQuery; - ReadJournal_query_CurrentEventsByTag_should_find_existing_events(); + if (ReadJournal is not ICurrentEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(ICurrentEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); + + var a = Sys.ActorOf(Query.TestActor.Props("a")); + var b = Sys.ActorOf(Query.TestActor.Props("b")); + + a.Tell("hello"); + ExpectMsg("hello-done"); + a.Tell("a green apple"); + ExpectMsg("a green apple-done"); + b.Tell("a black car"); + ExpectMsg("a black car-done"); + a.Tell("something else"); + ExpectMsg("something else-done"); + a.Tell("a green banana"); + ExpectMsg("a green banana-done"); + b.Tell("a green leaf"); + ExpectMsg("a green leaf-done"); var c = Sys.ActorOf(Query.TestActor.Props("c")); var greenSrc = queries.CurrentEventsByTag("green", offset: NoOffset()); var probe = greenSrc.RunWith(this.SinkProbe(), Materializer); probe.Request(2); - probe.ExpectNext(p => p.PersistenceId == "a" && p.SequenceNr == 2L && p.Event.Equals("a green apple")); - probe.ExpectNext(p => p.PersistenceId == "a" && p.SequenceNr == 4L && p.Event.Equals("a green banana")); + ExpectEnvelope(probe, "a", 2L, "a green apple", "green"); + ExpectEnvelope(probe, "a", 4L, "a green banana", "green"); probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); c.Tell("a green cucumber"); @@ -117,14 +141,15 @@ public virtual void ReadJournal_query_CurrentEventsByTag_should_not_see_new_even probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); probe.Request(5); - probe.ExpectNext(p => p.PersistenceId == "b" && p.SequenceNr == 2L && p.Event.Equals("a green leaf")); + ExpectEnvelope(probe, "b", 2L, "a green leaf", "green"); probe.ExpectComplete(); // green cucumber not seen } [Fact] public virtual void ReadJournal_query_CurrentEventsByTag_should_find_events_from_offset_exclusive() { - var queries = ReadJournal as ICurrentEventsByTagQuery; + if (ReadJournal is not ICurrentEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(ICurrentEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); var a = Sys.ActorOf(Query.TestActor.Props("a")); var b = Sys.ActorOf(Query.TestActor.Props("b")); @@ -148,25 +173,27 @@ public virtual void ReadJournal_query_CurrentEventsByTag_should_find_events_from var greenSrc1 = queries.CurrentEventsByTag("green", offset: NoOffset()); var probe1 = greenSrc1.RunWith(this.SinkProbe(), Materializer); probe1.Request(2); - probe1.ExpectNext(p => p.PersistenceId == "a" && p.SequenceNr == 2L && p.Event.Equals("a green apple")); - var offs = probe1.ExpectNext(p => p.PersistenceId == "a" && p.SequenceNr == 4L && p.Event.Equals("a green banana")).Offset; + ExpectEnvelope(probe1, "a", 2L, "a green apple", "green"); + var offs = ExpectEnvelope(probe1, "a", 4L, "a green banana", "green").Offset; probe1.Cancel(); var greenSrc = queries.CurrentEventsByTag("green", offset: offs); var probe2 = greenSrc.RunWith(this.SinkProbe(), Materializer); probe2.Request(10); // note that banana is not included, since exclusive offset - probe2.ExpectNext(p => p.PersistenceId == "b" && p.SequenceNr == 2L && p.Event.Equals("a green leaf")); + ExpectEnvelope(probe2, "b", 2L, "a green leaf", "green"); probe2.Cancel(); } [Fact] public virtual void ReadJournal_query_CurrentEventsByTag_should_see_all_150_events() { - var queries = ReadJournal as ICurrentEventsByTagQuery; + if (ReadJournal is not ICurrentEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(ICurrentEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); + var a = Sys.ActorOf(Query.TestActor.Props("a")); - for (int i = 0; i < 150; ++i) + foreach (var _ in Enumerable.Range(1, 150)) { a.Tell("a green apple"); ExpectMsg("a green apple-done"); @@ -175,10 +202,9 @@ public virtual void ReadJournal_query_CurrentEventsByTag_should_see_all_150_even var greenSrc = queries.CurrentEventsByTag("green", offset: NoOffset()); var probe = greenSrc.RunWith(this.SinkProbe(), Materializer); probe.Request(150); - for (int i = 0; i < 150; ++i) + foreach (var i in Enumerable.Range(1, 150)) { - probe.ExpectNext(p => - p.PersistenceId == "a" && p.SequenceNr == (i + 1) && p.Event.Equals("a green apple")); + ExpectEnvelope(probe, "a", i, "a green apple", "green"); } probe.ExpectComplete(); @@ -188,7 +214,9 @@ public virtual void ReadJournal_query_CurrentEventsByTag_should_see_all_150_even [Fact] public void ReadJournal_query_CurrentEventsByTag_should_include_timestamp_in_EventEnvelope() { - var queries = ReadJournal as ICurrentEventsByTagQuery; + if (ReadJournal is not ICurrentEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(ICurrentEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); + var a = Sys.ActorOf(Query.TestActor.Props("testTimestamp")); a.Tell("a green apple"); @@ -205,5 +233,19 @@ public void ReadJournal_query_CurrentEventsByTag_should_include_timestamp_in_Eve probe.ExpectNext().Timestamp.Should().BeGreaterThan(0); probe.Cancel(); } + + private EventEnvelope ExpectEnvelope(TestSubscriber.Probe probe, string persistenceId, long sequenceNr, string @event, string tag) + { + var envelope = probe.ExpectNext(_ => true); + envelope.PersistenceId.Should().Be(persistenceId); + envelope.SequenceNr.Should().Be(sequenceNr); + envelope.Event.Should().Be(@event); + if (SupportsTagsInEventEnvelope) + { + envelope.Tags.Should().NotBeNull(); + envelope.Tags.Should().Contain(tag); + } + return envelope; + } } } diff --git a/src/core/Akka.Persistence.TCK/Query/EventsByTagSpec.cs b/src/core/Akka.Persistence.TCK/Query/EventsByTagSpec.cs index d710770bd33..5e842a8cee9 100644 --- a/src/core/Akka.Persistence.TCK/Query/EventsByTagSpec.cs +++ b/src/core/Akka.Persistence.TCK/Query/EventsByTagSpec.cs @@ -15,17 +15,21 @@ using FluentAssertions; using Xunit; using Xunit.Abstractions; +using Xunit.Sdk; using static Akka.Persistence.Query.Offset; +#nullable enable namespace Akka.Persistence.TCK.Query { public abstract class EventsByTagSpec : Akka.TestKit.Xunit2.TestKit { protected ActorMaterializer Materializer { get; } - protected IReadJournal ReadJournal { get; set; } + protected IReadJournal? ReadJournal { get; set; } - protected EventsByTagSpec(Config config = null, string actorSystemName = null, ITestOutputHelper output = null) + protected virtual bool SupportsTagsInEventEnvelope => false; + + protected EventsByTagSpec(Config? config = null, string? actorSystemName = null, ITestOutputHelper? output = null) : base(config ?? Config.Empty, actorSystemName, output) { Materializer = Sys.Materializer(); @@ -40,7 +44,8 @@ public void ReadJournal_should_implement_IEventsByTagQuery() [Fact] public virtual void ReadJournal_live_query_EventsByTag_should_find_new_events() { - var queries = ReadJournal as IEventsByTagQuery; + if (ReadJournal is not IEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(IEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); var b = Sys.ActorOf(Query.TestActor.Props("b")); var d = Sys.ActorOf(Query.TestActor.Props("d")); @@ -51,7 +56,7 @@ public virtual void ReadJournal_live_query_EventsByTag_should_find_new_events() var blackSrc = queries.EventsByTag("black", offset: NoOffset()); var probe = blackSrc.RunWith(this.SinkProbe(), Materializer); probe.Request(2); - ExpectEnvelope(probe, "b", 1L, "a black car"); + ExpectEnvelope(probe, "b", 1L, "a black car", "black"); probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); d.Tell("a black dog"); @@ -59,17 +64,18 @@ public virtual void ReadJournal_live_query_EventsByTag_should_find_new_events() d.Tell("a black night"); ExpectMsg("a black night-done"); - ExpectEnvelope(probe, "d", 1L, "a black dog"); + ExpectEnvelope(probe, "d", 1L, "a black dog", "black"); probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); probe.Request(10); - ExpectEnvelope(probe, "d", 2L, "a black night"); + ExpectEnvelope(probe, "d", 2L, "a black night", "black"); probe.Cancel(); } [Fact] public virtual void ReadJournal_live_query_EventsByTag_should_find_events_from_offset_exclusive() { - var queries = ReadJournal as IEventsByTagQuery; + if (ReadJournal is not IEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(IEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); var a = Sys.ActorOf(Query.TestActor.Props("a")); var b = Sys.ActorOf(Query.TestActor.Props("b")); @@ -93,25 +99,30 @@ public virtual void ReadJournal_live_query_EventsByTag_should_find_events_from_o var greenSrc1 = queries.EventsByTag("green", offset: NoOffset()); var probe1 = greenSrc1.RunWith(this.SinkProbe(), Materializer); probe1.Request(2); - ExpectEnvelope(probe1, "a", 2L, "a green apple"); - var offs = ExpectEnvelope(probe1, "a", 4L, "a green banana").Offset; + ExpectEnvelope(probe1, "a", 2L, "a green apple", "green"); + var offs = ExpectEnvelope(probe1, "a", 4L, "a green banana", "green").Offset; probe1.Cancel(); var greenSrc2 = queries.EventsByTag("green", offset: offs); var probe2 = greenSrc2.RunWith(this.SinkProbe(), Materializer); probe2.Request(10); - ExpectEnvelope(probe2, "b", 2L, "a green leaf"); - ExpectEnvelope(probe2, "c", 1L, "a green cucumber"); + ExpectEnvelope(probe2, "b", 2L, "a green leaf", "green"); + ExpectEnvelope(probe2, "c", 1L, "a green cucumber", "green"); probe2.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); probe2.Cancel(); } - private EventEnvelope ExpectEnvelope(TestSubscriber.Probe probe, string persistenceId, long sequenceNr, string @event) + private EventEnvelope ExpectEnvelope(TestSubscriber.Probe probe, string persistenceId, long sequenceNr, string @event, string tag) { var envelope = probe.ExpectNext(_ => true); envelope.PersistenceId.Should().Be(persistenceId); envelope.SequenceNr.Should().Be(sequenceNr); envelope.Event.Should().Be(@event); + if (SupportsTagsInEventEnvelope) + { + envelope.Tags.Should().NotBeNull(); + envelope.Tags.Should().Contain(tag); + } return envelope; } }