C#4.0的并行库TPL,即Task(一)

C#4.0的并行库TPL,即Task(一)http://www.skcircle.com/?id=1793

C#4.0的并行库TPL,即Task(二) http://www.skcircle.com/?id=1798

C#4.0的并行库TPL,即Task(三) http://www.skcircle.com/?id=1808

C#4.0的并行库TPL,即Task(四)  http://www.skcircle.com/?id=1815

C#4.0的并行库TPL,即Task(五) http://www.skcircle.com/?id=1816






.net 4.0引入的TPL,即任务并行库。

它是在线程池之上的又一个抽象层。

TPL的核心概念是任务。一个任务代表了一个异步操作,该操作可以通过多种方式运行,可以使用或者不使用独立线程运行。

一个任务可以通过多种方式和其它任务组合起来。例如,可以同时启动多个任务,等待所有任务完成,然后运行一个任务对之前的所有任务的结果进行一些计算。
TPL与之前的模式相比,其中一个关键的优势是其具有用于组合任务的使得的API。

TPL处理异常有多种方式。由于一个任务可能会由多个其它任务组成,这些任务也可能依次拥有各自的子任务,所以有一个AggregateException的概念。这种异常可以捕获底层任务内部的所有的异常,并允许单独处理这些异常。

另外,在.net 5.0里面,新的异步编程交键字await和async里面已经包含有TPL的支持了。


例子一:创建任务


new Task()后必须要Start。

如果使用.Run()或者是.StartNew就不用显示的Start。

标记了TaskCreationOptions.LongRunning,则表示任务不使用线程池,而是使用Thread来运行。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Chapter4.Recipe1
{
	class Program
	{
		static void Main(string[] args)
		{
			var t1 = new Task(() => TaskMethod("Task 1"));
			var t2 = new Task(() => TaskMethod("Task 2"));
			t2.Start();
			t1.Start();
			Task.Run(() => TaskMethod("Task 3"));
			Task.Factory.StartNew(() => TaskMethod("Task 4"));
			Task.Factory.StartNew(() => TaskMethod("Task 5"), TaskCreationOptions.LongRunning);
			Thread.Sleep(TimeSpan.FromSeconds(1));
            Console.ReadKey();

		}

		static void TaskMethod(string name)
		{
			Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
				name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
		}
	}
}

image.png




示例二:任务的基本操作


首先直接运行了TaskMethod,它于是被同步运行,并且不属于线程池。

Task 1,使用Start并等待结果,该任务被放置于线程池,并且由于那句 int result = task.Result

所以主线程会等待,直到任务返回前都一直是阻塞状态。

Task 2,跟直接运行TaskMethod是一样,这个用法可以让该任务运行在主线程中,避免使用线程池来运行非常短暂的操作。

Task 3跟Task 1的区别是,它没有阻塞主线程,它循环打印出任务的状态。状态分别为:

Created、Running和RanToCompletion。



using System;
using System.Threading;
using System.Threading.Tasks;

namespace Chapter4.Recipe2
{
	class Program
	{
		static void Main(string[] args)
		{
			TaskMethod("Main Thread Task");
			Task<int> task = CreateTask("Task 1");
			task.Start();
			int result = task.Result;
			Console.WriteLine("Result is: {0}", result);

			task = CreateTask("Task 2");
			task.RunSynchronously();
			result = task.Result;
			Console.WriteLine("Result is: {0}", result);

			task = CreateTask("Task 3");
			Console.WriteLine(task.Status);
			task.Start();

			while (!task.IsCompleted)
			{
				Console.WriteLine(task.Status);
				Thread.Sleep(TimeSpan.FromSeconds(0.5));
			} 
			
			Console.WriteLine(task.Status);
			result = task.Result;
			Console.WriteLine("Result is: {0}", result);
            Console.ReadKey();
		}

		static Task<int> CreateTask(string name)
		{
			return new Task<int>(() => TaskMethod(name));
		}

		static int TaskMethod(string name)
		{
			Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
				name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
			Thread.Sleep(TimeSpan.FromSeconds(2));
			return 42;
		}
	}
}


image.png


示例三:组合任务(相互依赖的任务)


程序启动后,创建两个任务 firstTask,secondTask

在(1)处,为任务firstTask创建了一个后续任务,这个后续任务会在当前任务完成后运行。

然后启动两个任务后,等4秒,这个时间足够两个任务完成。

然后在(2)处,secondTask任务执行了一个后续操作,并通过指定两个选项:TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously来尝试同步执行该后续操作。即如果后续操作比较短暂,这种方式就很有用,

