Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chaining async's #33

Open
JeffN825 opened this issue Mar 3, 2019 · 2 comments
Open

Chaining async's #33

JeffN825 opened this issue Mar 3, 2019 · 2 comments

Comments

@JeffN825
Copy link

JeffN825 commented Mar 3, 2019

Sorry if this is a question that's already been answered, but I'm trying to figure out if this library is applicable to a big of fairly simple code. Today I have (converted to a more verbose form for clarity)

IEnumerable<MyEntity> entities = await GetEntitiesAsync();
IEnumerable<Task<MyProcessedEntity>> tasks = entities.Select(ProcessEntityAsync);
IEnumerable<MyProcessedEntity> processedEntities = await Task.WhenAll(tasks); // I realize this could introduce throttling issues
var tasksToGetNames = processedEntities.Select(async e => new { Name = await GetNameAsync(e); Entity = e });
var withNames = await Task.WhenAll(tasksToGetNames);
var byName = withNames.ToDictionary(i => i.Name);
return byName

public static async Task<MyProcessedEntity> ProcessEntityAsync(MyEntity e) ....

public static async Task<string> GetNameAsync(MyProcessedEntity e) ....

Obviously this leaves a lot to be desired in terms of verbosity, especially since this kind of code is repeated dozens of times throughout my codebase.

Ideally, I'm looking for the means to write something like this:


Dictionary<string, MyProcessedEntity> byName = 
    await GetEntitiesAsync().SelectAsync(ProcessEntityAsync)
        .ToDictionaryAsync(e => GetNameAsync(e))

I can easily write SelectAsync and ToDictionaryAsync myself as extension methods, but trying to understand if this library provides that kind of functionality already.

@kind-serge
Copy link
Member

@JeffN825 , sorry for late reply as I was on my vacation.

This library definitely has SelectAsync and ToDictionaryAsync, however, those built-in extension methods don't take in async delegates - only synchronous methods.

The solution that is available out of the box may look like this:

IAsyncEnumerble<MyProcessedEntity> ProcessEntities(this IEnumerable<MyEntity> entities) =>
new AsyncEnumerble<MyProcessedEntity>(yield =>
{
  foreach (var entity in entities)
    await yield.ReturnAsync(await ProcessEntityAsync(entity));
};

// ........

IEnumerable<MyEntity> entities = await GetEntitiesAsync();
var dictionary = entities.ProcessEntities().ToDictionaryAsync(e => e.Name);

Let me know if this helps.

@CodingOctocat
Copy link

CodingOctocat commented May 19, 2019

[Bing Translate]
I also had problems with asynchronous streaming chains, I tangled for days, I wasn't familiar with Rx, so I gave up Rx. I've used AsyncEnumerable, but it doesn't seem to have a real yield.

If C#8 (IAsyncEnumerable) is officially launched, my Code should be simple:

try
{
    IEnumerable<LocalEntity> items = ReadItems(/* ... */);
    IAsyncEnumerable<WebEntity> results = await QueryAsync(items);
    IAsyncEnumerable<MyEntity> myEntities = await DoWorkAsync(results);
    await Done(myEntities);
}
catch
{
    // Log
}
finally
{
    // DoSomething
}

The asynchronous flow process I want:
ReadItems:~~~~~~[a-b-c-d-e-f-g-h-i-j-k-l-m-n-o-p-q-r-s-t-u-v-w-x-y-z]
QueryAsync:~~~~~[A-B-C-D-E-F-G-H-I-J-K-(may be throw exception)-...]
DoWorkAsync:~~~~[A-B-C-D-E-F-G-H-(may be throw exception)-...............]
Done:~~~~~~~~~[A-B-C-D-E-F-G-H]
To describe it in words is:
ReadItems yield "a" => QueryAsync "a" yield "A" => DoWorkAsync "A" yield "A" => Doen "A" yield "A", then ReadItems yield "b" => And so on...

However, it appears that after using Dasync/AsyncEnumerable, if an exception occurs, the asynchronous stream is interrupted and the yield data is not continued to be processed.

And, i need support CancellationToken, When I click the Cancel button of the UI, the asynchronous stream should be able to interrupt.

How should I implement this asynchronous flow?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants