Skip to content

Commit

Permalink
To observable change set (#603)
Browse files Browse the repository at this point in the history
Complete re-write of ToObservableChangeSet. Fixes #604

* Fix memory leaks in ToObservableChangeSet no longer has a memory leak.  Fixes #307
* Restore expiry not working after source observable completes. Fixes #358
* Revert issue #326  / #327
  • Loading branch information
RolandPheasant authored Jun 9, 2022
1 parent a6b4936 commit 4375ab1
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 208 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;

using System.Threading.Tasks;
using DynamicData.Tests.Domain;

using FluentAssertions;
Expand Down Expand Up @@ -60,7 +61,13 @@ public void LimitSizeTo()
public void OnNextProducesAnAddAndRemoveChangeForEnumerableSource()
{
var subject = new Subject<IEnumerable<Person>>();
var results = subject.ToObservableChangeSet(p => p.Name).AsAggregator();

var results = ObservableChangeSet.Create<Person, string>(cache =>
{
return subject.Subscribe(items => cache.EditDiff(items, Person.NameAgeGenderComparer));
}, p => p.Name)
.AsAggregator();


var people = new[]
{
Expand All @@ -84,7 +91,7 @@ public void OnNextProducesAnAddAndRemoveChangeForEnumerableSource()

results.Messages.Last().Adds.Should().Be(0, "Should have added no items");
results.Messages.Last().Updates.Should().Be(2, "Should have updated 2 items");
results.Messages.Last().Removes.Should().Be(1, "Should have removed 1 items");
results.Messages.Last().Removes.Should().Be(1, "Should have removed 1 items");
results.Data.Count.Should().Be(2, "Should be 3 items in the cache");

results.Messages.Count.Should().Be(2, "Should be 2 updates");
Expand All @@ -110,4 +117,25 @@ public void OnNextProducesAnAddChangeForEnumerableSource()
results.Data.Count.Should().Be(3, "Should be 1 item in the cache");
results.Data.Items.Should().BeEquivalentTo(results.Data.Items, "Lists should be equivalent");
}



[Fact]
public void ExpireAfterObservableCompleted()
{
//See https://github.com/reactivemarbles/DynamicData/issues/358

var scheduler = new TestScheduler();

var expiry = Observable.Return(Enumerable.Range(0, 10).Select(i => new { A = i, B = 2 * i }))
.ToObservableChangeSet(x => x.A, _ => TimeSpan.FromSeconds(5), scheduler: scheduler)
.AsAggregator();

expiry.Data.Count.Should().Be(10);

scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);

expiry.Data.Count.Should().Be(0);

}
}
6 changes: 3 additions & 3 deletions src/DynamicData.Tests/Cache/ObservableChangeSetFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Task<IEnumerable<Person>> Loader()
},
p => p.Name);

using (var dervived = observable.AsObservableCache())
using var dervived = observable.AsObservableCache();
using (dervived.Connect().Subscribe(_ => { }, ex => error = ex))
{
error.Should().NotBeNull();
Expand All @@ -121,8 +121,8 @@ IEnumerable<Person> Loader()
},
p => p.Name);

