diff --git a/src/DynamicData.Tests/Cache/InnerJoinFixtureRaceCondition.cs b/src/DynamicData.Tests/Cache/InnerJoinFixtureRaceCondition.cs index 669a077ee..bdad60ee9 100644 --- a/src/DynamicData.Tests/Cache/InnerJoinFixtureRaceCondition.cs +++ b/src/DynamicData.Tests/Cache/InnerJoinFixtureRaceCondition.cs @@ -32,6 +32,46 @@ public void LetsSeeWhetherWeCanRandomlyHitARaceCondition() ids.InnerJoin(itemsCache.Connect(), x => x.Id, (_, thing) => thing).Subscribe((z) => { }, ex => { }, () => { }); } + // See https://github.com/reactivemarbles/DynamicData/issues/787 + [Fact] + public void LetsSeeWhetherWeCanRandomlyHitADifferentRaceCondition() + { + using var leftSource = new SourceCache(thing => thing.Id); + using var rightSource = new SourceCache(thing => thing.Id); + + var resultStream = ObservableCacheEx.InnerJoin( + left: leftSource.Connect(), + right: rightSource.Connect(), + rightKeySelector: rightThing => rightThing.Id, + (keys, leftThing, rightThing) => new Thing() + { + Id = keys.leftKey, + Name = $"{leftThing.Name} x {rightThing.Name}" + }); + + using var leftThingGenerator = BeginGeneratingThings(leftSource, "Left"); + using var rightThingGenerator = BeginGeneratingThings(rightSource, "Left"); + + for (var i = 0; i < 100; ++i) + { + using var subscription = resultStream.Subscribe(); + } + + IDisposable BeginGeneratingThings(SourceCache source, string namePrefix) + // Generate items infinitely. The runtime of the test is limited by the .Subscribe() loop. + => Observable.Range(1, int.MaxValue, ThreadPoolScheduler.Instance) + .Subscribe(id => + { + source.AddOrUpdate(new Thing() + { + Id = id, + Name = $"{namePrefix}Thing #{id}" + }); + // Start removing items after the first 100, to keep the overhead of calling .Subscribe() down. + source.RemoveKey(id - 100); + }); + } + public class Thing { public long Id { get; set; } diff --git a/src/DynamicData/Cache/Internal/FullJoin.cs b/src/DynamicData/Cache/Internal/FullJoin.cs index ebb220128..f2d91f91b 100644 --- a/src/DynamicData/Cache/Internal/FullJoin.cs +++ b/src/DynamicData/Cache/Internal/FullJoin.cs @@ -118,6 +118,9 @@ public IObservable> Run() => Observable.Creat return joinedCache.CaptureChanges(); }); - return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache); + lock (locker) + { + return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache); + } }); } diff --git a/src/DynamicData/Cache/Internal/InnerJoin.cs b/src/DynamicData/Cache/Internal/InnerJoin.cs index 155645a6c..6332aa9c7 100644 --- a/src/DynamicData/Cache/Internal/InnerJoin.cs +++ b/src/DynamicData/Cache/Internal/InnerJoin.cs @@ -41,7 +41,7 @@ internal class InnerJoin(IObse { foreach (var change in changes.ToConcreteType()) { - var leftCurent = change.Current; + var leftCurrent = change.Current; var rightLookup = rightGrouped.Lookup(change.Key); if (rightLookup.HasValue) @@ -52,7 +52,7 @@ internal class InnerJoin(IObse case ChangeReason.Update: foreach (var keyvalue in rightLookup.Value.KeyValues) { - joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurent, keyvalue.Value), (change.Key, keyvalue.Key)); + joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurrent, keyvalue.Value), (change.Key, keyvalue.Key)); } break; @@ -117,6 +117,9 @@ internal class InnerJoin(IObse return joinedCache.CaptureChanges(); }); - return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect()); + lock (locker) + { + return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect()); + } }); } diff --git a/src/DynamicData/Cache/Internal/LeftJoin.cs b/src/DynamicData/Cache/Internal/LeftJoin.cs index c6ffa9a1a..ef69cb835 100644 --- a/src/DynamicData/Cache/Internal/LeftJoin.cs +++ b/src/DynamicData/Cache/Internal/LeftJoin.cs @@ -119,6 +119,9 @@ public IObservable> Run() => Observable.Creat return joined.CaptureChanges(); }); - return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache); + lock (locker) + { + return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache); + } }); }