因为放置在主线程中运行比放置在线程池中运行要快。

可以实现这一点是因为第二个任务恰好在那刻完成。如果你把4秒的sleep方法注释掉,将会看到

该代码被放置线程池中去,这是因为还未从之前的任务中得到结果。


在(3)处我们为之前的后续任务又定义了一个后续任务,但这里使用了一个GetAwaiter和OnCompleted方法,这些方法是C#5.0的异步机制。


在(4)处演示了父子线程,我们创建了一个新任务,当运行该任务时,通过提供一个TaskCreationOptions.AttachedToParent

选项来运行一个子任务。

注意:子任务必须在父任务运动的时候创建,并正确的附加给父任务!

只得分所得分子任务结束工作,父任务才会完成。通过一个TaskContinuationOptions选项还可以给子任务加上后续操作,这时候,只得分后续操作结束后,父任务才能完成。


using System;
using System.Threading;
using System.Threading.Tasks;

namespace Chapter4.Recipe3
{
	class Program
	{
		static void Main(string[] args)
		{
			var firstTask = new Task<int>(() => TaskMethod("First Task", 3));
			var secondTask = new Task<int>(() => TaskMethod("Second Task", 2));
                        //(1)
			firstTask.ContinueWith(
				t => Console.WriteLine("The first answer is {0}. Thread id {1}, is thread pool thread: {2}",
					t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread),
				TaskContinuationOptions.OnlyOnRanToCompletion);

			firstTask.Start();
			secondTask.Start();

			Thread.Sleep(TimeSpan.FromSeconds(4));
                        //(2)
			Task continuation = secondTask.ContinueWith(
				t => Console.WriteLine("The second answer is {0}. Thread id {1}, is thread pool thread: {2}",
					t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread),
				TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
                        //(3)
			continuation.GetAwaiter().OnCompleted(
				() => Console.WriteLine("Continuation Task Completed! Thread id {0}, is thread pool thread: {1}",
					Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread));

			Thread.Sleep(TimeSpan.FromSeconds(2));
			Console.WriteLine();
                        
                        //(4)
			firstTask = new Task<int>(() =>
			{
				var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", 5), TaskCreationOptions.AttachedToParent);
				innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent);
				return TaskMethod("First Task", 2);
			});

			firstTask.Start();

			while (!firstTask.IsCompleted)
			{
				Console.WriteLine(firstTask.Status);
				Thread.Sleep(TimeSpan.FromSeconds(0.5));
			}
			Console.WriteLine(firstTask.Status);

			Thread.Sleep(TimeSpan.FromSeconds(10));
            Console.ReadKey();
		}

		static int TaskMethod(string name, int seconds)
		{
			Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
				name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
			Thread.Sleep(TimeSpan.FromSeconds(seconds));
			return 42 * seconds;
		}
	}
}


image.png


示例四:取消选项


仔细看longTask的创建代码,我们将给底层传递一次取消标志,然后给任务的构造函数再传递一次。

为什么需要提供取消标志两次呢?

答案是如果在任务实际启动前取消它,该任务的TPL基础设施有责任处理该取消操作,

因为这些代码根本不会执行。通过得到的第一个任务状态可以知道它被取消。如果尝试对该

任务调用Start方法,将会得到InvalidOperationException异常。

然后你需要自己来写代码处理取消过程。注意在TaskMethod的代码中你可以看到我们的处理代码。

在你取消任务后,任务的状态仍然是RanToCompletion,因为从TPL的角度来看,该任务正常完成了它的

工作。辨别这两种情况是非常重要的,并且需要理解每种情况下职责的不同。


using System;
using System.Threading;
using System.Threading.Tasks;

namespace Chapter4.Recipe6
{
	class Program
	{
		private static void Main(string[] args)
		{
			var cts = new CancellationTokenSource();
			var longTask = new Task<int>(() => TaskMethod("Task 1", 10, cts.Token), cts.Token);
			Console.WriteLine(longTask.Status);
			cts.Cancel();
			Console.WriteLine(longTask.Status);
			Console.WriteLine("First task has been cancelled before execution");
			cts = new CancellationTokenSource();
			longTask = new Task<int>(() => TaskMethod("Task 2", 10, cts.Token), cts.Token);
			longTask.Start();
			for (int i = 0; i < 5; i++ )
			{
				Thread.Sleep(TimeSpan.FromSeconds(0.5));
				Console.WriteLine(longTask.Status);
			}
			cts.Cancel();
			for (int i = 0; i < 5; i++)
			{
				Thread.Sleep(TimeSpan.FromSeconds(0.5));
				Console.WriteLine(longTask.Status);
			}

			Console.WriteLine("A task has been completed with result {0}.", longTask.Result);
            Console.ReadKey();
		}

