《C#并发编程经典实例》学习笔记—2.6 任务完成时的处理

释放双眼,带上耳机,听听看~!

问题

正在 await 一批任务,希望在每个任务完成时对它做一些处理。另外,希望在任务一完成就立即进行处理,而不需要等待其他任务。

问题的重点在于希望任务完成之后立即进行处理,而不去等待其他任务。
这里还沿用文中的例子。
等待几秒钟之后返回等待的秒数,之后立即打印任务等待的秒数。
等待的函数如下

static async Task<int> DelayAndReturnAsync(int val)
{
      await Task.Delay(TimeSpan.FromSeconds(val));
      return val;
}

以下方法执行之后的打印结果是“2”, “3”, “1”。想得到结果“1”, “2”, “3”应该如何实现。

static async Task ProcessTasksAsync()
{
    // 创建任务队列。
    Task<int> taskA = DelayAndReturnAsync(2);
    Task<int> taskB = DelayAndReturnAsync(3);
    Task<int> taskC = DelayAndReturnAsync(1);
    var tasks = new[] { taskA, taskB, taskC };
    // 按顺序 await 每个任务。
    foreach (var task in tasks)
    {
        var result = await task;
        Trace.WriteLine(result);
    }
}

文中给了两种解决方案。一种是抽出更高级的async方法,一种是借助作者的nuget拓展。作者还推荐了另外两个博客文章。
Processing tasks as they complete
ORDERING BY COMPLETION, AHEAD OF TIME
这两篇文章介绍了更多处理方法。

抽象方法,并发执行

static async Task AwaitAndProcessAsync(Task<int> task)
{
    var result = await task;
    Trace.WriteLine(result);
}

将执行和处理抽象出来,借助Task.WhenAll和LINQ并发执行。

var processingTasks = (from t in tasks
select AwaitAndProcessAsync(t)).ToArray();
// 等待全部处理过程的完成。
await Task.WhenAll(processingTasks);

或者

var processingTasks = tasks.Select(async t =>
{
var result = await t;
Trace.WriteLine(result);
}).ToArray();
// 等待全部处理过程的完成。
await Task.WhenAll(processingTasks);

借助nuget拓展:Nito.AsyncEx

推荐预发布版本:https://www.nuget.org/packages/Nito.AsyncEx/5.0.0-pre-06
需要添加引用using Nito.AsyncEx;

static async Task UseOrderByCompletionAsync()
{
      // 创建任务队列。
      Task<int> taskA = DelayAndReturnAsync(2);
      Task<int> taskB = DelayAndReturnAsync(3);
      Task<int> taskC = DelayAndReturnAsync(1);
      var tasks = new[] { taskA, taskB, taskC };
      // 等待每一个任务完成。
      foreach (var task in tasks.OrderByCompletion())
      {
           var result = await task;
           Trace.WriteLine(result);
      }
}

串行执行

使用ConcurrentExclusiveSchedulerPair,使任务串行执行,结果是“2”, “3”, “1”。

var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
foreach (var t in tasks)
{
    await t.ContinueWith(completed =>
    {
             switch (completed.Status)
             {
                   case TaskStatus.RanToCompletion:
                   Trace.WriteLine(completed.Result);
                   //Process(completed.Result);
                   break;
                   case TaskStatus.Faulted:
                   //Handle(completed.Exception.InnerException);
                   break;
               }
     }, scheduler);
}

上篇文章中提到了使用Task.WhenAny处理已完成的任务:https://www.cnblogs.com/AlienXu/p/10609253.html#idx_2
文中的例子从算法层面是不推荐使用的,作者推荐了他自己的拓展Nito.AsyncEx,源码地址:https://github.com/StephenCleary/AsyncEx/blob/master/src/Nito.AsyncEx.Tasks/TaskExtensions.cs
另外两种实现的实现方法差不多,都是借助TaskCompletionSource<T>和Interlocked.Incrementa处理Task。
这里只列出ORDERING BY COMPLETION, AHEAD OF TIME的解决方案。

/// <summary> 
/// 返回一系列任务,这些任务的输入类型相同和返回结果类型一致
/// 返回的任务将以完成顺序返回
/// </summary> 
private static IEnumerable<Task<T>> OrderByCompletion<T>(IEnumerable<Task<T>> inputTasks) 
{ 
    // 复制输入,以下的处理将不需要考虑是否会对输入有影响
    var inputTaskList = inputTasks.ToList();
    var completionSourceList = new List<TaskCompletionSource<T>>(inputTaskList.Count); 
    for (int i = 0; i < inputTaskList.Count; i++) 
    { 
        completionSourceList.Add(new TaskCompletionSource<T>()); 
    }

    // 索引
    // 索引最好是从0开始,但是 Interlocked.Increment 返回的是递增之后的值,所以这里应该赋值-1
    int prevIndex = -1;

    // 可以不用再循环之外处理Action,这样会让代码更清晰。现在有C#7.0的新特性本地方法可以使用
     /* //本地方法
     void continuation(Task<T> completedTask)
     {
          int index = Interlocked.Increment(ref prevIndex);
          var source = completionSourceList[index];
          PropagateResult(completedTask, source);
     }*/ 
   
    Action<Task<T>> continuation = completedTask => 
    { 
        int index = Interlocked.Increment(ref prevIndex); 
        var source = completionSourceList[index]; 
        PropagateResult(completedTask, source); 
    };

    foreach (var inputTask in inputTaskList) 
    {  
        inputTask.ContinueWith(continuation, 
                               CancellationToken.None, 
                               TaskContinuationOptions.ExecuteSynchronously, 
                               TaskScheduler.Default); 
    }

    return completionSourceList.Select(source => source.Task); 
}

/// <summary> 
/// 对 TaskCompletionSource 进行处理
/// </summary> 
private static void PropagateResult<T>(Task<T> completedTask, 
    TaskCompletionSource<T> completionSource) 
{ 
    switch (completedTask.Status) 
    { 
        case TaskStatus.Canceled: 
            completionSource.TrySetCanceled(); 
            break; 
        case TaskStatus.Faulted: 
            completionSource.TrySetException(completedTask.Exception.InnerExceptions); 
            break; 
        case TaskStatus.RanToCompletion: 
            completionSource.TrySetResult(completedTask.Result); 
            break; 
        default: 
            throw new ArgumentException(\"Task was not completed\"); 
    } 
}

给TA打赏
共{{data.count}}人
人已打赏
随笔日记

ThreadLocal介绍以及源码分析

2020-11-9 4:09:08

随笔日记

微服务架构 - 搭建docker本地镜像仓库并提供权限校验及UI界面

2020-11-9 4:09:10

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索