勇哥要做到的目标是:
task工厂按添加的顺序依次执行。
下面程序我规定的顺序为:
A...
A...
B...
B...
C...
C...
从源码一来看,每次执行次序都不同。

如果我们把lock那段代码启用,效果如下:
可以看到仅保证了两次输出是连续,而不能保证执行顺序跟task工厂add的顺序一样。

代码还有一个问题是:
myScheduler调度器并没有发挥作用,twork根本执行不到。
以上问题的解决版本见后面。
问题源码1:
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace WindowsFormsApplication1
{
public partial class Form1 : Form
{
myScheduler sd = new myScheduler();
work worker = new work();
CancellationTokenSource cts = new CancellationTokenSource();
public Form1()
{
InitializeComponent();
}
private async void button1_Click(object sender, EventArgs e)
{
var res = await appendTask();
richTextBox1.AppendText("ok "+DateTime.Now.ToString());
}
List<int> list1 = new List<int>() { 0, 1, 2 };
private Task<int> appendTask()
{
return Task.Run<int>(() =>
{
//这里不能用for循环传i,否则会...
//for (var i = 0; i < 3; i++)
//{
foreach (var m in list1)
{
Task.Factory.StartNew(() =>
worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token);
}
//}
while (true)
{
if(cts.IsCancellationRequested)
{
break;
}
if (sd.taskCount() == 0)
{
break;
}
Thread.Sleep(2);
}
return 0;
});
}
private void Form1_Load(object sender, EventArgs e)
{
worker.worked += Worker_worked;
}
private void Worker_worked(object sender, work.workEventArgs e)
{
AppendTextString(richTextBox1, e.outmsg + Environment.NewLine);
}
public static void AppendTextString(Control ctrl, string text, bool AndCRLF = true)
{
if (ctrl == null) return;
if (ctrl.InvokeRequired)
{
Action<Control, string, bool> method = AppendTextString;
ctrl.Invoke(method, new object[] { ctrl, text, AndCRLF });
}
else
{
if (ctrl.Text == "")
{
ctrl.Text = text;
}
else
{
if (AndCRLF)
{
ctrl.Text += text;
}
else
{
ctrl.Text += text;
}
if (ctrl is RichTextBox)
{
var obj = ((RichTextBox)ctrl);
obj.Select(obj.TextLength, 0);
obj.ScrollToCaret();
}
}
}
}
}
public class work
{
public delegate void workEvnetHadle(object sender, workEventArgs e);
public event workEvnetHadle worked;
public class workEventArgs : EventArgs
{
public readonly string outmsg;
public workEventArgs(string msg1)
{
this.outmsg = msg1;
}
}
public virtual void OnWork(workEventArgs e)
{
if(worked!=null)
{
worked(this, e);
}
}
static readonly object obj1 = new object();
string[] strary = new string[3] { "A...","B...","C..."};
public struct workstruct
{
public CancellationToken workcts;
public int workjs;
}
public void workfun(workstruct data)
{
//lock (obj1)
//{
for (int i = 0; i < 2; i++)
{
if(data.workcts.IsCancellationRequested)
{
break;
}
OnWork(new workEventArgs(strary[data.workjs]));
Thread.Sleep(600);
}
//}
}
}
public class myScheduler:TaskScheduler,IDisposable
{
private ManualResetEvent[] mc = new ManualResetEvent[2];
private List<Task> taskList = new List<Task>();
private Thread taskThread = null;
public myScheduler()
{
mc[0] = new ManualResetEvent(false);
mc[1] = new ManualResetEvent(false);
taskThread = new Thread(new ThreadStart(twork));
taskThread.IsBackground = true;
taskThread.Start();
}
public int taskCount()
{
return taskList.Count;
}
private void twork()
{
while(true)
{
if(waitTask())
{
Thread.Sleep(2);
continue;
}
Task task=null;
if(TryDequeue(task))
{
if(TryExecuteTask(task))
{
int k1 = 0;
}
}
}
}
private bool waitTask()
{
if(taskList.Count<1)
{
}
//var s1=WaitHandle.WaitAny(mc);
return false;
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
throw new NotImplementedException();
}
protected override void QueueTask(Task task)
{
if(null!=task)
{
taskList.Add(task);
}
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return taskList.ToArray();
}
protected override bool TryDequeue(Task task)
{
return base.TryDequeue(task);
}
private void Dispose(bool f1)
{
if (!f1) return;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public override int MaximumConcurrencyLevel
{
get
{
return base.MaximumConcurrencyLevel;
}
}
}
}最终源码解决了上面的全部问题。
几个知识点:
1. IfEmptyWait中,如果任务为空,则程序停留在var s1=WaitHandle.WaitAny(mc);
其中WaitHandle.WaitAny(mc)返回值是满足等待的对象的数组索引
在QueueTask中,如果任务不为空,则mc[0].Set(),让上面的信号继续。
2. protected override void QueueTask(Task task),它的task由下面语句传入。
Task.Factory.StartNew(() =>
worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token,TaskCreationOptions.None,sd);
3. BlockingCollection<Task> 自带阻塞功能的线程安全集合类
本例子不能使用List<Task>,原因是它不能take一个元素。
Add 方法用于向集合添加元素。
Take 方法用于从集合中获取元素。当集合为空时,Take 方法将阻塞,直到获取到新元素。
CompleteAdding 方法标记集合为完成状态,此后不能再向集合中添加元素,调用 Add 将抛出 System.InvalidOperationException 异常。
调用 CompleteAdding 方法将使阻塞状态的 Take 方法抛出 System.InvalidOperationException 异常。
实例化 BlockingCollection<T> 时,可以传入 boundedCapacity 参数,设置集合的上限,集合中元素到达上限后,Add 方法将阻塞。
TryAdd 方法在集合满时,不会阻塞,而是直接返回 false,并且丢弃要插入的元素。
TryTake 方法在集合为空时不会阻塞,而是会返回 false。
当有多个线程 Take 时,将形成一个 Take 队列,依次获取到元素。
4 if(!IfEmptyWait())
注意这里是条件是!
private bool IfEmptyWait()
{
if(taskList.Count<1)
{
mc[0].Reset();
}
var s1=WaitHandle.WaitAny(mc);
return s1==0;
}
当程序启动后,mc[0], mc[1]都没有信号,会卡在 var s1=WaitHandle.WaitAny(mc);
当点击“启动”按钮后,mc[1].set, 这时 var s1=1,于是程序继续。直到全部任务完成。
当全部任务完成后,这时候执行mc[0].Reset(),而mc[1]还是set(有信号),因此s1=1
当点击“停止”后,mc[1].reset(无信号),程序又会卡在会卡在 var s1=WaitHandle.WaitAny(mc);
当s1=0的时候,会执行后面的运行task。
5. 自定义任务调度器myScheduler,如何跟Task.Factory发生关联。
注意下面调用的最后一个参数sd
Task.Factory.StartNew(() =>
worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token,TaskCreationOptions.None,sd);
最终源码:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace WindowsFormsApplication1
{
public partial class Form1 : Form
{
myScheduler sd = new myScheduler();
work worker = new work();
CancellationTokenSource cts = new CancellationTokenSource();
public Form1()
{
InitializeComponent();
TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
}
private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
{
throw new NotImplementedException();
}
private async void button1_Click(object sender, EventArgs e)
{
cts = new CancellationTokenSource();
var res = await appendTask();
richTextBox1.AppendText("ok "+DateTime.Now.ToString()+Environment.NewLine);
}
List<int> list1 = new List<int>() { 0, 1, 2 };
private Task<int> appendTask()
{
return Task.Run<int>(() =>
{
//这里不能用for循环传i,否则会...
//for (var i = 0; i < 3; i++)
//{
sd.TaskIsOver = false;
sd.Continue();
foreach (var m in list1)
{
Task.Factory.StartNew(() =>
worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token,TaskCreationOptions.None,sd);
}
//}
while (true)
{
if(cts.IsCancellationRequested)
{
break;
}
if (sd.TaskIsOver)
{
worker.OnWork(new work.workEventArgs("TaskIsOver=true"));
break;
}
Thread.Sleep(2);
}
return 0;
});
}
private void Form1_Load(object sender, EventArgs e)
{
worker.worked += Worker_worked;
}
private void Worker_worked(object sender, work.workEventArgs e)
{
AppendTextString(richTextBox1, e.outmsg + Environment.NewLine);
}
public static void AppendTextString(Control ctrl, string text, bool AndCRLF = true)
{
if (ctrl == null) return;
if (ctrl.InvokeRequired)
{
Action<Control, string, bool> method = AppendTextString;
ctrl.Invoke(method, new object[] { ctrl, text, AndCRLF });
}
else
{
if (ctrl.Text == "")
{
ctrl.Text = text;
}
else
{
if (AndCRLF)
{
ctrl.Text += text;
}
else
{
ctrl.Text += text;
}
if (ctrl is RichTextBox)
{
var obj = ((RichTextBox)ctrl);
obj.Select(obj.TextLength, 0);
obj.ScrollToCaret();
}
}
}
}
private void button2_Click(object sender, EventArgs e)
{
//停止
sd.Pause();
}
private void button3_Click(object sender, EventArgs e)
{
//急停
cts.Cancel();
}
}
public class work
{
public delegate void workEvnetHadle(object sender, workEventArgs e);
public event workEvnetHadle worked;
public class workEventArgs : EventArgs
{
public readonly string outmsg;
public workEventArgs(string msg1)
{
this.outmsg = msg1;
}
}
public virtual void OnWork(workEventArgs e)
{
if(worked!=null)
{
worked(this, e);
}
}
static readonly object obj1 = new object();
string[] strary = new string[3] { "A...","B...","C..."};
public struct workstruct
{
public CancellationToken workcts;
public int workjs;
}
public void workfun(workstruct data)
{
//lock (obj1)
//{
for (int i = 0; i < 2; i++)
{
if(data.workcts.IsCancellationRequested)
{
return;
}
OnWork(new workEventArgs(strary[data.workjs]));
Thread.Sleep(600);
}
//}
}
}
public class myScheduler:TaskScheduler,IDisposable
{
private ManualResetEvent[] mc = new ManualResetEvent[2];
private BlockingCollection<Task> taskList = new BlockingCollection<Task>();
private Thread taskThread = null;
public bool TaskIsOver { get; set; } = false;
public myScheduler()
{
mc[0] = new ManualResetEvent(false);
mc[1] = new ManualResetEvent(false);
taskThread = new Thread(new ThreadStart(twork));
taskThread.IsBackground = true;
taskThread.Start();
}
public int taskCount()
{
return taskList.Count;
}
public void Pause()
{
mc[1].Reset();
}
public void Continue()
{
mc[1].Set();
}
private void twork()
{
while(true)
{
if(!IfEmptyWait())
{
Thread.Sleep(2);
continue;
}
Task task=null;
try
{
if (taskList.TryTake(out task))
{
if (null != task)
{
mc[1].WaitOne();
if (TryExecuteTask(task))
{
task.ContinueWith(t => { var exp = t.Exception; });
task.Wait();
if (taskList.Count < 1)
{
TaskIsOver = true;
}
}
}
}
}
catch(AggregateException ex)
{
ex.Handle(predicate: e =>
{
Console.WriteLine(e.Message);
if (task.IsCanceled)
{
}
if (task.IsFaulted)
{
}
return true;
});
}
}
}
private bool IfEmptyWait()
{
if(taskList.Count<1)
{
mc[0].Reset();
}
var s1=WaitHandle.WaitAny(mc);
return s1==0;
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
protected override void QueueTask(Task task)
{
if(null!=task)
{
taskList.Add(task);
if(taskList.Count>0)
{
mc[0].Set();
}
}
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return taskList.ToArray();
}
protected override bool TryDequeue(Task task)
{
//var res= base.TryDequeue(task);
//return res;
return taskList.TryTake(out task);
}
private void Dispose(bool f1)
{
if (!f1) return;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public override int MaximumConcurrencyLevel
{
get
{
return base.MaximumConcurrencyLevel;
}
}
}
}源码下载:
链接:https://pan.baidu.com/s/1I9FfSiPB1QGlh9oWW0LOuA
提取码:edes
--来自百度网盘超级会员V4的分享
另外再贴段一位朋友讲解的AutoResetEvent和ManualResetEvent区别:
在.Net多线程编程中,AutoResetEvent和ManualResetEvent这两个类经常用到, 他们的用法很类似,但也有区别。
Set方法将信号置为发送状态,Reset方法将信号置为不发送状态,WaitOne等待信号的发送。
可以通过构造函数的参数值来决定其初始状态,若为true则非阻塞状态,为false为阻塞状态。
如果某个线程调用WaitOne方法,则当信号处于发送状态时,该线程会得到信号, 继续向下执行。
其区别就在调用后,AutoResetEvent.WaitOne()每次只允许一个线程进入,当某个线程得到信号后,AutoResetEvent会自动又将信号置为不发送状态,则其他调用WaitOne的线程只有继续等待.也就是说,AutoResetEvent一次只唤醒一个线程;
而ManualResetEvent则可以唤醒多个线程,因为当某个线程调用了ManualResetEvent.Set()方法后,其他调用WaitOne的线程获得信号得以继续执行,而ManualResetEvent不会自动将信号置为不发送。
也就是说,除非手工调用了ManualResetEvent.Reset()方法,则ManualResetEvent将一直保持有信号状态,ManualResetEvent也就可以同时唤醒多个线程继续执行。