Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in ParallelForEachAsync when exceptions are thrown #57

Open
danylofitel opened this issue Apr 20, 2020 · 3 comments
Open

Comments

@danylofitel
Copy link
Contributor

danylofitel commented Apr 20, 2020

ParallelForEachAsync sometimes does not throw an exception if individual iterations did throw exceptions.

It reproduces with the following code (derived from the scenario where I noticed it):

while (true)
{
    Console.WriteLine();
    IReadOnlyList<int> input = Enumerable.Range(0, 10).ToList();
    ConcurrentQueue<int> output = new ConcurrentQueue<int>();

    try
    {
        await input.ParallelForEachAsync(
            async item =>
            {
                if (item == 0)
                {
                    throw new AggregateException(new Exception("Individual task failed."));
                }

                await Task.Delay(1);
                output.Enqueue(item);
            },
            maxDegreeOfParallelism: 10,
            cancellationToken: default);
    }
    catch (Exception)
    {
        continue;
    }

    Console.WriteLine($"No exception. {input.Count} - {output.count}.");
}

The cancellation token does not get canceled, and the default values of breakLoopOnException: false and gracefulBreak: true are used. The first task always throws an exception, therefore the expectation is that remaining tasks would finish, and ParallelForEachAsync would throw an exception.
However, the code above will eventually reach the case where exception is not thrown, and only 9 items are added to the queue (Console.WriteLine() statement above).

In the ParallelForEachAsync implementation the main task that schedules individual iterations, as well as continuations of individual tasks all call OnOperationComplete() of ParallelForEachContext. OnOperationComplete() adds exception to the list of tracked exceptions if it was supplied, releases the semaphore and at the very end calls CompleteLoopNow() if all tasks have completed or cancellation was requested (in this specific case I was not cancelling any tasks, so it was only called when all tasks finish).

public void OnOperationComplete(Exception exceptionIfFailed = null)
{
    // Add exception to the list
    // Release the semaphore

    if ((_semaphore.CurrentCount == _maxDegreeOfParallelism + 1) || (IsLoopBreakRequested && !_gracefulBreak))
        CompleteLoopNow();
}

The problem occurs when the last few tasks release the semaphore at the same time, in which case
_semaphore.CurrentCount == _maxDegreeOfParallelism + 1 condition can be evaluated as true for multiple tasks, so CompleteLoopNow() can be called more than once.

public void CompleteLoopNow()
{
    Console.WriteLine("CompleteLoopNow - Start");
    _cancellationTokenRegistration.Dispose();

    try
    {
        if (_semaphore != null)
            _semaphore.Dispose();
    }
    catch
    {
    }

    var exceptions = ReadExceptions();
    var aggregatedException = exceptions?.Count > 0 ? new ParallelForEachException(exceptions) : null;

    if (_cancellationToken.IsCancellationRequested)
    {
        Console.WriteLine("CompleteLoopNow - OperationCanceledException");
        _ = _completionTcs.TrySetException(
            new OperationCanceledException(
                new OperationCanceledException().Message,
                aggregatedException,
                _cancellationToken));
    }
    else if (exceptions?.Count > 0)
    {
        Console.WriteLine("CompleteLoopNow - TrySetException");
        _ = _completionTcs.TrySetException(aggregatedException);
    }
    else
    {
        Console.WriteLine("CompleteLoopNow - TrySetResult");
        _ = _completionTcs.TrySetResult(null);
    }
}

Which means that multiple tasks can also enter ReadExceptions() concurrently.

public List<Exception> ReadExceptions()
{
    Console.WriteLine("ReadExceptions - Start");
    bool lockTaken = false;
    while (!lockTaken)
        _exceptionListLock.Enter(ref lockTaken);
    try
    {
        Console.WriteLine("ReadExceptions - Returning");
        return _exceptionList;
    }
    finally
    {
        _exceptionList = null;
        _exceptionListLock.Exit(useMemoryBarrier: false);
        Console.WriteLine("ReadExceptions - End");
    }
}

