C#线程篇---Task(任务)和线程池不得不说的秘密(5)

C#线程篇---Task(任务)和线程池不得不说的秘密(5)
在上篇最后⼀个例⼦之后,我们发现了怎么去使⽤线程池,调⽤ThreadPool的QueueUserWorkItem⽅法来发起⼀次异步的、计算限制的操作,例⼦很简单,不是吗?
  然⽽,在今天这篇博客中,我们要知道的是,QueueUserWorkItem这个技术存在许多限制。其中最⼤的问题是没有⼀个内建的机制让你知道操作在什么时候完成,也没有⼀个机制在操作完成是获得⼀个返回值,这些问题使得我们都不敢启⽤这个技术。
  Microsoft为了克服这些限制(同时解决其他⼀些问题),引⼊了任务(tasks)的概念。顺带说⼀下我们得通过System.Threading.Tasks命名空间来使⽤它们。
  现在我要说的是,⽤线程池不是调⽤ThreadPool的QueueUserWorkItem⽅法,⽽是⽤任务来做相同的事:
1        static void Main(string[] args)
2        {
3            Console.WriteLine("主线程启动");
4            //ThreadPool.QueueUserWorkItem(StartCode,5);
5            new Task(StartCode, 5).Start();
6            Console.WriteLine("主线程运⾏到此!");
7            Thread.Sleep(1000);
8        }
9
10        private static void StartCode(object i)
11        {
12            Console.WriteLine("开始执⾏⼦线程...{0}",i);
13            Thread.Sleep(1000);//模拟代码操作
14        }
15    }
嘿,你会发现结果是⼀样的。
再来看看这个是什么:
TaskCreationOptions这个类型是⼀个枚举类型,传递⼀些标志来控制Task的执⾏⽅式。TaskCreationOptions定义如下:
慢点,注释很详细,看看这些有好处,TaskScheduler(任务调度器)不懂没关系,请继续往下看,我会介绍的,但请注意,这些标识都只是⼀些提议⽽已,在调度⼀个Task时,可能会、也可能不会采纳这些提议,不过有⼀条要注意:AttachedToParent标志,它总会得到Task采纳,因为它和TaskScheduler本⾝⽆关。
  来看下这段代码:
氢氧化钙生产
1        static void Main(string[] args)
2        {
3
4            //1000000000这个数字会抛出System.AggregateException
5
6            Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 1000000000);
7
8            //可以现在开始,也可以以后开始
9
10            t.Start();
11
12            //Wait显式的等待⼀个线程完成
13
14            t.Wait();
15
按摩坐垫16            Console.WriteLine("The Sum is:"+t.Result);
17        }
18
19        private static Int32 Sum(Int32 i)
20        {
21            Int32 sum = 0;
22            for (; i > 0; i--)
23                checked { sum += i; }
24            return sum;
25        }
26    }
  这段代码⼤家应该猜得出是什么意思吧,⼈⼈都会写。
  但是,我的结果为什么是t.Result⽽不直接是返回的Sum呢?有没有多此⼀举的感觉?
下⾯我来说说这段代码我想表达的意思:
  在⼀个线程调⽤Wait⽅法时,系统会检查线程要等待的Task是否已经开始执⾏,如果任务正在执⾏,那么这个Wait⽅法会使线程阻塞,知道Task运⾏结束为⽌。
  就说上⾯的程序执⾏,因为累加数字太⼤,它抛出算术运算溢出错误,在⼀个计算限制任务抛出⼀个未处理的异常时,这个异常会被“包含”不并存储到⼀个集合中,⽽线程池线程是允许返回到线程池中的,在调⽤Wait⽅法或者Result属性时,这个成员会抛出⼀个
System.AggregateException对象
  现在你会问,为什么要调⽤Wait或者Result?或者⼀直不查询Task的Exception属性?你的代码就永远注意不到这个异常的发⽣,如果不能捕捉到这个异常,垃圾回收时,抛出AggregateException,进程就会⽴即终⽌,这就是“牵⼀发动全⾝”,莫名其妙程序就⾃⼰关掉了,谁也不知道这是什么情况。
