首页 > 学院 > 开发设计 > 正文

Parallel线程使用

2019-11-17 02:58:43
字体:
来源:转载
供稿:网友

Parallel线程使用

Parallel的静态For,ForEach和Invoke方法

在一些常见的编程情形中,使用任务也许会提升性能。为了简化编程,静态类System.Threading.Tasks.Paraller封装了这些常见的情形,它内部使用Task对象。例如,不要像下面一样处理一个集合中的所有项:
// 一个线程顺序执行这个工作(每次迭代调用一次DoWork)for (Int32 i = 0; i< 1000; i++ ) DoWork(i);

相反,可以使用Parallel类型的For方法,让多个线程池治线程帮助执行这个工作:

// 线程池的线程并行处理工作Parallel.For(0,1000,i=>DoWork(i));

类似的,如果有一个集合,那么不要像下面这样写:

// 一个线程顺序执行这个工作(每次迭代调用一次DoWork)foreach ( var item in conllection) DoWork(item);

而是这样做:

// 线程池的线程并行处理工作Parallel.ForEach(conllection,item=>DoWork(item));

如果代码中既可以用For,也可以用ForEach,那么建议使用For,因为它执行的快一点。最后,如果要执行几个方法,那么可以顺序执行它们,如下所示:

//  一个线程顺序执行所有方法    Method1();Method2();Method3();

也可以并行执行它们:

复制代码
// 线程池的线程并行执行parallel.Invoke(            () => Method1(),            () => Method2(),            () => Method3()); 
复制代码

Parallel的所有方法都让调用线程参与处理。从资源利用的角度说,这是一件好事,因为我们不希望调用线程停下来,等待线程池做完所有工作后才继续。然而,如果调用线程在线程池完成自己的那一部分工作之前完成工作,调用程序就会将自己挂起,知道所有工作完成。这也是一件好事,因为这个提供了和普通for和foreach循环时相同的语义:线程要在所有工作后才继续运行。还要注意,如果任何操作抛出一个未处理的异常,你调用的paraller方法最后会抛出一个AggregateException。

当然,这并不是说需要检查自己的所有源代码,将for循环替换成Parallel.For的调用。调用Parallel的方法时,有一个前提条件务必记住:工作项要能并行执行。因此,如果工作项必须顺序执行,就不要调用Parallel的方法。另外,要避免会修改任何共享数据的工作项,因为多个线程同时处理的数据可能损坏。为了解决这个问题,一般的方法就是围绕数据访问添加线程同步锁。但是这样一来,一次就只能有一个线程访问数据,无法享受并行处理多个想带来的好处。 除此之外,Parallel的方法本身也有开销:委托对象必须分配,而针对每一个工作项,都要调用一次这些委托。如果有大量可由多个线程处理的工作项,那么也许会获得性能的提升。但是,如果只为区区几个工作项使用Parallel的方法,或者为处理得非常快的工作项使用Parallel就会得不偿失了。 Parallel的For,ForEach和Invoke方法都能接受一个ParallelOptions对象的重载版本。这个对象的定义如下: 复制代码
 // 存储用于配置 Parallel 类的方法的操作的选项。     public class ParallelOptions    {        // 初始化 ParallelOptions 类的新实例        public ParallelOptions();        // 获取或设置与此 ParallelOptions 实例关联的 CancellationToken,运行取消操作        public CancellationToken CancellationToken { get; set; }        // 获取或设置此 ParallelOptions 实例所允许的最大并行度,默认为-1(可用CPU数)        public int MaxDegreeOfParallelism { get; set; }        // 获取或设置与此 ParallelOptions 实例关联的 TaskScheduler。默认为TaskScheduler.Default        public TaskScheduler TaskScheduler { get; set; }    }
复制代码

除此之外,For和ForEach方法有一些重载版本允许传递3个委托:

任务局部初始化委托(localInit),为参与工作的每一个任务都调用一次委托。这个委托是在任务被要求处理一个工作项之前调用。 主体委托(body),为参与工作的各个线程所处理的每一项都调用一次委托。 任务局部终结委托(localFinally),为参与工作的每一个任务都调用一次委托。这个委托是在任务处理好派遣给它的所有工作之后调用。即使主体委托引发一个未处理的异常,也会调用它。 以下示例代码演示了如何利用3个委托,计算一个目录中的所有文件的字节长度总计值:复制代码
PRivate static Int64 DirectoryBytes(String path, String searchPattern, SearchOption searchOption)        {            var files = Directory.EnumerateFiles(path, searchPattern, searchOption);            Int64 masterTotal = 0;             ParallelLoopResult result = Parallel.ForEach<String, Int64>(files,               () =>               {                   // localInit: 每个任务开始之前调用一次                   // 每个任务开始之前,总计值都初始化为0                   return 0;               },                (file, parallelLoopState, index, taskLocalTotal) =>               {                   // body: 每个任务调用一次                   // 获得这个文件的大小,把它添加到这个任务的累加值上                   Int64 fileLength = 0;                   FileStream fs = null;                   try                   {                       fs = File.OpenRead(file);                       fileLength = fs.Length;                   }                   catch (IOException) { /* 忽略拒绝访问的文件 */ }                   finally { if (fs != null) fs.Dispose(); }                   return taskLocalTotal + fileLength;               },                taskLocalTotal =>               {                   // localFinally: 每个任务完成后调用一次                   // 将这个任务的总计值(taskLocalTotal)加到中的总计值(masterTotal)上去                   Interlocked.Add(ref masterTotal, taskLocalTotal);               });            return masterTotal;        }
复制代码

每个任务都通过taskLocalTotal变量为分配给它的文件维护自己的总计值。每个任务完成工作之后,都调用Interlocked.Add方法[对两个 32 位整数进行求和并用和替换第一个整数],以一种线程安全的方式更新总的总计值。由于每个任务都有自己的总计值,可以在一个工作项处理期间,无需进行线程同步。由于线程同步会造成性能的损失,所以不需要线程同步是一件好事。只有在每个任务返回之后,masterTotal才需要以一种线程安全的方式更新materTotal变量。所以,因为调用Interlocked.Add方法而造成的性能损失每个任务只发生一次,而不会每个工作项都发生。

注意,我们向主题委托传递一个ParallelLoopState对象,它的定义如下: 复制代码
// 可用来使 Parallel 循环的迭代与其他迭代交互    public class ParallelLoopState    {        // 获取循环的任何迭代是否已引发相应迭代未处理的异常        public bool IsExceptional { get; }        // 获取循环的任何迭代是否已调用 Stop        public bool IsStopped { get; }        // 获取从中调用 Break 的最低循环迭代。        public long? LowestBreakIteration { get; }        // 获取循环的当前迭代是否应基于此迭代或其他迭代发出的请求退出。        public bool ShouldExitCurrentIteration { get; }        // 告知 Parallel 循环应在系统方便的时候尽早停止执行当前迭代之外的迭代。        public void Break();        // 告知 Parallel 循环应在系统方便的时候尽早停止执行。        public void Stop();}
复制代码

参与工作的每一个任务都会获得它自己的ParallelState对象,并可通过这个对象和参与工作的其他任务进行交互。Stop方法告诉循环停止处理任何更多的工作,未来对IsStopped属性的查询会返回true。Break方法告诉循环不再继续处理当前项之后的项。例如,假如ForEach被告知要处理100项,并在第5项时调用了Break,那么循环会确保前5项处理好之后,ForEach才返回。但注意,这并不是说在这100项中,只有前5项被处理,也许第5项之后可能在以前已经处理过了。LowestBreakIteration属性返回在处理过程中调用过Break方法的最低的项。从来没有调用过Break,LowestBreakIteration会返回null。

处理任何一项时,如果造成一个未处理的异常,IsExceptional属性会返回true。如果处理一项时会花费大量的时间,代码可查询ShouldExitCurrentIteration属性看它是否应该提前退出。如果调用过Stop,调用过Break,取消过CancellationTokenSource,或者处理一项时造成了未处理的异常,这个属性就会返回true。 Parallel的For和ForEach方法都返回一个ParallelLoopResult实例,他看起来像下面这样: 复制代码
// 提供执行 System.Threading.Tasks.Parallel 循环的完成状态。    public struct ParallelLoopResult    {        // 获取该循环是否已运行完成(即该循环的所有迭代均已执行,并且该循环没有收到提前结束的请求)。        public bool IsCompleted { get; }         // 获取从中调用 System.Threading.Tasks.ParallelLoopState.Break() 的最低迭代的索引。        public long? LowestBreakIteration { get; }    }
复制代码

可通过检查属性来了解循环的结果,如果IsCompleted返回true。表明循环运行完成,所有项都得到了处理。如果IsCompleted为false,而且LowestBreakIteration为null,表明参与工作的某个线程调用了Stop方法。如果LowestBreakIteration返回false,而且LowestBreakIteration不为null,表名参与工作的某个线程调用的Break方法,LowestBreakIteration返回的Int64值指明了保证已得到处理的最低一项的索引。

场景

public void run(Action<List<string>> onload) { List<string> directoryLists = new List<string>(); directoryLists = Directory.GetDirectories(m_importPath).ToList(); Parallel.ForEach(directoryLists,new ParallelOptions { MaxDegreeOfParallelism=10}, i => { List<string> files = importer(i);

onload(files);

}

}

Action<List<string>> onload = dir => { BeginInvoke(new EventHandler((obj, even) => { var root = new TreeNode(dir[0]); for (int i = 1; i < dir.Count; i++) { string fileName = dir[i].Substring(dir[i].LastIndexOf('//') + 1); root.Nodes.Add(fileName.Remove(fileName.IndexOf('.'))); } this.treeView1.Nodes.Add(root); }), null);

};

import.run(onload);


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表