Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: OfType Operator #865

Merged
merged 10 commits into from
Feb 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,10 @@ namespace DynamicData
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> NotEmpty<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source)
where TObject : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> OfType<TObject, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, bool suppressEmptyChangeSets = true)
where TObject : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TObject, TKey>> OnItemAdded<TObject, TKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Action<TObject> addAction)
where TObject : notnull
where TKey : notnull { }
Expand Down
157 changes: 157 additions & 0 deletions src/DynamicData.Tests/Cache/OfTypeFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Bogus;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace DynamicData.Tests.Cache;

public class OfTypeFixture : IDisposable
{
#if DEBUG
const int AddCount = 7;
const int UpdateCount = 5;
const int RemoveCount = 3;
#else
const int AddCount = 101;
const int UpdateCount = 57;
const int RemoveCount = 53;
#endif

private readonly Randomizer _randomizer;

private readonly Faker<Person> _personFaker;

private readonly Faker<CatPerson> _catPersonFaker;

private readonly SourceCache<Person, string> _sourceCache = new(p => p.Id);

private readonly ChangeSetAggregator<Person, string> _personResults;

private readonly ChangeSetAggregator<CatPerson, string> _catPersonResults;

public OfTypeFixture(ITestOutputHelper testOutputHelper)
{
_randomizer = new(0x3737_ddcc);
_personFaker = new Faker<Person>().CustomInstantiator(faker => new Person(faker.Person.FullName)).WithSeed(_randomizer);
_catPersonFaker = new Faker<CatPerson>().CustomInstantiator(faker => new CatPerson(faker.Person.FullName, $"{faker.Hacker.Adjective()} the {faker.Hacker.Noun()}")).WithSeed(_randomizer);
_personResults = _sourceCache.Connect().TestSpy(testOutputHelper, "Cache").AsAggregator();
_catPersonResults = _sourceCache.Connect().OfType<Person, string, CatPerson>().TestSpy(testOutputHelper, "OfType").AsAggregator();
}

[Fact]
public void AddedItemsAreInResults()
{
// Arrange
var people = _personFaker.Generate(AddCount);
var catPeople = _catPersonFaker.Generate(AddCount);

_sourceCache.AddOrUpdate(people);
_sourceCache.AddOrUpdate(catPeople);

_personResults.Summary.Overall.Adds.Should().Be(AddCount * 2);
_personResults.Messages.Count.Should().Be(2);
_catPersonResults.Summary.Overall.Adds.Should().Be(AddCount);
_catPersonResults.Messages.Count.Should().Be(1);
CheckResults();
}

[Fact]
public void RemovedItemsAreNotResults()
{
var people = _personFaker.Generate(AddCount);
var catPeople = _catPersonFaker.Generate(AddCount);

_sourceCache.AddOrUpdate(people);
_sourceCache.AddOrUpdate(catPeople);
_sourceCache.Remove(_randomizer.ListItems(people, RemoveCount));
_sourceCache.Remove(_randomizer.ListItems(catPeople, RemoveCount));

_personResults.Summary.Overall.Adds.Should().Be(AddCount * 2);
_personResults.Summary.Overall.Removes.Should().Be(RemoveCount * 2);
_personResults.Messages.Count.Should().Be(4);
_catPersonResults.Summary.Overall.Adds.Should().Be(AddCount);
_catPersonResults.Summary.Overall.Removes.Should().Be(RemoveCount);
_catPersonResults.Messages.Count.Should().Be(2);
CheckResults();
}

[Fact]
public void UpdateResultsAreCorrect()
{
// Arrange
var people = _personFaker.Generate(AddCount);
var catPeople = _catPersonFaker.Generate(AddCount);

_sourceCache.AddOrUpdate(people);
_sourceCache.AddOrUpdate(catPeople);

var updates = _randomizer.ListItems(people.Concat(catPeople).ToList(), UpdateCount);
var preUpdateCatPeople = updates.Where(p => p is CatPerson).ToList();
var updated = updates.Select(p => GenerateUpdateRandom(p.Id)).ToList();
var postUpdateCatPeople = updated.Where(p => p is CatPerson).ToList();
var catToCatCount = preUpdateCatPeople.Count(p => postUpdateCatPeople.Any(pu => pu.Id == p.Id));
var catToNonCount = preUpdateCatPeople.Count - catToCatCount;
var nonToCatCount = postUpdateCatPeople.Count(p => !preUpdateCatPeople.Any(pu => pu.Id == p.Id));

// Act
_sourceCache.AddOrUpdate(updated);

// Assert
_personResults.Summary.Overall.Adds.Should().Be(AddCount * 2);
_personResults.Summary.Overall.Updates.Should().Be(UpdateCount);
_personResults.Messages.Count.Should().Be(3);
_catPersonResults.Summary.Overall.Adds.Should().Be(AddCount + nonToCatCount);
_catPersonResults.Summary.Overall.Removes.Should().Be(catToNonCount);
_catPersonResults.Summary.Overall.Updates.Should().Be(catToCatCount);
_catPersonResults.Messages.Count.Should().Be(2);
CheckResults();
}

public void Dispose()
{
_sourceCache.Dispose();
_personResults.Dispose();
_catPersonResults.Dispose();
}

private IEnumerable<Person> GeneratePeople(int count = AddCount) => Enumerable.Range(0, count).Select(_ => _randomizer.Bool() ? _personFaker.Generate() : _catPersonFaker.Generate());

private Person GenerateUpdateRandom(string id) => _randomizer.Bool() ? GenerateUpdatePerson(id) : GenerateUpdateCatPerson(id);

private Person GenerateUpdatePerson(string id) => new(_personFaker.Generate().Name, id);

private CatPerson GenerateUpdateCatPerson(string id)
{
var newCp = _catPersonFaker.Generate();
return new CatPerson(newCp.Name, newCp.CatName, id);
}

private void CheckResults()
{
var expectedPeople = _sourceCache.Items;
var expectedCatPeople = expectedPeople.OfType<CatPerson>();

_personResults.Data.Items.Should().BeEquivalentTo(expectedPeople);
_catPersonResults.Data.Items.Should().BeEquivalentTo(expectedCatPeople);
}

private interface ICatPerson
{
string CatName { get; }
}

private record Person(string Name, string Id)
{
public Person(string Name) : this(Name, Guid.NewGuid().ToString("N")) { }
}

private record CatPerson(string Name, string CatName, string Id) : Person(Name, Id), ICatPerson
{
public CatPerson(string Name, string CatName) : this(Name, CatName, Guid.NewGuid().ToString("N")) { }
}
}
87 changes: 48 additions & 39 deletions src/DynamicData.Tests/Utilities/ObservableSpy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Reactive.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using Xunit.Abstractions;

