C# 实现顺序执行的自定义TaskScheduler

勇哥要做到的目标是:

task工厂按添加的顺序依次执行。

下面程序我规定的顺序为:  

A...
A...

B...

B...

C...

C...



从源码一来看,每次执行次序都不同。

image.png


如果我们把lock那段代码启用,效果如下:

可以看到仅保证了两次输出是连续,而不能保证执行顺序跟task工厂add的顺序一样。

image.png


代码还有一个问题是:

myScheduler调度器并没有发挥作用,twork根本执行不到。

以上问题的解决版本见后面。



问题源码1:


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace WindowsFormsApplication1
{
    public partial class Form1 : Form
    {
        myScheduler sd = new myScheduler();
        work worker = new work();
        CancellationTokenSource cts = new CancellationTokenSource();
        public Form1()
        {
            InitializeComponent();
        }

        private async void button1_Click(object sender, EventArgs e)
        {
            var res = await appendTask();
            richTextBox1.AppendText("ok "+DateTime.Now.ToString());
        }

        List<int> list1 = new List<int>() { 0, 1, 2 };
      
        private Task<int> appendTask()
        {
            return Task.Run<int>(() =>
            {
                //这里不能用for循环传i,否则会...
                //for (var i = 0; i < 3; i++)
                //{
                foreach (var m in list1)
                {
                    Task.Factory.StartNew(() =>
                         worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token);
                }
                //}
                while (true)
                {
                     if(cts.IsCancellationRequested)
                    {
                        break;
                    }
                    if (sd.taskCount() == 0)
                    {
                        break;
                    }
                    Thread.Sleep(2);
                }
                return 0;
            });

        }

        private void Form1_Load(object sender, EventArgs e)
        {
            worker.worked += Worker_worked;
        }

        private void Worker_worked(object sender, work.workEventArgs e)
        {
            AppendTextString(richTextBox1, e.outmsg + Environment.NewLine);
        }

        public static void AppendTextString(Control ctrl, string text, bool AndCRLF = true)
        {
            if (ctrl == null) return;
            if (ctrl.InvokeRequired)
            {
                Action<Control, string, bool> method = AppendTextString;
                ctrl.Invoke(method, new object[] { ctrl, text, AndCRLF });
            }
            else
            {
                if (ctrl.Text == "")
                {
                    ctrl.Text = text;
                }
                else
                {

                    if (AndCRLF)
                    {
                        ctrl.Text +=  text;
                    }
                    else
                    {
                        ctrl.Text += text;

                    }
                    if (ctrl is RichTextBox)
                    {
                        var obj = ((RichTextBox)ctrl);
                        obj.Select(obj.TextLength, 0);
                        obj.ScrollToCaret();
                    }
                }
            }
        }
    }

    public class work
    {
       
        public delegate void workEvnetHadle(object sender, workEventArgs e);
        public event workEvnetHadle worked;
        public class workEventArgs : EventArgs
        {
            public readonly string outmsg;
            public workEventArgs(string msg1)
            {
                this.outmsg = msg1;
            }
        }

        public virtual void OnWork(workEventArgs e)
        {
            if(worked!=null)
            {
                worked(this, e);
            }
        }
        static readonly object obj1 = new object();
        string[] strary = new string[3] { "A...","B...","C..."};
       
        public struct workstruct
        {
            public CancellationToken workcts;
            public int workjs;
        }

        public void workfun(workstruct data)
        {
            //lock (obj1)
            //{
                for (int i = 0; i < 2; i++)
                {
                    if(data.workcts.IsCancellationRequested)
                    {
                        break;
                    }
                    OnWork(new workEventArgs(strary[data.workjs]));
                    Thread.Sleep(600);
                }
            //}
        }


    }


    public class myScheduler:TaskScheduler,IDisposable
    {

        private ManualResetEvent[] mc = new ManualResetEvent[2];
        private List<Task> taskList = new List<Task>();
        private Thread taskThread = null;

        public myScheduler()
        {
            mc[0] = new ManualResetEvent(false);
            mc[1] = new ManualResetEvent(false);
            taskThread = new Thread(new ThreadStart(twork));
            taskThread.IsBackground = true;
            taskThread.Start();
        }



        public int taskCount()
        {
            return taskList.Count;
        }

        private void twork()
        {
            while(true)
            {
                if(waitTask())
                {
                    Thread.Sleep(2);
                    continue;
                }

                Task task=null;
                if(TryDequeue(task))
                {
                    if(TryExecuteTask(task))
                    {
                        int k1 = 0;       
                    }
                }



            }
        }

        private bool waitTask()
        {
            if(taskList.Count<1)
            {

            }
            //var s1=WaitHandle.WaitAny(mc);
            return false;
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            throw new NotImplementedException();
        }

        protected override void QueueTask(Task task)
        {
            if(null!=task)
            {
                taskList.Add(task);
            }
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return taskList.ToArray();
        }

        protected override bool TryDequeue(Task task)
        {
            return base.TryDequeue(task);
        }

        private void Dispose(bool f1)
        {
            if (!f1) return;
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        public override int MaximumConcurrencyLevel
        {
            get
            {
                return base.MaximumConcurrencyLevel;
            }
        }


    }
}



最终源码解决了上面的全部问题。

几个知识点:

1. IfEmptyWait中,如果任务为空,则程序停留在var s1=WaitHandle.WaitAny(mc);

    其中WaitHandle.WaitAny(mc)返回值是满足等待的对象的数组索引

    在QueueTask中,如果任务不为空,则mc[0].Set(),让上面的信号继续。


2. protected override void QueueTask(Task task),它的task由下面语句传入。

   Task.Factory.StartNew(() =>

                         worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }),     cts.Token,TaskCreationOptions.None,sd);


