C#4.0的并行库TPL,即Task(三)

C#4.0的并行库TPL,即Task(一)http://www.skcircle.com/?id=1793

C#4.0的并行库TPL,即Task(二) http://www.skcircle.com/?id=1798

C#4.0的并行库TPL,即Task(三) http://www.skcircle.com/?id=1808

C#4.0的并行库TPL,即Task(四)  http://www.skcircle.com/?id=1815

C#4.0的并行库TPL,即Task(五) http://www.skcircle.com/?id=1816



上篇是:C#4.0的并行库TPL,即Task(二)


接下来的都是些零碎知识,大知识前面都提过,这里只是记录一些使用经验性质的例子。



示例七:Task的构造枚举选项TaskCreationOptions



代码:

class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("主线程启动");
            //ThreadPool.QueueUserWorkItem(StartCode,5);
            new Task(StartCode,TaskCreationOptions.LongRunning).Start();
            Console.WriteLine("主线程运行到此!");
            Thread.Sleep(1000);
            Console.ReadKey();
        }

        private static void StartCode(object i)
        {
            Console.WriteLine("开始执行子线程...{0}", i);
            Thread.Sleep(1000);//模拟代码操作
        }

    }


枚举如下:

  //
    // 摘要:
    //     指定可控制任务的创建和执行的可选行为的标志。
    [Flags]
    public enum TaskCreationOptions
    {
        //
        // 摘要:
        //     指定应使用默认行为。
        None,
        //
        // 摘要:
        //     提示 System.Threading.Tasks.TaskScheduler 以一种尽可能公平的方式安排任务,
        // 这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。
        PreferFairness,
        //
        // 摘要:
        //     指定任务将是长时间运行的、粗粒度的操作,涉及比细化的系统更少、更大的组件。
        //它会向 System.Threading.Tasks.TaskScheduler
        //     提示,过度订阅可能是合理的。您可以通过过度订阅创建比可用硬件线程数更多的线程。
        LongRunning,
        //
        // 摘要:
        //     指定将任务附加到任务层次结构中的某个父级。有关详细信息,请参阅 已附加和已分离的子任务。
        AttachedToParent = 4,
        //
        // 摘要:
        //     如果尝试附有子任务到创建的任务,指定 System.InvalidOperationException 将被引发。
        DenyChildAttach = 8,
        //
        // 摘要:
        //     防止环境计划程序被视为已创建任务的当前计划程序。这意味着像 StartNew 或 ContinueWith 
        //创建任务的执行操作将被视为 System.Threading.Tasks.TaskScheduler.Default
        //     当前计划程序。
        HideScheduler = 16
    }


请注意,这些标识都只是一些提议而已,在调度一个Task时,可能会、也可能不会采纳这些提议,不过有一条要注意:AttachedToParent标志,它总会得到Task采纳,因为它和TaskScheduler本身无关。


示例八: 异常与恢复


异常在前面的例子中谈到过,这次来继续讨论这个话题。

在下面的代码中,算法溢出异常在哪里捕获呢?

答案是在t.Result这里。


代码:

class Program
    {
        static void Main(string[] args)
        {
            //1000000000这个数字会抛出System.AggregateException
            Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 1000000000);

            //可以现在开始,也可以以后开始
            t.Start();
            //Wait显式的等待一个线程完成
            t.Wait();
            //其实上面的wait不需要,t.Result就包含了。
            Console.WriteLine("The Sum is:" + t.Result);

        }

        private static Int32 Sum(Int32 i)
        {
            Int32 sum = 0;
            for (; i > 0; i--)
                //checked关键字是打开运算溢出检查,unchecked相反。
                checked { sum += i; }
            return sum;
        }
    }

在一个线程调用Wait方法时,系统会检查线程要等待的Task是否已经开始执行,如果任务正在执行,那么这个Wait方法会使线程阻塞,知道Task运行结束为止。

  就说上面的程序执行,因为累加数字太大,它抛出算术运算溢出错误,在一个计算限制任务抛出一个未处理的异常时,这个异常会被“包含”不并存储到一个集合中,而线程池线程是允许返回到线程池中的,在调用Wait方法或者Result属性时,这个成员会抛出一个System.AggregateException对象。

  现在你会问,为什么要调用Wait或者Result?或者一直不查询Task的Exception属性?你的代码就永远注意不到这个异常的发生,如果不能捕捉到这个异常,垃圾回收时,抛出AggregateException,进程就会立即终止,这就是“牵一发动全身”,莫名其妙程序就自己关掉了,谁也不知道这是什么情况。所以,必须调用前面提到的某个成员,确保代码注意到异常,并从异常中恢复。悄悄告诉你,其实在用Result的时候,内部会调用Wait。

  怎么恢复?

  为了帮助你检测没有注意到的异常,可以向TaskScheduler的静态UnobservedTaskException时间等级一个回调方法,当Task被垃圾回收时,如果出现一个没有被注意到的异常,CLR终结器会引发这个事件。一旦引发,就会向你的时间处理器方法传递一个UnobservedTaskExceptionEvenArgs对象,其中包含了你没有注意的AggregateException。然后再调用UnobservedTasExceptionEvenArgs的SetObserved方法来指出你的异常已经处理好了,从而阻止CLR终止进程。