namespace DynamicData.Tests.Utilities;

Expand Down Expand Up @@ -31,30 +32,27 @@ private static class NativeMethods
/// <param name="showTimestamps">Indicates whether or not timestamps should be prepended to messages.</param>
/// <returns>An IObservable{T} with the Spy events included.</returns>
/// <remarks>Adapted from https://stackoverflow.com/q/20220755/.</remarks>
public static IObservable<T> Spy<T>(this IObservable<T> source, string? infoText = null, Action<string>? logger = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
public static IObservable<T> Spy<T>(this IObservable<T> source, string? infoText = null, Action<string>? logger = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
{
static string NoTimestamp() => string.Empty;
static string HighResTimestamp() => DateTimeOffset.UtcNow.ToString("HH:mm:ss.fffffff") + " ";
static void NullLogger(string _) { }

var activeSubscriptionCounter = 0;
var subscriptionCounter = 0;

formatter ??= (t => t?.ToString() ?? "{Null}");
logger = CreateLogger(logger ?? Console.WriteLine, showTimestamps ? HighResTimestamp : NoTimestamp, infoText ?? $"IObservable<{typeof(T).Name}>");

var subLogger = showSubs ? logger : NullLogger;
var subLogger = showSubs ? logger : null;

logger("Creating Observable");
return Observable.Create<T>(obs =>
return Observable.Create<T>(observer =>
{
var subId = Interlocked.Increment(ref subscriptionCounter);
var valueCounter = 0;
bool? completedSuccessfully = null;

subLogger($"Creating Subscription #{subId}");
subLogger?.Invoke($"Creating Subscription #{subId}");
try
{
var subscription = source
Expand All @@ -65,79 +63,83 @@ static void NullLogger(string _) { }
{
try
{
obs.OnNext(t);
observer.OnNext(t);
}
catch (Exception ex)
{
logger($"Downstream Exception [SubId:{subId}] ({ex})");
throw;
}
}, obs.OnError, obs.OnCompleted);
}, observer.OnError, observer.OnCompleted);

return Disposable.Create(() =>
{
if (showSubs)
if (subLogger != null)
{
switch (completedSuccessfully)
{
case true: subLogger($"Disposing SubId #{subId} due to OnComplete"); break;
case false: subLogger($"Disposing SubId #{subId} due to OnError"); break;
case null: subLogger($"Disposing SubId #{subId} due to Unsubscribe"); break;
case true: subLogger.Invoke($"Disposing SubId #{subId} due to OnComplete"); break;
case false: subLogger.Invoke($"Disposing SubId #{subId} due to OnError"); break;
case null: subLogger.Invoke($"Disposing SubId #{subId} due to Unsubscribe"); break;
}
}
subscription?.Dispose();
var count = Interlocked.Decrement(ref activeSubscriptionCounter);
subLogger($"Dispose Completed! ({count} Active Subscriptions)");
subLogger?.Invoke($"Dispose Completed! ({count} Active Subscriptions)");
});
}
finally
{
var count = Interlocked.Increment(ref activeSubscriptionCounter);
subLogger($"Subscription Id #{subId} Created! ({count} Active Subscriptions)");
subLogger?.Invoke($"Subscription Id #{subId} Created! ({count} Active Subscriptions)");
}
});
}

