Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Async ObservableChangeSet.Create<int, int>() does not publish changes until factory function completes. #383

Closed
thargy opened this issue Jul 9, 2020 · 7 comments
Labels

Comments

@thargy
Copy link

thargy commented Jul 9, 2020

Describe the bug

Consider the following code using Observable.Create from Reactive Extensions:

var random = new Random(1234);
var observable = Observable.Create<int>(
    async (observer, token) =>
    {
        Console.WriteLine("Started");
        while (!token.IsCancellationRequested)
        {
            var next = random.Next(0, 4);
            Console.WriteLine($"Pushing {next}");
            observer.OnNext(next);

            await Task.Delay(1000, token);
        }

        Console.WriteLine("Stopped");
    });
    
Console.WriteLine("Here");
using (var sub1 = observable.Subscribe(o => $"Sub1 received {o}".Dump()))
{
    Console.WriteLine("Here 2");
    await Task.Delay(5000);
}
Console.WriteLine("Finished");

This is a simplification of a common use case, allowing for an on-demand triggering of an asynchronous method that produces changes over time (for example listening to a USB device, or a network port). It is particularly powerful when combined with RefCount(), or similar, to allow multiple subscribers to share a single connection, which is disposed (by token being cancelled) when everyone stops listening.

The above code produces:

Here
Started
Pushing 0
Sub1 received 0
Here 2
Pushing 2
Sub1 received 2
Pushing 0
Sub1 received 0
Pushing 0
Sub1 received 0
Pushing 3
Sub1 received 3
Stopped
Finished

The equivalent in Dynamic Data would be:

var random = new Random(1234);
var observable = ObservableChangeSet.Create<int, int>(
    async (changeSet, token) =>
    {
        Console.WriteLine("Started");
        while (!token.IsCancellationRequested)
        {
            var next = random.Next(0, 4);
            Console.WriteLine($"Pushing {next}");
            changeSet.AddOrUpdate(next);

            await Task.Delay(1000, token);
        }

        Console.WriteLine("Stopped");
    },
    i => i)
    .Select(cs => cs.Select(c => c.Current).ToList());

Console.WriteLine("Here");
using (var sub1 = observable.Subscribe(o => $"Sub1 received {o}".Dump()))
{
    Console.WriteLine("Here 2");
    await Task.Delay(5000);
}
Console.WriteLine("Finished");

Which should have identical output to above, but instead produces:

Here
Started
Pushing 1
Here 2
Pushing 3
Pushing 1
Pushing 3
Pushing 1
Finished

As you can see, the subscriber is not receiving a notification, nor is token being cancelled directly (it appears that the factory function does stop, however, possible due to the task being terminated). The latter is less an issue as token cancellation is not 'guaranteed' by the Reactive Extensions specs anyway (but is a nice to have).

@thargy thargy added the bug label Jul 9, 2020
@geometrikal
Copy link

I have a similar problem in a Blazor WASM app.

If I change

await subscribe(cache, ct).ConfigureAwait(false);
to simply subscribe(cache, ct); it works.

However, when OperationCanceledException (or any other Exception) is thrown, the exception is not caught and thus OnError is not called. Changing the code to this enables the exceptions to be caught:

return Observable.Create<IChangeSet<TObject, TKey>>(
            async (observer, ct) =>
            {
                var cache = new SourceCache<TObject, TKey>(keySelector);

                _ = Task.Run(async () =>
                {
                    try
                    {
                        await subscribe(cache, ct);
                    }
                    catch (Exception e)
                    {
                        observer.OnError(e);
                    }
                });

                return new CompositeDisposable(cache.Connect().SubscribeSafe(observer), cache, Disposable.Create(observer.OnCompleted));
            });

@glennawatson
Copy link
Member

If you're in a position opening a PR can help

Get a unit test if you can with the broken functionality

@thargy
Copy link
Author

thargy commented May 17, 2022

@geometrikal good find!

Removing the ConfigureAwait forces the code to schedule the continuation back to the calling context, whereas with the ConfigureAwait present it is likely to continue, synchronously, on the same thread as the subscribe concluded, which is probably different from the original caller thread.

The Task.Run change 'works' because it forces the entire subscribe(cache,ct) block to be deferred, allowing the SubscribeSafe code to be called first, and so the OnError(e) is not called prior to the function returning.

I suspect it isn't 'correct' though as it is just delaying the subscription arbitrarily, and will be impacted by what happens after the method returns.

We need to understand what it is that needs to occur before the subscription does, and then schedule the subscribe call after that explicitly.

@RolandPheasant
Copy link
Collaborator

This seems to do it.

        return Observable.Create<IChangeSet<TObject, TKey>>(
            async (observer, ct) =>
            {
                var cache = new SourceCache<TObject, TKey>(keySelector);
                var responder = cache.Connect().SubscribeSafe(observer);

                try
                {
                    await subscribe(cache, ct);
                }
                catch (Exception e)
                {
                    observer.OnError(e);
                }

                return new CompositeDisposable(responder, cache, Disposable.Create(observer.OnCompleted));
            });

The key is to subscribe to the cache changes before awaiting the subscription.

Now I need to work out how to make a proper test.

@RolandPheasant
Copy link
Collaborator

I've checked the fix in and will deploy later today, or tomorrow. Please let me know if the issue has been correctly fixed

thargy added a commit to DevDecoder/HIDDevices that referenced this issue May 24, 2022
* When a controller name contains "XBox", the sample application will
issue a warning and not connect it, avoiding confusion.

* Updated NuGet pacakges, this is particularly important as
[DynamicData](https://github.com/reactivemarbles/DynamicData) has some
bug fixes (notably
[#383](reactivemarbles/DynamicData#383 (comment))).
@thargy
Copy link
Author

thargy commented May 24, 2022

I can confirm that it was broken in 7.8.1 and fixed in 7.8.5 (i.e. you've fixed the initial bug as raised).

Honestly, this is actually quite an important fix to a major use case, congratulations! 🍾

@github-actions
Copy link

github-actions bot commented Jun 8, 2022

This issue has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jun 8, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

4 participants