3. BlockingCollection<Task> 自带阻塞功能的线程安全集合类

   本例子不能使用List<Task>,原因是它不能take一个元素。

   Add 方法用于向集合添加元素。

  Take 方法用于从集合中获取元素。当集合为空时,Take 方法将阻塞,直到获取到新元素。

  CompleteAdding 方法标记集合为完成状态,此后不能再向集合中添加元素,调用 Add 将抛出 System.InvalidOperationException 异常。

  调用 CompleteAdding 方法将使阻塞状态的 Take 方法抛出 System.InvalidOperationException 异常。

  实例化 BlockingCollection<T> 时,可以传入 boundedCapacity 参数,设置集合的上限,集合中元素到达上限后,Add 方法将阻塞。

  TryAdd 方法在集合满时,不会阻塞,而是直接返回 false,并且丢弃要插入的元素。

  TryTake 方法在集合为空时不会阻塞,而是会返回 false。

  当有多个线程 Take 时,将形成一个 Take 队列,依次获取到元素。


4  if(!IfEmptyWait())

   注意这里是条件是!

     private bool IfEmptyWait()

        {

            if(taskList.Count<1)

            {

                mc[0].Reset();

            }

            var s1=WaitHandle.WaitAny(mc);

            return s1==0;

        }

   当程序启动后,mc[0], mc[1]都没有信号,会卡在 var s1=WaitHandle.WaitAny(mc);

   当点击“启动”按钮后,mc[1].set, 这时 var s1=1,于是程序继续。直到全部任务完成。

   当全部任务完成后,这时候执行mc[0].Reset(),而mc[1]还是set(有信号),因此s1=1

   当点击“停止”后,mc[1].reset(无信号),程序又会卡在会卡在 var s1=WaitHandle.WaitAny(mc);

   当s1=0的时候,会执行后面的运行task。


5. 自定义任务调度器myScheduler,如何跟Task.Factory发生关联。

    注意下面调用的最后一个参数sd


   Task.Factory.StartNew(() =>

                         worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token,TaskCreationOptions.None,sd);