public static IObservable<IChangeSet<T, TKey>> Spy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source,
string? opName = null, Action<string>? logger = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
public static IObservable<IChangeSet<T, TKey>> Spy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source, string? opName = null, Action<string>? logger = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull
where TKey : notnull
{
formatter ??= (t => t?.ToString() ?? "{Null}");
return Spy(source, opName, logger, CreateCacheChangeSetFormatter<T, TKey>(formatter!), showSubs, showTimestamps);
}

public static IObservable<IChangeSet<T>> Spy<T>(this IObservable<IChangeSet<T>> source,
string? opName = null, Action<string>? logger = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
where T : notnull
public static IObservable<IChangeSet<T>> Spy<T>(this IObservable<IChangeSet<T>> source, string? opName = null, Action<string>? logger = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull
{
formatter ??= (t => t?.ToString() ?? "{Null}");
return Spy(source, opName, logger, CreateListChangeSetFormatter(formatter!), showSubs, showTimestamps);
}

private static Func<IChangeSet<T, TKey>, string> CreateCacheChangeSetFormatter<T, TKey>(Func<T, string> formatter) where T : notnull where TKey : notnull =>
private static Func<IChangeSet<T, TKey>, string> CreateCacheChangeSetFormatter<T, TKey>(Func<T, string> formatter)
where T : notnull
where TKey : notnull =>
cs => "[Cache Change Set]" + ChangeSetEntrySpacing + string.Join(ChangeSetEntrySpacing, cs.Select((change, n) => $"#{n} {FormatChange(formatter, change)}"));

private static Func<IChangeSet<T>, string> CreateListChangeSetFormatter<T>(Func<T, string> formatter) where T : notnull =>
private static Func<IChangeSet<T>, string> CreateListChangeSetFormatter<T>(Func<T, string> formatter)
where T : notnull =>
cs => "[List Change Set]" + ChangeSetEntrySpacing + string.Join(ChangeSetEntrySpacing, cs.Select((change, n) => $"#{n} {FormatChange(formatter, change)}"));

public static IObservable<T> DebugSpy<T>(this IObservable<T> source, string? opName = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true) =>
#if DEBUG || DEBUG_SPY_ALWAYS
public static IObservable<T> TestSpy<T>(this IObservable<T> source, ITestOutputHelper testOutputHelper, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true) =>
source.Spy(opName, TestLogger(testOutputHelper), formatter, showSubs, showTimestamps);

public static IObservable<IChangeSet<T, TKey>> TestSpy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source, ITestOutputHelper testOutputHelper, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull
where TKey : notnull =>
source.Spy(opName, TestLogger(testOutputHelper), formatter, showSubs, showTimestamps);

public static IObservable<IChangeSet<T>> TestSpy<T>(this IObservable<IChangeSet<T>> source, ITestOutputHelper testOutputHelper, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull =>
source.Spy(opName, TestLogger(testOutputHelper), formatter, showSubs, showTimestamps);

public static IObservable<T> DebugSpy<T>(this IObservable<T> source, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true) =>
#if DEBUG || DEBUG_SPY_ALWAYS
source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps);
#else
source;
#endif

public static IObservable<IChangeSet<T, TKey>> DebugSpy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source,
string? opName = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
public static IObservable<IChangeSet<T, TKey>> DebugSpy<T, TKey>(this IObservable<IChangeSet<T, TKey>> source, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull
where TKey : notnull =>
#if DEBUG || DEBUG_SPY_ALWAYS
Expand All @@ -146,11 +148,8 @@ public static IObservable<IChangeSet<T, TKey>> DebugSpy<T, TKey>(this IObservabl
source;
#endif

public static IObservable<IChangeSet<T>> DebugSpy<T>(this IObservable<IChangeSet<T>> source,
string? opName = null,
Func<T, string>? formatter = null, bool showSubs = true,
bool showTimestamps = true)
where T : notnull =>
public static IObservable<IChangeSet<T>> DebugSpy<T>(this IObservable<IChangeSet<T>> source, string? opName = null, Func<T, string>? formatter = null, bool showSubs = true, bool showTimestamps = true)
where T : notnull =>
#if DEBUG || DEBUG_SPY_ALWAYS
source.Spy(opName, DebugLogger, formatter, showSubs, showTimestamps);
#else
Expand Down Expand Up @@ -199,6 +198,16 @@ private static string FormatItemChange<T>(Func<T, string> formatter, ItemChange<
private static Action<string> CreateLogger(Action<string> baseLogger, Func<string> timeStamper, string opName) =>
msg => baseLogger($"{timeStamper()}[{Environment.CurrentManagedThreadId:X2}] |{opName}| {msg}");

#if DEBUG || DEBUG_SPY_ALWAYS
private static Action<string> TestLogger(ITestOutputHelper testOutputHelper) => str =>
{
testOutputHelper.WriteLine(str);
DebugLogger(str);
};
#else
private static Action<string> TestLogger(ITestOutputHelper testOutputHelper) => testOutputHelper.WriteLine;
#endif

#if DEBUG
private static void DebugLogger(string str) => System.Diagnostics.Debug.WriteLine(str);
#elif DEBUG_SPY_ALWAYS
Expand Down
Loading