using (var dervived = observable.AsObservableCache())
using (dervived.Connect().Subscribe(_ => { }, ex => error = ex))
using var derived = observable.AsObservableCache();
using (derived.Connect().Subscribe(_ => { }, ex => error = ex))
{
error.Should().NotBeNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;

using DynamicData.Kernel;
using DynamicData.Tests.Domain;

using FluentAssertions;

using Microsoft.Reactive.Testing;

using Xunit;

namespace DynamicData.Tests.Cache;

public class ObservableToObservableChangeSetFixture
{

[Fact]
public void ExpireAfterTime()
{
Expand Down Expand Up @@ -53,6 +50,7 @@ public void ExpireAfterTimeDynamic()
results.Data.Count.Should().Be(10, "Should be 10 items in the cache");
}


[Fact]
public void ExpireAfterTimeDynamicWithKey()
{
Expand Down Expand Up @@ -82,9 +80,11 @@ public void ExpireAfterTimeWithKey()
subject.OnNext(person);
}

results.Data.Count.Should().Be(200, "Should 200 items in the cache");

scheduler.AdvanceBy(TimeSpan.FromSeconds(61).Ticks);

results.Messages.Count.Should().Be(400, "Should be 400 messages");
results.Messages.Count.Should().Be(201, "Should be 201 messages");
results.Messages.Sum(x => x.Adds).Should().Be(200, "Should be 200 adds");
results.Messages.Sum(x => x.Removes).Should().Be(200, "Should be 200 removes");
results.Data.Count.Should().Be(0, "Should be no data in the cache");
Expand Down
18 changes: 11 additions & 7 deletions src/DynamicData.Tests/Cache/ToObservableChangeSetFixture.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
using System;
using System.Collections.Generic;

using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
using DynamicData.Tests.Domain;

using FluentAssertions;

using Microsoft.Reactive.Testing;

using Xunit;
using Xunit.Abstractions;

namespace DynamicData.Tests.Cache;

public class ToObservableChangeSetFixture : ReactiveTest, IDisposable
{
private readonly ITestOutputHelper _outputHelper;

private readonly IDisposable _disposable;

private readonly IObservable<Person> _observable;
Expand All @@ -27,8 +32,9 @@ public class ToObservableChangeSetFixture : ReactiveTest, IDisposable

private readonly List<Person> _target;

public ToObservableChangeSetFixture()
public ToObservableChangeSetFixture(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
_scheduler = new TestScheduler();
_observable = _scheduler.CreateColdObservable(OnNext(1, _person1), OnNext(2, _person2), OnNext(3, _person3));

Expand All @@ -37,11 +43,6 @@ public ToObservableChangeSetFixture()
_disposable = _observable.ToObservableChangeSet(p => p.Key, limitSizeTo: 2, scheduler: _scheduler).Clone(_target).Subscribe();
}

public void Dispose()
{
_disposable.Dispose();
}

[Fact]
public void ShouldLimitSizeOfBoundCollection()
{
Expand All @@ -53,4 +54,7 @@ public void ShouldLimitSizeOfBoundCollection()

_target.Count.Should().Be(2, "Should be 2 item in target collection because of size limit");
}


public void Dispose() => _disposable.Dispose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ public ToObservableChangeSetFixtureWithCompletion()
_disposable = _observable.ToObservableChangeSet(p => p.Key).Clone(_target).Subscribe(x => { }, () => _hasCompleted = true);
}

public void Dispose()
{
_disposable.Dispose();
}

[Fact]
// [Fact] - disabled as it's questionable whether the completion should be invoked
public void ShouldReceiveUpdatesThenComplete()
{
_observable.OnNext(new Person("One", 1));
Expand All @@ -48,4 +43,6 @@ public void ShouldReceiveUpdatesThenComplete()
_observable.OnNext(new Person("Three", 3));
_target.Count.Should().Be(2);
}

public void Dispose() => _disposable.Dispose();
}
6 changes: 3 additions & 3 deletions src/DynamicData/Cache/Internal/TimeExpirer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public IObservable<IEnumerable<KeyValuePair<TKey, TObject>>> ForExpiry()
{
var dateTime = DateTime.Now;
var autoRemover = _source.Do(_ => dateTime = _scheduler.Now.DateTime).Transform(
var autoRemover = _source.Do(_ => dateTime = _scheduler.Now.UtcDateTime).Transform(
(t, v) =>
{
var removeAt = _timeSelector(t);
Expand All @@ -87,7 +87,7 @@ void RemovalAction()
{
try
{
var toRemove = autoRemover.KeyValues.Where(kv => kv.Value.ExpireAt <= _scheduler.Now.DateTime).ToList();
var toRemove = autoRemover.KeyValues.Where(kv => kv.Value.ExpireAt <= _scheduler.Now.UtcDateTime).ToList();
observer.OnNext(toRemove.Select(kv => new KeyValuePair<TKey, TObject>(kv.Key, kv.Value.Value)).ToList());
}
Expand All @@ -109,7 +109,7 @@ void RemovalAction()
removalSubscription.Disposable = autoRemover.Connect().DistinctValues(ei => ei.ExpireAt).SubscribeMany(
datetime =>
{
var expireAt = datetime.Subtract(_scheduler.Now.DateTime);
var expireAt = datetime.Subtract(_scheduler.Now.UtcDateTime);
return Observable.Timer(expireAt, _scheduler).Take(1).Subscribe(_ => RemovalAction());
}).Subscribe();
}
Expand Down
Loading

0 comments on commit 4375ab1

Please sign in to comment.