However, in the finally block the exception list is set to null, so the first task calling it will get the full list of exceptions back, and subsequent tasks will get a null. Then in CompleteLoopNow() it is possible that a task with null exception list calls TrySetResult() before the a task with the correct exception list calls TrySetException().

I debugged with the same Console.WriteLine statements as above, and in cases where ParallelForEachAsync() did not throw I saw the following output

CompleteLoopNow - Start              // Task A entering CompleteLoopNow()
ReadExceptions - Start               // Task A entering ReadExceptions()
ReadExceptions - Returning           // Task A returning a full list of exceptions
ReadExceptions - End                 // Task A setting the list of exceptions to null
CompleteLoopNow - Start              // Task B entering CompleteLoopNow()
ReadExceptions - Start               // Task B entering ReadExceptions()
ReadExceptions - Returning           // Task B returning null as the list of exceptions
ReadExceptions - End                 // Task B setting the list of exceptions to null
CompleteLoopNow - TrySetResult       // Task B setting result on the task completion source since it got null from ReadExceptions()
CompleteLoopNow - TrySetException    // Task A setting exception on the task completion source since it got a non-empty list of exceptions from ReadExceptions()

I'm not sure whether ReadExceptions() needs to reset exception list to null. One possible reason is to prevent a race condition for the case where the loop was canceled, in which case continuations of tasks that are still running can keep adding exceptions to the list, but the same list is returned from ReadExceptions() to CompleteLoopNow(). However, in this case it's possible to return a copy of the exception list from ReadExceptions(), i.e.

public List<Exception> ReadExceptions()
{
    bool lockTaken = false;
    while (!lockTaken)
        _exceptionListLock.Enter(ref lockTaken);
    try
    {
        // Return a copy, so the list being returned will not be modified
        // by tasks that are still running if the loop was canceled
        return new List<Exception>(_exceptionList ?? Enumerable.Empty<Exception>());
    }
    finally
    {
        _exceptionListLock.Exit(useMemoryBarrier: false);
    }
}

Another option is to prevent tasks from re-entering CompleteLoopNow().

@danylofitel
Copy link
Contributor Author

Here's a PR with the proposed change to ReadExceptions()
#58

@marvel16
Copy link

@danylofitel hi! Do you know why this could be unaddressed issue? No one answered on your PR, we have faced similar issue in production we couldn't understand why exception was not thrown, but in our case this occurred with 1 task that was throwing an exception. Is there any alternative package from Microsoft?

@danylofitel
Copy link
Contributor Author

@marvel16 it seems like the owner hasn't checked the repo for a while, previously he was super responsive and merged my PR within a day.

While waiting for a fix I'm using a wrapper in which I'm adding a try/catch around the async action and if an exception is thrown. Equivalent to:

        public static async Task ParallelForEachAsync<T>( 
            this IAsyncEnumerable<T> collection,
            Func<T, long, Task> asyncItemAction,
            int maxDegreeOfParallelism,
            bool breakLoopOnException,
            bool gracefulBreak,
            CancellationToken cancellationToken = default)
        {
            ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();

            await Dasync.Collections.ParallelForEachExtensions.ParallelForEachAsync(
                collection,
                async (item, index) =>
                {
                    try
                    {
                        await asyncItemAction(item, index).ConfigureAwait(false);
                    }
                    catch (Exception exception)
                    {
                        exceptions.Enqueue(exception);
                        throw;
                    }
                },
                maxDegreeOfParallelism,
                breakLoopOnException,
                gracefulBreak,
                cancellationToken).ConfigureAwait(false);

            if (!exceptions.IsEmpty)
            {
                if (exceptions.All(exception => exception is OperationCanceledException) && cancellationToken.IsCancellationRequested)
                {
                    throw exceptions.First();
                }
                else
                {
                    throw new AggregateException("ParallelForEachAsync exception", exceptions);
                }
            }
        }

The idea is that once this issue is resolved you won't need any code changes (except perhaps cleaning up namespace usings - potentially remove the namespace in which you keep this wrapper and add the Dasync namespace).

Another option is to use the new Parallel.ForEachAsync. Note that it is available only in .NET 6 which is in preview until November, so if you're able to switch to it - you don't really need this package.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants