Skip to content

Commit

Permalink
Fix threading issue in observable cache. Fixes #538 (#539)
Browse files Browse the repository at this point in the history
Co-authored-by: Glenn <5834289+glennawatson@users.noreply.github.com>
  • Loading branch information
RolandPheasant and glennawatson authored Dec 18, 2021
1 parent 5ac9fb9 commit aa32432
Showing 1 changed file with 11 additions and 16 deletions.
27 changes: 11 additions & 16 deletions src/DynamicData/Cache/ObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,22 @@ public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool>? predi
Observable.Create<IChangeSet<TObject, TKey>>(
observer =>
{
var initial = InternalEx.Return(() =>
lock (_locker)
{
// lock getting initial changes and rely on a combination of Concat
// + _changes being synchronized to produce thread safety (I hope!)
lock (_locker)
var initial = InternalEx.Return(() => (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate));
var changes = initial.Concat(_changes);
if (predicate != null)
{
return (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate);
changes = changes.Filter(predicate, suppressEmptyChangeSets);
}
else if (suppressEmptyChangeSets)
{
changes = changes.NotEmpty();
}
});
var changes = Observable.Defer(() => initial).Concat(_changes);
if (predicate != null)
{
changes = changes.Filter(predicate, suppressEmptyChangeSets);
}
else if (suppressEmptyChangeSets)
{
changes = changes.NotEmpty();
return changes.SubscribeSafe(observer);
}
return changes.SubscribeSafe(observer);
});

public void Dispose() => _cleanUp.Dispose();
Expand Down

0 comments on commit aa32432

Please sign in to comment.