2013年9月21日 星期六

TPL(Task Parallel Language) – 平行化處理

.Net Framework的平行化程式
   目前的個人電腦以及伺服器工作站已具備二到四核心(i.e.實體CPU),這使得多執行緒可以在同一時間點上一起被執行。在不久的未來,個人電腦將會擁有更多的核心。為了能夠妥善的利用硬體所帶來的資源,可以考慮使用平行化程式將工作分散在多個核心中。在過去,平行化需要較處理執行緒和資源鎖這種細節上的處理,自從.Net Framework 4.0開始,提供了一套新的類別庫以及探測工具來支援平行化程式撰寫。這套新的類別庫可以讓開發能夠寫出更有效率且細顆粒度和可延展平行的程式,並且不需要再對執行緒(Thread)或是執行緒池(ThreadPool)進行操作。下圖描繪了.Net Framework 4.0的平行程式架構:

IC292903
圖1. .Net Framework 4.0的平行程式架構

Task類別
   Task與Task<TResult>類別是TPL的基礎類別,而Task<TResult>為Task的泛型版本。而Task<TResult>中的TResult為Task執行完畢後回傳值的資料型別。
啟動Task
Task task = new Task (() => Console .Write("Hi" ));
task.Start();

   透過實體一個Task物件,接著再呼叫該物件的Start()方法。而Task類別的建構子除了上述例子輸入型別是Action委派型別之外,還有另一種建構子: Task(Action, TaskCreationOptions);而TaskCreationOptions是一個列舉型別,其中較常被使用到的為: LongRunning,該列舉值是在當反覆測試過後判斷這個Task較適合採用長時間執行的方式來運行才選用。TaskCreationOptions是用於告知ThreadPool這個Task不要放在ThreadPool中執行,否則一般的Task都是在ThreadPool中以非同步方式執行。

   上述啟動Task的方式較傳統與一般,但是在實務中會採用更簡便的手法:
Task.Run(() => Console .WriteLine("Hi" ));

   以上述方式即可直接啟動Task而不需實體化Task物件。Task.Run是Task類別的靜態方法,輸入參數是Action委派型別,而Run()的回傳值回一個Task物件。除了Task.Run(Action)之外,Task的Run還有另一種泛型版本:
Task.Run<TResult>( Func<Task<TResult>>);

這個版本的輸入參數由Action委派型別換成是Func<Task<TResult>> ,其範例如下:
Task<int > task=Task .Run<int >(() => { return 1 + 1; });
int result = task.Result;

Task的等待

   一般狀況下,Task是ThreadPool以非同步方式執行,系統想要知道這個Task是否已經執行完畢可以透過查看Task物件的屬性IsCompleted來判斷,亦可以呼叫Task物件的方法:Wait()。
   但是若是採用Wait()方法的方式會令主執行緒暫停。
Task< int> task= Task.Run< int>(() => { return 1 + 1; });
int result = default( int);

if(task.IsCompleted)
     result=task.Result;

Task< int> task= Task.Run< int>(() => { return 1 + 1; });
task.Wait();
int result = task.Result;

   Wait()這個方法還有其它多載: bool Wait(int millisecondsTimeout),該重載方法傳入一個毫秒為單位的整數,Task的Wait為依此傳入的秒數作為等待時間的依據,若在時間內Task執行完畢則回傳True,反之,則是False。

Task< int> task = Task.Run< int>(() => { Thread.Sleep(5000); return 1 + 1; });
bool blOver=task.Wait(4900);
int result = task.Result;

Task的串聯

   Task串聯指的是當一個Task執行完畢之後接著執行另一個Task,通常有兩種方法來達成。
   1. Task物件.GetAwaiter()
   2. Task物件.ContinueWith(Action)

   採用第1種方法,當呼叫GetAwaiter()方法後,會回傳TaskAwaiter結構體,並且該TaskAwaiter結構體具有一個OnCompleted方法,而Task執行完畢後的下一個Task所要執行的動作就是藉由輸入這個物件的OnCompleted事件來達成:
Task< int> task = Task.Run< int>(() => { Thread.Sleep(5000); return 1 + 1; });
TaskAwaiter< int> awaiter = task.GetAwaiter();
awaiter.OnCompleted(() => {
   Console.WriteLine( "hi");
});

第2種方式較常使用,而這種方式和第1種不同的地方在於當發生異常時,第1種方式可以直接截取,而第二種僅能在呼叫Result屬性才會拋出異常,並且會將異常包裹成AggregateException,但第1種的異常不會包裹。
Task< int> task = Task.Run< int>(() => { Thread.Sleep(5000); return 1 + 1; });
Task task2 = task.ContinueWith(o => Console.WriteLine( "hi"));
bool blOver=task.Wait(4900);
int result = task.Result;
Console.WriteLine(blOver);

