From 5148d1400de3457dfc2f29696ff4dc4297cfc8cf Mon Sep 17 00:00:00 2001 From: Roland Pheasant Date: Mon, 22 Jan 2024 07:24:27 +0000 Subject: [PATCH] TransformAsync enhancements (#819) Improve implementation of TransformAsync + add max concurrency and transform on refresh overloads --- ...ts.DynamicDataTests.DotNet8_0.verified.txt | 31 ++ .../Cache/TransformAsyncFixture.cs | 66 +++- .../Cache/TransformSafeAsyncFixture.cs | 370 ++++++++++-------- .../Domain/PersonWithGender.cs | 1 + .../DynamicData.Tests.csproj | 2 +- .../Cache/Internal/TransformAsync.cs | 111 +++--- src/DynamicData/Cache/ObservableCacheEx.cs | 168 ++++++++ .../Cache/TransformAsyncOptions.cs | 18 + 8 files changed, 540 insertions(+), 227 deletions(-) create mode 100644 src/DynamicData/Cache/TransformAsyncOptions.cs diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt index 492b4274f..93e985429 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt @@ -1823,14 +1823,26 @@ namespace DynamicData where TDestination : notnull where TSource : notnull where TKey : notnull { } + public static System.IObservable> TransformAsync(this System.IObservable> source, System.Func> transformFactory, DynamicData.TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull { } public static System.IObservable> TransformAsync(this System.IObservable> source, System.Func> transformFactory, System.IObservable>? forceTransform = null) where TDestination : notnull where TSource : notnull where TKey : notnull { } + public static System.IObservable> TransformAsync(this System.IObservable> source, System.Func> transformFactory, DynamicData.TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull { } public static System.IObservable> TransformAsync(this System.IObservable> source, System.Func> transformFactory, System.IObservable>? forceTransform = null) where TDestination : notnull where TSource : notnull where TKey : notnull { } + public static System.IObservable> TransformAsync(this System.IObservable> source, System.Func, TKey, System.Threading.Tasks.Task> transformFactory, DynamicData.TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull { } public static System.IObservable> TransformAsync(this System.IObservable> source, System.Func, TKey, System.Threading.Tasks.Task> transformFactory, System.IObservable>? forceTransform = null) where TDestination : notnull where TSource : notnull @@ -1879,14 +1891,26 @@ namespace DynamicData where TDestination : notnull where TSource : notnull where TKey : notnull { } + public static System.IObservable> TransformSafeAsync(this System.IObservable> source, System.Func> transformFactory, System.Action> errorHandler, DynamicData.TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull { } public static System.IObservable> TransformSafeAsync(this System.IObservable> source, System.Func> transformFactory, System.Action> errorHandler, System.IObservable>? forceTransform = null) where TDestination : notnull where TSource : notnull where TKey : notnull { } + public static System.IObservable> TransformSafeAsync(this System.IObservable> source, System.Func> transformFactory, System.Action> errorHandler, DynamicData.TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull { } public static System.IObservable> TransformSafeAsync(this System.IObservable> source, System.Func> transformFactory, System.Action> errorHandler, System.IObservable>? forceTransform = null) where TDestination : notnull where TSource : notnull where TKey : notnull { } + public static System.IObservable> TransformSafeAsync(this System.IObservable> source, System.Func, TKey, System.Threading.Tasks.Task> transformFactory, System.Action> errorHandler, DynamicData.TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull { } public static System.IObservable> TransformSafeAsync(this System.IObservable> source, System.Func, TKey, System.Threading.Tasks.Task> transformFactory, System.Action> errorHandler, System.IObservable>? forceTransform = null) where TDestination : notnull where TSource : notnull @@ -2420,6 +2444,13 @@ namespace DynamicData public void Edit(System.Action> updateAction) { } public System.IObservable> Preview(System.Func? predicate = null) { } } + public struct TransformAsyncOptions : System.IEquatable + { + public static readonly DynamicData.TransformAsyncOptions Default; + public TransformAsyncOptions(int? MaximumConcurrency, bool TransformOnRefresh) { } + public int? MaximumConcurrency { get; set; } + public bool TransformOnRefresh { get; set; } + } [System.Serializable] public class UnspecifiedIndexException : System.Exception { diff --git a/src/DynamicData.Tests/Cache/TransformAsyncFixture.cs b/src/DynamicData.Tests/Cache/TransformAsyncFixture.cs index 81f666aa6..a8c4c3b0b 100644 --- a/src/DynamicData.Tests/Cache/TransformAsyncFixture.cs +++ b/src/DynamicData.Tests/Cache/TransformAsyncFixture.cs @@ -5,11 +5,9 @@ using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading.Tasks; - +using DynamicData.Binding; using DynamicData.Tests.Domain; - using FluentAssertions; - using Xunit; namespace DynamicData.Tests.Cache; @@ -93,7 +91,7 @@ public void Remove() public async Task RemoveFlowsToTheEnd() { var transform = 0; - var count = 500; + var count = 100; ReadOnlyObservableCollection collection; var cache = new SourceCache(p => p.Name); @@ -121,9 +119,8 @@ public async Task RemoveFlowsToTheEnd() cache.RemoveKey(p.Name); } - while (transform != count) - await Task.Delay(100); - await Task.Delay(3000); + await collection.ToObservableChangeSet().Take(count * 2); + collection.Count.Should().Be(0); } @@ -207,6 +204,61 @@ public void Update() stub.Results.Messages[1].Updates.Should().Be(1, "Should be 1 update"); } + + + + [Theory, InlineData(true), InlineData(false)] + public void TransformOnRefresh(bool transformOnRefresh) + { + using var source = new SourceCache(p => p.Name); + using var results = source.Connect() + .AutoRefresh() + .TransformAsync((p, key) => Task.FromResult(new PersonWithAgeGroup(p, p.Age < 18 ? "Child" : "Adult")), TransformAsyncOptions.Default with { TransformOnRefresh = transformOnRefresh }).AsAggregator(); + + var person = new Person("SomeOne", 16); + source.AddOrUpdate(person); + + results.Data.Count.Should().Be(1); + results.Data.Lookup("SomeOne").Value.AgeGroup.Should().Be("Child"); + + person.Age = 21; + + + results.Data.Count.Should().Be(1); + results.Data.Lookup("SomeOne").Value.AgeGroup.Should().Be(transformOnRefresh ? "Adult": "Child"); + + } + + + [Theory, InlineData(10), InlineData(100)] + + public async Task WithMaxConcurrency(int maxConcurrency) + { + /* We need to test whether the max concurrency has any effect. + + If maxConcurrency == 100, this test takes a little more than 100 ms + If maxConcurrency = 10, this test takes a little more than 1s + + So it works, but how can it be tested in a scientific way ?? + */ + + + const int transformCount = 100; + + using var source = new SourceCache(p => p.Name); + using var results = source.Connect() + .TransformAsync(async (p, key) => + { + await Task.Delay(100); + + return new PersonWithAgeGroup(p, p.Age < 18 ? "Child" : "Adult"); + }, TransformAsyncOptions.Default with { MaximumConcurrency = maxConcurrency }).AsAggregator(); + + source.AddOrUpdate(Enumerable.Range(1, transformCount).Select(l => new Person("Person" + l, l))); + + await results.Data.CountChanged.Where(c => c == transformCount).Take(1); + } + private class TransformStub : IDisposable { public TransformStub() diff --git a/src/DynamicData.Tests/Cache/TransformSafeAsyncFixture.cs b/src/DynamicData.Tests/Cache/TransformSafeAsyncFixture.cs index 415c9a65b..9221bd1b8 100644 --- a/src/DynamicData.Tests/Cache/TransformSafeAsyncFixture.cs +++ b/src/DynamicData.Tests/Cache/TransformSafeAsyncFixture.cs @@ -5,17 +5,14 @@ using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading.Tasks; - +using DynamicData.Cache; using DynamicData.Kernel; using DynamicData.Tests.Domain; - using FluentAssertions; - using Xunit; namespace DynamicData.Tests.Cache; -[Obsolete("Not obsolete - test commented out due to test run freezing on Appveyor")] public class TransformSafeAsyncFixture { [Fact] @@ -41,165 +38,212 @@ public void ReTransformAll() } } - //[Fact] - //public void ReTransformSelected() - //{ - // var people = Enumerable.Range(1, 10).Select(i => new Person("Name" + i, i)).ToArray(); - // var forceTransform = new Subject>(); - - // using (var stub = new TransformStub(forceTransform)) - // { - // stub.Source.AddOrUpdate(people); - // forceTransform.OnNext(person => person.Age <= 5); - - // stub.Results.Messages.Count.Should().Be(2); - // stub.Results.Messages[1].Updates.Should().Be(5); - - // for (int i = 1; i <= 5; i++) - // { - // var original = stub.Results.Messages[0].ElementAt(i - 1).Current; - // var updated = stub.Results.Messages[1].ElementAt(i - 1).Current; - // updated.Should().Be(original); - // ReferenceEquals(original, updated).Should().BeFalse(); - // } - // } - //} - - //[Fact] - //public async Task Add() - //{ - // using (var stub = new TransformStub()) - // { - // var person = new Person("Adult1", 50); - // stub.Source.AddOrUpdate(person); - - // stub.Results.Messages.Count.Should().Be(1, "Should be 1 updates"); - // stub.Results.Data.Count.Should().Be(1, "Should be 1 item in the cache"); - - // var firstPerson = await stub.TransformFactory(person); - - // stub.Results.Data.Items.First().Should().Be(firstPerson, "Should be same person"); - // } - //} - - //[Fact] - //public void Remove() - //{ - // const string key = "Adult1"; - // var person = new Person(key, 50); - - // using (var stub = new TransformStub()) - // { - // stub.Source.AddOrUpdate(person); - // stub.Source.Remove(key); - - // stub.Results.Messages.Count.Should().Be(2, "Should be 2 updates"); - // stub.Results.Messages.Count.Should().Be(2, "Should be 2 updates"); - // stub.Results.Messages[0].Adds.Should().Be(1, "Should be 80 addes"); - // stub.Results.Messages[1].Removes.Should().Be(1, "Should be 80 removes"); - // stub.Results.Data.Count.Should().Be(0, "Should be nothing cached"); - // } - //} - - //[Fact] - //public void Update() - //{ - // const string key = "Adult1"; - // var newperson = new Person(key, 50); - // var updated = new Person(key, 51); - - // using (var stub = new TransformStub()) - // { - // stub.Source.AddOrUpdate(newperson); - // stub.Source.AddOrUpdate(updated); - - // stub.Results.Messages.Count.Should().Be(2, "Should be 2 updates"); - // stub.Results.Messages[0].Adds.Should().Be(1, "Should be 1 adds"); - // stub.Results.Messages[1].Updates.Should().Be(1, "Should be 1 update"); - // } - //} - - //[Fact] - //public async Task BatchOfUniqueUpdates() - //{ - // var people = Enumerable.Range(1, 100).Select(i => new Person("Name" + i, i)).ToArray(); - // using (var stub = new TransformStub()) - // { - // stub.Source.AddOrUpdate(people); - - // stub.Results.Messages.Count.Should().Be(1, "Should be 1 updates"); - // stub.Results.Messages[0].Adds.Should().Be(100, "Should return 100 adds"); - - // var result = await Task.WhenAll(people.Select(stub.TransformFactory)); - // var transformed = result.OrderBy(p => p.Age).ToArray(); - // stub.Results.Data.Items.OrderBy(p => p.Age).Should().BeEquivalentTo(stub.Results.Data.Items.OrderBy(p => p.Age), "Incorrect transform result"); - // } - //} - - //[Fact] - //public async Task SameKeyChanges() - //{ - // using (var stub = new TransformStub()) - // { - // var people = Enumerable.Range(1, 10).Select(i => new Person("Name", i)).ToArray(); - - // stub.Source.AddOrUpdate(people); - - // stub.Results.Messages.Count.Should().Be(1, "Should be 1 updates"); - // stub.Results.Messages[0].Adds.Should().Be(1, "Should return 1 adds"); - // stub.Results.Messages[0].Updates.Should().Be(9, "Should return 9 adds"); - // stub.Results.Data.Count.Should().Be(1, "Should result in 1 record"); - - // var lastTransformed = await stub.TransformFactory(people.Last()); - // var onlyItemInCache = stub.Results.Data.Items.First(); - - // onlyItemInCache.Should().Be(lastTransformed, "Incorrect transform result"); - // } - //} - - //[Fact] - //public void Clear() - //{ - // using (var stub = new TransformStub()) - // { - // var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - - // stub.Source.AddOrUpdate(people); - // stub.Source.Clear(); - - // stub.Results.Messages.Count.Should().Be(2, "Should be 2 updates"); - // stub.Results.Messages[0].Adds.Should().Be(100, "Should be 80 addes"); - // stub.Results.Messages[1].Removes.Should().Be(100, "Should be 80 removes"); - // stub.Results.Data.Count.Should().Be(0, "Should be nothing cached"); - // } - //} - - //[Fact] - //public void HandleError() - //{ - // using (var stub = new TransformStub(p => - // { - // if (p.Age <= 50) - // return new PersonWithGender(p, p.Age % 2 == 0 ? "M" : "F"); - - // throw new Exception("Broken"); - // })) - // { - // var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); - // stub.Source.AddOrUpdate(people); - - // stub.Results.Error.Should().BeNull(); - - // Exception? error = null; - // stub.Source.Connect() - // .Subscribe(changes => { }, ex => error = ex); - - // error.Should().BeNull(); - - // stub.HandledErrors.Count.Should().Be(50); - // stub.Results.Data.Count.Should().Be(50); - // } - //} + [Fact] + public void ReTransformSelected() + { + var people = Enumerable.Range(1, 10).Select(i => new Person("Name" + i, i)).ToArray(); + var forceTransform = new Subject>(); + + using var stub = new TransformStub(forceTransform); + stub.Source.AddOrUpdate(people); + forceTransform.OnNext(person => person.Age <= 5); + + stub.Results.Messages.Count.Should().Be(2); + stub.Results.Messages[1].Updates.Should().Be(5); + + for (int i = 1; i <= 5; i++) + { + var original = stub.Results.Messages[0].ElementAt(i - 1).Current; + var updated = stub.Results.Messages[1].ElementAt(i - 1).Current; + updated.Should().Be(original); + ReferenceEquals(original, updated).Should().BeFalse(); + } + } + + [Fact] + public async Task Add() + { + using var stub = new TransformStub(); + var person = new Person("Adult1", 50); + stub.Source.AddOrUpdate(person); + + stub.Results.Messages.Count.Should().Be(1, "Should be 1 updates"); + stub.Results.Data.Count.Should().Be(1, "Should be 1 item in the cache"); + + var firstPerson = await stub.TransformFactory(person); + + stub.Results.Data.Items.First().Should().Be(firstPerson, "Should be same person"); + } + + [Fact] + public void Remove() + { + const string key = "Adult1"; + var person = new Person(key, 50); + + using var stub = new TransformStub(); + stub.Source.AddOrUpdate(person); + stub.Source.Remove(key); + + stub.Results.Messages.Count.Should().Be(2, "Should be 2 updates"); + stub.Results.Messages.Count.Should().Be(2, "Should be 2 updates"); + stub.Results.Messages[0].Adds.Should().Be(1, "Should be 80 adds"); + stub.Results.Messages[1].Removes.Should().Be(1, "Should be 80 removes"); + stub.Results.Data.Count.Should().Be(0, "Should be nothing cached"); + } + + [Fact] + public void Update() + { + const string key = "Adult1"; + var newperson = new Person(key, 50); + var updated = new Person(key, 51); + + using (var stub = new TransformStub()) + { + stub.Source.AddOrUpdate(newperson); + stub.Source.AddOrUpdate(updated); + + stub.Results.Messages.Count.Should().Be(2, "Should be 2 updates"); + stub.Results.Messages[0].Adds.Should().Be(1, "Should be 1 adds"); + stub.Results.Messages[1].Updates.Should().Be(1, "Should be 1 update"); + } + } + + [Fact] + public async Task BatchOfUniqueUpdates() + { + var people = Enumerable.Range(1, 100).Select(i => new Person("Name" + i, i)).ToArray(); + using var stub = new TransformStub(); + stub.Source.AddOrUpdate(people); + + stub.Results.Messages.Count.Should().Be(1, "Should be 1 updates"); + stub.Results.Messages[0].Adds.Should().Be(100, "Should return 100 adds"); + + var result = await Task.WhenAll(people.Select(stub.TransformFactory)); + var transformed = result.OrderBy(p => p.Age).ToArray(); + stub.Results.Data.Items.OrderBy(p => p.Age).Should().BeEquivalentTo(stub.Results.Data.Items.OrderBy(p => p.Age), "Incorrect transform result"); + } + + [Fact] + public async Task SameKeyChanges() + { + using var stub = new TransformStub(); + var people = Enumerable.Range(1, 10).Select(i => new Person("Name", i)).ToArray(); + + stub.Source.AddOrUpdate(people); + + stub.Results.Messages.Count.Should().Be(1, "Should be 1 updates"); + stub.Results.Messages[0].Adds.Should().Be(1, "Should return 1 adds"); + stub.Results.Messages[0].Updates.Should().Be(9, "Should return 9 adds"); + stub.Results.Data.Count.Should().Be(1, "Should result in 1 record"); + + var lastTransformed = await stub.TransformFactory(people.Last()); + var onlyItemInCache = stub.Results.Data.Items.First(); + + onlyItemInCache.Should().Be(lastTransformed, "Incorrect transform result"); + } + + [Fact] + public void Clear() + { + using var stub = new TransformStub(); + var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); + + stub.Source.AddOrUpdate(people); + stub.Source.Clear(); + + stub.Results.Messages.Count.Should().Be(2, "Should be 2 updates"); + stub.Results.Messages[0].Adds.Should().Be(100, "Should be 80 addes"); + stub.Results.Messages[1].Removes.Should().Be(100, "Should be 80 removes"); + stub.Results.Data.Count.Should().Be(0, "Should be nothing cached"); + } + + [Fact] + public void HandleError() + { + using var stub = new TransformStub(p => + { + if (p.Age <= 50) + return new PersonWithGender(p, p.Age % 2 == 0 ? "M" : "F"); + + throw new Exception("Broken"); + }); + var people = Enumerable.Range(1, 100).Select(l => new Person("Name" + l, l)).ToArray(); + stub.Source.AddOrUpdate(people); + + stub.Results.Error.Should().BeNull(); + + Exception? error = null; + stub.Source.Connect() + .Subscribe(changes => { }, ex => error = ex); + + error.Should().BeNull(); + + stub.HandledErrors.Count.Should().Be(50); + stub.Results.Data.Count.Should().Be(50); + } + + [Theory, InlineData(true), InlineData(false)] + public void TransformOnRefresh(bool transformOnRefresh) + { + int errorCount = 0; + using var source = new SourceCache(p => p.Name); + using var results = source.Connect() + .AutoRefresh() + .TransformAsync((p, key) => Task.FromResult(new PersonWithAgeGroup(p, p.Age < 18 ? "Child" : "Adult")), TransformAsyncOptions.Default with { TransformOnRefresh = transformOnRefresh }).AsAggregator(); + + var person = new Person("SomeOne", 16); + source.AddOrUpdate(person); + + results.Data.Count.Should().Be(1); + results.Data.Lookup("SomeOne").Value.AgeGroup.Should().Be("Child"); + + person.Age = 21; + + + results.Data.Count.Should().Be(1); + results.Data.Lookup("SomeOne").Value.AgeGroup.Should().Be(transformOnRefresh ? "Adult" : "Child"); + errorCount.Should().Be(0); + } + + + [Theory, InlineData(10), InlineData(100)] + + public async Task WithMaxConcurrency(int maxConcurrency) + { + /* We need to test whether the max concurrency has any effect. + + If maxConcurrency == 100, this test takes a little more than 100 ms + If maxConcurrency = 10, this test takes a little more than 1s + + So it works, but how can it be tested in a scientific way ?? + */ + + int errorCount = 0; + const int transformCount = 100; + + using var source = new SourceCache(p => p.Name); + using var results = source.Connect() + .TransformSafeAsync(async (p, key) => + { + await Task.Delay(100); + + return new PersonWithAgeGroup(p, p.Age < 18 ? "Child" : "Adult"); + } + , error => { errorCount++; } + , TransformAsyncOptions.Default with { MaximumConcurrency = maxConcurrency }) + .AsAggregator(); + + + source.AddOrUpdate(Enumerable.Range(1, transformCount).Select(l => new Person("Person" + l, l))); + + + await results.Data.CountChanged.Where(c => c == transformCount).Take(1); + + errorCount.Should().Be(0); + } + private class TransformStub : IDisposable { diff --git a/src/DynamicData.Tests/Domain/PersonWithGender.cs b/src/DynamicData.Tests/Domain/PersonWithGender.cs index 7f6f8203e..dc4ac6c09 100644 --- a/src/DynamicData.Tests/Domain/PersonWithGender.cs +++ b/src/DynamicData.Tests/Domain/PersonWithGender.cs @@ -1,6 +1,7 @@ using System; namespace DynamicData.Tests.Domain; +public record PersonWithAgeGroup(Person Person, string AgeGroup); public class PersonWithGender : IEquatable { diff --git a/src/DynamicData.Tests/DynamicData.Tests.csproj b/src/DynamicData.Tests/DynamicData.Tests.csproj index 1713b9081..202c0f2b1 100644 --- a/src/DynamicData.Tests/DynamicData.Tests.csproj +++ b/src/DynamicData.Tests/DynamicData.Tests.csproj @@ -1,6 +1,6 @@  - net6.0;net7.0;net8.0 + net8.0 $(NoWarn);CS0618;CA1801;CA1812;CA1816;CA1063;CS8767;CS8602;CS8618;IDE1006 enable latest diff --git a/src/DynamicData/Cache/Internal/TransformAsync.cs b/src/DynamicData/Cache/Internal/TransformAsync.cs index 2b0b5788a..0e0a4f4b0 100644 --- a/src/DynamicData/Cache/Internal/TransformAsync.cs +++ b/src/DynamicData/Cache/Internal/TransformAsync.cs @@ -3,71 +3,59 @@ // See the LICENSE file in the project root for full license information. using System.Reactive.Linq; - +using System.Reactive.Threading.Tasks; using DynamicData.Kernel; namespace DynamicData.Cache.Internal; -internal sealed class TransformAsync(IObservable> source, Func, TKey, Task> transformFactory, Action>? exceptionCallback, IObservable>? forceTransform = null) +internal class TransformAsync( + IObservable> source, + Func, TKey, Task> transformFactory, + Action>? exceptionCallback, + IObservable>? forceTransform = null, + int? maximumConcurrency = null, + bool transformOnRefresh = false) where TDestination : notnull where TSource : notnull where TKey : notnull { - public IObservable> Run() => Observable.Create>(observer => - { - var cache = new ChangeAwareCache(); - var asyncLock = new SemaphoreSlim(1, 1); - - var transformer = source.Select(async changes => - { - try - { - await asyncLock.WaitAsync(); - return await DoTransform(cache, changes).ConfigureAwait(false); - } - finally - { - asyncLock.Release(); - } - }).Concat(); - - if (forceTransform is not null) - { - var locker = new object(); - var forced = forceTransform.Synchronize(locker) - .Select(async shouldTransform => - { - try - { - await asyncLock.WaitAsync(); - return await DoTransform(cache, shouldTransform).ConfigureAwait(false); - } - finally - { - asyncLock.Release(); - } - }).Concat(); - - transformer = transformer.Synchronize(locker).Merge(forced); - } - - return transformer.SubscribeSafe(observer); - }); - - private async Task> DoTransform(ChangeAwareCache cache, Func shouldTransform) - { - var toTransform = cache.KeyValues.Where(kvp => shouldTransform(kvp.Value.Source, kvp.Key)).Select(kvp => new Change(ChangeReason.Update, kvp.Key, kvp.Value.Source, kvp.Value.Source)).ToArray(); + public IObservable> Run() => + Observable.Create>(observer => + { + var cache = new ChangeAwareCache(); - var transformed = await Task.WhenAll(toTransform.Select(Transform)).ConfigureAwait(false); + var transformer = source.Select(changes => DoTransform(cache, changes)).Concat(); - return ProcessUpdates(cache, transformed); - } + if (forceTransform is not null) + { + var locker = new object(); + var forced = forceTransform.Synchronize(locker) + .Select(shouldTransform => DoTransform(cache, shouldTransform)).Concat(); + + transformer = transformer.Synchronize(locker).Merge(forced); + } - private async Task> DoTransform(ChangeAwareCache cache, IChangeSet changes) + return transformer.SubscribeSafe(observer); + }); + + private IObservable> DoTransform(ChangeAwareCache cache, Func shouldTransform) { - var transformed = await Task.WhenAll(changes.Select(Transform)).ConfigureAwait(false); + var toTransform = cache.KeyValues.Where(kvp => shouldTransform(kvp.Value.Source, kvp.Key)).Select(kvp => + new Change(ChangeReason.Update, kvp.Key, kvp.Value.Source, kvp.Value.Source)).ToArray(); + + return toTransform.Select(change => Observable.Defer(() => Transform(change).ToObservable())) + .Merge(maximumConcurrency ?? int.MaxValue) + .ToArray() + .Select(transformed => ProcessUpdates(cache, transformed)); + } - return ProcessUpdates(cache, transformed); + private IObservable> DoTransform( + ChangeAwareCache cache, IChangeSet changes) + { + return changes.Select(change => Observable.FromAsync(() => Transform(change))) + .Merge(maximumConcurrency ?? int.MaxValue) + .ToArray() + .Select(transformed => ProcessUpdates(cache, transformed)); } private ChangeSet ProcessUpdates(ChangeAwareCache cache, TransformResult[] transformedItems) @@ -76,7 +64,8 @@ private ChangeSet ProcessUpdates(ChangeAwareCache !t.Success).ToArray(); if (errors.Length > 0) { - errors.ForEach(t => exceptionCallback?.Invoke(new Error(t.Error, t.Change.Current, t.Change.Key))); + errors.ForEach(t => + exceptionCallback?.Invoke(new Error(t.Error, t.Change.Current, t.Change.Key))); } foreach (var result in transformedItems.Where(t => t.Success)) @@ -94,12 +83,21 @@ private ChangeSet ProcessUpdates(ChangeAwareCache new Change(change.Reason, change.Key, change.Current.Destination, change.Previous.Convert(x => x.Destination), change.CurrentIndex, change.PreviousIndex)); return new ChangeSet(transformed); @@ -109,9 +107,10 @@ private async Task Transform(Change change) { try { - if (change.Reason == ChangeReason.Add || change.Reason == ChangeReason.Update) + if (change.Reason is ChangeReason.Add or ChangeReason.Update || (change.Reason is ChangeReason.Refresh && transformOnRefresh)) { - var destination = await transformFactory(change.Current, change.Previous, change.Key).ConfigureAwait(false); + var destination = await transformFactory(change.Current, change.Previous, change.Key) + .ConfigureAwait(false); return new TransformResult(change, new TransformedItemContainer(change.Current, destination)); } diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index 06676ef97..726dc1743 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -4764,6 +4764,87 @@ public static IObservable> TransformAsync(source, transformFactory, null, forceTransform).Run(); } + /// + /// Projects each update item to a new form using the specified transform function. + /// + /// The type of the destination. + /// The type of the source. + /// The type of the key. + /// The source. + /// The transform factory. + /// The transform options. + /// + /// A transformed update collection. + /// + /// source + /// or + /// transformFactory. + [SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "By Design.")] + public static IObservable> TransformAsync(this IObservable> source, Func> transformFactory, TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory)); + + return source.TransformAsync((current, _, _) => transformFactory(current), options); + } + + /// + /// Projects each update item to a new form using the specified transform function. + /// + /// The type of the destination. + /// The type of the source. + /// The type of the key. + /// The source. + /// The transform factory. + /// The transform options. + /// + /// A transformed update collection. + /// + /// source + /// or + /// transformFactory. + [SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "By Design.")] + public static IObservable> TransformAsync(this IObservable> source, Func> transformFactory, TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory)); + + return source.TransformAsync((current, _, key) => transformFactory(current, key), options); + } + + /// + /// Projects each update item to a new form using the specified transform function. + /// + /// The type of the destination. + /// The type of the source. + /// The type of the key. + /// The source. + /// The transform factory. + /// The transform options. + /// + /// A transformed update collection. + /// + /// source + /// or + /// transformFactory. + [SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "By Design.")] + public static IObservable> TransformAsync(this IObservable> source, Func, TKey, Task> transformFactory, TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory)); + + return new TransformAsync(source, transformFactory, null, null, options.MaximumConcurrency, options.TransformOnRefresh).Run(); + } + /// /// Equivalent to a select many transform. To work, the key must individually identify each child. /// @@ -5090,6 +5171,93 @@ public static IObservable> TransformSafeAsync(source, transformFactory, errorHandler, forceTransform).Run(); } + /// + /// Projects each update item to a new form using the specified transform function. + /// + /// The type of the destination. + /// The type of the source. + /// The type of the key. + /// The source. + /// The transform factory. + /// The error handler. + /// Additional transform options. + /// + /// A transformed update collection. + /// + /// source + /// or + /// transformFactory. + [SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "By Design.")] + public static IObservable> TransformSafeAsync(this IObservable> source, Func> transformFactory, Action> errorHandler, TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory)); + errorHandler.ThrowArgumentNullExceptionIfNull(nameof(errorHandler)); + + return source.TransformSafeAsync((current, _, _) => transformFactory(current), errorHandler, options); + } + + /// + /// Projects each update item to a new form using the specified transform function. + /// + /// The type of the destination. + /// The type of the source. + /// The type of the key. + /// The source. + /// The transform factory. + /// The error handler. + /// Additional transform options. + /// + /// A transformed update collection. + /// + /// source + /// or + /// transformFactory. + [SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "By Design.")] + public static IObservable> TransformSafeAsync(this IObservable> source, Func> transformFactory, Action> errorHandler, TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory)); + errorHandler.ThrowArgumentNullExceptionIfNull(nameof(errorHandler)); + + return source.TransformSafeAsync((current, _, key) => transformFactory(current, key), errorHandler, options); + } + + /// + /// Projects each update item to a new form using the specified transform function. + /// + /// The type of the destination. + /// The type of the source. + /// The type of the key. + /// The source. + /// The transform factory. + /// The error handler. + /// Additional transform options. + /// + /// A transformed update collection. + /// + /// source + /// or + /// transformFactory. + [SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "By Design.")] + public static IObservable> TransformSafeAsync(this IObservable> source, Func, TKey, Task> transformFactory, Action> errorHandler, TransformAsyncOptions options) + where TDestination : notnull + where TSource : notnull + where TKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory)); + errorHandler.ThrowArgumentNullExceptionIfNull(nameof(errorHandler)); + + return new TransformAsync(source, transformFactory, errorHandler, null, options.MaximumConcurrency, options.TransformOnRefresh).Run(); + } + /// /// Transforms the object to a fully recursive tree, create a hierarchy based on the pivot function. /// diff --git a/src/DynamicData/Cache/TransformAsyncOptions.cs b/src/DynamicData/Cache/TransformAsyncOptions.cs new file mode 100644 index 000000000..e9b43aeba --- /dev/null +++ b/src/DynamicData/Cache/TransformAsyncOptions.cs @@ -0,0 +1,18 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. +namespace DynamicData; + +/// +/// Options for TransformAsync and TransformSafeAsync. +/// +/// The maximum number of tasks in flight at once. +/// Should a new transform be applied when a refresh event is received. +public record struct TransformAsyncOptions(int? MaximumConcurrency, bool TransformOnRefresh) +{ + /// + /// The default transform async option values, with is unlimited concurrency and do not transform on reset. + /// + /// A TransformAsyncOptions object. + public static readonly TransformAsyncOptions Default = new(null, false); +}