来看个捕获异常的例子,把上面的例子改造了一下:

 class Program
    {
         static void Main(string[] args)
          {
              CancellationTokenSource cts = new CancellationTokenSource();
              Task<Int32> t = new Task<Int32>(() => Sum(cts.Token, 10000), cts.Token);
  
              //可以现在开始,也可以以后开始 
             t.Start();
 
             //在之后的某个时间,取消CancellationTokenSource 以取消Task
             cts.Cancel();//这是个异步请求,如果你的机器够快,到这里则Task可能已经完成了。
 
             //注释这个为了测试抛出的异常
             //Console.WriteLine("This sum is:" + t.Result);
             try
             {
                 //如果任务已经取消了,Result会抛出AggregateException
                 Console.WriteLine("This sum is:" + t.Result);
             }
             catch (AggregateException x)
             {
                 //将任何OperationCanceledException对象都视为已处理。
                 //其他任何异常都造成抛出一个AggregateException,其中
                 //只包含未处理的异常
                 x.Handle(e => e is OperationCanceledException);
                 Console.WriteLine("Sum was Canceled");
             }
          
         }
 
         private static Int32 Sum(CancellationToken ct, Int32 i)
         {
             Int32 sum = 0;
             for (; i > 0; i--)
             {
                 //在取消标志引用的CancellationTokenSource上如果调用
                 //Cancel,下面这一行就会抛出OperationCanceledException
                 ct.ThrowIfCancellationRequested();
                 checked { sum += i; }
             }
             return sum;
         }
     }

这个例子展示了一个任务在进行的时候中途取消的操作,我觉得它很有趣,你试试也会发现。

  Lamada表达式写这个,是个亮点,将CancellationToken闭包变量“传递”。


  如果不用Lamada表达式,这问题还真不好解决:

  Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token);

  Sum(cts.Token,10000) 内的Token需要和cts.Token关联起来,你还能想出怎么关联起来么?




示例九: 再谈ContinueWith



ContinueWith便是一个更好的方式,一个任务完成时它可以启动另一个任务。上面的例子不会阻塞任何线程。

  当Sum的任务完成时,这个任务会启动另一个任务以显示结果。ContinueWith会返回对新的Task对象的一个引用,所以为了看到结果,我需要调用一下Wait方法,当然你也可以查询下Result,或者继续ContinueWith,返回的这个对象可以忽略,它仅仅是一个变量。

  还要指出的是,Task对象内部包含了ContinueWith任务的一个集合。所以,实际上可以用一个Task对象来多次调用ContinueWith。任务完成时,所有ContinueWith任务都会进入线程池队列中,在构造ContinueWith的时候我们可以看到一个TaskContinuationOptions枚举值,不能忽视,看看它的定义:

image.png


PrefereFairness是尽量公平的意思,就是较早调度的任务可能较早的运行,先来后到,将线程放到全局队列,便可以实现这个效果。

ExecuteSynchronously指同步执行,强制两个任务用同一个线程一前一后运行,然后就同步运行了。



代码:

这段代码做为简单的ContinueWith演示,还没用到TaskContinuationOptions枚举值。

class Program
    {
        static void Main(string[] args)
        {

            Task<Int32> t = new Task<Int32>(i => Sum((Int32)i), 10000);

            //可以现在开始,也可以以后开始

            t.Start();

            Task cwt = t.ContinueWith(task => Console.WriteLine("The sum is:{0}", task.Result));
            cwt.Wait();
            Console.ReadKey();
        }

        private static Int32 Sum(Int32 i)
        {
            Int32 sum = 0;
            for (; i > 0; i--)
            {
                checked { sum += i; }
            }

            return sum;


        }
    }


下面来个使用TaskContinuationOptions枚举值的例子:

其用ContinueWith处理了正常计算完成、抛出异常、用户取消三种情况下的后续动作。