異常的處理

   和Thread類別不同的地方是 Task拋出的異常是可以被截取,但也不是直接就能截取到,而是藉由呼叫Wait()時或是Result屬性時Task才會拋出異常,並且這個異常會被包裹成AggregateException異常型別。

   實務上,Task的啟動多半都是採用Task.Run(Action)或是
Task.Run<TResult( Func<Task<TResult>>)這兩種方式,在某些情境下不需要關心它們的執行是否已完成,但是對於它們在執行中是否有發生異常還是需要加以關注,而這些異常亦即未觀察到的異常(Unobserved Exception),可以透過註冊一個全域的靜態事件: TaskScheduler.UnobservedTaskException來處理這些異常;當GC要回收Task物件時,若在此之前有某個Task執行發生了異常,則在GC回收之前會執行註冊UnobservedTaskException事件的方法。
TaskScheduler.UnobservedTaskException += (obj, e) => Console.WriteLine(e.Exception.Message);
Task< int> task = Task.Run< int>(() => { Thread.Sleep(5000); return 1 + 1; });

TaskCompletionSource

   TaskCompletionSource是一種承載需要長時間執行工作的Task管理類別,該類別較常使用的有三個方法:
   1. SetResult: 承載最後處理的結果
   2. SetException: 承載異常
   3. SetCanceled: 承載該工作已被取消
   而其較簡易的使用方式如下:
TaskCompletionSource<int > tcs = new TaskCompletionSource <int >();
Task.Run(
   () => {
      Thread.Sleep(5000);
      int i = Enumerable.Range(0, 100).Sum();
      tcs.SetResult(i);
});

Task< int> task = tcs.Task;

TaskCompletionSource的Task屬性即為包裹後所產出的Task物件,接下來即可操作Task物件。依上述程式碼來看TaskCompletionSource類別的執行結果需待Task.Run方法中的工作執行完畢才能取得,這是一種變相的執行緒同步手法,並且最終取得執行的成果。
   TaskCompletionSource在實務應用上可用來作為延遲執行;通常可配合Timer來處理:
TaskCompletionSource<int > tcs = new TaskCompletionSource <int >();
System.Timers. Timer timer = new System.Timers. Timer(5000) { AutoReset = false };

timer.Elapsed += (sender, e) => {
      timer.Dispose();
      //執行某段工作
      tcs.SetResult(result);
};

timer.Start();
Task< int> task = tcs.Task;
int  finalResult= task.Result;

   除了延遲執行外,TaskCompletionSource在處理事件驅動程式(EAP)的重構方面亦大有助益;實務中,EAP很容易讓UI的控制項與商業邏輯混合在一起造成維護上的困擾,故可以藉TaskCompletionSource包裹一些工作,並且再藉由TaskCompletionSource最後產出的Task物件來達成UI與商業邏輯切割。
(重構前)
WebClient wc = new WebClient();

wc.DownloadStringCompleted += (sender, e) => {
   if (e.Cancelled)
      Console.WriteLine( "Cancel");
   else if (e.Error != null)
      Console.WriteLine( "Error");
   else
      Console.WriteLine(e.Result);
};

(重構後)
TaskCompletionSource<string > tcs = new TaskCompletionSource<string >();
WebClient wc = new WebClient();

wc.DownloadStringCompleted += (sender, e) => {
   if (e.Cancelled)
       tcs.SetCanceled();
   else if (e.Error != null)
       tcs.SetException(e.Error);
   else
       tcs.SetResult(e.Result);
};

Task< string> task = tcs.Task;
string strResult= task.Result;
Console.WriteLine(strResult);

   TaskCompletionSource除了在EAP上的應用之外,亦可應用在IAsyncResult模式下:
非同步方法簽名如下:
public static IAsyncResult BeginGetHostAddresses(string hostNameOrAddress, AsyncCallback requestCallback, Object state);

public static IPAddress[] EndGetHostAddresses( IAsyncResult asyncResult);

(重構前)
static void Main()
{
   Dns.BeginGetHostAddresses( "www.yahoo.com", result =>
   {
      IPAddress[] address = Dns.EndGetHostAddresses(result);
      Console.WriteLine(addresses[0]);
   }, null);

   Console.ReadKey();
}

