Skip to content

Commit

Permalink
Async subscription fix (#597)
Browse files Browse the repository at this point in the history
Fix async overloads for ObservableChangeSet.Create
  • Loading branch information
RolandPheasant authored May 23, 2022
1 parent 8ce6dd4 commit 9b7f857
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 9 deletions.
61 changes: 61 additions & 0 deletions src/DynamicData.Tests/Cache/ObservableChangeSetFixture.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
Expand All @@ -15,6 +16,66 @@ namespace DynamicData.Tests.Cache;

public class ObservableChangeSetFixture
{

[Fact]
[Description(" See https://github.com/reactivemarbles/DynamicData/issues/383")]
public async Task AsyncSubscriptionCanReceiveMultipleResults()
{

//the aim of this test is to ensure we can continuously receive subscriptions when we use the async subscribe overloads
var result = new List<int>();


var observable = ObservableChangeSet.Create<int, int>(
async (changeSet, token) =>
{
int i = 0;
while (!token.IsCancellationRequested)
{
changeSet.AddOrUpdate(i++);
/*
* Without ConfigureAwait(false) we get a flakey test which always work when run in isolation
* but periodically fails when all tests are run. WTAF - I have no idea why but can only speculate
* that without it the context is returning to the context of the test runner and it doesn't get back to it
* until after the test session ends
*/
await Task.Delay(5, token).ConfigureAwait(false);
}
},
i => i)
.Select(cs => cs.Select(c => c.Current).ToList());


bool isComplete = false;
Exception? error = null;


//load list of results
var subscriber = observable
.Subscribe(item => result.AddRange(item), ex => error = ex, () => isComplete = true);

//allow some results through
await Task.Delay(100);

isComplete.Should().Be(false);
error.Should().BeNull();

//do not try to be clever with timings because wierd stuff happens in time
result.Take(5).Should().BeEquivalentTo(new List<int>
{
0,
1,
2,
3,
4
});

subscriber.Dispose();
}


[Fact]
public void HandlesAsyncError()
{
Expand Down
27 changes: 18 additions & 9 deletions src/DynamicData/ObservableChangeSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
throw new ArgumentNullException(nameof(keySelector));
}

return Create(async (list, _) => await subscribe(list).ConfigureAwait(false), keySelector);
return Create(async (cache, _) => await subscribe(cache).ConfigureAwait(false), keySelector);
}

/// <summary>
Expand Down Expand Up @@ -135,6 +135,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
{
var cache = new SourceCache<TObject, TKey>(keySelector);
var disposable = new SingleAssignmentDisposable();
var responder = cache.Connect().SubscribeSafe(observer);
try
{
Expand All @@ -145,7 +146,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
observer.OnError(e);
}
return new CompositeDisposable(cache.Connect().SubscribeSafe(observer), cache, disposable, Disposable.Create(observer.OnCompleted));
return new CompositeDisposable(responder, cache, disposable, Disposable.Create(observer.OnCompleted));
});
}

Expand All @@ -170,7 +171,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
throw new ArgumentNullException(nameof(keySelector));
}

return Create((list, _) => subscribe(list), keySelector);
return Create((cache, _) => subscribe(cache), keySelector);
}

/// <summary>
Expand Down Expand Up @@ -198,6 +199,8 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
async (observer, ct) =>
{
var cache = new SourceCache<TObject, TKey>(keySelector);
var responder = cache.Connect().SubscribeSafe(observer);
Action? disposeAction = null;
try
Expand All @@ -210,7 +213,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
}
return new CompositeDisposable(
cache.Connect().SubscribeSafe(observer),
responder,
cache,
Disposable.Create(
() =>
Expand Down Expand Up @@ -246,6 +249,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
async observer =>
{
var cache = new SourceCache<TObject, TKey>(keySelector);
var responder = cache.Connect().SubscribeSafe(observer);
try
{
Expand All @@ -256,7 +260,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
observer.OnError(e);
}
return new CompositeDisposable(cache.Connect().SubscribeSafe(observer), cache, Disposable.Create(observer.OnCompleted));
return new CompositeDisposable(responder, cache, Disposable.Create(observer.OnCompleted));
});
}

Expand Down Expand Up @@ -285,6 +289,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
async (observer, ct) =>
{
var cache = new SourceCache<TObject, TKey>(keySelector);
var responder = cache.Connect().SubscribeSafe(observer);
try
{
Expand All @@ -295,7 +300,7 @@ public static IObservable<IChangeSet<TObject, TKey>> Create<TObject, TKey>(Func<
observer.OnError(e);
}
return new CompositeDisposable(cache.Connect().SubscribeSafe(observer), cache, Disposable.Create(observer.OnCompleted));
return new CompositeDisposable(responder, cache, Disposable.Create(observer.OnCompleted));
});
}

Expand Down Expand Up @@ -395,6 +400,7 @@ public static IObservable<IChangeSet<T>> Create<T>(Func<ISourceList<T>, Cancella
var list = new SourceList<T>();
IDisposable? disposeAction = null;
SingleAssignmentDisposable actionDisposable = new();
var responder = list.Connect().SubscribeSafe(observer);
try
{
Expand All @@ -406,7 +412,7 @@ public static IObservable<IChangeSet<T>> Create<T>(Func<ISourceList<T>, Cancella
}
return new CompositeDisposable(
list.Connect().SubscribeSafe(observer),
responder,
list,
actionDisposable,
Disposable.Create(
Expand Down Expand Up @@ -451,6 +457,8 @@ public static IObservable<IChangeSet<T>> Create<T>(Func<ISourceList<T>, Cancella
async (observer, ct) =>
{
var list = new SourceList<T>();
var responder = list.Connect().SubscribeSafe(observer);
Action? disposeAction = null;
try
Expand All @@ -463,7 +471,7 @@ public static IObservable<IChangeSet<T>> Create<T>(Func<ISourceList<T>, Cancella
}
return new CompositeDisposable(
list.Connect().SubscribeSafe(observer),
responder,
list,
Disposable.Create(
() =>
Expand Down Expand Up @@ -491,6 +499,7 @@ public static IObservable<IChangeSet<T>> Create<T>(Func<ISourceList<T>, Task> su
async observer =>
{
var list = new SourceList<T>();
var responder = list.Connect().SubscribeSafe(observer);
try
{
Expand All @@ -501,7 +510,7 @@ public static IObservable<IChangeSet<T>> Create<T>(Func<ISourceList<T>, Task> su
observer.OnError(e);
}
return new CompositeDisposable(list.Connect().SubscribeSafe(observer), list, Disposable.Create(observer.OnCompleted));
return new CompositeDisposable(responder, list, Disposable.Create(observer.OnCompleted));
});
}

Expand Down

0 comments on commit 9b7f857

Please sign in to comment.