class Program
    {
        static void Main(string[] args)
        {
            Task<Int32> t = new Task<Int32>(i => Sum((Int32)i), 10000);

            t.Start();

            t.ContinueWith(task => Console.WriteLine("The sum is:{0}", task.Result),
                TaskContinuationOptions.OnlyOnRanToCompletion);

            t.ContinueWith(task => Console.WriteLine("Sum throw:" + task.Exception),
                TaskContinuationOptions.OnlyOnFaulted);

            t.ContinueWith(task => Console.WriteLine("Sum was cancel:" + task.IsCanceled),
                TaskContinuationOptions.OnlyOnCanceled);
            try
            {
                t.Wait();  // 测试用
                Console.ReadKey();
            }
            catch (AggregateException)
            {
                Console.WriteLine("出错");
            }


        }

        private static Int32 Sum(Int32 i)
        {
            Int32 sum = 0;
            for (; i > 0; i--)
            {
                checked { sum += i; }
            }

            return sum;
        }
    }


还有一个是TaskCreationOptions.AttachedToParent选项。

askCreationOptions.AttachedToParent不会阻止其他子任务启动,而是阻止父任务本身关闭。

见下面的例子:

 class Program
    {
        static void Main(string[] args)
        {
            Task<Int32[]> parent = new Task<Int32[]>(() => {
                var results = new Int32[3];
                //
                new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start();
                new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start();
                new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start();
                return results;
            });

            var cwt = parent.ContinueWith(parentTask => Array.ForEach(parentTask.Result, Console.WriteLine));

            parent.Start();
            cwt.Wait();
            Console.ReadKey();            
        }

        private static Int32 Sum(Int32 i)
        {
            Int32 sum = 0;
            for (; i > 0; i--)
            {
                checked { sum += i; }
            }
            return sum;
        }
    }


示例十:Task的内容构造



每个Task对象都有一组构成任务状态的字段。


  一个Int32 ID(只读属性)

代表Task执行状态的一个Int32

对父任务的一个引用

对Task创建时置顶TaskSchedule的一个引用

对回调方法的一个引用

对要传给回调方法的对象的一个引用(通过Task只读AsyncState属性查询)

对一个ExceptionContext的引用

对一个ManualResetEventSlim对象的引用

还有没个Task对象都有对根据需要创建的一些补充状态的一个引用,补充状态包含这些:


一个CancellationToken

一个ContinueWithTask对象集合

为抛出未处理异常的子任务,所准备的一个Task对象集合

说了这么多,只想要大家知道:


虽然任务提供了大量功能,但并不是没有代价的。因为必须为所有的这些状态分配内存。

如果不需要任务提供的附加功能,使用ThreadPool.QueueUserWorkItem,资源的使用效率会更高一些。


Task类还实现了IDispose接口,允许你在用完Task对象后调用Dispose,不过大多数不管,让垃圾回收器回收就好。

创建一个Task对象时,代表Task唯一的一个Int32字段初始化为零,TaskID从1开始,每分配一个ID都递增1。顺带说一下,在你调试中查看一个Task对象的时候,会造成调试器显示Task的ID,从而造成为Task分配一个ID。

这个ID的意义在于,每个Task都可以用一个唯一的值来标识。Visual Studio会在它的“并行任务”和并行堆栈“窗口中显示这些任务ID。要知道的是,这是Visual Studio自己分配的ID,不是在自己代码中分配的ID,几乎不可能将Visual Studio分配的ID和代码正在做的事情联系起来。要查看自己正在运行的任务,可以在调试的时候查看Task的静态CurrentId属性,如果没有任务在执行,CurrentId返回null。

再看看TaskStatus的值,这个可以查询Task对象的生存期:


image.png

这些在任务运行的时候都是可以一一查到的,还有~判断要像这样:

if(task.Status==TaskStatus.RantoCompletion)...

为了简化编码,Task只提供几个只读Boolean属性:IsCanceled,IsFaulted,IsCompleted,它们能返回最终状态true/false。

如果Task是通过调用某个函数来创建的,这个Task对象就会出于WaitingForActivation状态,它会自动运行。



示例十一:再谈TaskFactory(任务工厂)


任务工厂的需求源自于下面:

1.需要创建一组Task对象来共享相同的状态

2.为了避免机械的将相同的参数传给每一个Task的构造器。


满足这些条件就可以创建一个任务工厂来封装通用的状态。TaskFactory类型和TaskFactory<TResult>类型,它们都派生System.Object。


