From f492247a96d0f032e9290f2b42a300f6a5addbb3 Mon Sep 17 00:00:00 2001 From: Jake Meiergerd Date: Wed, 6 Dec 2023 14:38:06 -0600 Subject: [PATCH] Feature Re-design for `ToObservableChangeSet()` (#771) * Re-designed the `.ToObservableChangeSet()` operator for both caches and lists, for better independence, proper error handling, and improved performance. Resolves #635. * Code Format updates * Formatting/Organization * Updated `.ToObservableChangeSet()` implementations to leverage C#11 features. --------- Co-authored-by: Chris Pulman --- ...ts.DynamicDataTests.DotNet6_0.verified.txt | 1 + ...ts.DynamicDataTests.DotNet7_0.verified.txt | 1 + ...ts.DynamicDataTests.DotNet8_0.verified.txt | 1 + .../Cache/ToObservableChangeSetFixture.cs | 206 ++++++- .../List/ToObservableChangeSetFixture.cs | 193 +++++-- .../Cache/Internal/ToObservableChangeSet.cs | 507 +++++++++++++++--- src/DynamicData/List/ChangeSet.cs | 9 + .../List/Internal/ToObservableChangeSet.cs | 458 ++++++++++++++-- .../List/Internal/TransformAsync.cs | 6 +- src/DynamicData/List/ObservableListEx.cs | 27 +- 10 files changed, 1212 insertions(+), 197 deletions(-) diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet6_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet6_0.verified.txt index dc5936118..539e1c744 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet6_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet6_0.verified.txt @@ -634,6 +634,7 @@ namespace DynamicData public static readonly DynamicData.IChangeSet Empty; public ChangeSet() { } public ChangeSet(System.Collections.Generic.IEnumerable> items) { } + public ChangeSet(int capacity) { } public int Adds { get; } public int Moves { get; } public int Refreshes { get; } diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet7_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet7_0.verified.txt index fde840e28..467ae974f 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet7_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet7_0.verified.txt @@ -634,6 +634,7 @@ namespace DynamicData public static readonly DynamicData.IChangeSet Empty; public ChangeSet() { } public ChangeSet(System.Collections.Generic.IEnumerable> items) { } + public ChangeSet(int capacity) { } public int Adds { get; } public int Moves { get; } public int Refreshes { get; } diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt index 31c0bdc01..046c0b5cf 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt @@ -634,6 +634,7 @@ namespace DynamicData public static readonly DynamicData.IChangeSet Empty; public ChangeSet() { } public ChangeSet(System.Collections.Generic.IEnumerable> items) { } + public ChangeSet(int capacity) { } public int Adds { get; } public int Moves { get; } public int Refreshes { get; } diff --git a/src/DynamicData.Tests/Cache/ToObservableChangeSetFixture.cs b/src/DynamicData.Tests/Cache/ToObservableChangeSetFixture.cs index 4bd76d1c1..f09198975 100644 --- a/src/DynamicData.Tests/Cache/ToObservableChangeSetFixture.cs +++ b/src/DynamicData.Tests/Cache/ToObservableChangeSetFixture.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; @@ -15,6 +17,63 @@ namespace DynamicData.Tests.Cache; public class ToObservableChangeSetFixture : ReactiveTest { + [Fact] + public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() + { + using var source = new Subject(); + + var scheduler = new TestScheduler(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + keySelector: static item => item.Id, + expireAfter: static item => item.Lifetime, + scheduler: scheduler)); + + var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) }; + source.OnNext(item1); + scheduler.AdvanceBy(1); + + // Extend the expiration to a later time + var item2 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(20) }; + source.OnNext(item2); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(2, "2 items were emitted"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "2 items were emitted, 1 of which was a replacement"); + + scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(2, "no changes should have occurred, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "no changes should have occurred, since the last check"); + + // Shorten the expiration to an earlier time (5ms from now is 15m total) + var item3 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(5) }; + source.OnNext(item3); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(3, "1 item was emitted, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item3 }, "1 item was replaced, since the last check"); + + // One more update with no changes to the expiration + var item4 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(5) }; + source.OnNext(item4); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(4, "1 item was emitted, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "1 item was replaced, since the last check"); + + scheduler.AdvanceTo(TimeSpan.FromMilliseconds(15).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(5, "1 expiration should have occurred, since the last check"); + results.Data.Items.Should().BeEmpty("the last item should have expired, since the last check"); + } + [Fact] public void ExpirationIsGiven_RemovalIsScheduled() { @@ -112,6 +171,80 @@ public void ExpirationIsGiven_RemovalIsScheduled() results.Data.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have expired"); } + [Fact] + public void ItemIsEvictedBeforeExpiration_ExpirationIsCancelled() + { + using var source = new Subject>(); + + var scheduler = new TestScheduler(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + keySelector: static item => item.Id, + expireAfter: static item => item.Lifetime, + limitSizeTo: 3, + scheduler: scheduler)); + + var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) }; + var item2 = new Item() { Id = 2, Lifetime = TimeSpan.FromMilliseconds(10) }; + var item3 = new Item() { Id = 3, Lifetime = TimeSpan.FromMilliseconds(10) }; + source.OnNext(new[] { item1, item2, item3 }); + scheduler.AdvanceBy(1); + + var item4 = new Item() { Id = 4 }; + source.OnNext(new[] { item4 }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(2, "2 item sets were emitted"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item4 }, "the size limit of the collection was 3"); + + scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(3, "2 items should have expired, at the same time, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "2 items should have expired, since the last check"); + } + + [Fact] + public void ItemExpiresBeforeEviction_EvictionIsSkipped() + { + using var source = new Subject>(); + + var scheduler = new TestScheduler(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + keySelector: static item => item.Id, + expireAfter: static item => item.Lifetime, + limitSizeTo: 3, + scheduler: scheduler)); + + var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + source.OnNext(new[] { item1, item2, item3 }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(1, "1 item set was emitted"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "the size limit of the collection was 3"); + + scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(2, "1 expiration should have occurred, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have expired"); + + var item4 = new Item() { Id = 4 }; + source.OnNext(new[] { item4 }); + scheduler.AdvanceBy(1); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(3, "1 item set was emitted, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item4 }, "no eviction should have occurred"); + } + [Fact] public void KeySelectorIsNull_ThrowsException() => FluentActions.Invoking(() => ObservableCacheEx.ToObservableChangeSet( @@ -119,8 +252,7 @@ public void KeySelectorIsNull_ThrowsException() keySelector: null!)) .Should().Throw(); - [Fact(Skip = "Outstanding bug, error re-throws, instead of emitting on the stream")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")] + [Fact] public void KeySelectorThrows_SubscriptionReceivesError() { using var source = new Subject(); @@ -142,8 +274,31 @@ public void KeySelectorThrows_SubscriptionReceivesError() results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was emitted before an error occurred"); } - [Fact(Skip = "Outstanding bug, completion is not forwarded")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")] + [Fact] + public void LimitToSizeIs0_ChangeSetsAreEmpty() + { + using var source = new Subject(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + keySelector: static item => item.Id, + limitSizeTo: 0)); + + var item1 = new Item() { Id = 1 }; + source.OnNext(item1); + + var item2 = new Item() { Id = 2 }; + source.OnNext(item2); + + var item3 = new Item() { Id = 3 }; + source.OnNext(item3); + + results.Error.Should().BeNull(); + results.Messages.Count.Should().Be(3, "3 items were emitted"); + results.Data.Items.Should().BeEmpty("the size limit of the collection was 0"); + } + + [Fact] public void RemovalsArePending_CompletionWaitsForRemovals() { using var source = new Subject>(); @@ -164,19 +319,20 @@ public void RemovalsArePending_CompletionWaitsForRemovals() source.OnCompleted(); + results.Error.Should().BeNull(); results.Completed.Should().BeFalse("item removals have been scheduled, and not completed"); results.Messages.Count.Should().Be(1, "1 item set was emitted"); results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were emitted"); scheduler.AdvanceTo(TimeSpan.FromMilliseconds(30).Ticks); + results.Error.Should().BeNull(); results.Completed.Should().BeTrue("the source has completed, and no outstanding expirations remain"); results.Messages.Count.Should().Be(3, "2 expirations should have occurred, since the last check"); results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "3 items were emitted, and 2 should have expired"); } - [Fact(Skip = "Outstanding bug, https://github.com/reactivemarbles/DynamicData/issues/635")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")] + [Fact] public void SourceCompletesImmediately_SubscriptionCompletes() { var item = new Item() { Id = 1 }; @@ -191,6 +347,7 @@ public void SourceCompletesImmediately_SubscriptionCompletes() using var results = new ChangeSetAggregator(source .ToObservableChangeSet(static item => item.Id)); + results.Error.Should().BeNull(); results.Completed.Should().BeTrue("the source has completed, and no outstanding expirations remain"); results.Messages.Count.Should().Be(1, "1 item was emitted"); results.Data.Items.Should().BeEquivalentTo(new[] { item }, "1 item was emitted"); @@ -246,12 +403,11 @@ public void SizeLimitIsExceeded_OldestItemsAreRemoved() scheduler.AdvanceBy(1); results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(9, "6 item sets were emitted by the source, 3 of which triggered followup evictions"); + results.Messages.Count.Should().Be(6, "6 item sets were emitted by the source"); results.Data.Items.Should().BeEquivalentTo(new[] { item5, item6, item9, item10, item11 }, "the size limit of the collection was 5"); } - [Fact(Skip = "Outstanding bug, notifications are not synchronized, initial item emits after error")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")] + [Fact] public void SourceErrorsImmediately_SubscriptionReceivesError() { var item = new Item() { Id = 1 }; @@ -322,6 +478,38 @@ public void SourceIsNull_ThrowsException() keySelector: static item => item)) .Should().Throw(); + [Fact] + public void ThreadPoolSchedulerIsUsed_ExpirationIsThreadSafe() + { + var testDuration = TimeSpan.FromSeconds(1); + var maxItemLifetime = TimeSpan.FromMilliseconds(50); + + using var source = new Subject(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + keySelector: static item => item.Id, + expireAfter: static item => item.Lifetime, + limitSizeTo: 1000, + scheduler: ThreadPoolScheduler.Instance)); + + var nextItemId = 1; + var rng = new Random(Seed: 1234567); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + while (stopwatch.Elapsed < testDuration) + { + source.OnNext(new() + { + Id = nextItemId++, + Lifetime = TimeSpan.FromMilliseconds(rng.Next(maxItemLifetime.Milliseconds + 1)) + }); + } + + results.Error.Should().BeNull(); + } + public class Item { public int Id { get; init; } diff --git a/src/DynamicData.Tests/List/ToObservableChangeSetFixture.cs b/src/DynamicData.Tests/List/ToObservableChangeSetFixture.cs index b2dcd188d..727c0b4b7 100644 --- a/src/DynamicData.Tests/List/ToObservableChangeSetFixture.cs +++ b/src/DynamicData.Tests/List/ToObservableChangeSetFixture.cs @@ -1,11 +1,11 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; -using DynamicData.Tests.Domain; - using FluentAssertions; using Microsoft.Reactive.Testing; @@ -29,27 +29,27 @@ public void ExpirationIsGiven_RemovalIsScheduled() expireAfter: static item => item.Lifetime, scheduler: scheduler)); - var item1 = new Item() { Lifetime = TimeSpan.FromMilliseconds(10) }; - var item2 = new Item() { Lifetime = TimeSpan.FromMilliseconds(20) }; - var item3 = new Item() { Lifetime = TimeSpan.FromMilliseconds(30) }; + var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) }; + var item2 = new Item() { Id = 2, Lifetime = TimeSpan.FromMilliseconds(20) }; + var item3 = new Item() { Id = 3, Lifetime = TimeSpan.FromMilliseconds(30) }; source.OnNext(new[] { item1, item2, item3 }); scheduler.AdvanceBy(1); // Item removals should batch to the closest prior millisecond. // This actually seems wrong to me, that for items to be removed earlier than asked for. // Should this maybe batch to the closest future millisecond, or just round to the nearest? - var item4 = new Item() { Lifetime = TimeSpan.FromMilliseconds(20.1) }; - var item5 = new Item() { Lifetime = TimeSpan.FromMilliseconds(20.9) }; + var item4 = new Item() { Id = 4, Lifetime = TimeSpan.FromMilliseconds(20.1) }; + var item5 = new Item() { Id = 5, Lifetime = TimeSpan.FromMilliseconds(20.9) }; source.OnNext(new[] { item4, item5 }); scheduler.AdvanceBy(1); // Out-of-order removal - var item6 = new Item() { Lifetime = TimeSpan.FromMilliseconds(15) }; + var item6 = new Item() { Id = 6, Lifetime = TimeSpan.FromMilliseconds(15) }; source.OnNext(new[] { item6 }); scheduler.AdvanceBy(1); // Non-expiring item - var item7 = new Item(); + var item7 = new Item() { Id = 7 }; source.OnNext(new[] { item7 }); scheduler.AdvanceBy(1); @@ -83,6 +83,101 @@ public void ExpirationIsGiven_RemovalIsScheduled() results.Data.Items.Should().BeEquivalentTo(new[] { item7 }, "item #3 should have expired"); } + [Fact] + public void ItemIsEvictedBeforeExpiration_ExpirationIsCancelled() + { + using var source = new Subject>(); + + var scheduler = new TestScheduler(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + expireAfter: static item => item.Lifetime, + limitSizeTo: 3, + scheduler: scheduler)); + + var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) }; + var item2 = new Item() { Id = 2, Lifetime = TimeSpan.FromMilliseconds(10) }; + var item3 = new Item() { Id = 3, Lifetime = TimeSpan.FromMilliseconds(10) }; + source.OnNext(new[] { item1, item2, item3 }); + scheduler.AdvanceBy(1); + + var item4 = new Item() {Id = 4 }; + source.OnNext(new[] { item4 }); + scheduler.AdvanceBy(1); + + results.Exception.Should().BeNull(); + results.Messages.Count.Should().Be(2, "2 item sets were emitted"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item4 }, "the size limit of the collection was 3"); + + scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks); + + results.Exception.Should().BeNull(); + results.Messages.Count.Should().Be(3, "2 items should have expired, at the same time, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "2 items should have expired, since the last check"); + } + + [Fact] + public void ItemExpiresBeforeEviction_EvictionIsSkipped() + { + using var source = new Subject>(); + + var scheduler = new TestScheduler(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + expireAfter: static item => item.Lifetime, + limitSizeTo: 3, + scheduler: scheduler)); + + var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; + source.OnNext(new[] { item1, item2, item3 }); + scheduler.AdvanceBy(1); + + results.Exception.Should().BeNull(); + results.Messages.Count.Should().Be(1, "1 item set was emitted"); + results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "the size limit of the collection was 3"); + + scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks); + + results.Exception.Should().BeNull(); + results.Messages.Count.Should().Be(2, "1 expiration should have occurred, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have expired"); + + var item4 = new Item() { Id = 4 }; + source.OnNext(new[] { item4 }); + scheduler.AdvanceBy(1); + + results.Exception.Should().BeNull(); + results.Messages.Count.Should().Be(3, "1 item set was emitted, since the last check"); + results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item4 }, "no eviction should have occurred"); + } + + [Fact] + public void LimitToSizeIs0_ChangeSetsAreEmpty() + { + using var source = new Subject(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + limitSizeTo: 0)); + + var item1 = new Item() { Id = 1 }; + source.OnNext(item1); + + var item2 = new Item() { Id = 2 }; + source.OnNext(item2); + + var item3 = new Item() { Id = 3 }; + source.OnNext(item3); + + results.Exception.Should().BeNull(); + results.Messages.Count.Should().Be(3, "3 items were emitted"); + results.Data.Items.Should().BeEmpty("the size limit of the collection was 0"); + } + [Fact] public void RemovalsArePending_CompletionWaitsForRemovals() { @@ -95,27 +190,28 @@ public void RemovalsArePending_CompletionWaitsForRemovals() expireAfter: static item => item.Lifetime, scheduler: scheduler)); - var item1 = new Item() { Lifetime = TimeSpan.FromMilliseconds(10) }; - var item2 = new Item(); - var item3 = new Item() { Lifetime = TimeSpan.FromMilliseconds(30) }; + var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3, Lifetime = TimeSpan.FromMilliseconds(30) }; source.OnNext(new[] { item1, item2, item3 }); scheduler.AdvanceBy(1); source.OnCompleted(); + results.Exception.Should().BeNull(); results.IsCompleted.Should().BeFalse("item removals have been scheduled, and not completed"); results.Messages.Count.Should().Be(1, "1 item set was emitted"); results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were emitted"); scheduler.AdvanceTo(TimeSpan.FromMilliseconds(30).Ticks); - results.IsCompleted.Should().BeFalse("the source has completed, and no outstanding expirations remain"); + results.Exception.Should().BeNull(); + results.IsCompleted.Should().BeTrue("the source has completed, and no outstanding expirations remain"); results.Messages.Count.Should().Be(3, "2 expirations should have occurred, since the last check"); results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "3 items were emitted, and 2 should have expired"); } - [Fact(Skip = "Outstanding bug, completions are not forwarded")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")] + [Fact] public void SourceCompletesImmediately_SubscriptionCompletes() { var item = new Item(); @@ -130,6 +226,7 @@ public void SourceCompletesImmediately_SubscriptionCompletes() using var results = new ChangeSetAggregator(source .ToObservableChangeSet()); + results.Exception.Should().BeNull(); results.IsCompleted.Should().BeTrue("the source has completed, and no outstanding expirations remain"); results.Messages.Count.Should().Be(1, "1 item was emitted"); results.Data.Items.Should().BeEquivalentTo(new[] { item }, "1 item was emitted"); @@ -144,22 +241,22 @@ public void SizeLimitIsExceeded_OldestItemsAreRemoved() .ToObservableChangeSet(limitSizeTo: 4)); // Populate enough initial items so that at least one item at the end never gets evicted - var item1 = new Item(); - var item2 = new Item(); - var item3 = new Item(); + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; + var item3 = new Item() { Id = 3 }; source.OnNext(new[] { item1, item2, item3 }); // Limit is reached - var item4 = new Item(); + var item4 = new Item() { Id = 4 }; source.OnNext(new[] { item4 }); // New item exceeds the limit - var item5 = new Item(); + var item5 = new Item() { Id = 5 }; source.OnNext(new[] { item5 }); // Multiple items exceed the limit - var item6 = new Item(); - var item7 = new Item(); + var item6 = new Item() { Id = 6 }; + var item7 = new Item() { Id = 7 }; source.OnNext(new[] { item6, item7 }); results.Exception.Should().BeNull(); @@ -167,8 +264,7 @@ public void SizeLimitIsExceeded_OldestItemsAreRemoved() results.Data.Items.Should().BeEquivalentTo(new[] { item4, item5, item6, item7 }, "the size limit of the collection was 4"); } - [Fact(Skip = "Outstanding bug, notifications are not synchronized, initial item emits after error")] - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")] + [Fact] public void SourceErrorsImmediately_SubscriptionReceivesError() { var item = new Item(); @@ -197,13 +293,13 @@ public void SourceEmitsSingle_ItemIsAdded() using var results = new ChangeSetAggregator(source .ToObservableChangeSet()); - var item1 = new Item(); + var item1 = new Item() { Id = 1 }; source.OnNext(item1); - var item2 = new Item(); + var item2 = new Item() { Id = 2 }; source.OnNext(item2); - var item3 = new Item(); + var item3 = new Item() { Id = 3 }; source.OnNext(item3); results.Exception.Should().BeNull(); @@ -221,12 +317,12 @@ public void SourceEmitsMany_ItemsAreAddedOrUpdated() using var results = new ChangeSetAggregator(source .ToObservableChangeSet()); - var item1 = new Item(); - var item2 = new Item(); + var item1 = new Item() { Id = 1 }; + var item2 = new Item() { Id = 2 }; source.OnNext(new[] { item1, item2 }); - var item3 = new Item(); - var item4 = new Item(); + var item3 = new Item() { Id = 3 }; + var item4 = new Item() { Id = 4 }; source.OnNext(new[] { item3, item4 }); results.Exception.Should().BeNull(); @@ -239,10 +335,41 @@ public void SourceIsNull_ThrowsException() => FluentActions.Invoking(() => ObservableListEx.ToObservableChangeSet(source: null!)) .Should().Throw(); + [Fact] + public void ThreadPoolSchedulerIsUsed_ExpirationIsThreadSafe() + { + var testDuration = TimeSpan.FromSeconds(1); + var maxItemLifetime = TimeSpan.FromMilliseconds(500); + + using var source = new Subject(); + + using var results = new ChangeSetAggregator(source + .ToObservableChangeSet( + expireAfter: static item => item.Lifetime, + limitSizeTo: 1000, + scheduler: ThreadPoolScheduler.Instance)); + + var nextItemId = 1; + var rng = new Random(Seed: 1234567); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + while (stopwatch.Elapsed < testDuration) + { + source.OnNext(new() + { + Id = nextItemId++, + Lifetime = TimeSpan.FromMilliseconds(rng.Next(maxItemLifetime.Milliseconds + 1)) + }); + } + + results.Exception.Should().BeNull(); + } + public class Item { - public Exception? Error { get; init; } + public int Id { get; set; } - public TimeSpan? Lifetime { get; init; } + public TimeSpan? Lifetime { get; set; } } } diff --git a/src/DynamicData/Cache/Internal/ToObservableChangeSet.cs b/src/DynamicData/Cache/Internal/ToObservableChangeSet.cs index d9d7cdeaf..fa301d8e4 100644 --- a/src/DynamicData/Cache/Internal/ToObservableChangeSet.cs +++ b/src/DynamicData/Cache/Internal/ToObservableChangeSet.cs @@ -2,107 +2,440 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace DynamicData.Cache.Internal; -internal class ToObservableChangeSet(IObservable> source, - Func keySelector, - Func? expireAfter, - int limitSizeTo, - IScheduler? scheduler = null) +internal class ToObservableChangeSet where TObject : notnull where TKey : notnull { - private readonly IObservable> _source = source ?? throw new ArgumentNullException(nameof(source)); - private readonly Func _keySelector = keySelector ?? throw new ArgumentNullException(nameof(keySelector)); - private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default; + private readonly Func? _expireAfter; + private readonly Func _keySelector; + private readonly int _limitSizeTo; + private readonly IScheduler _scheduler; + private readonly IObservable> _source; - public ToObservableChangeSet(IObservable source, + public ToObservableChangeSet( + IObservable source, Func keySelector, Func? expireAfter, int limitSizeTo, - IScheduler? scheduler = null) - : this(source.Select(t => new[] { t }), keySelector, expireAfter, limitSizeTo, scheduler) + IScheduler? scheduler) { + _expireAfter = expireAfter; + _keySelector = keySelector; + _limitSizeTo = limitSizeTo; + _scheduler = scheduler ?? Scheduler.Default; + + _source = Observable.Create>(observer => + { + // Reusable buffer, to avoid allocating per-item + var buffer = new TObject[1]; + + return source.SubscribeSafe(Observer.Create( + onNext: item => + { + buffer[0] = item; + observer.OnNext(buffer); + }, + onError: observer.OnError, + onCompleted: observer.OnCompleted)); + }); } - public IObservable> Run() => Observable.Create>(observer => - { - var locker = new object(); - - var dataSource = new SourceCache(_keySelector); - - // load local data source with current items - var populator = _source.Synchronize(locker) - .Subscribe(items => dataSource.AddOrUpdate(items), observer.OnError); - - // handle size expiration - var sizeExpiryDisposer = new CompositeDisposable(); - - if (limitSizeTo > 0) - { - long orderItemWasAdded = -1; - - var transformed = dataSource.Connect() - .Transform(t => (Item: t, Order: Interlocked.Increment(ref orderItemWasAdded))) - .AsObservableCache(); - - var transformedRemoved = transformed.Connect() - .Subscribe(_ => - { - if (transformed.Count <= limitSizeTo) return; - - // remove oldest items - var itemsToRemove = transformed.KeyValues - .OrderBy(exp => exp.Value.Order) - .Take(transformed.Count - limitSizeTo) - .Select(x => x.Key) - .ToArray(); - - // schedule, otherwise we can get a deadlock when removing due to re-entrancey - _scheduler.Schedule(() => dataSource.Remove(itemsToRemove)); - }); - sizeExpiryDisposer.Add(transformed); - sizeExpiryDisposer.Add(transformedRemoved); - } - - // handle time expiration - var timeExpiryDisposer = new CompositeDisposable(); - - DateTime Trim(DateTime date, long ticks) => new(date.Ticks - (date.Ticks % ticks), date.Kind); - - if (expireAfter is not null) - { - var expiry = dataSource.Connect() - .Transform(t => - { - var removeAt = expireAfter?.Invoke(t); - - if (removeAt is null) - return (Item: t, ExpireAt: DateTime.MaxValue); - - // get absolute expiry, and round by milliseconds to we can attempt to batch as many items into a single group - var expireTime = Trim(_scheduler.Now.UtcDateTime.Add(removeAt.Value), TimeSpan.TicksPerMillisecond); - - return (Item: t, ExpireAt: expireTime); - }) - .Filter(ei => ei.ExpireAt != DateTime.MaxValue) - .GroupWithImmutableState(ei => ei.ExpireAt) - .MergeMany(grouping => Observable.Timer(grouping.Key, _scheduler).Select(_ => grouping)) - .Synchronize(locker) - .Subscribe(grouping => dataSource.Remove(grouping.Keys)); - - timeExpiryDisposer.Add(expiry); - } - - return new CompositeDisposable( - dataSource, - populator, - sizeExpiryDisposer, - timeExpiryDisposer, - dataSource.Connect().SubscribeSafe(observer)); - }); + public ToObservableChangeSet( + IObservable> source, + Func keySelector, + Func? expireAfter, + int limitSizeTo, + IScheduler? scheduler) + { + _expireAfter = expireAfter; + _keySelector = keySelector; + _limitSizeTo = limitSizeTo; + _scheduler = scheduler ?? Scheduler.Default; + _source = source; + } + + public IObservable> Run() + => Observable.Create>(observer => new Subscription( + source: _source, + expireAfter: _expireAfter, + keySelector: _keySelector, + limitSizeTo: _limitSizeTo, + observer: observer, + scheduler: _scheduler)); + + private sealed class Subscription + : IDisposable + { + private readonly EvictionState? _evictionState; + private readonly ExpirationState? _expirationState; + private readonly Dictionary _itemStatesByKey; + private readonly Func _keySelector; + private readonly IObserver> _observer; + private readonly IScheduler _scheduler; + private readonly IDisposable _sourceSubscription; + + private bool _hasSourceCompleted; + private ScheduledExpiration? _scheduledExpiration; + + public Subscription( + IObservable> source, + Func? expireAfter, + Func keySelector, + int limitSizeTo, + IObserver> observer, + IScheduler scheduler) + { + _keySelector = keySelector; + _observer = observer; + _scheduler = scheduler; + + if (limitSizeTo >= 0) + { + _evictionState = new() + { + LimitSizeTo = limitSizeTo, + Queue = new(capacity: limitSizeTo) + }; + + _expirationState = (expireAfter is null) + ? null + : new() + { + ChangesBuffer = new(), + ExpireAfter = expireAfter, + Queue = new(capacity: limitSizeTo) + }; + + _itemStatesByKey = new(capacity: limitSizeTo); + } + else + { + _expirationState = (expireAfter is null) + ? null + : new() + { + ChangesBuffer = new(), + ExpireAfter = expireAfter, + Queue = new() + }; + + _itemStatesByKey = []; + } + + _sourceSubscription = source + .Synchronize(SynchronizationGate) + .SubscribeSafe(Observer.Create>( + onNext: items => + { + try + { + var now = _scheduler.Now; + + var hasExpirationQueueChanged = false; + + var itemCount = items switch + { + ICollection itemsCollection => itemsCollection.Count, + IReadOnlyCollection itemsCollection => itemsCollection.Count, + _ => 0 + }; + + var changeSet = new ChangeSet(capacity: (_evictionState is EvictionState evictionState) + ? Math.Max(itemCount + evictionState.Queue.Count - evictionState.LimitSizeTo, 0) + : itemCount); + + if (items is IReadOnlyList itemsList) + { + for (var i = 0; i < itemsList.Count; ++i) + HandleIncomingItem(itemsList[i], now, changeSet, ref hasExpirationQueueChanged); + } + else + { + foreach (var item in items) + HandleIncomingItem(item, now, changeSet, ref hasExpirationQueueChanged); + } + + if (hasExpirationQueueChanged) + OnExpirationQueueChanged(); + + observer.OnNext(changeSet); + } + catch (Exception error) + { + TearDownStates(); + + observer.OnError(error); + } + }, + onError: error => + { + TearDownStates(); + + observer.OnError(error); + }, + onCompleted: () => + { + _hasSourceCompleted = true; + + // If there are pending expirations scheduled, wait to complete the stream until they're done + if (_expirationState is null or { Queue.Count: 0 }) + observer.OnCompleted(); + })); + } + + public void Dispose() + { + lock (SynchronizationGate) + { + _sourceSubscription.Dispose(); + + TearDownStates(); + } + } + + private static int CompareExpireAtToExpiration(DateTimeOffset expireAt, Expiration expiration) + => expireAt.CompareTo(expiration.ExpireAt); + + // Instead of using a dedicated _synchronizationGate object, we can save an allocation by using any object that is never exposed to consumers. + private object SynchronizationGate + => _itemStatesByKey; + + private void HandleIncomingItem( + TObject item, + DateTimeOffset now, + ChangeSet changeSet, + ref bool hasExpirationQueueChanged) + { + var key = _keySelector.Invoke(item); + var previousItemState = _itemStatesByKey.TryGetValue(key, out var existingItemState) + ? existingItemState + : null as ItemState?; + + // Perform processing for eviction behavior, if applicable + if (_evictionState is EvictionState evictionState) + { + // Backwards compatibility + if (evictionState.LimitSizeTo is 0) + return; + + // Eviction is only applicable to adds, not replacements + if (previousItemState is null) + { + // If our size limit has been reached, evict the oldest item before adding a new one. + // Repeat removals until we drop below the limit, since items in the queue might have already expired. + while (_itemStatesByKey.Count >= evictionState.LimitSizeTo) + { + var keyToEvict = evictionState.Queue.Dequeue(); + + if (_itemStatesByKey.TryGetValue(keyToEvict, out var itemStateToEvict)) + { + _itemStatesByKey.Remove(keyToEvict); + changeSet.Add(new( + reason: ChangeReason.Remove, + key: keyToEvict, + current: itemStateToEvict.Item)); + } + } + + evictionState.Queue.Enqueue(key); + } + } + + // Perform processing for expiration behavior, if applicable + var expireAt = null as DateTimeOffset?; + if (_expirationState is ExpirationState expirationState) + { + var previousExpireAt = previousItemState?.ExpireAt; + var expireAfter = expirationState.ExpireAfter.Invoke(item); + if (expireAfter is TimeSpan resolvedExpireAfter) + { + // Truncate to milliseconds to promote batching expirations together. + var expireAtTicks = now.UtcTicks + resolvedExpireAfter.Ticks; + expireAt = new DateTimeOffset(ticks: expireAtTicks - (expireAtTicks % TimeSpan.TicksPerMillisecond), offset: TimeSpan.Zero); + } + + // Queue the item for expiration if it's new and needs to expire, or if it's a replacement with a different expiration time. + if ((expireAt is not null) && (expireAt != previousExpireAt)) + { + var insertionIndex = expirationState.Queue.BinarySearch(expireAt.Value, CompareExpireAtToExpiration); + if (insertionIndex < 0) + insertionIndex = ~insertionIndex; + + expirationState.Queue.Insert( + index: insertionIndex, + item: new() + { + ExpireAt = expireAt.Value, + Key = key + }); + + hasExpirationQueueChanged = true; + } + } + + // Track the item's state, to be able to detect replacements later, and issue either an add or replace change for it. + _itemStatesByKey[key] = new() + { + ExpireAt = expireAt, + Item = item + }; + changeSet.Add((previousItemState is null) + ? new( + reason: ChangeReason.Add, + key: key, + current: item) + : new( + reason: ChangeReason.Update, + key: key, + current: item, + previous: previousItemState.Value.Item)); + } + + private void HandleScheduledExpiration() + { + var expirationState = _expirationState!.Value; + + var now = _scheduler.Now; + + // Buffer removals, so we can optimize the allocation for the final changeset, or skip it entirely. + // Also, so we can optimize removal from the queue as a range removal. + var processedExpirationCount = 0; + foreach (var expiration in expirationState.Queue) + { + if (expiration.ExpireAt > now) + break; + + ++processedExpirationCount; + + // If the item hasn't already been evicted, or had its expiration time change, formally remove it + if (_itemStatesByKey.TryGetValue(expiration.Key, out var itemState) && (itemState.ExpireAt <= now)) + { + _itemStatesByKey.Remove(expiration.Key); + expirationState.ChangesBuffer.Add(new( + reason: ChangeReason.Remove, + key: expiration.Key, + current: itemState.Item)); + } + } + + expirationState.Queue.RemoveRange(0, processedExpirationCount); + + // We can end up with no changes here for a couple of reasons: + // * An item's expiration time can change + // * When items are evicted due to the size limit, it still remains in the expiration queue. + // * The scheduler only promises "best effort" to cancel scheduled operations. + if (expirationState.ChangesBuffer.Count is not 0) + { + _observer.OnNext(new ChangeSet(expirationState.ChangesBuffer)); + + expirationState.ChangesBuffer.Clear(); + } + + OnExpirationQueueChanged(); + } + + private void OnExpirationQueueChanged() + { + var expirationState = _expirationState!.Value; + + // If there aren't any items queued to expire, check to see if the stream should be terminated (I.E. we just expired the last item). + // Otherwise, make sure we have an operation scheduled to handle the next expiration. + if (expirationState.Queue.Count is 0) + { + if (_hasSourceCompleted) + _observer.OnCompleted(); + } + else + { + // If there's already a scheduled operation, and it doesn't match the current next-item-to-expire time, wipe it out and re-schedule it. + var nextExpireAt = expirationState.Queue[0].ExpireAt; + if (_scheduledExpiration is ScheduledExpiration scheduledExpiration) + { + if (scheduledExpiration.DueTime != nextExpireAt) + { + scheduledExpiration.Cancellation.Dispose(); + _scheduledExpiration = null; + } + else + { + return; + } + } + + _scheduledExpiration = new() + { + Cancellation = _scheduler.Schedule( + state: this, + dueTime: nextExpireAt, + action: static (_, @this) => + { + lock (@this.SynchronizationGate) + { + @this._scheduledExpiration = null; + + @this.HandleScheduledExpiration(); + } + + return Disposable.Empty; + }), + DueTime = nextExpireAt + }; + } + } + + private void TearDownStates() + { + _scheduledExpiration?.Cancellation.Dispose(); + _scheduledExpiration = null; + + _evictionState?.Queue.Clear(); + + _expirationState?.Queue.Clear(); + } + + private readonly struct ItemState + { + public required DateTimeOffset? ExpireAt { get; init; } + + public required TObject Item { get; init; } + } + + private readonly struct EvictionState + { + public required int LimitSizeTo { get; init; } + + public required Queue Queue { get; init; } + } + + private readonly struct Expiration + { + public required DateTimeOffset ExpireAt { get; init; } + + public required TKey Key { get; init; } + } + + private readonly struct ExpirationState + { + public required List> ChangesBuffer { get; init; } + + public required Func ExpireAfter { get; init; } + + // Potential performance improvement: Instead of List, use PriorityQueue available in .NET 6+, or an equivalent. + public required List Queue { get; init; } + } + + private readonly struct ScheduledExpiration + { + public required IDisposable Cancellation { get; init; } + + public required DateTimeOffset DueTime { get; init; } + } + } } diff --git a/src/DynamicData/List/ChangeSet.cs b/src/DynamicData/List/ChangeSet.cs index 4a5eb184f..65f7174bd 100644 --- a/src/DynamicData/List/ChangeSet.cs +++ b/src/DynamicData/List/ChangeSet.cs @@ -34,6 +34,15 @@ public ChangeSet(IEnumerable> items) { } + /// + /// Initializes a new instance of the class. + /// + /// The initial capacity of the change set. + public ChangeSet(int capacity) + : base(capacity) + { + } + /// /// Gets the number of additions. /// diff --git a/src/DynamicData/List/Internal/ToObservableChangeSet.cs b/src/DynamicData/List/Internal/ToObservableChangeSet.cs index a614dbbcd..8ef2ff14f 100644 --- a/src/DynamicData/List/Internal/ToObservableChangeSet.cs +++ b/src/DynamicData/List/Internal/ToObservableChangeSet.cs @@ -2,83 +2,449 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace DynamicData.List.Internal; -internal class ToObservableChangeSet(IObservable> source, Func? expireAfter, int limitSizeTo, IScheduler? scheduler = null) - where T : notnull +internal class ToObservableChangeSet + where TObject : notnull { - private readonly IScheduler _scheduler = scheduler ?? Scheduler.Default; + private readonly Func? _expireAfter; + private readonly int _limitSizeTo; + private readonly IScheduler _scheduler; + private readonly IObservable> _source; - public ToObservableChangeSet(IObservable source, Func? expireAfter, int limitSizeTo, IScheduler? scheduler = null) - : this(source.Select(t => new[] { t }), expireAfter, limitSizeTo, scheduler) + public ToObservableChangeSet( + IObservable source, + Func? expireAfter, + int limitSizeTo, + IScheduler? scheduler) { + _expireAfter = expireAfter; + _limitSizeTo = limitSizeTo; + _scheduler = scheduler ?? Scheduler.Default; + + _source = Observable.Create>(observer => + { + // Reusable buffer, to avoid allocating per-item + var buffer = new TObject[1]; + + return source.SubscribeSafe(Observer.Create( + onNext: item => + { + buffer[0] = item; + observer.OnNext(buffer); + }, + onError: observer.OnError, + onCompleted: observer.OnCompleted)); + }); } - public IObservable> Run() => Observable.Create>( - observer => + public ToObservableChangeSet( + IObservable> source, + Func? expireAfter, + int limitSizeTo, + IScheduler? scheduler) + { + _expireAfter = expireAfter; + _limitSizeTo = limitSizeTo; + _scheduler = scheduler ?? Scheduler.Default; + _source = source; + } + + public IObservable> Run() + => Observable.Create>(observer => new Subscription( + source: _source, + expireAfter: _expireAfter, + limitSizeTo: _limitSizeTo, + observer: observer, + scheduler: _scheduler)); + + private sealed class Subscription + : IDisposable + { + private readonly EvictionState? _evictionState; + private readonly ExpirationState? _expirationState; + private readonly IObserver> _observer; + private readonly IScheduler _scheduler; + private readonly IDisposable _sourceSubscription; + private readonly object _synchronizationGate; + + private int _currentItemCount; + private bool _hasSourceCompleted; + private ScheduledExpiration? _scheduledExpiration; + + public Subscription( + IObservable> source, + Func? expireAfter, + int limitSizeTo, + IObserver> observer, + IScheduler scheduler) + { + _observer = observer; + _scheduler = scheduler; + + if (limitSizeTo >= 0) { - var locker = new object(); + _evictionState = new() + { + LimitSizeTo = limitSizeTo, + Queue = new(capacity: limitSizeTo) + }; - var dataSource = new SourceList(); + _expirationState = (expireAfter is null) + ? null + : new() + { + RemovalsBuffer = new(), + ExpireAfter = expireAfter, + Queue = new(capacity: limitSizeTo) + }; + } + else + { + _expirationState = (expireAfter is null) + ? null + : new() + { + RemovalsBuffer = new(), + ExpireAfter = expireAfter, + Queue = new() + }; + } + + _synchronizationGate = new(); - // load local data source with current items - var populator = source.Synchronize(locker) - .Subscribe(items => + _sourceSubscription = source + .Synchronize(_synchronizationGate) + .SubscribeSafe(Observer.Create>( + onNext: items => { - dataSource.Edit(innerList => + try { - innerList.AddRange(items); + var now = _scheduler.Now; - if (limitSizeTo > 0 && innerList.Count > limitSizeTo) + var hasExpirationQueueChanged = false; + + var itemCount = items switch { - // remove oldest items [these will always be the first x in the list] - var toRemove = innerList.Count - limitSizeTo; - innerList.RemoveRange(0, toRemove); + ICollection itemsCollection => itemsCollection.Count, + IReadOnlyCollection itemsCollection => itemsCollection.Count, + _ => 0 + }; + + var changeSet = new ChangeSet(capacity: (_evictionState is EvictionState evictionState) + ? Math.Max(itemCount + evictionState.Queue.Count - evictionState.LimitSizeTo, 0) + : itemCount); + + if (items is IReadOnlyList itemsList) + { + for (var i = 0; i < itemsList.Count; ++i) + { + HandleIncomingItem(itemsList[i], now, changeSet, ref hasExpirationQueueChanged); + } + } + else + { + foreach (var item in items) + { + HandleIncomingItem(item, now, changeSet, ref hasExpirationQueueChanged); + } } - }); - }, observer.OnError); + if (hasExpirationQueueChanged) + { + OnExpirationQueueChanged(); + } - // handle time expiration - var timeExpiryDisposer = new CompositeDisposable(); + observer.OnNext(changeSet); + } + catch (Exception error) + { + TearDownStates(); - DateTime Trim(DateTime date, long ticks) => new(date.Ticks - (date.Ticks % ticks), date.Kind); + observer.OnError(error); + } + }, + onError: error => + { + TearDownStates(); - if (expireAfter is not null) + observer.OnError(error); + }, + onCompleted: () => + { + _hasSourceCompleted = true; + + // If there are pending expirations scheduled, wait to complete the stream until they're done + if (_expirationState is null or { Queue.Count: 0 }) + { + observer.OnCompleted(); + } + })); + } + + public void Dispose() + { + lock (_synchronizationGate) + { + _sourceSubscription.Dispose(); + + TearDownStates(); + } + } + + private static int CompareExpireAtToExpiration(DateTimeOffset expireAt, Expiration expiration) + => expireAt.CompareTo(expiration.ExpireAt); + + private void HandleIncomingItem( + TObject item, + DateTimeOffset now, + ChangeSet changeSet, + ref bool hasExpirationQueueChanged) + { + // Perform processing for eviction behavior, if applicable + if (_evictionState is EvictionState evictionState) + { + // Backwards compatibility + if (evictionState.LimitSizeTo is 0) + { + return; + } + + // If our size limit has been reached, evict the oldest item before adding a new one. + // Repeat removals until we drop below the limit, since items in the queue might have already expired. + if (evictionState.Queue.Count >= evictionState.LimitSizeTo) { - var expiry = dataSource.Connect() - .Transform(t => + var itemToEvict = evictionState.Queue[0]; + evictionState.Queue.RemoveAt(0); + + // Need to synchronize the expiration queue, if applicable, to keep the indexes stored there correct. + if (_expirationState is { Queue: var expirationQueue }) + { + for (var i = 0; i < expirationQueue.Count;) { - var removeAt = expireAfter?.Invoke(t); + if (expirationQueue[i].Index == 0) + { + expirationQueue.RemoveAt(i); + continue; + } - if (removeAt is null) - return (Item: t, ExpireAt: DateTime.MaxValue); + var expiration = expirationQueue[i]; + --expiration.Index; + expirationQueue[i] = expiration; - // get absolute expiry, and round by milliseconds to we can attempt to batch as many items into a single group - var expireTime = Trim(_scheduler.Now.UtcDateTime.Add(removeAt.Value), TimeSpan.TicksPerMillisecond); + ++i; + } + } - return (Item: t, ExpireAt: expireTime); - }) - .Filter(ei => ei.ExpireAt != DateTime.MaxValue) - .GroupWithImmutableState(ei => ei.ExpireAt) - .MergeMany(grouping => Observable.Timer(grouping.Key, _scheduler).Select(_ => grouping.Items.Select(x => x.Item).ToArray())) - .Synchronize(locker) - .Subscribe(items => + changeSet.Add(new( + reason: ListChangeReason.Remove, + current: itemToEvict, + index: 0)); + --_currentItemCount; + } + + evictionState.Queue.Add(item); + } + + // Perform processing for expiration behavior, if applicable + if (_expirationState is ExpirationState expirationState) + { + var expireAfter = expirationState.ExpireAfter.Invoke(item); + if (expireAfter is TimeSpan resolvedExpireAfter) + { + // Truncate to milliseconds to promote batching expirations together. + var expireAtTicks = now.UtcTicks + resolvedExpireAfter.Ticks; + var expireAt = new DateTimeOffset(ticks: expireAtTicks - (expireAtTicks % TimeSpan.TicksPerMillisecond), offset: TimeSpan.Zero); + + var insertionIndex = expirationState.Queue.BinarySearch(expireAt, CompareExpireAtToExpiration); + if (insertionIndex < 0) + { + insertionIndex = ~insertionIndex; + } + + expirationState.Queue.Insert( + index: insertionIndex, + item: new() { - dataSource.RemoveMany(items); + ExpireAt = expireAt, + Index = _currentItemCount, + Item = item }); - timeExpiryDisposer.Add(expiry); + hasExpirationQueueChanged = true; } + } + + changeSet.Add(new( + reason: ListChangeReason.Add, + current: item, + index: _currentItemCount)); + ++_currentItemCount; + } + + private void HandleScheduledExpiration() + { + var expirationState = _expirationState!.Value; - return new CompositeDisposable( - dataSource, - populator, - timeExpiryDisposer, - dataSource.Connect().SubscribeSafe(observer)); - }); + var now = _scheduler.Now; + + // Buffer removals, so we can sort them and generate adjusted indexes, in the event of many items being removed at once. + // Also, so we can optimize away the changeSet allocation and publication, if possible. + // Also, so we can optimize removal from the queue as a range removal. + foreach (var expiration in expirationState.Queue) + { + if (expiration.ExpireAt > now) + { + break; + } + + expirationState.RemovalsBuffer.Add(new( + key: expiration.Index, + value: expiration.Item)); + } + + // It's theoretically possible to end up with no changes here, + // as the scheduler only promises "best effort" to cancel scheduled operations. + if (expirationState.RemovalsBuffer.Count is not 0) + { + expirationState.Queue.RemoveRange(0, expirationState.RemovalsBuffer.Count); + + expirationState.RemovalsBuffer.Sort(static (x, y) => x.Key.CompareTo(y.Key)); + + var evictionQueue = _evictionState?.Queue; + + var changeSet = new ChangeSet(capacity: expirationState.RemovalsBuffer.Count); + for (var i = 0; i < expirationState.RemovalsBuffer.Count; ++i) + { + var removal = expirationState.RemovalsBuffer[i]; + var indexToRemove = removal.Key - i; + + changeSet.Add(new( + reason: ListChangeReason.Remove, + current: removal.Value, + index: indexToRemove)); + --_currentItemCount; + + // Adjust indexes for all remaining items in the queue. + for (var j = 0; j < expirationState.Queue.Count; ++j) + { + var expiration = expirationState.Queue[j]; + if (expiration.Index > indexToRemove) + { + --expiration.Index; + } + + expirationState.Queue[j] = expiration; + } + + // Clear expiring items out of the eviction queue as well, if applicable. + evictionQueue?.RemoveAt(indexToRemove); + } + + expirationState.RemovalsBuffer.Clear(); + + _observer.OnNext(changeSet); + } + + OnExpirationQueueChanged(); + } + + private void OnExpirationQueueChanged() + { + var expirationState = _expirationState!.Value; + + // If there aren't any items queued to expire, check to see if the stream should be terminated (I.E. we just expired the last item). + // Otherwise, make sure we have an operation scheduled to handle the next expiration. + if (expirationState.Queue.Count is 0) + { + if (_hasSourceCompleted) + { + _observer.OnCompleted(); + } + } + else + { + // If there's already a scheduled operation, and it doesn't match the current next-item-to-expire time, wipe it out and re-schedule it. + var nextExpireAt = expirationState.Queue[0].ExpireAt; + if (_scheduledExpiration is ScheduledExpiration scheduledExpiration) + { + if (scheduledExpiration.DueTime != nextExpireAt) + { + scheduledExpiration.Cancellation.Dispose(); + _scheduledExpiration = null; + } + else + { + return; + } + } + + _scheduledExpiration = new() + { + Cancellation = _scheduler.Schedule( + state: this, + dueTime: nextExpireAt, + action: static (_, @this) => + { + lock (@this._synchronizationGate) + { + @this._scheduledExpiration = null; + + @this.HandleScheduledExpiration(); + } + + return Disposable.Empty; + }), + DueTime = nextExpireAt + }; + } + } + + private void TearDownStates() + { + _scheduledExpiration?.Cancellation.Dispose(); + _scheduledExpiration = null; + + _evictionState?.Queue.Clear(); + + _expirationState?.Queue.Clear(); + } + + private readonly struct EvictionState + { + public required int LimitSizeTo { get; init; } + + public required List Queue { get; init; } + } + + private struct Expiration + { + public required DateTimeOffset ExpireAt { get; init; } + + public required int Index { get; set; } + + public required TObject Item { get; init; } + } + + private readonly struct ExpirationState + { + public required List> RemovalsBuffer { get; init; } + + public required Func ExpireAfter { get; init; } + + public required List Queue { get; init; } + } + + private readonly struct ScheduledExpiration + { + public required IDisposable Cancellation { get; init; } + + public required DateTimeOffset DueTime { get; init; } + } + } } diff --git a/src/DynamicData/List/Internal/TransformAsync.cs b/src/DynamicData/List/Internal/TransformAsync.cs index cb6276b74..f18666d49 100644 --- a/src/DynamicData/List/Internal/TransformAsync.cs +++ b/src/DynamicData/List/Internal/TransformAsync.cs @@ -77,7 +77,7 @@ private async Task Transform( case ListChangeReason.Add: { var change = item.Item; - if (change.CurrentIndex < 0 | change.CurrentIndex >= transformed.Count) + if (change.CurrentIndex < 0 || change.CurrentIndex >= transformed.Count) { var container = await _containerFactory(item.Item.Current, Optional.None, @@ -143,7 +143,7 @@ await _containerFactory(item.Item.Current, Optional.None, case ListChangeReason.Remove: { var change = item.Item; - bool hasIndex = change.CurrentIndex >= 0; + var hasIndex = change.CurrentIndex >= 0; if (hasIndex) { @@ -191,7 +191,7 @@ await _containerFactory(item.Item.Current, Optional.None, case ListChangeReason.Moved: { var change = item.Item; - bool hasIndex = change.CurrentIndex >= 0; + var hasIndex = change.CurrentIndex >= 0; if (!hasIndex) { throw new UnspecifiedIndexException("Cannot move as an index was not specified"); diff --git a/src/DynamicData/List/ObservableListEx.cs b/src/DynamicData/List/ObservableListEx.cs index 4bac2eb73..1f52117e3 100644 --- a/src/DynamicData/List/ObservableListEx.cs +++ b/src/DynamicData/List/ObservableListEx.cs @@ -1142,10 +1142,7 @@ public static IObservable MergeMany(this IObserva /// Parameter was null. public static IObservable> MergeChangeSets(this IObservableList>> source, IComparer comparer) where TObject : notnull - where TKey : notnull - { - return source.Connect().MergeChangeSets(comparer: comparer); - } + where TKey : notnull => source.Connect().MergeChangeSets(comparer: comparer); /// /// Merges all of the Cache Observable ChangeSets into a single ChangeSets while correctly handling multiple Keys and removal of the parent items. @@ -1159,10 +1156,7 @@ public static IObservable> MergeChangeSetsParameter was null. public static IObservable> MergeChangeSets(this IObservableList>> source, IEqualityComparer? equalityComparer = null, IComparer? comparer = null) where TObject : notnull - where TKey : notnull - { - return source.Connect().MergeChangeSets(equalityComparer, comparer); - } + where TKey : notnull => source.Connect().MergeChangeSets(equalityComparer, comparer); /// /// Merges all of the Cache Observable ChangeSets into a single ChangeSets while correctly handling multiple Keys and removal of the parent items. @@ -1317,7 +1311,7 @@ public static IObservable> OnItemAdded(this IObservable> OnItemRefreshed(this IObservable> source, Action refreshAction) where TObject : notnull { - Action refreshAction2 = refreshAction; + var refreshAction2 = refreshAction; if (source == null) { throw new ArgumentNullException(nameof(source)); @@ -1328,13 +1322,8 @@ public static IObservable> OnItemRefreshed(this IOb throw new ArgumentNullException(nameof(refreshAction)); } - return source.Do(delegate(IChangeSet changes) - { - changes.Where((Change c) => c.Reason == ListChangeReason.Refresh).ForEach(delegate(Change c) - { - refreshAction2(c.Item.Current); - }); - }); + return source.Do((IChangeSet changes) => + changes.Where((Change c) => c.Reason == ListChangeReason.Refresh).ForEach((Change c) => refreshAction2(c.Item.Current))); } /// @@ -1808,7 +1797,7 @@ public static IObservable> ToObservableChangeSet(this IObservab /// /// The type of the object. /// The source. - /// Remove the oldest items when the size has reached this limit. + /// Remove the oldest items when the size has reached this limit. Supply -1 to disable size limiting. /// The scheduler (only used for time expiry). /// An observable which emits a change set. /// source @@ -1832,7 +1821,7 @@ public static IObservable> ToObservableChangeSet(this IObservab /// The type of the object. /// The source. /// Specify on a per object level the maximum time before an object expires from a cache. - /// Remove the oldest items when the size has reached this limit. + /// Remove the oldest items when the size has reached this limit. Supply -1 to disable size limiting. /// The scheduler (only used for time expiry). /// An observable which emits a change set. /// source @@ -1891,7 +1880,7 @@ public static IObservable> ToObservableChangeSet(this IObservab /// or /// keySelector. public static IObservable> ToObservableChangeSet(this IObservable> source, Func expireAfter, IScheduler? scheduler = null) - where T : notnull => ToObservableChangeSet(source, expireAfter, 0, scheduler); + where T : notnull => ToObservableChangeSet(source, expireAfter, -1, scheduler); /// /// Converts the observable to an observable change set, allowing size and time limit to be specified.