F#かわいいよF#。F#の並列処理や非同期計算の理解を深めるために、FSharpAsyncとFSharpAsyncBuilderのラッパーをC#で書いてみた。
ご存知、F#は並列処理や非同期処理について考慮されて設計された強力な言語です。
いくつかサンプルプログラムを書いて、わたしもその雰囲気については過去に簡単に確認していました。
しかし、FSharpAsyncやFSharpAsyncBuilder(computation expression)にどのような機能が提供されているのか、その多くは把握していませんでした。
C#でFSharpAsyncやFSharpAsyncBuilderのラッパーを書くことで、その理解を深められそうな気がしたので、
試しにちょっと書いてみました。F#のコードとのちょっとした比較もあります。
先に結論から言うと、F#かわいいよF#ということになります。もちろんC#もかわいいです。
FSharpAsyncWrapperを利用した非同期計算サンプル
まずは、今回作ったFSharpAsyncWrapperとFSharpAsyncBuilderWrapperを利用した、簡単な非同期計算のサンプルプログラムを示します。非同期計算で素数を出力するだけのプログラムです。
従来の方法でC#でガチ書きするよりも、シンプルで分かりやすいような気がします。下記のコードでなんとなく雰囲気は伝わると思います。
using System; using System.Collections.Generic; using System.Linq; using Microsoft.FSharp.Core; using System.Runtime.Remoting.Messaging; using ClassLibrary1; namespace ConsoleApplication1 { class Program { static void Main(string[] args) { var nums = Enumerable.Range(1,1000); var async1 = Async<IEnumerable<int>, Unit>.Create().Build(nums, list => { var result = list.Where(x => IsPrime(x)).AsParallel(); foreach (var item in result) Console.Write("{0},", item); return (Unit)null; }); var async2 = Async<IEnumerable<int>, Unit>.Create().Build(nums, list => { var result = list.Where(x => IsPrime(x)).AsParallel(); foreach (var item in result) Console.Write("{0},",item); return (Unit)null; }, iar => { var ar = (AsyncResult)iar; var dlgt = (Func<IEnumerable<int>, Unit>)ar.AsyncDelegate; dlgt.EndInvoke(iar); Console.WriteLine("ヒャッハー"); }); var computations = Async<IEnumerable<int>, IEnumerable<int>>.Create() .Build(nums, list => list.Where(x => IsPrime(x)).AsParallel()); var primeInfo = computations.RunSynchronously(); foreach (var x in primeInfo) Console.Write("{0},", x); Console.WriteLine(); Console.WriteLine("--------------------------------------------------------------------"); var asyncCombine = async1.Bind(x => { Console.WriteLine(); return x; }).Combine(async2); asyncCombine.Start(); Console.WriteLine("続けるには何かキーを押してください..."); Console.ReadLine(); } private static bool IsPrime(int x) { for (int i = 2; i < x; ++i) if (x % i == 0) return false; return true; } } }
実行結果
1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97,101,103 ,107,109,113,127,131,137,139,149,151,157,163,167,173,179,181,191,193,197,199,211 ,223,227,229,233,239,241,251,257,263,269,271,277,281,283,293,307,311,313,317,331 ,337,347,349,353,359,367,373,379,383,389,397,401,409,419,421,431,433,439,443,449 ,457,461,463,467,479,487,491,499,503,509,521,523,541,547,557,563,569,571,577,587 ,593,599,601,607,613,617,619,631,641,643,647,653,659,661,673,677,683,691,701,709 ,719,727,733,739,743,751,757,761,769,773,787,797,809,811,821,823,827,829,839,853 ,857,859,863,877,881,883,887,907,911,919,929,937,941,947,953,967,971,977,983,991 ,997, -------------------------------------------------------------------- 続けるには何かキーを押してください... 1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97,101,103 ,107,109,113,127,131,137,139,149,151,157,163,167,173,179,181,191,193,197,199,211 ,223,227,229,233,239,241,251,257,263,269,271,277,281,283,293,307,311,313,317,331 ,337,347,349,353,359,367,373,379,383,389,397,401,409,419,421,431,433,439,443,449 ,457,461,463,467,479,487,491,499,503,509,521,523,541,547,557,563,569,571,577,587 ,593,599,601,607,613,617,619,631,641,643,647,653,659,661,673,677,683,691,701,709 ,719,727,733,739,743,751,757,761,769,773,787,797,809,811,821,823,827,829,839,853 ,857,859,863,877,881,883,887,907,911,919,929,937,941,947,953,967,971,977,983,991 ,997, 1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97,101,103 ,107,109,113,127,131,137,139,149,151,157,163,167,173,179,181,191,193,197,199,211 ,223,227,229,233,239,241,251,257,263,269,271,277,281,283,293,307,311,313,317,331 ,337,347,349,353,359,367,373,379,383,389,397,401,409,419,421,431,433,439,443,449 ,457,461,463,467,479,487,491,499,503,509,521,523,541,547,557,563,569,571,577,587 ,593,599,601,607,613,617,619,631,641,643,647,653,659,661,673,677,683,691,701,709 ,719,727,733,739,743,751,757,761,769,773,787,797,809,811,821,823,827,829,839,853 ,857,859,863,877,881,883,887,907,911,919,929,937,941,947,953,967,971,977,983,991 ,997,ヒャッハー
FSharpAsyncWrapper
FSharpAsyncが行なうF#の非同期計算の作成と操作のメンバーのラッパーを保持します。
ごちゃごちゃしている上、XMLコメントも記述してあるので、非常に長く読みにくいです。コピペしてVisual Studioなどでご覧ください。
FSharpAsyncWrapper.cs
using System; using System.Threading; using System.Collections.Generic; using Microsoft.FSharp.Core; using Microsoft.FSharp.Control; namespace ClassLibrary1 { /// <summary> /// FSharpAsyncが行なうF#の非同期計算の作成と操作のメンバーのラッパーを保持します。 /// </summary> public static class FSharpAsyncWrapper { public static FSharpFunc<TArg1, Result> ToFSharpFunc<TArg1, Result>(this Func<TArg1, Result> func) { return FuncConvert.ToFSharpFunc(new Converter<TArg1, Result>(func)); } public static FSharpFunc<TArg,Unit> ToFSharpFunc<TArg>(this Action<TArg> action) { return FuncConvert.ToFSharpFunc(action); } public static FSharpFunc<Tuple<Targ1, Targ2>, Result> ToTupledFSharpFunc<Targ1, Targ2, Result>(this Func<Targ1, Targ2, Result> f) { return FuncConvert.ToFSharpFunc(new Converter<Tuple<Targ1, Targ2>, Result>(t => f(t.Item1, t.Item2))); } public static FSharpFunc<Tuple<TArg1, TArg2, TArg3>, Result> ToTupledFSharpFunc<TArg1, TArg2, TArg3, Result>(this Func<TArg1, TArg2, TArg3, Result> f) { return FuncConvert.ToFSharpFunc(new Converter<Tuple<TArg1, TArg2, TArg3>, Result>(t => f(t.Item1, t.Item2, t.Item3))); } public static FSharpFunc<Tuple<TArg1, TArg2, TArg3, TArg4>, Result> ToTupledFSharpFunc<TArg1, TArg2, TArg3, TArg4, Result>(this Func<TArg1, TArg2, TArg3, TArg4, Result> f) { return FuncConvert.ToFSharpFunc(new Converter<Tuple<TArg1, TArg2, TArg3, TArg4>, Result>(t => f(t.Item1, t.Item2, t.Item3, t.Item4))); } public static FSharpFunc<Tuple<TArg1, TArg2, TArg3, TArg4, TArg5>, Result> ToTupledFSharpFunc<TArg1, TArg2, TArg3, TArg4, TArg5, Result>(this Func<TArg1, TArg2, TArg3, TArg4, TArg5, Result> f) { return FuncConvert.ToFSharpFunc(new Converter<Tuple<TArg1, TArg2, TArg3, TArg4, TArg5>, Result>(t => f(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5))); } public static FSharpOption<T> ToFSharpOption<T>(this T self) { if (self == null) return FSharpOption<T>.None; return FSharpOption<T>.Some(self); } public static FSharpOption<FSharpFunc<T,TResult>> ToFSharpFuncOption<T,TResult>(this Func<T,TResult> self) { if (self == null) return FSharpOption<FSharpFunc<T,TResult>>.None; return FSharpOption<FSharpFunc<T,TResult>>.Some(self.ToFSharpFunc()); } /// <summary> /// 指定した非同期計算の .NET 非同期プログラミング モデル (APM: Asynchronous Programming Model) を実装するために使用できる 3 つの関数を作成します。 /// </summary> /// <typeparam name="TArg"></typeparam> /// <typeparam name="R"></typeparam> /// <param name="computation">従来の .NET 非同期プログラミング モデルに分割する非同期計算を生成する関数。</param> /// <returns>begin、end、および cancel をメンバーとする組。</returns> public static Tuple<FSharpFunc<Tuple<TArg,AsyncCallback,object>,IAsyncResult>,FSharpFunc<IAsyncResult,R>,FSharpFunc<IAsyncResult,Unit>> AsBeginEnd<TArg,R>(this Func<TArg,FSharpAsync<R>> computation) { return FSharpAsync.AsBeginEnd<TArg,R>(computation.ToFSharpFunc()); } /// <summary> /// イベントにハンドラーを追加することによって、CLI イベントの 1 回の呼び出しを待機する非同期計算を作成します。 /// 計算が完了するか取り消されると、イベントからハンドラーが削除されます。 /// </summary> /// <typeparam name="TDelegate"></typeparam> /// <typeparam name="R"></typeparam> /// <param name="event">1 回処理するイベント。</param> /// <param name="cancelAction">キャンセルが発行された場合に、キャンセルの代わりに実行する省略可能な関数。</param> /// <returns></returns> public static FSharpAsync<R> AwaitEvent<R>(this Func<Unit, Unit> cancelAction) { return FSharpAsync.AwaitEvent(new FSharpEvent<Func<R>, R>().Publish , cancelAction.ToFSharpFunc().ToFSharpOption()); } /// <summary> /// IAsyncResult で待機する非同期計算を作成します。 /// </summary> /// <param name="iar">待機対象の IAsyncResult</param> /// <param name="millisecondsTimeout">タイムアウト値 (ミリ秒)。指定しない場合は、System.Threading.Timeout.Infinite に対応する既定値の -1</param> /// <returns>指定されたタイムアウト内でハンドルが結果を示した場合、この計算は true を返します。</returns> public static FSharpAsync<bool> AwaitIAsyncResult(this IAsyncResult iar, int? millisecondsTimeout = null) { return FSharpAsync.AwaitIAsyncResult(iar, millisecondsTimeout.HasValue ? millisecondsTimeout.Value.ToFSharpOption() : null); } /// <summary> /// 指定された WaitHandle で待機する非同期計算を作成します。 /// </summary> /// <param name="waitHandle">通知可能な WaitHandle</param> /// <param name="millisecondsTimeout">タイムアウト値 (ミリ秒)。指定しない場合は、System.Threading.Timeout.Infinite に対応する既定値の -1</param> /// <returns>指定された WaitHandle で待機する非同期計算</returns> public static FSharpAsync<bool> AwaitWaitHandle(this WaitHandle waitHandle, int? millisecondsTimeout = null) { return FSharpAsync.AwaitWaitHandle(waitHandle, millisecondsTimeout.HasValue ? millisecondsTimeout.Value.ToFSharpOption() : null); } /// <summary> /// 特定の CancellationToken なしで開始した最新の非同期計算のセットに対してキャンセル状態を発生させます。 /// この時点以降に CancellationToken を指定せずに作成されるすべての非同期計算について、グローバル CancellationTokenSource を新しいグローバル トークン ソースに置き換えます。 /// </summary> public static void CancelDefaultToken() { FSharpAsync.CancelDefaultToken(); } /// <summary> /// 計算の実行を制御する CancellationToken を返す非同期計算を作成します。 /// </summary> public static FSharpAsync<CancellationToken> CancellationToken = FSharpAsync.CancellationToken; /// <summary> /// computation を実行する非同期計算を作成します。 /// この計算が正常に終了した場合は、戻り値を示す Choice1Of2 を返します。 /// この計算が完了する前に例外が発生した場合は、発生した例外を示す Choice2Of2 を返します。 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="computation">型 T を返す入力計算</param> /// <returns>型 T または例外の選択肢を返す計算</returns> public static FSharpAsync<FSharpChoice<T,Exception>> Catch<T>(this FSharpAsync<T> computation) { return FSharpAsync.Catch(computation); } /// <summary> /// 非同期計算を実行するための既定のキャンセル トークンを取得します。 /// </summary> /// <returns></returns> public static CancellationToken DefaultCancellationToken = FSharpAsync.DefaultCancellationToken; /// <summary> /// Begin/End のペアのアクションについて、CLI API で使用されるスタイルで非同期計算を作成します。 /// たとえば、Async.FromBeginEnd(ws.BeginGetWeather,ws.EndGetWeather) のように使用します。 /// 計算が実行されると、beginFunc が実行され、計算の継続を表すコールバックが渡されます。 /// コールバックが呼び出されると、全体的な結果は endFunc を使用してフェッチされます。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="begin"> CLI の非同期操作を開始する関数</param> /// <param name="end">CLI の非同期操作を完了する関数。</param> /// <param name="cancelaction">キャンセルが要求された場合に実行されるオプションの関数。</param> /// <returns>指定された Begin/End 関数をラップする非同期計算。</returns> public static FSharpAsync<R> FromBeginEnd<R>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, R> end, Func<Unit, Unit> cancelaction = null) { return FSharpAsync.FromBeginEnd(ToTupledFSharpFunc(begin) , ToFSharpFunc(end) , cancelaction.ToFSharpFuncOption()); } /// <summary> /// Begin/End のペアのアクションについて、CLI API で使用されるスタイルで非同期計算を作成します。 /// たとえば、Async.FromBeginEnd(ws.BeginGetWeather,ws.EndGetWeather) のように使用します。 /// 計算が実行されると、beginFunc が実行され、計算の継続を表すコールバックが渡されます。 /// コールバックが呼び出されると、全体的な結果は endFunc を使用してフェッチされます。 /// </summary> /// <param name="begin"> CLI の非同期操作を開始する関数</param> /// <param name="end">CLI の非同期操作を完了する関数。</param> /// <param name="cancelaction">キャンセルが要求された場合に実行されるオプションの関数。</param> /// <returns>指定された Begin/End 関数をラップする非同期計算。これはUnit(F#でいう"()")を返す非同期計算です。</returns> public static FSharpAsync<Unit> FromBeginEndUnit(Func<AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end, Func<Unit, Unit> cancelaction = null) { return FSharpAsync.FromBeginEnd(ToTupledFSharpFunc(begin) , FuncConvert.ToFSharpFunc<IAsyncResult, Unit>(iar => { end(iar); return (Unit)null; }) , cancelaction.ToFSharpFuncOption()); } /// <summary> /// Begin/End のペアのアクションについて、CLI API で使用されるスタイルで非同期計算を作成します。 /// 操作を 1 つの引数で修飾する場合は、このオーバーロードを使用する必要があります。 /// たとえば、Async.FromBeginEnd(ws.BeginGetWeather,ws.EndGetWeather) のように使用します。 /// 計算が実行されると、beginFunc が実行され、計算の継続を表すコールバックが渡されます。 /// コールバックが呼び出されると、全体的な結果は endFunc を使用してフェッチされます。 /// </summary> /// <typeparam name="TArg"></typeparam> /// <typeparam name="R"></typeparam> /// <param name="arg1">非同期計算に必要な第1引数</param> /// <param name="begin"> CLI の非同期操作を開始する関数</param> /// <param name="end">CLI の非同期操作を完了する関数。</param> /// <param name="cancelaction">キャンセルが要求された場合に実行されるオプションの関数。</param> /// <returns>指定された Begin/End 関数をラップする非同期計算。</returns> public static FSharpAsync<R> FromBeginEnd<TArg, R>(TArg arg, Func<TArg, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, R> end, Func<Unit, Unit> cancelaction = null) { return FSharpAsync.FromBeginEnd(arg , ToTupledFSharpFunc(begin) , ToFSharpFunc(end) , cancelaction.ToFSharpFuncOption()); } /// <summary> /// Begin/End のペアのアクションについて、CLI API で使用されるスタイルで非同期計算を作成します。 /// 操作を 1 つの引数で修飾する場合は、このオーバーロードを使用する必要があります。 /// たとえば、Async.FromBeginEnd(ws.BeginGetWeather,ws.EndGetWeather) のように使用します。 /// 計算が実行されると、beginFunc が実行され、計算の継続を表すコールバックが渡されます。 /// コールバックが呼び出されると、全体的な結果は endFunc を使用してフェッチされます。 /// </summary> /// <typeparam name="TArg"></typeparam> /// <param name="arg1">非同期計算に必要な第1引数</param> /// <param name="begin"> CLI の非同期操作を開始する関数</param> /// <param name="end">CLI の非同期操作を完了する関数。</param> /// <param name="cancelaction">キャンセルが要求された場合に実行されるオプションの関数。</param> /// <returns>指定された Begin/End 関数をラップする非同期計算。これはUnit(F#でいう"()")を返す非同期計算です。</returns> public static FSharpAsync<Unit> FromBeginEndUnit<TArg>(TArg arg, Func<TArg, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end, Func<Unit, Unit> cancelaction = null) { return FSharpAsync.FromBeginEnd(arg , ToTupledFSharpFunc(begin) , FuncConvert.ToFSharpFunc<IAsyncResult, Unit>(iar => { end(iar); return (Unit)null; }) , cancelaction.ToFSharpFuncOption()); } /// <summary> /// Begin/End のペアのアクションについて、CLI API で使用されるスタイルで非同期計算を作成します。 /// 操作を 2 つの引数で修飾する場合は、このオーバーロードを使用する必要があります。 /// たとえば、Async.FromBeginEnd(ws.BeginGetWeather,ws.EndGetWeather) のように使用します。 /// 計算が実行されると、beginFunc が実行され、計算の継続を表すコールバックが渡されます。 /// コールバックが呼び出されると、全体的な結果は endFunc を使用してフェッチされます。 /// </summary> /// <typeparam name="TArg1"></typeparam> /// <typeparam name="TArg2"></typeparam> /// <typeparam name="R"></typeparam> /// <param name="arg1">非同期計算に必要な第1引数</param> /// <param name="arg2">非同期計算に必要な第2引数</param> /// <param name="begin"> CLI の非同期操作を開始する関数</param> /// <param name="end">CLI の非同期操作を完了する関数。</param> /// <param name="cancelaction">キャンセルが要求された場合に実行されるオプションの関数。</param> /// <returns>指定された Begin/End 関数をラップする非同期計算。</returns> public static FSharpAsync<R> FromBeginEnd<TArg1, TArg2, R>(TArg1 arg1, TArg2 arg2, Func<TArg1, TArg2, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, R> end, Func<Unit, Unit> cancelaction = null) { return FSharpAsync.FromBeginEnd(arg1 , arg2 , ToTupledFSharpFunc(begin) , ToFSharpFunc(end) , cancelaction.ToFSharpFuncOption()); } /// <summary> /// Begin/End のペアのアクションについて、CLI API で使用されるスタイルで非同期計算を作成します。 /// 操作を 2 つの引数で修飾する場合は、このオーバーロードを使用する必要があります。 /// たとえば、Async.FromBeginEnd(ws.BeginGetWeather,ws.EndGetWeather) のように使用します。 /// 計算が実行されると、beginFunc が実行され、計算の継続を表すコールバックが渡されます。 /// コールバックが呼び出されると、全体的な結果は endFunc を使用してフェッチされます。 /// </summary> /// <typeparam name="TArg1"></typeparam> /// <typeparam name="TArg2"></typeparam> /// <param name="arg1">非同期計算に必要な第1引数</param> /// <param name="arg2">非同期計算に必要な第2引数</param> /// <param name="begin"> CLI の非同期操作を開始する関数</param> /// <param name="end">CLI の非同期操作を完了する関数。</param> /// <param name="cancelaction">キャンセルが要求された場合に実行されるオプションの関数。</param> /// <returns>指定された Begin/End 関数をラップする非同期計算。これはUnit(F#でいう"()")を返す非同期計算です。</returns> public static FSharpAsync<Unit> FromBeginEndUnit<TArg1, TArg2>(TArg1 arg1, TArg2 arg2, Func<TArg1, TArg2, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end, Func<Unit, Unit> cancelaction = null) { return FSharpAsync.FromBeginEnd(arg1 , arg2 , ToTupledFSharpFunc(begin) , FuncConvert.ToFSharpFunc<IAsyncResult, Unit>(iar => { end(iar); return (Unit)null; }) , cancelaction.ToFSharpFuncOption()); } /// <summary> /// Begin/End のペアのアクションについて、CLI API で使用されるスタイルで非同期計算を作成します。 /// 操作を 3 つの引数で修飾する場合は、このオーバーロードを使用する必要があります。 /// たとえば、Async.FromBeginEnd(ws.BeginGetWeather,ws.EndGetWeather) のように使用します。 /// 計算が実行されると、beginFunc が実行され、計算の継続を表すコールバックが渡されます。 /// コールバックが呼び出されると、全体的な結果は endFunc を使用してフェッチされます。 /// </summary> /// <typeparam name="TArg1"></typeparam> /// <typeparam name="TArg2"></typeparam> /// <typeparam name="TArg3"></typeparam> /// <typeparam name="R"></typeparam> /// <param name="arg1">非同期計算に必要な第1引数</param> /// <param name="arg2">非同期計算に必要な第2引数</param> /// <param name="arg3">非同期計算に必要な第3引数</param> /// <param name="begin"> CLI の非同期操作を開始する関数</param> /// <param name="end">CLI の非同期操作を完了する関数。</param> /// <param name="cancelaction">キャンセルが要求された場合に実行されるオプションの関数。</param> /// <returns>指定された Begin/End 関数をラップする非同期計算。</returns> public static FSharpAsync<R> FromBeginEnd<TArg1, TArg2, TArg3, R>(TArg1 arg1, TArg2 arg2, TArg3 arg3, Func<TArg1, TArg2, TArg3, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, R> end, Func<Unit, Unit> cancelaction = null) { return FSharpAsync.FromBeginEnd(arg1 , arg2 , arg3 , ToTupledFSharpFunc(begin) , ToFSharpFunc(end) , cancelaction.ToFSharpFuncOption()); } /// <summary> /// Begin/End のペアのアクションについて、CLI API で使用されるスタイルで非同期計算を作成します。 /// 操作を 3 つの引数で修飾する場合は、このオーバーロードを使用する必要があります。 /// たとえば、Async.FromBeginEnd(ws.BeginGetWeather,ws.EndGetWeather) のように使用します。 /// 計算が実行されると、beginFunc が実行され、計算の継続を表すコールバックが渡されます。 /// コールバックが呼び出されると、全体的な結果は endFunc を使用してフェッチされます。 /// </summary> /// <typeparam name="TArg1"></typeparam> /// <typeparam name="TArg2"></typeparam> /// <typeparam name="TArg3"></typeparam> /// <param name="arg1">非同期計算に必要な第1引数</param> /// <param name="arg2">非同期計算に必要な第2引数</param> /// <param name="arg3">非同期計算に必要な第3引数</param> /// <param name="begin"> CLI の非同期操作を開始する関数</param> /// <param name="end">CLI の非同期操作を完了する関数。</param> /// <param name="cancelaction">キャンセルが要求された場合に実行されるオプションの関数。</param> /// <returns>指定された Begin/End 関数をラップする非同期計算。これはUnit(F#でいう"()")を返す非同期計算です。</returns> public static FSharpAsync<Unit> FromBeginEndUnit<TArg1, TArg2, TArg3>(TArg1 arg1, TArg2 arg2, TArg3 arg3, Func<TArg1, TArg2, TArg3, AsyncCallback, object, IAsyncResult> begin, Action<IAsyncResult> end, Func<Unit, Unit> cancelaction = null) { return FSharpAsync.FromBeginEnd(arg1 , arg2 , arg3 , ToTupledFSharpFunc(begin) , FuncConvert.ToFSharpFunc<IAsyncResult, Unit>(iar => { end(iar); return (Unit)null; }) , cancelaction.ToFSharpFuncOption()); } /// <summary> /// 現在の成功、例外、およびキャンセルの継続をキャプチャする非同期計算を作成します。 /// コールバックは、最終的に、指定された継続のうちの 1 つだけを呼び出す必要があります。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="continuation">現在の成功、の継続を受け入れる関数。</param> /// <param name="exceptionContinuation">現在の例外の継続を受け入れる関数。</param> /// <param name="cancellationContinuation">現在のキャンセルの継続を受け入れる関数。</param> /// <returns> /// コールバックに現在の継続を与える非同期計算。 /// </returns> public static FSharpAsync<R> FromContinuations<R>(this Func<R, Unit> continuation, Func<Exception, Unit> exceptionContinuation, Func<OperationCanceledException, Unit> cancellationContinuation) { var callback = CreateContinuationCallback(continuation, exceptionContinuation, cancellationContinuation); return FSharpAsync.FromContinuations<R>(callback.ToTupledFSharpFunc()); } /// <summary> /// 現在の成功、例外、およびキャンセルの継続を受け入れる関数を生成します。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="continuation">現在の成功、の継続を受け入れる関数。</param> /// <param name="exceptionContinuation">現在の例外の継続を受け入れる関数。</param> /// <param name="cancellationContinuation">現在のキャンセルの継続を受け入れる関数。</param> /// <returns>現在の成功、例外、およびキャンセルの継続を受け入れる関数。</returns> private static Func<FSharpFunc<R, Unit>, FSharpFunc<Exception, Unit>, FSharpFunc<OperationCanceledException, Unit>, Unit> CreateContinuationCallback<R>(this Func<R, Unit> continuation, Func<Exception, Unit> exceptionContinuation, Func<OperationCanceledException, Unit> cancellationContinuation) { Func<FSharpFunc<R, Unit>, FSharpFunc<Exception, Unit>, FSharpFunc<OperationCanceledException, Unit>, Unit> callback = (x, y, z) => (Unit)null; return (arg1, arg2, arg3) => { return callback(continuation.ToFSharpFunc(), exceptionContinuation.ToFSharpFunc(),cancellationContinuation.ToFSharpFunc()); }; } /// <summary> /// 非同期計算を実行し、現在のオペレーティング システムのスレッドですぐに開始します。 /// 操作の完了時に、3 つの継続のうちの 1 つを呼び出します。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="computation">実行する非同期計算</param> /// <param name="continuation">成功時に呼び出す継続</param> /// <param name="exceptionContinuation">例外時に呼び出す継続</param> /// <param name="cancellationContinuation">取り消し時に呼び出す継続</param> /// <param name="cancellationToken"></param> public static void StartWithContinuations<R>(this FSharpAsync<R> computation, Func<R,Unit> continuation, Func<Exception,Unit> exceptionContinuation , Func<OperationCanceledException,Unit> cancellationContinuation, CancellationToken? cancellationToken = null) { FSharpAsync.StartWithContinuations(computation , continuation.ToFSharpFunc() , exceptionContinuation.ToFSharpFunc() , cancellationContinuation.ToFSharpFunc() , cancellationToken.HasValue ? cancellationToken.Value.ToFSharpOption() : null); } /// <summary> /// 指定された計算を実行し、その結果を無視する非同期計算を作成します。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="computation">入力計算</param> /// <returns>入力計算と同じ計算。ただし、結果は無視されます</returns> public static FSharpAsync<Unit> Ignore<R>(this FSharpAsync<R> computation) { return FSharpAsync.Ignore(computation); } /// <summary> /// 非同期ワークフロー内で使用するための、スコープが設定された連携可能なキャンセル ハンドラーを生成します。 /// </summary> /// <param name="interruption"></param> /// <returns></returns> public static FSharpAsync<IDisposable> OnCancel(this Func<Unit, Unit> interruption) { return FSharpAsync.OnCancel(interruption.ToFSharpFunc()); } /// <summary> /// 指定されたすべての並列計算を実行する非同期計算を作成します。 /// 最初にそれぞれを作業項目としてキューに入れ、fork/join パターンを使用します。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="computations"></param> /// <returns></returns> public static FSharpAsync<R[]> AsFSharpAsyncParallel<R>(this IEnumerable<FSharpAsync<R>> computations) { return FSharpAsync.Parallel(computations); } /// <summary> /// 非同期計算を実行し、その結果を待機します。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="computation">実行する計算</param> /// <timeout> /// System.TimeoutException が発生する前に、計算の結果を待機するミリ秒単位の時間。 /// タイムアウトを指定しない場合は、System.Threading.Timeout.Infinite に対応する既定値の -1 が使用されます。 /// </timeout> /// <cancellationToken> /// 計算に関連付けるキャンセル トークン。指定しない場合は、既定のキャンセル トークンが使用されます。 /// </cancellationToken> /// <returns>計算結果</returns> public static R RunSynchronously<R>(this FSharpAsync<R> computation,int? timeout = null, CancellationToken? cancellationToken = null) { return FSharpAsync.RunSynchronously(computation , timeout.HasValue ? timeout.Value.ToFSharpOption() : null , cancellationToken.HasValue ? cancellationToken.Value.ToFSharpOption() : null); } /// <summary> /// 指定した時間スリープする非同期計算を作成します。 /// これは、System.Threading.Timer オブジェクトを使用してスケジュールされます。 /// この操作では、待機中にオペレーティング システム スレッドがブロックされることはありません。 /// </summary> /// <param name="millisecondsDueTime">ミリ秒数単位のスリープ時間。</param> public static FSharpAsync<Unit> Sleep(int millisecondsDueTime) { return FSharpAsync.Sleep(millisecondsDueTime); } /// <summary> /// スレッドプールの非同期計算を開始します。結果を待機しません。 /// </summary> /// <cancellationToken> /// 計算に関連付けるキャンセル トークン。指定しない場合は、既定のキャンセル トークンが使用されます。 /// </cancellationToken> /// <param name="computation">実行する計算</param> public static void Start(this FSharpAsync<Unit> computation, CancellationToken? cancellationToken = null) { FSharpAsync.Start(computation , cancellationToken.HasValue ? cancellationToken.Value.ToFSharpOption() : null); } /// <summary> /// 非同期ワークフロー内で子計算を開始します。これにより、複数の非同期計算を同時に実行できます。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="computation">子計算</param> /// <param name="millisecondsTimeout"> /// タイムアウト値 (ミリ秒)。指定しない場合は、System.Threading.Timeout.Infinite に対応する既定値の -1 が使用されます。 /// </param> /// <returns>入力計算が完了するまで待機する新しい計算。</returns> public static FSharpAsync<FSharpAsync<R>> StartChild<R>(this FSharpAsync<R> computation, int? millisecondsTimeout = null) { return FSharpAsync.StartChild(computation, millisecondsTimeout.HasValue ? millisecondsTimeout.Value.ToFSharpOption() : null); } /// <summary> /// 非同期計算を実行し、現在のオペレーティング システムのスレッドですぐに開始します。 /// </summary> /// <param name="computation">実行する非同期計算</param> /// <param name="cancellationToken">計算に関連付ける CancellationToken。このパラメーターを指定しない場合は、既定値が使用されます。</param> public static void StartImmediate(this FSharpAsync<Unit> computation, CancellationToken? cancellationToken = null) { FSharpAsync.StartImmediate(computation, cancellationToken.HasValue ? cancellationToken.Value.ToFSharpOption() : null); } /// <summary> /// 非同期計算を実行し、現在のオペレーティング システムのスレッドですぐに開始します。 /// 操作の完了時に、3 つの継続のうちの 1 つを呼び出します。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="computation">実行する非同期計算</param> /// <param name="continuation">成功時に呼び出す継続</param> /// <param name="exceptionContinuation">例外時に呼び出す継続</param> /// <param name="cancellationContinuation">取り消し時に呼び出す継続</param> /// <param name="cancellationToken"></param> public static void StartWithContinuations<R>(this FSharpAsync<R> computation, Func<R,Unit> continuation, Func<Exception,Unit> exceptionContinuation , Func<OperationCanceledException,Unit> cancellationContinuation, CancellationToken? cancellationToken = null) { FSharpAsync.StartWithContinuations(computation , continuation.ToFSharpFunc() , exceptionContinuation.ToFSharpFunc() , cancellationContinuation.ToFSharpFunc() , cancellationToken.HasValue ? cancellationToken.Value.ToFSharpOption() : null); } /// <summary> /// syncContext.Post を使用して継続を実行する非同期計算を作成します。syncContext が null の場合、この非同期計算は SwitchToThreadPool() と等しくなります。 /// </summary> /// <param name="syncContext">ポストされた計算を受け入れる同期コンテキスト。</param> /// <returns>syncContext コンテキストの実行に使用する非同期計算。</returns> public static FSharpAsync<Unit> SwitchToContext(this SynchronizationContext syncContext) { return FSharpAsync.SwitchToContext(syncContext); } /// <summary> /// 新しいスレッドを作成し、そのスレッドで継続を実行する非同期計算を作成します。 /// </summary> /// <returns>新しいスレッドで実行する計算。</returns> public static FSharpAsync<Unit> SwitchToNewThread() { return FSharpAsync.SwitchToNewThread(); } /// <summary> /// 継続を実行する作業項目をキューに配置する非同期計算を作成します。 /// </summary> /// <return>スレッド プール内に新しい作業項目を生成する計算。</return> public static FSharpAsync<Unit> SwitchToThreadPool() { return FSharpAsync.SwitchToThreadPool(); } /// <summary> /// computation を実行する非同期計算を作成します。 /// この計算が完了前に取り消された場合は、compensation の実行によって生成された計算が実行されます。 /// </summary> /// <typeparam name="R"></typeparam> /// <param name="computation">入力する非同期計算。</param> /// <param name="compensation">計算が取り消された場合に実行する関数。</param> /// <returns>入力計算が取り消された場合に補正を実行する非同期計算。</returns> public static FSharpAsync<R> TryCancelled<R>(this FSharpAsync<R> computation, Func<OperationCanceledException, Unit> compensation) { return FSharpAsync.TryCancelled(computation, compensation.ToFSharpFunc()); } } }
FSharpAsyncBuilderWrapper
F#で非同期計算のワークフロー(computation expression)を作成する async 演算子の型のラッパーです。
FSharpAsyncBuilderWrapper.cs
using System; using System.Collections.Generic; using Microsoft.FSharp.Core; using Microsoft.FSharp.Control; namespace ClassLibrary1 { /// <summary> /// F#で非同期計算ワークフロー(computation expression)を作成するasync 演算子の型のラッパーです。 /// </summary> public static class FSharpAsyncBuilderWrapper { public static FSharpAsyncBuilder async = Microsoft.FSharp.Core.ExtraTopLevelOperators.DefaultAsyncBuilder; /// <summary> /// computation を実行する非同期計算を作成します。 /// この computation が結果 T を生成すると、binder res が実行されます。 /// </summary> /// <typeparam name="TArg1"></typeparam> /// <typeparam name="T"></typeparam> /// <param name="computation">バインドされていない結果を生成する計算。</param> /// <param name="binder">computation の結果をバインドする関数。</param> /// <returns>computation の結果に対してモナディック バインドを実行する非同期計算。</returns> /// <remarks> /// 計算の実行時には、キャンセルのチェックが実行されます。 /// F#では、async { ... } 計算式構文で let! を使用する部分です。 /// </remarks> public static FSharpAsync<T> Bind<TArg1, T>(this FSharpAsync<TArg1> computation, Func<TArg1, T> binder) { return async.Bind(computation, FSharpAsyncWrapper.ToFSharpFunc<TArg1, FSharpAsync<T>>(r => async.Return(binder(r)))); } public static FSharpAsync<V> SelectMany<T, U, V>(this FSharpAsync<T> p, Func<T, FSharpAsync<U>> selector, Func<T, U, V> projector) { return async.Bind(p, FuncConvert.ToFSharpFunc(new System.Converter<T, FSharpAsync<V>>(r1 => async.Bind(selector(r1), FuncConvert.ToFSharpFunc<U, FSharpAsync<V>>(r2 => async.Return(projector(r1, r2))))))); } /// <summary> /// 最初に computation1 を実行し、次に computation2 を実行して computation2 の結果を返す、非同期計算を作成します。 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="computation1">連続する計算の最初の部分。</param> /// <param name="computation2">連続する計算の 2 番目の部分。</param> /// <returns>両方の計算を順次実行する非同期計算。</returns> /// <remarks> /// 計算の実行時には、キャンセルのチェックが実行されます。 /// F#ではasync { ... } 計算式構文で式を連続して使用される部分です。 /// </remarks> public static FSharpAsync<T> Combine<T>(this FSharpAsync<Unit> computation1, FSharpAsync<T> computation2) { return async.Combine(computation1, computation2); } /// <summary> /// generator を実行する非同期計算を作成します。 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="generator">実行する関数。</param> /// <returns>generator を実行する非同期計算。</returns> /// <remarks>計算の実行時には、キャンセルのチェックが実行されます。</remarks> public static FSharpAsync<T> Delay<T>(this Func<Unit, FSharpAsync<T>> generator) { return async.Delay(generator.ToFSharpFunc()); } /// <summary> /// シーケンス IEnumerable{T} を列挙し、各要素に対して body を実行する非同期計算を作成します。 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="sequence">列挙するシーケンス。</param> /// <param name="body">シーケンスから項目を取得し、非同期計算を作成する関数。for 式の本体と見なすことができます。</param> /// <returns>シーケンスを列挙し、各要素に対して body を実行する非同期計算。</returns> /// <remarks> /// ループが反復処理されるたびに、キャンセルのチェックが実行されます。 /// F#では、async { ... } 計算式構文で for を使用する部分です。 /// </remarks> public static FSharpAsync<Unit> For<T>(this IEnumerable<T> sequence, Func<T, FSharpAsync<Unit>> body) { return async.For(sequence, body.ToFSharpFunc()); } /// <summary> /// 結果 value を返す非同期計算を作成します。 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="value">計算から返される値。</param> /// <returns> /// 計算の実行時には、キャンセルのチェックが実行されます。 /// F#では、async { ... } 計算式構文で return を使用する部分です。 /// </returns> public static FSharpAsync<T> Return<T>(this T value) { return async.Return(value); } /// <summary> /// 入力計算に処理を代行させます。 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="computation">入力計算。</param> /// <returns>入力計算。</returns> /// <remarks>F#では、async { ... } 計算式構文で return! を使用する部分です。</remarks> public static FSharpAsync<T> ReturnFrom<T>(this FSharpAsync<T> computation) { return async.ReturnFrom(computation); } /// <summary> /// computation を実行する非同期計算を作成します。 /// computation が完了した後、computation が正常に終了したのか例外で終了したのかにより、 /// 処理の compensation が実行されます。 /// compensation 自体で例外が発生した場合、元の例外は破棄されて、新しい例外が計算全体の結果になります。 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="computation">入力計算。</param> /// <param name="compensation">computation の完了またはキャンセルを含む例外の発生後に実行するアクション。</param> /// <returns>入力計算の後で、または例外が発生したときに計算と補正を実行する非同期計算。</returns> /// <remarks> /// 計算の実行時には、キャンセルのチェックが実行されます。 /// F#では、async { ... } 計算式構文で try/finally を使用する部分です。 /// </remarks> public static FSharpAsync<T> TryFinally<T>(this FSharpAsync<T> computation, Func<Unit, Unit> compensation) { return async.TryFinally(computation, compensation.ToFSharpFunc()); } /// <summary> /// computation を実行してその結果を返す非同期計算を作成します。 /// 例外が発生した場合は、catchHandler(exn) が呼び出され、その結果の計算が代わりに実行されます。 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="computation">入力計算。</param> /// <param name="catchHandler">computation が例外をスローしたときに実行される関数。</param> /// <returns>例外がスローされた場合に、computation を実行し、catchHandler を呼び出す非同期計算。</returns> /// <remarks> /// 計算の実行時には、キャンセルのチェックが実行されます。 /// F#では、async { ... } 計算式構文で try/with を使用する部分です。 /// </remarks> public static FSharpAsync<T> TryWith<T>(this FSharpAsync<T> computation, Func<Exception, FSharpAsync<T>> catchHandler) { return async.TryWith(computation, catchHandler.ToFSharpFunc()); } /// <summary> /// binder(resource) を実行する非同期計算を作成します。 /// この計算で結果が生成されるか、非同期計算が例外またはキャンセルによって終了すると、resource.Dispose() の処理が実行されます。 /// </summary> /// <typeparam name="T"></typeparam> /// <typeparam name="TResult"></typeparam> /// <param name="resource">使用および破棄するリソース。</param> /// <param name="binder">リソースを使用し、非同期計算を返す関数。</param> /// <returns>resource をバインドし、最終的に破棄する非同期計算。</returns> /// <remarks> /// 計算の実行時には、キャンセルのチェックが実行されます。 /// F#では、async { ... } 計算式構文で use および use! を使用する部分です。 /// </remarks> public static FSharpAsync<TResult> Using<T, TResult>(T resource, Func<T, FSharpAsync<TResult>> binder) where T : IDisposable { return async.Using(resource, binder.ToFSharpFunc()); } /// <summary> /// guard() が false になるまで繰り返し computation を実行する非同期計算を作成します。 /// </summary> /// <param name="guard">computation の実行を停止するタイミングを判断する関数。</param> /// <param name="computation">実行する関数。while 式の本体と同じです。</param> /// <returns>実行すると while ループと同じように動作する非同期計算。</returns> /// <remarks> /// 計算の実行時には、常にキャンセルのチェックが実行されます。 /// F#では、async { ... } 計算式構文で while を使用する部分です。</remarks> public static FSharpAsync<Unit> While(this Func<Unit, bool> guard, FSharpAsync<Unit> computation) { return async.While(guard.ToFSharpFunc(), computation); } /// <summary> /// Unit (F#で言うところの"()") だけを返す非同期計算を作成します。 /// </summary> /// <returns>() を返す非同期計算。</returns> /// <remarks> /// 計算の実行時には、キャンセルのチェックが実行されます。 /// F#では、async { ... } 計算式構文で空の else 分岐を使用する部分です。 /// </remarks> public static FSharpAsync<Unit> Zero() { return async.Zero(); } } }
Async
とりあえず的に作ってみたAsyncなヘルパークラスです
いろいろ拡張するとより使いやすいものができると思います。
Async{T, TResult}.cs
using System; using Microsoft.FSharp.Control; using System.Diagnostics.Contracts; namespace ClassLibrary1 { public class Async<T, TResult> { private Async() { } public static Async<T, TResult> Create() { return new Async<T, TResult>(); } private Func<T, TResult> _dlgt = x => default(TResult); private Action<IAsyncResult> _cbdlgt = x => { }; public FSharpAsync<TResult> Build(T x, Func<T, TResult> dlgt) { if (dlgt == null) new ArgumentNullException("dlgt"); _dlgt = dlgt; return FSharpAsyncWrapper.FromBeginEnd(x, Begin, End); } public FSharpAsync<TResult> Build(T x, Func<T, TResult> dlgt, Action<IAsyncResult> cbdlgt) { Contract.Requires(dlgt != null); _dlgt = dlgt; _cbdlgt = cbdlgt; return FSharpAsyncWrapper.FromBeginEnd(x, BeginCallBack, End); } private IAsyncResult Begin(T x, AsyncCallback cb, object obj) { return _dlgt.BeginInvoke(x, cb, obj); } private IAsyncResult BeginCallBack(T x, AsyncCallback cb, object obj) { return _dlgt.BeginInvoke(x, new AsyncCallback(_cbdlgt), obj); } private TResult End(IAsyncResult iar) { return (TResult)_dlgt.EndInvoke(iar); } } }
並列処理、非同期計算のサンプルプログラム
まずはF#で、ParallelMapとParallelMapAsyncなサンプル。
シンプルで且つふつくしく並列・非同期計算を表現できています。
Program.fs
module ConsoleApplication1 = open System open System.Linq open System.Net open System.Text.RegularExpressions let urls = [@"http://blogs.msdn.com/b/dsyme/"; @"http://groups.google.co.jp/group/fsug-jp"; @"http://www.msn.com/"; @"http://news.google.com/"; @"http://www.amazon.co.jp/";] let downloadAndExtractLinks url = let download url = let webclient = new System.Net.WebClient() webclient.DownloadString(url : string) let extractLinks html = Regex.Matches(html, @"http://\S+") let links = (url |> download |> extractLinks) url, links.Count let pmap (f:'a ->'b) (pe:seq<'a>) = ParallelEnumerable.Select(pe.AsParallel (), Func<_,_>(f)).ToArray () let pmapAsync f list = seq { for a in list -> async { return f a} } |> Async.Parallel |> Async.Ignore |> Async.RunSynchronously let testSync = fun _ -> let r = List.map downloadAndExtractLinks urls printfn "%A" r let testParallel = fun _ -> let r = pmap downloadAndExtractLinks urls printfn "%A" r let testParallelAsync = fun _ -> let f s = let a = downloadAndExtractLinks s printf "%A; " a pmapAsync f urls let time msg f = let stopwatch = System.Diagnostics.Stopwatch.StartNew() f() stopwatch.Stop() printfn "%s: (%f ms) " msg stopwatch.Elapsed.TotalMilliseconds let main = (fun _ -> printfn "Start..." time "Sync" testSync |> Console.WriteLine |> ignore time "Parallel" testParallel |> Console.WriteLine |> ignore time "ParallelAsync" (testParallelAsync >> Console.WriteLine) printfn "Done." |> Console.ReadLine |> ignore) main ()
実行結果
Start... [("http://blogs.msdn.com/b/dsyme/", 41); ("http://groups.google.co.jp/group/fsug-jp", 34); ("http://www.msn.com/", 266); ("http://news.google.com/", 295); ("http://www.amazon.co.jp/", 102)] Sync: (5779.072800 ms) [|("http://groups.google.co.jp/group/fsug-jp", 34); ("http://www.msn.com/", 266); ("http://news.google.com/", 295); ("http://www.amazon.co.jp/", 97); ("http://blogs.msdn.com/b/dsyme/", 41)|] Parallel: (2942.730000 ms) ("http://groups.google.co.jp/group/fsug-jp", 34); ("http://blogs.msdn.com/b/dsym e/", 41); ("http://news.google.com/", 295); ("http://www.msn.com/", 266); ("http ://www.amazon.co.jp/", 103); ParallelAsync: (2185.525900 ms) Done.
続きまして、C#でもParallelMapとParallelMapAsyncなサンプル。
using System; using System.Net; using System.Text.RegularExpressions; using System.Collections.Generic; using System.Linq; using Microsoft.FSharp.Core; using System.Diagnostics.Contracts; using ClassLibrary1; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { [STAThread] public static void Main() { var urls = new[] {@"http://blogs.msdn.com/b/dsyme/", @"http://groups.google.co.jp/group/fsug-jp", @"http://www.msn.com/", @"http://news.google.com/", @"http://www.amazon.co.jp/"}; Console.WriteLine("Start..."); WriteTime("Sync", () => Console.WriteLine(TestSyncronous(urls).GetString())); Console.WriteLine(); WriteTime("Parallel", () => Console.WriteLine(TestParallel(urls).GetString())); Console.WriteLine(); WriteTime("ParallelAsync", () => { TestParallelAsync(urls); Console.WriteLine(); }); Console.WriteLine("Done."); Console.ReadLine(); } private static string Download(string url) { var webclient = new WebClient(); return webclient.DownloadString(url); } private static MatchCollection ExtractLinks(string html) { return Regex.Matches(html, @"http://\S+"); } private static Tuple<string, int> DownloadAndExtractLink(string url) { var links = ExtractLinks(Download(url)); return new Tuple<string, int>(url, links.Count); } private static IEnumerable<Tuple<string, int>> TestSyncronous(IEnumerable<string> urls) { Contract.Requires(urls != null); return urls.Select(x => DownloadAndExtractLink(x)).ToList(); } private static IEnumerable<Tuple<string, int>> TestParallel(IEnumerable<string> urls) { var option = new ParallelOptions() { MaxDegreeOfParallelism = 4 }; return urls.ParallelMap(x => DownloadAndExtractLink(x)); } private static void TestParallelAsync(IEnumerable<string> urls) { Contract.Requires(urls != null); var count = urls.Count(); var option = new ParallelOptions() { MaxDegreeOfParallelism = 4 }; urls.ParallelMapAsync<string, Unit>((x, s, i) => { var t = DownloadAndExtractLink(x); Console.Write("(\"{0}\", {1})", t.Item1, t.Item2); if (i != count - 1) Console.Write("; "); return (Unit)null; }); } private static void WriteTime(string msg, Action action) { var stopwatch = action.Benchmark(); Console.Write(" {0}: ({1} ms)", msg, stopwatch.Elapsed.TotalMilliseconds); Console.WriteLine(); } } }
FSharpAsync.Parallelのラッパーを利用して並列・非同期計算をすることもできますが、
これをすると、FSharpAsync用にコンバートする際のオーバーヘッドが大きすぎるようで、
処理速度的な意味で並列・非同期計算の恩恵をほとんど受けられないっぽいので、C#で最適に使えるParallelMapを用意してみました。
ParallelExtentions.cs
using System; using System.Linq; using System.Collections.Generic; using Microsoft.FSharp.Core; using Microsoft.FSharp.Control; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace ClassLibrary1 { public static class ParallelExtentions { public static IEnumerable<TResult> ParallelMap<TArg, TResult>(this IEnumerable<TArg> list, Func<TArg, TResult> func, ParallelOptions option = null) { var result = new List<TResult>(); ParallelForEach<TArg>(list, x => { var val = func(x); lock (result) { result.Add(val); } }, option); return result; } public static IEnumerable<TResult> ParallelMap<TArg, TResult>(this IEnumerable<TArg> list, Func<TArg, ParallelLoopState, TResult> func, ParallelOptions option = null) { var result = new List<TResult>(); ParallelForEach<TArg>(list, (x, s) => { var val = func(x, s); lock (result) { result.Add(val); } }, option); return result; } public static IEnumerable<TResult> ParallelMap<TArg, TResult>(this IEnumerable<TArg> list, Func<TArg, ParallelLoopState, long, TResult> func, ParallelOptions option = null) { Contract.Requires(list != null); TResult[] result = new TResult[list.Count()]; ParallelForEach<TArg>(list, (x, s, i) => { var val = func(x, s, i); result[i] = val; }, option); return result; } public static void ParallelMapAsync<TArg, TResult>(this IEnumerable<TArg> list, Func<TArg, Unit> func, ParallelOptions option = null) { var result = new List<FSharpAsync<Unit>>(); ParallelForEach<TArg>(list, x => { var val = func(x).Return(); lock (result) { result.Add(val); } }, option); result.ForEach(x => x.Start()); } public static void ParallelMapAsync<TArg, TResult>(this IEnumerable<TArg> list, Func<TArg, ParallelLoopState, Unit> func, ParallelOptions option = null) { var result = new List<FSharpAsync<Unit>>(); ParallelForEach<TArg>(list, (x, s) => { var val = func(x, s).Return(); lock (result) { result.Add(val); } }, option); result.ForEach(x => x.Start()); } public static void ParallelMapAsync<TArg, TResult>(this IEnumerable<TArg> list, Func<TArg, ParallelLoopState, long, Unit> func, ParallelOptions option = null) { Contract.Requires(list != null); var result = new FSharpAsync<Unit>[list.Count()]; ParallelForEach<TArg>(list, (x, s, i) => { var val = func(x, s, i).Return(); result[i] = val; }, option); result.ToList().ForEach(x => x.Start()); } private static ParallelLoopResult ParallelForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body, ParallelOptions option = null) { if (option == null) return Parallel.ForEach(source, body); return Parallel.ForEach(source, option, body); } private static ParallelLoopResult ParallelForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body, ParallelOptions option = null) { if (option == null) return Parallel.ForEach(source, body); return Parallel.ForEach(source, option, body); } private static ParallelLoopResult ParallelForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body, ParallelOptions option = null) { if (option == null) return Parallel.ForEach(source, body); return Parallel.ForEach(source, option, body); } } }
おまけ
using System; using System.Collections.Generic; using System.Linq; using System.Diagnostics; using System.Diagnostics.Contracts; namespace ClassLibrary1 { public static class With { public static Stopwatch Benchmark(Action action) { var stopwatch = Stopwatch.StartNew(); action(); stopwatch.Stop(); return stopwatch; } public static Stopwatch Benchmark<TArg>(Action<TArg> action, TArg arg) { var stopwatch = Stopwatch.StartNew(); action(arg); stopwatch.Stop(); return stopwatch; } public static void Iterations(int n, Action action) { for (int count = 0; count < n; count++) action(); } public static void Iterations<TArg>(int n, Action<TArg> action,TArg arg) { for (int count = 0; count < n; count++) action(arg); } } public static class Extensions { public static Stopwatch Benchmark(this Action action) { return With.Benchmark(action); } public static Stopwatch Benchmark<TArg>(this Action<TArg> action, TArg arg) { return With.Benchmark(action, arg); } public static Action Iterations(this Action action, int n) { return () => With.Iterations(n, action); } public static Action Iterations<TArg>(this Action<TArg> action, int n, TArg arg) { return () => With.Iterations(n, action, arg); } public static string GetString<T1, T2>(this IEnumerable<Tuple<T1, T2>> tuplelist) { Contract.Requires(tuplelist != null); string str = "[|"; foreach (var t in tuplelist.Select((x, i) => new { value = x, index = i })) { str += t.value.ToString(); if (t.index != tuplelist.Count() - 1) str += "; "; } return str += "|]"; } } }
実行結果
Start... [|(http://blogs.msdn.com/b/dsyme/, 41); (http://groups.google.co.jp/group/fsug-j p, 34); (http://www.msn.com/, 266); (http://news.google.com/, 295); (http://www. amazon.co.jp/, 92)|] Sync: (4905.4956 ms) [|(http://www.msn.com/, 266); (http://news.google.com/, 295); (http://blogs.msdn .com/b/dsyme/, 41); (http://groups.google.co.jp/group/fsug-jp, 34); (http://www. amazon.co.jp/, 99)|] Parallel: (1857.4171 ms) ("http://www.msn.com/", 266); ("http://news.google.com/", 295); ("http://groups. google.co.jp/group/fsug-jp", 34); ("http://blogs.msdn.com/b/dsyme/", 41); ("http ://www.amazon.co.jp/", 97) ParallelAsync: (1587.4767 ms) Done.
わたしが書いたF#でのサンプルプログラムがよろしくないせいなのか、C#版の方が高速に動作しているようでした。
もしかしてF#コンパラ的に残念なことになっていたり? まぁ、この例では通信状況にもよりますし、気になるほどではありませんね。
まとめ
いろいろいじったおかげで、なんだか結構わかったような気がします。FSharpAsyncのラッパーをC#で準備しておくと、いろいろな局面で便利であり、
ある程度の意味があることもわかりました。ただ、並列や非同期処理についてFSharpAsyncラッパーに全て頼ることは、
当然のことながら結構大きなオーバーヘッドが生じるので「うれしくない」という点については理解しておく必要がある。
また、「並列や非同期についてC#で本気出したい!」という場合は、今回作ったParallelExtentionsのような感じで
ラッパーとC#特有の実装をそれぞれうまく組み合わせて併用すると、ウマーな感じです。
まぁ、並列や非同期について.NETで本気を出したいのであれば、C#やVB.NETよりもF#を使ったほうが様々な面で有利なのは間違いないでしょう。
というわけで、総評するとF#かわいいよF#ということになります。
C#同様、みなさんかわいがってあげましょう(・ω・)