(重構後)
public static Task< IPAddress[]> GetHostAddressesAsTask(string hostNameOrAddress)
{
   var tcs = new TaskCompletionSource<IPAddress []>();
   Dns.BeginGetHostAddresses(hostNameOrAddress, iar =>
   {
      try
      {
         tcs.SetResult( Dns.EndGetHostAddresses(iar));
      }
      catch ( Exception ex)
      {
         tcs.SetException(ex);
      }
   }, null);

   return tcs.Task;
}

大量資料平行化處理

   使用時機: 當集合中所有的元素都有相同的資料操作行為需要同步進行時。集合會被拆分成多個區塊,而不同執行緒可以在同一時間處理各自被分派的區塊。TPL(Task Parallel Library)藉由System.Threading.Tasks.Parallel提供資料平行化處理。這個類別提供For/ForEach以方法導向的實做方式來達到平行化。開發時,撰寫Parallel.For/Parallel.ForEach的迴圈內容就如同在撰寫一般的For/ForEac的迴圈內容,完全不需要處理執行緒或是佇列中的工作項目。
Parallel.ForEach(sourceCollection, item => Process(item));

   當平行化迴圈開始執行時,TPL中的Task scheduler會依硬體資源以及工作負載將sourceCollection切割成數份,Task Scheduler會自動調節工作量,當它認知到工作負載不平衡的時候,可能會使用到更多的執行緒和CPU核心數。

   Parallel.For/Parallel.ForEach提供了多個函式的負載,讓開發人員能夠:監控其它執行緒的狀態、中斷迴圈執行、維護執行緒的本地狀態、釋放執行緒本地狀態、控制平行化的程度...等等。能夠達成這些操作的相關支援類別如下:
   1. ParallelLoopState
   2. ParallelOptions
   3. ParallelLoopResult
   4. CancellationToken
   5. CancellationTokenSource

大量資料平行化處理-For

   在使用Parallel.For時,需注意是否真的需要使用到平行化的For;若For迴圈內的工作其所需資源與複雜度並不高,則可以考慮不使用平行化的Parallel.For而改使用一般的For。
   Parallel.For與一般的For迴圈在撰寫上相當類似,不同的地方在於Parallel.For中反覆執行的是一個"方法",而一般的For迴圈則是一段Script;因此,雖說寫法相當類似但是實際上並不全然相同,譬如說,在一般的For迴圈中若在滿足了一定的條件後中斷(Break),僅需在For迴圈中撰寫if判斷式,並且滿足判斷式後執行break這個指令即可中斷For迴圈的執行,但Parallel.For執行break指令是無任何意義的,因此,要處理一般For迴圈中所能夠達到的邏輯,需要借助ParallelLoopState物件來處理。