代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication17
{
    class Program
    {
        static void Main(string[] args)
        {
            Task parent = new Task(() =>
            {
                var cts = new CancellationTokenSource();
                var tf = new TaskFactory<Int32>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

                //创建并启动3个子任务
                var childTasks = new[] {
                tf.StartNew(() => Sum(cts.Token, 10000)),
                tf.StartNew(() => Sum(cts.Token, 20000)),
                tf.StartNew(() => Sum(cts.Token, Int32.MaxValue))  // 这个会抛异常
                };

                // 任何子任务抛出异常就取消其余子任务
                for (Int32 task = 0; task < childTasks.Length; task++)
                    childTasks[task].ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);

                // 所有子任务完成后,从未出错/未取消的任务获取返回的最大值
                // 然后将最大值传给另一个任务来显示最大结果
                tf.ContinueWhenAll(childTasks,
                   completedTasks => completedTasks.Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result),
                   CancellationToken.None)
                   .ContinueWith(t => Console.WriteLine("The maxinum is: " + t.Result),
                      TaskContinuationOptions.ExecuteSynchronously).Wait(); // Wait用于测试
            });

            // 子任务完成后,也显示任何未处理的异常
            parent.ContinueWith(p =>
            {
                // 用StringBuilder输出所有

                StringBuilder sb = new StringBuilder("The following exception(s) occurred:" + Environment.NewLine);
                foreach (var e in p.Exception.Flatten().InnerExceptions)
                    sb.AppendLine("   " + e.GetType().ToString());
                Console.WriteLine(sb.ToString());
            }, TaskContinuationOptions.OnlyOnFaulted);

            //启动父任务
            parent.Start();

            try
            {
                parent.Wait(); //显示结果
                Console.ReadKey();
            }
            catch (AggregateException ex)
            {
                Console.WriteLine(ex.Message);
                Console.ReadKey();
            }
        }

        private static Int32 Sum(CancellationToken ct, Int32 n)
        {
            Int32 sum = 0;
            for (; n > 0; n--)
            {
                ct.ThrowIfCancellationRequested();
                checked { sum += n; }
            }
            return sum;
        }
    }

}



示例十一:再谈任务调度TaskScheduler


任务基础结构是很灵活的,TaskScheduler对象功不可没。

TaskScheduler对象负责执行调度的任务,同时向Visual Studio调试器公开任务信息,就像一座桥梁,让我们能够掌控自己的任务线程。

TaskScheduler有两个派生类:thread pool task scheduler(线程池任务调度),和synchronization context task scheduler(同步上下文任务调度器)。默认情况下,所以应用程序使用的都是线程池任务调度器,这个任务调度器将任务调度给线程池的工作者线程。可以查询TaskScheduler的静态Default属性来获得对默认任务调度器的一个引用。

同步上下文任务调度器通常用于桌面应用程序,Winfrom,WPF及Silverlight。这个任务调度器将多有任务都调度给应用程序的GUI线程,使所有任务代码都能成功更新UI组建,比如按钮、菜单项等。同步上下文任务调度器根本不使用线程池。同样,可以查询TaskScheduler的静态FromCurrentSynchronizationContext方法来获得对一个同步上下文任务调度器的引用。


就像这样创建类型:

//同步上下文任务调度
 TaskScheduler m_syncContextTaskScheduler =
            TaskScheduler.FromCurrentSynchronizationContext();

任务调度有很多的,下面列举一部分,供参考,更多的请参看http://code.msdn.microsoft.com/ParExtSamples  它包括了大量的示例代码。


image.png



--------------------- 

作者:hackpig

来源:www.skcircle.com

版权声明:本文为博主原创文章,转载请附上博文链接!


本文出自勇哥的网站《少有人走的路》wwww.skcircle.com,转载请注明出处!讨论可扫码加群:

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

会员中心
搜索
«    2024年5月    »
12345
6789101112
13141516171819
20212223242526
2728293031
网站分类
标签列表
最新留言
    热门文章 | 热评文章 | 随机文章
文章归档
友情链接
  • 订阅本站的 RSS 2.0 新闻聚合
  • 扫描加本站机器视觉QQ群,验证答案为:halcon勇哥的机器视觉
  • 点击查阅微信群二维码
  • 扫描加勇哥的非标自动化群,验证答案:C#/C++/VB勇哥的非标自动化群
  • 扫描加站长微信:站长微信:abc496103864
  • 扫描加站长QQ:
  • 扫描赞赏本站:
  • 留言板:

Powered By Z-BlogPHP 1.7.2

Copyright Your skcircle.com Rights Reserved.

鄂ICP备18008319号


站长QQ:496103864 微信:abc496103864