最终源码:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace WindowsFormsApplication1
{
    public partial class Form1 : Form
    {
        myScheduler sd = new myScheduler();
        work worker = new work();
        CancellationTokenSource cts = new CancellationTokenSource();
        public Form1()
        {
            InitializeComponent();
            TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
        }

        private void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
        {
            throw new NotImplementedException();
        }

        private async void button1_Click(object sender, EventArgs e)
        {
            cts = new CancellationTokenSource();
            var res = await appendTask();
            richTextBox1.AppendText("ok "+DateTime.Now.ToString()+Environment.NewLine);
        }

        List<int> list1 = new List<int>() { 0, 1, 2 };
      
        private Task<int> appendTask()
        {
            return Task.Run<int>(() =>
            {
                //这里不能用for循环传i,否则会...
                //for (var i = 0; i < 3; i++)
                //{
                sd.TaskIsOver = false;
                sd.Continue();
                foreach (var m in list1)
                {
                    Task.Factory.StartNew(() =>
                         worker.workfun(new work.workstruct() { workcts = cts.Token, workjs = m }), cts.Token,TaskCreationOptions.None,sd);
                }
                //}
                while (true)
                {
                     if(cts.IsCancellationRequested)
                    {
                        break;
                    }
                    if (sd.TaskIsOver)
                    {
                        worker.OnWork(new work.workEventArgs("TaskIsOver=true"));
                        break;
                    }
                    Thread.Sleep(2);
                }
                return 0;
            });

        }

        private void Form1_Load(object sender, EventArgs e)
        {
            worker.worked += Worker_worked;
        }

        private void Worker_worked(object sender, work.workEventArgs e)
        {
            AppendTextString(richTextBox1, e.outmsg + Environment.NewLine);
        }

        public static void AppendTextString(Control ctrl, string text, bool AndCRLF = true)
        {
            if (ctrl == null) return;
            if (ctrl.InvokeRequired)
            {
                Action<Control, string, bool> method = AppendTextString;
                ctrl.Invoke(method, new object[] { ctrl, text, AndCRLF });
            }
            else
            {
                if (ctrl.Text == "")
                {
                    ctrl.Text = text;
                }
                else
                {

                    if (AndCRLF)
                    {
                        ctrl.Text +=  text;
                    }
                    else
                    {
                        ctrl.Text += text;

                    }
                    if (ctrl is RichTextBox)
                    {
                        var obj = ((RichTextBox)ctrl);
                        obj.Select(obj.TextLength, 0);
                        obj.ScrollToCaret();
                    }
                }
            }
        }

        private void button2_Click(object sender, EventArgs e)
        {
            //停止
            sd.Pause();
        }

        private void button3_Click(object sender, EventArgs e)
        {
            //急停
            cts.Cancel();
        }
    }

    public class work
    {
       
        public delegate void workEvnetHadle(object sender, workEventArgs e);
        public event workEvnetHadle worked;
        public class workEventArgs : EventArgs
        {
            public readonly string outmsg;
            public workEventArgs(string msg1)
            {
                this.outmsg = msg1;
            }
        }

        public virtual void OnWork(workEventArgs e)
        {
            if(worked!=null)
            {
                worked(this, e);
            }
        }
        static readonly object obj1 = new object();
        string[] strary = new string[3] { "A...","B...","C..."};
       
        public struct workstruct
        {
            public CancellationToken workcts;
            public int workjs;
        }

        public void workfun(workstruct data)
        {
            //lock (obj1)
            //{
                for (int i = 0; i < 2; i++)
                {
                    if(data.workcts.IsCancellationRequested)
                    {
                        return;
                    }
                    OnWork(new workEventArgs(strary[data.workjs]));
                    Thread.Sleep(600);
                }
            //}
        }


    }


    public class myScheduler:TaskScheduler,IDisposable
    {

        private ManualResetEvent[] mc = new ManualResetEvent[2];
        private BlockingCollection<Task> taskList = new BlockingCollection<Task>();
        private Thread taskThread = null;
        public bool TaskIsOver { get; set; } = false;
 
        public myScheduler()
        {
            mc[0] = new ManualResetEvent(false);
            mc[1] = new ManualResetEvent(false);
            taskThread = new Thread(new ThreadStart(twork));
            taskThread.IsBackground = true;
            taskThread.Start();
        }



        public int taskCount()
        {
            return taskList.Count;
        }

        public void Pause()
        {
            mc[1].Reset();
        }

        public void Continue()
        {
            mc[1].Set();
        }

        private void twork()
        {
            while(true)
            {
                if(!IfEmptyWait())
                {
                    Thread.Sleep(2);
                    continue;
                }

                Task task=null;
                try
                {
                    if (taskList.TryTake(out task))
                    {
                        if (null != task)
                        {
                            mc[1].WaitOne();
                            if (TryExecuteTask(task))
                            {
                                task.ContinueWith(t => { var exp = t.Exception; });
                                task.Wait();
                                if (taskList.Count < 1)
                                {
                                    TaskIsOver = true;
                                }
                            }
                        }
                    }
                }
                catch(AggregateException ex)
                {

                    ex.Handle(predicate: e =>
                    {
                        Console.WriteLine(e.Message);
                        if (task.IsCanceled)
                        {

                        }
                        if (task.IsFaulted)
                        {

                        }
                        return true;
                    });
                
            }


            }
        }

        private bool IfEmptyWait()
        {
            if(taskList.Count<1)
            {
                mc[0].Reset();
            }
            var s1=WaitHandle.WaitAny(mc);
            return s1==0;
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }

        protected override void QueueTask(Task task)
        {
            if(null!=task)
            {
                taskList.Add(task);
                if(taskList.Count>0)
                {
                    mc[0].Set();
                }
            }
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return taskList.ToArray();
        }

        protected override bool TryDequeue(Task task)
        {
            //var res= base.TryDequeue(task);
            //return res;
            return taskList.TryTake(out task);
        }

        private void Dispose(bool f1)
        {
            if (!f1) return;
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        public override int MaximumConcurrencyLevel
        {
            get
            {
                return base.MaximumConcurrencyLevel;
            }
        }


    }
}


源码下载:

链接:https://pan.baidu.com/s/1I9FfSiPB1QGlh9oWW0LOuA

提取码:edes 

--来自百度网盘超级会员V4的分享



另外再贴段一位朋友讲解的AutoResetEvent和ManualResetEvent区别:


在.Net多线程编程中,AutoResetEvent和ManualResetEvent这两个类经常用到, 他们的用法很类似,但也有区别。
Set方法将信号置为发送状态,Reset方法将信号置为不发送状态,WaitOne等待信号的发送。
可以通过构造函数的参数值来决定其初始状态,若为true则非阻塞状态,为false为阻塞状态。
如果某个线程调用WaitOne方法,则当信号处于发送状态时,该线程会得到信号, 继续向下执行。

其区别就在调用后,AutoResetEvent.WaitOne()每次只允许一个线程进入,当某个线程得到信号后,AutoResetEvent会自动又将信号置为不发送状态,则其他调用WaitOne的线程只有继续等待.也就是说,AutoResetEvent一次只唤醒一个线程;
而ManualResetEvent则可以唤醒多个线程,因为当某个线程调用了ManualResetEvent.Set()方法后,其他调用WaitOne的线程获得信号得以继续执行,而ManualResetEvent不会自动将信号置为不发送。
也就是说,除非手工调用了ManualResetEvent.Reset()方法,则ManualResetEvent将一直保持有信号状态,ManualResetEvent也就可以同时唤醒多个线程继续执行。




本文出自勇哥的网站《少有人走的路》wwww.skcircle.com,转载请注明出处!讨论可扫码加群:

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

会员中心
搜索
«    2024年4月    »
1234567
891011121314
15161718192021
22232425262728
2930
网站分类
标签列表
最新留言
    热门文章 | 热评文章 | 随机文章
文章归档
友情链接
  • 订阅本站的 RSS 2.0 新闻聚合
  • 扫描加本站机器视觉QQ群,验证答案为:halcon勇哥的机器视觉
  • 点击查阅微信群二维码
  • 扫描加勇哥的非标自动化群,验证答案:C#/C++/VB勇哥的非标自动化群
  • 扫描加站长微信:站长微信:abc496103864
  • 扫描加站长QQ:
  • 扫描赞赏本站:
  • 留言板:

Powered By Z-BlogPHP 1.7.2

Copyright Your skcircle.com Rights Reserved.

鄂ICP备18008319号


站长QQ:496103864 微信:abc496103864