Parallel.For( 0, 100, (i, loopState)=>
{
   if( i>90)
      loopState.Break();
}

   當i大於90時即刻中斷尚未執行完的所有工作,實際執行時有可能第二個執行緒執行時其值就超過90了,因此,不能預設大約會執行近90次才會中斷。Parallel.For本身的回傳值為PrallelLoopResult結構體,此為一個用於判斷是否所有工作都已經執行完畢,通常會和需要中斷條件的現實需求相互搭配使用,讓開發人員能夠在程式中取得Parallel.For中斷之後關於Parallel.For的執行狀況,例如:是否所有工作都執行完畢(IsCompleted)、最小的上下界索引值(LowestBreakIteration)為何。

上例是基本較常使用到的方法重載。除了這個重載外Parallel.For還有多達12個重載。不同的重載其差別大多是起始和終止的上下界參數資料型別為int或long的差別,除此之外,Parallel還允許更多的控制,諸如執行緒的狀態物件設定以及平行化執行的組態設定等等。
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 2;
Parallel.For(0, 9, options, (i)=>{
     // DoSomething
});

   上例中使用Parallel.For另一個重載的方法,並且設定其執行時的平行度;當設定平行度為-1時,代表無執行緒的個數限制,設定為1則代表不使用平行化以循序方式執行。
大量資料平行化處理-ForEach

Parallel.ForEach在程式撰寫上十分類似Parallel.For。來源資料集合被切割成數份並且依照系統的硬體資源

使用Task Parallel的陷阱

   Parallel.For與Parallel.ForEach在大多數狀況下針對循序式的迴圈效能能夠有大幅度的改善。但是導入平行處理後勢必也帶來了一定的複雜性,而這些複雜性很可能會造成一些問題。以下接著討論可能的問題:

   1. 不要假設使用了平行處理就一定能加速
       在某些狀況下,平行迴圈可能會比循序式迴圈處理還要慢;使用平行迴圈的最高指導原則就是僅有較少的迭代,但執行複雜度低的委派內容反而會造成效能不彰。由於,有太多可能會影響到效能的因素,是故,建議使用時一定要效能測試。

   2. 避免在委派中撰寫到記憶體區塊共享的程式碼
       平行處理在存取靜態變數這類的共享記憶體型變數與一般的非平行處理程式有所不同。因此,當有大量的執行緒在存取共享記憶體區塊的變數時,勢必會有競爭現象。雖然解決方式為對需要同步化存取的變數進行鎖定,但這也造成了效能上的損害。因此,建議盡量避免撰寫到需要存取共享記憶體區塊的變數,至少也要有所節制。在平行處理中難免會需要存取執行緒的狀態,而最佳解決方案則是使用System.Threading.threadLocal<T>型別的變數來儲存執行緒內部的變數。例如:求取總合時就需要使用一個變數不斷累加數值:
long total = 0;
Parallel.For< long>(0, array.Length, () => 0, (index, loop, subtotal) =>
{
   subtotal += array[index];
   return subtotal;
}, (x) => Interlocked.Add( ref total, x));

   3. 避免過度平行化
       使用平行化迴圈本質是會附帶一些額外的資料切割以及同步化工作執行緒的成本。而平行化所能產生的效能提速是基於所能使用的伺服器CPU核心數。因此,如果僅有一顆核心的伺服器無法讓平行化加速,是故,要注意不要所有大量資料處理都使用平行化。大抵上,平行化多半是用在巢狀迴圈中的最外層迴圈,僅有在下列狀況下才會在內層迴圈中使用:
       3.1. 內部迴圈的內容其執行動作的複雜度高
       3.2. 內部迴圈的內容其執行動作的執行較長
       3.3. 執行系統的硬體具有足夠的核心數來處理夠多數量的執行緒

   4. 避免呼叫非執行緒保護的方法
       在Parallel.For或Parallel.ForEach的委派中呼叫非執行緒保護的方法可能會造成非預期的執行的動作或是異常(Exception)。

   5. 限制執行緒保護方法的呼叫次數
       大多數在.Net Framework中的靜態方法都是屬於執行緒保護的方法,並且能夠讓多執行緒同時呼叫。然而,這會為了達到同步化而帶來重大的效能低落。

   6. 留心執行緒混搭議題
       某些技術,例如COM與STA或WindowForm,WPF等前端技術混搭時,COM必須執行在指定的執行緒上,而WindowForm或WPF的控制項只能由創建它的執行緒可以對其進行操作,這意味著,你無法在COM所在的執行緒中試圖更新控制項,除非有設定執行緒排程:
Task t1 = new Task(() =>
{
   //DoSomething
});

var UISyncContext = TaskScheduler.FromCurrentSynchronizationContext();

Task t2 = t1.ContinueWith((lastResult) =>
{
   //UI控制項設定
}, UISyncContext);

   7. 謹慎使用Parallel.Invoke的等待委派
       在某些情境下,Task會是執行在某條執行緒中。這種效能優化方式很有可能會造成死結。舉例來說,當兩個Task執行同一段委派程式碼時,當這段委派程式碼中使用EventWaitHandler進行程式碼的執行權管理時,若第一個Task在執行之後沒有通知另一個正在等待中的Task,就會造成死結。解決的方法可以在等待委派中指定等待逾時時間,或是在創建這條執行兩個Task的執行緒建構子中設定執行時內部的Task彼此之間不會阻斷。

   8. 不要假設ForEach/For/ForAll<TSource>的迭代永遠都應該平行處理
       其實ForEach/For/ForAll並非一定非要以平行化來處理資料。因此,應該避免在內部程式碼中以多執行緒的邏輯撰寫程式;不需要加上鎖或是事件通知(EventWaitHandler)等機制。

   9. 避免將平行化迴圈寫在UI執行緒中
       若希望在大量資料處理完畢之後將結果顯示在UI上,考慮將平行化迴圈寫在背景執行緒中,並且在執行完畢後呼叫UI執行緒進行結果的顯示。這麼做是為了避免發生系統異常,因為多執行緒允許操作UI上的控制項時,很可能會因為搶占控制項或是等待更甚是死結的現象發生。
(有問題的程式碼)
public void btnOk_Click( object sender, EventArgs e)
{
   int N=100;
   Parallel.For(0, N, i=>
   {
      btnOk.Invoke(( Action) delegate { DisplayProgress(i) });
   });
}

(改善後)
public void btnOk_Click(object sender, EventArgs e)
{
   Task.Factory.StartNew(() =>
   Parallel.For(0, N, i =>
   {
      btnOk.Invoke(( Action) delegate { DisplayProgress(i);});
   });
}

參考項目
1. http://cnn237111.blog.51cto.com/2359144/1102476

沒有留言:

張貼留言