		private static int TaskMethod(string name, int seconds, CancellationToken token)
		{
			Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
				name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
			for (int i = 0; i < seconds; i ++)
			{
				Thread.Sleep(TimeSpan.FromSeconds(1));
				if (token.IsCancellationRequested) return -1;
			}
			return 42*seconds;
		}
	}
}


image.png


示例五:处理任务中的异常


第一个例子只是使用catch捕获异常,它是被称为AggregateException的异常。可以通过访问InnerException属性来得到底层异常。

第二个例子使用GetAwaiter和GetResult来访问任务结果。这种情况下无需封装异常,因为TPL的基础设施会提取异常。

最后一个例子展示了两个任务抛出异常的情形。

只有之前的任务完成前得分异常时,该后续操作才会操作。通过给后续操作传递TaskContinuationOptions.OnlyOnFaulted选项时可以实现该行为。


由于任务可以各种形式进行连接在一起,因为AggregateException异常可能包含其它普通异常的集合。这么多异常怎么提取呢?

这时候可以使用根异常的Flatten方法。它将返回一个集合。该集合包含以层级结构中每个子聚合异常中的内部异常。




using System;
using System.Threading;
using System.Threading.Tasks;

namespace Chapter4.Recipe7
{
	class Program
	{
		static void Main(string[] args)
		{
			Task<int> task;
			try
			{
				task = Task.Run(() => TaskMethod("Task 1", 2));
				int result = task.Result;
				Console.WriteLine("Result: {0}", result);
			}
			catch (Exception ex)
			{
				Console.WriteLine("Exception caught: {0}", ex);
			}
			Console.WriteLine("----------------------------------------------");
			Console.WriteLine();

			try
			{
				task = Task.Run(() => TaskMethod("Task 2", 2));
				int result = task.GetAwaiter().GetResult();
				Console.WriteLine("Result: {0}", result);
			}
			catch (Exception ex)
			{
				Console.WriteLine("Exception caught: {0}", ex);
			}
			Console.WriteLine("----------------------------------------------");
			Console.WriteLine();

			var t1 = new Task<int>(() => TaskMethod("Task 3", 3));
			var t2 = new Task<int>(() => TaskMethod("Task 4", 2));
			var complexTask = Task.WhenAll(t1, t2);
			var exceptionHandler = complexTask.ContinueWith(t => 
					Console.WriteLine("Exception caught: {0}", t.Exception), 
					TaskContinuationOptions.OnlyOnFaulted
				);
			t1.Start();
			t2.Start();

			Thread.Sleep(TimeSpan.FromSeconds(5));
		}

		static int TaskMethod(string name, int seconds)
		{
			Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}",
				name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
			Thread.Sleep(TimeSpan.FromSeconds(seconds));
			throw new Exception("Boom!");
			return 42 * seconds;
		}
	}
}


image.png

image.png

image.png




本文出自勇哥的网站《少有人走的路》wwww.skcircle.com,转载请注明出处!讨论可扫码加群:
  • 评论列表:
  •  qawwss56
     发布于 2021-06-27 16:07:46  回复该评论
  • 请教一下,Task怎么做到工业上的启动,停止,复位
    •  勇哥,很想停止
       发布于 2021-06-28 10:17:50  回复该评论
    • 设备的启动、停止、复位。并不是task自身的功能做到的。
      因为设备有轴运动,你得编写信号机制,让停止信号发出时,能同时让轴停止和让运动调度逻辑也停止,暂停继续等亦是如此。
      不要幻想线程有什么Pause,Continue、stop机制,能直接把设备暂停、继续。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

会员中心
搜索
«    2024年4月    »
1234567
891011121314
15161718192021
22232425262728
2930
网站分类
标签列表
最新留言
    热门文章 | 热评文章 | 随机文章
文章归档
友情链接
  • 订阅本站的 RSS 2.0 新闻聚合
  • 扫描加本站机器视觉QQ群,验证答案为:halcon勇哥的机器视觉
  • 点击查阅微信群二维码
  • 扫描加勇哥的非标自动化群,验证答案:C#/C++/VB勇哥的非标自动化群
  • 扫描加站长微信:站长微信:abc496103864
  • 扫描加站长QQ:
  • 扫描赞赏本站:
  • 留言板:

Powered By Z-BlogPHP 1.7.2

Copyright Your skcircle.com Rights Reserved.

鄂ICP备18008319号


站长QQ:496103864 微信:abc496103864