所以,必须调⽤前⾯提到的某个成员,确保代码注意到异常,并从异常中恢复。悄悄告诉你,其实在⽤Result的时候,内部会调⽤Wait。
  怎么恢复?
  为了帮助你检测没有注意到的异常,可以向TaskScheduler的静态UnobservedTaskException时间等级⼀个回调⽅法,当Task被垃圾回收时,如果出现⼀个没有被注意到的异常,CLR终结器会引发这个事件。⼀旦引发,就会向你的时间处理器⽅法传递⼀个UnobservedTaskExceptionEvenArgs对象,其中包含了你没有注意的AggregateException。然后再调⽤UnobservedTasExceptionEvenArgs的SetObserved⽅法来指出你的异常已经处理好了,从⽽阻⽌CLR终⽌进程。这是个图省事的做法,要少做这些,宁愿终⽌进程,也不要呆着已经损坏的状态⽽继续运⾏。做⼈也⼀样,病了宁肯休息,也不要带病坚持上班,你没那么伟⼤,公司也不需要你的这⼀点伟⼤,命是⾃⼰的。(─.─|||扯远了。
  除了单个等待任务,Task 还提供了两个静态⽅法:WaitAny和WaitAll,他们允许线程等待⼀个Task对象数组。
  WaitAny⽅法会阻塞调⽤线程,知道数组中的任何⼀个Task对象完成,这个⽅法会返回⼀个索引值,指明完成的是哪⼀个Task对象。如果发⽣超时,⽅法将返回-1。它可以通过⼀个CancellationToken取消,会抛出⼀个OperationCanceledException。
  WaitAll⽅法也会阻塞调⽤线程,知道数组中的所有Task对象都完成,如果全部完成就返回true,如果超时就返回false。当然它也能取消,同样会抛出OperationCanceledException。
  说了这么两个取消任务的⽅法,现在来试试这个⽅法,加深下印象,修改先前例⼦代码,完整代码如下:
1        static void Main(string[] args)
2        {
3            CancellationTokenSource cts = new CancellationTokenSource();
4
5
6
烟花机械7            Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token);
8
9            //可以现在开始,也可以以后开始
10
11            t.Start();
12
13            //在之后的某个时间,取消CancellationTokenSource 以取消Task
14
15            cts.Cancel();//这是个异步请求,Task可能已经完成了。我是双核机器,Task没有完成过
16
17
18            //注释这个为了测试抛出的异常
19            //Console.WriteLine("This sum is:" + t.Result);
20            try
21            {
22                //如果任务已经取消了,Result会抛出AggregateException
23
24                Console.WriteLine("This sum is:" + t.Result);
25            }
26            catch (AggregateException x)
27            {
28                //将任何OperationCanceledException对象都视为已处理。
29                //其他任何异常都造成抛出⼀个AggregateException,其中
30                //只包含未处理的异常
31
32                x.Handle(e => e is OperationCanceledException);
33                Console.WriteLine("Sum was Canceled");
34            }
35
36        }
37
38        private static Int32 Sum(CancellationToken ct ,Int32 i)
39        {
40            Int32 sum = 0;
41            for (; i > 0; i--)
42            {
43                //在取消标志引⽤的CancellationTokenSource上如果调⽤
44                //Cancel,下⾯这⼀⾏就会抛出OperationCanceledException
45
46                ct.ThrowIfCancellationRequested();
47
48                checked { sum += i; }
49            }
50
51            return sum;
52        }
53    }
  这个例⼦展⽰了⼀个任务在进⾏的时候中途取消的操作,我觉得它很有趣,你试试也会发现。
  Lamada表达式写这个,是个亮点,得学学,将CancellationToken闭包变量“传递”。
  如果不⽤Lamada表达式,这问题还真不好解决:
  Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token);
  Sum(cts.Token,10000) 内的Token需要和cts.Token关联起来,你还能想出怎么关联起来么?
  好,任务取消也讲玩了,来看个更好⽤的技术:
1        static void Main(string[] args)
2        {
3
4            Task<Int32> t = new Task<Int32>(i => Sum((Int32)i),10000);
5
6            //可以现在开始,也可以以后开始
7
8            t.Start();
9
10            Task cwt =  t.ContinueWith(task=>Console.WriteLine("The sum is:{0}",task.Result));
11            cwt.Wait();
12
13        }
14
15        private static Int32 Sum(Int32 i)
16        {
17            Int32 sum = 0;
18            for (; i > 0; i--)
19            {
20                checked { sum += i; }
21            }
22
23            return sum;
24        }
25    }
ContinueWith?啥东西~~??
  要写可伸缩的软件,⼀定不能使你的线程阻塞。这意味着如果调⽤Wait或者在任务未完成时查询Result属性,极有可能造成线程池创建⼀个新线程,这增⼤了资源的消耗,并损害了伸缩性。
  ContinueWith便是⼀个更好的⽅式,⼀个任务完成时它可以启动另⼀个任务。上⾯的例⼦不会阻塞任何线程。
  当Sum的任务完成时,这个任务会启动另⼀个任务以显⽰结果。ContinueWith会返回对新的Task对象的⼀个引⽤,所以为了看到结果,我需要调⽤⼀下Wait⽅法,当然你也可以查询下Result,或者继续ContinueWith,返回的这个对象可以忽略,它仅仅是⼀个变量。
  还要指出的是,Task对象内部包含了ContinueWith任务的⼀个集合。所以,实际上可以⽤⼀个Task对象来多次调⽤ContinueWith。任务完成时,所有ContinueWith任务都会进⼊线程池队列中,在构造ContinueWith的时候我们可以看到⼀个TaskContinuationOptions枚举值,不能忽视,看看它的定义:
PrefereFairness是尽量公平的意思,就是较早调度的任务可能较早的运⾏,先来后到,将线程放到全局队列,便可以实现这个效果。ExecuteSynchronously指同步执⾏,强制两个任务⽤同⼀个线程⼀前⼀后运⾏,然后就同步运⾏了。
看得是不是晕乎乎?有这么多枚举例⼦,怎么掌握啊?多看⼏次,知道任务的使⽤情况,以后⽤起来得⼼应⼿~想学新技术,就要能耐住,才能基础牢固。来看个例⼦,⽤⽤这些枚举。
1        static void Main(string[] args)
2        {
3            Task<Int32> t = new Task<Int32>(i => Sum((Int32)i),10000);
4
5            t.Start();
6
7            t.ContinueWith(task=>Console.WriteLine("The sum is:{0}",task.Result),
8                TaskContinuationOptions.OnlyOnRanToCompletion);
9
10            t.ContinueWith(task=>Console.WriteLine("Sum throw:"+task.Exception),
11                TaskContinuationOptions.OnlyOnFaulted);
12
压缩胶囊
13            t.ContinueWith(task=>Console.WriteLine("Sum was cancel:"+task.IsCanceled),
14                TaskContinuationOptions.OnlyOnCanceled);
15            try
16            {
17                t.Wait();  // 测试⽤
18            }
19            catch (AggregateException)
20            {
21                Console.WriteLine("出错");
船用防爆离心风机22            }
23
24
25        }
26
27        private static Int32 Sum(Int32 i)
28        {
29            Int32 sum = 0;
30            for (; i > 0; i--)
31            {
32                checked { sum += i; }
33            }
34
35            return sum;
36        }
37    }
  ContinueWith讲完了。可是还没有结束哦。
  AttachedToParnt枚举类型(⽗任务)也不能放过!看看怎么⽤,写法有点新奇,看看:
1        static void Main(string[] args)
2        {
3            Task<Int32[]> parent = new Task<Int32[]>(() => {
4                var results = new Int32[3];
5                //
6                new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start();
7                new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start();
8                new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start();
9                return results;
10            });
11
12            var cwt = parent.ContinueWith( parentTask=>Array.ForEach(parentTask.Result,Console.WriteLine));
13
14
15            parent.Start();
16            cwt.Wait();
17        }
18
19        private static Int32 Sum(Int32 i)
20        {
21            Int32 sum = 0;
22            for (; i > 0; i--)
区域能源管理23            {
24                checked { sum += i; }
25            }
26            return sum;
27        }
28    }
Oh,我都写晕了。。。(+﹏+)~
例⼦中,⽗任务创建兵启动3个Task对象。默认情况下,⼀个任务创建的Task对象是顶级任务,这些任务跟创建它们的那个任务没有关系。TaskCreationOptions.AttachedToParent标志将⼀个Task和创建它的那个Task关联起来,除⾮所有⼦任务(⼦任务的⼦任务)结束运⾏,否则创
建任务(⽗任务)不会认为已经结束。调⽤ContinueWith⽅法创建⼀个Task时,可以指定TaskContinuationOptions.AttachedToParent标志将延续任务置顶为⼀个⼦任务。
  看了这么多任务的⽅法操作⽰例了,现在来挖挖任务内部构造:
  每个Task对象都有⼀组构成任务状态的字段。
  ⼀个Int32 ID(只读属性)
代表Task执⾏状态的⼀个Int32
对⽗任务的⼀个引⽤
对Task创建时置顶TaskSchedule的⼀个引⽤
对回调⽅法的⼀个引⽤
对要传给回调⽅法的对象的⼀个引⽤(通过Task只读AsyncState属性查询)
对⼀个ExceptionContext的引⽤
对⼀个ManualResetEventSlim对象的引⽤
还有没个Task对象都有对根据需要创建的⼀些补充状态的⼀个引⽤,补充状态包含这些:
⼀个CancellationToken
⼀个ContinueWithTask对象集合
为抛出未处理异常的⼦任务,所准备的⼀个Task对象集合
说了这么多,只想要⼤家知道:
  虽然任务提供了⼤量功能,但并不是没有代价的。因为必须为所有的这些状态分配内存。
如果不需要任务提供的附加功能,使⽤ThreadPool.QueueUserWorkItem,资源的使⽤效率会更⾼⼀些。
Task类还实现了IDispose接⼝,允许你在⽤完Task对象后调⽤Dispose,不过⼤多数不管,让垃圾回收器回收就好。
创建⼀个Task对象时,代表Task唯⼀的⼀个Int32字段初始化为零,TaskID从1开始,每分配⼀个ID都递增1。顺带说⼀下,在你调试中查看⼀个Task对象的时候,会造成调试器显⽰Task的ID,从⽽造成为Task分配⼀个ID。
  这个ID的意义在于,每个Task都可以⽤⼀个唯⼀的值来标识。Visual Studio会在它的“并⾏任务”和并⾏堆栈“窗⼝中显⽰这些任务ID。要知道的是,这是Visual Studio⾃⼰分配的ID,不是在⾃⼰代码中分配的ID,⼏乎不可能将Visual Studio分配的ID和代码正在做的事情联系起来。要查看⾃⼰正在运⾏的任务,可以在调试的时候查看Task的静态CurrentId属性,如果没有任务在执⾏,CurrentId返回null。
  再看看TaskStatus的值,这个可以查询Task对象的⽣存期:
这些在任务运⾏的时候都是可以⼀⼀查到的,还有~判断要像这样:
1 if(task.Status==TaskStatus.RantoCompletion)...
为了简化编码,Task只提供⼏个只读Boolean属性:IsCanceled,IsFaulted,IsCompleted,它们能返回最终状态true/false。
如果Task是通过调⽤某个函数来创建的,这个Task对象就会出于WaitingForActivation状态,它会⾃动运⾏。
最后我们要来了解⼀下TaskFactory(任务⼯⼚):
  1.需要创建⼀组Task对象来共享相同的状态
  2.为了避免机械的将相同的参数传给每⼀个Task的构造器。
满⾜这些条件就可以创建⼀个任务⼯⼚来封装通⽤的状态。TaskFactory类型和TaskFactory<TResult>类型,它们都派⽣System.Object。
你会学到不⼀样的编码⽅式:
1        static void Main(string[] args)
2        {
3            Task parent = new Task(() =>
4            {
5                var cts = new CancellationTokenSource();
6                var tf = new TaskFactory<Int32>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
7
8                //创建并启动3个⼦任务
9                var childTasks = new[] {
10            tf.StartNew(() => Sum(cts.Token, 10000)),
11            tf.StartNew(() => Sum(cts.Token, 20000)),
12            tf.StartNew(() => Sum(cts.Token, Int32.MaxValue))  // 这个会抛异常
13          };
14
15                // 任何⼦任务抛出异常就取消其余⼦任务
16                for (Int32 task = 0; task < childTasks.Length; task++)
17                    childTasks[task].ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);
18
19                // 所有⼦任务完成后,从未出错/未取消的任务获取返回的最⼤值
20                // 然后将最⼤值传给另⼀个任务来显⽰最⼤结果
21                tf.ContinueWhenAll(childTasks,
22                    completedTasks => completedTasks.Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result),
23                    CancellationToken.None)
24                    .ContinueWith(t => Console.WriteLine("The maxinum is: " + t.Result),
25                      TaskContinuationOptions.ExecuteSynchronously).Wait(); // Wait⽤于测试
26            });
27
28            // ⼦任务完成后,也显⽰任何未处理的异常
29            parent.ContinueWith(p =>
30            {
31                // ⽤StringBuilder输出所有
32
33                StringBuilder sb = new StringBuilder("The following exception(s) occurred:" + Environment.NewLine);
34                foreach (var e in p.Exception.Flatten().InnerExceptions)
35                    sb.AppendLine("  " + e.GetType().ToString());
36                Console.WriteLine(sb.ToString());
37            }, TaskContinuationOptions.OnlyOnFaulted);
38
39            // 启动⽗任务
40            parent.Start();
41
42            try
43            {
44                parent.Wait(); //显⽰结果
45            }
46            catch (AggregateException)
47            {
48            }
49        }
50
51        private static Int32 Sum(CancellationToken ct, Int32 n)
52        {

本文发布于:2024-09-22 14:22:23,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/1/307886.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:任务   线程   对象   完成   抛出   没有
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议