Bug Catharsis このページをアンテナに追加 RSSフィード Twitter

ようこそ

C#やんごとねぇ、F#のFは「ふつくしい」のF日記
基本.NETが好きだけど、いろいろと手広くやっている睡眠不足なプログラマのチラ裏です。
この日記のはてなブックマーク数
2007 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2008 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2009 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2010 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 11 | 12 |
2011 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2012 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 12 |
2013 | 01 | 03 | 04 | 05 |

2012-04-16

Retry Monad for Transient Fault Handling (Topaz + FSharpx)

f:id:zecl:20120416202627p:image


4月14日札幌で行われた第69回CLR/H勉強会にて、「Retry Monad for Transient Fault Handling - F#Windows Azure と私 -」と題して、ライトニングトークで発表しました。


Microsoft Enterprise Library 5.0 Integration Pack for Windows Azure(EL5 for Azure)のTopaz および FSharpx を利用してモナド作りました。Topazを利用する理由は、再利用可能な再試行戦略およびWindows Azure向けの検出戦略組み込み済みであり、それをそのまま利用したいからです。EL5 for AzureOSSなので、どのような実装がなされているか実際に確認することができるので、すべてをF#で書き直すこともできますが、それでは車輪の再発明になってしまいます。Retry Monad for Transient Fault Handling は、一時的障害が発生するかもしれない計算について、それぞれ異なるRetryPolicyを適用しながら再試行処理を行います。一時的な障害に対するリトライ処理をひとつ計算として包括的に扱うことができるモナド実装です。このRetryモナド計算結果は、Choice<’T1,’T2>型で得ることができ、これによりFSharpxで定義済みの Eitherモナドで扱うこともできます



Retry Monad for Transient Fault Handling


namespace Monad.Retry 
open System

[<AutoOpen>]
module Retry =
  // #r "Microsoft.Practices.TransientFaultHandling.Core"
  // #r "FSharpx.Core"
  open Microsoft.Practices.TransientFaultHandling
  open FSharpx
 
  [<Sealed>]
  type TransientErrorCatchAllStrategy () =
    interface ITransientErrorDetectionStrategy with
      member this.IsTransient (ex : exn)  = true

  [<Sealed>]
  type TransientErrorIgnoreStrategy () =
    interface ITransientErrorDetectionStrategy with
      member this.IsTransient (ex : exn)  = false

  let defaultRetryStrategyName = "DefaultRetry"
  let defaultRetryCount = 3
  let defaultMinBackoff = TimeSpan.FromSeconds(3.0)
  let defaultMaxBackoff = TimeSpan.FromSeconds(90.0)
  let defaultDeltaBackoff = TimeSpan.FromMilliseconds(30.0)

  let (<+) (rp:RetryPolicy<'TResultStrategy>) retrying = rp.Retrying |> Event.add(retrying)

  type RetryPolicies =
    static member NoRetry() = new RetryPolicy<TransientErrorIgnoreStrategy>(0, TimeSpan.Zero)
    static member Retry<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryCount : int , retryInterval : TimeSpan) : RetryPolicy<'TTransientErrorCatchStrategy> =
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryCount, retryInterval)
    static member Retry<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryCount : int , initialInterval : TimeSpan, increment : TimeSpan) : RetryPolicy<'TTransientErrorCatchStrategy> =
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryCount, initialInterval, increment)
    static member Retry<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryStrategy : RetryStrategy) : RetryPolicy<'TTransientErrorCatchStrategy> =
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryStrategy)
    static member RetryExponential<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryCount : int , deltaBackoff : TimeSpan) : RetryPolicy<'TTransientErrorCatchStrategy> =
      let retryStrategy = new ExponentialBackoff(defaultRetryStrategyName, retryCount, defaultMinBackoff, defaultMaxBackoff , deltaBackoff)
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryStrategy)
    static member RetryExponential<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryCount : int , minBackoff : TimeSpan, maxBackoff : TimeSpan, deltaBackoff : TimeSpan) : RetryPolicy<'TTransientErrorCatchStrategy> =
      let retryStrategy = new ExponentialBackoff(defaultRetryStrategyName, retryCount, minBackoff, maxBackoff, deltaBackoff)
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryStrategy)
    static member RetryDefault(?retryCount : int) : RetryPolicy<TransientErrorCatchAllStrategy>=
      let retryCount = defaultArg retryCount defaultRetryCount
      RetryPolicies.RetryExponential<TransientErrorCatchAllStrategy>(retryCount, defaultMinBackoff, defaultMaxBackoff, defaultDeltaBackoff)

  type Retry<'TResult> = Retry of (Lazy<unit -> 'TResult * LastException option>)
  and RetryResult<'TResult> = Choice<'TResult, LastException>
  and LastException = exn

  let exnHandler e = Retry(lazy(fun () -> Unchecked.defaultof<'TResult>, e |> Some))    
  type RetryBuilder (policy : RetryPolicy) = 
    new(?retryCount : int, ?retrying) = 
      let policy = 
        let retryCount = defaultArg retryCount defaultRetryCount
        RetryPolicies.RetryDefault(retryCount)

      retrying |> function
      | None   -> policy <+ (fun e -> printfn "%s" (sprintf "RetryPolicyName:%s, CurrentRetryCount:%d, LastException.Message:%s, Delay:%A" 
                                                            policy.RetryStrategy.Name e.CurrentRetryCount e.LastException.Message e.Delay))
      | Some retrying ->policy <+ retrying
      RetryBuilder(policy)
    
    member this.Bind (m : Retry<'TResult>, bind : ('TResult) -> Retry<'UResult>) : Retry<'UResult> = 
      Retry(lazy(fun () -> 
        m |> function
        | Retry f -> f.Force() |> fun cont -> 
          cont() ||> fun r _ -> r |> bind
        |> function
          | Retry f -> f.Force() 
          |> fun cont -> policy.ExecuteAction(Func<_>(fun () -> cont() ||> fun r _ -> r,None))))
    member this.Return (value : 'TResult) : Retry<'TResult> = 
      Retry(lazy (fun () -> policy.ExecuteAction(L.F<_>(fun () ->  value, None))))
    member this.ReturnFrom (m : Retry<'TResult>) : Retry<'TResult> = 
      m
    member this.Delay (f: unit -> Retry<unit -> 'TResult>)  : Retry<unit -> 'TResult> = 
      Retry(lazy (fun () -> policy.ExecuteAction(L.F<_>(fun () -> f() |> function | Retry f -> f.Force() |> fun cont -> cont() ||> fun f _ -> f(), None)) ||> fun r _ ->  (fun () -> r), None))
    member this.Zero () : Retry<'TResult> = 
      this.Return(Unchecked.defaultof<'TResult>)
    member this.Combine(comp1:Retry<'TResult>, comp2:Retry<'TResult>) = 
      this.Bind(comp1,(fun r -> comp2))

  let retry = new RetryBuilder()

  open Operators
  let inline returnM x = returnM retry x 
  let inline (>>=) m f = bindM retry m f
  let inline (=<<) f m = bindM retry m f
  let inline (<*>) f m = applyM retry retry f m
  let inline ap m f = f <*> m
  let inline map f m = liftM retry f m
  let inline (<!>) f m = map f m
  let inline lift2 f a b = returnM f <*> a <*> b
  let inline ( *>) x y = lift2 (fun _ z -> z) x y
  let inline ( <*) x y = lift2 (fun z _ -> z) x y
  let inline (>>.) m f = bindM retry m (fun _ -> f)
  let inline (>=>) f g = fun x -> f x >>= g
  let inline (<=<) x = flip (>=>) x

  let (|RetryResult|) = 
    let rec result (r:RetryResult<'TResult>) =
      match r with
      | Choice1Of2 v -> v, None
      | Choice2Of2 e -> Unchecked.defaultof<'TResult>, Some(e)
    result

  let run (retryCont : Retry<unit -> 'TResult>) : RetryResult<'TResult> =
    try
      retryCont |> function
      |(Retry f) -> f.Force()() ||> fun r e -> 
        e |> function
        |Some e -> e |> Choice2Of2
        |None   -> r() |> Choice1Of2
    with e -> e |> Choice2Of2





一時的な障害:Windows Azure(クラウド)アプリケーションを開発するにあたって対処しなければならない課題ひとつ

他のクラウドサービス依存するようなクラウドアプリケーションを開発する場合開発者対処しなければならない課題の一つに、“一時的な障害” がありますインフラストラクチャレベルの障害であったり、ネットワークの問題など一時的な条件のために発生する恐れのある障害のことです。この一時的に発生しうる障害は、ほとんどの場合は短い間隔で(ほんの数ミリ秒後に)リトライ処理を行うことで回避することができます


たとえば、Windows AzureSQL Azureプラットフォームを利用する場合SQL Azureサービスは、共有リソース上で大規模なマルチテナントデータベースとしてサービス提供されるので、データベースを利用するすべての利用者に対して良好なエクスペリエンス提供しなければなりません。そのため、SQL Azureは過剰なリソースの使用や、実行時間の長いトランザクションの発行された場合など、さまざまな理由でサービスへの接続数を抑制して、利用者が意図しないタイミングで接続を切断することがあります。これが、SQL Azureを利用した場合に生じる一時的な障害ということになります。このような障害が発生した場合であってもシームレスユーザーエクスペリエンス提供するために、Windows Azureアプリケーション(クラウドアプリケーション)では、一時的な障害によって処理が中断された場合にはリトライを試みるようにアプリケーションを実装する必要があります


Microsoft Enterprise Library 5.0 Integration Pack for Windows Azureを利用する

一時的な障害に対応するアプリケーションを実装する場合Microsoft Enterprise Library 5.0 Integration Pack for Windows Azure(以降 EL5 for Azure)を利用するのが有効です。EL5 for Azureは、マイクロソフトの pattern & practice チームによる、マイクロソフト製品テクノロジを基として、アプリケーションを構築する上でのパターンベストプラクティスを集めたライブラリWindows Azure向けの拡張パックです。この拡張ライブラリ提供されるまでは、一時的障害を検知してリトライ処理を行う実装を開発者自身がおのおので組み込まなければなりませんでした。EL5 for Azureには、Transient Fault Handling Application Block (Topaz)という、Windows Azureプラットフォームに含まれるサービス利用時に発生するさまざまな一時的な障害からWindows Azureアプリケーションを回復させるためのアプリケーションブロック提供されています。これは、Windows Azure固有の一時的な障害のみならず、オンプレミスアプリケーションで発生するさまざまな一時的な障害に対するリトライ処理についても利用可能なように設計されており、リトライ処理について高いレベルで抽象化されたアプリケーションブロックです(Microsoft.Practices.TransientFaultHandling.Core.dllにまとめらえている)。特にWindows Azureに特化した組み込みの実装については、SQL AzureWindows Azure ストレージサービスWindows Azure サービスバスWindows Azure キャッシングサービス向けの検出戦略がそれぞれ提供されていて、Microsoft.Practices.EnterpriseLibrary.WindowsAzure.TransientFaultHandling.dllに含まれています



検出戦略と再試行戦略

検出戦略は、ITransientErrorDetectionStrategyインターフェイスを実装して作成することができます


public interface ITransientErrorDetectionStrategy
{
    bool IsTransient(Exception ex);
}

例外を引数で受け取り、その例外の種類や内部的なメッセージなどを判断して、リトライ処理を行うときは true、 リトライをせずに無視するときは falseを返すように実装するだけの非常にシンプルインターフェイスです。Windows Azureの一時的な障害に対する4つの組み込み検出戦略として、SqlAzureTransientErrorDetectionStrategy、StorageTransientErrorDetectionStrategy、ServiceBusTransientErrorDetectionStrategy、CacheTransientErrorDetectionStrategyが提供されています




試行戦略は、RetryStrategy抽象クラス継承して作成することができます

    public abstract class RetryStrategy
    {
        public static readonly int DefaultClientRetryCount = 10;
        public static readonly TimeSpan DefaultClientBackoff = TimeSpan.FromSeconds(10.0);
        public static readonly TimeSpan DefaultMaxBackoff = TimeSpan.FromSeconds(30.0);
        public static readonly TimeSpan DefaultMinBackoff = TimeSpan.FromSeconds(1.0);
        public static readonly TimeSpan DefaultRetryInterval = TimeSpan.FromSeconds(1.0);
        public static readonly TimeSpan DefaultRetryIncrement = TimeSpan.FromSeconds(1.0);
        public static readonly bool DefaultFirstFastRetry = true;

        public static readonly RetryStrategy NoRetry = new FixedInterval(0, DefaultRetryInterval);
        public static readonly RetryStrategy DefaultFixed = new FixedInterval(DefaultClientRetryCount, DefaultRetryInterval);
        public static readonly RetryStrategy DefaultProgressive = new Incremental(DefaultClientRetryCount, DefaultRetryInterval, DefaultRetryIncrement);
        public static readonly RetryStrategy DefaultExponential = new ExponentialBackoff(DefaultClientRetryCount, DefaultMinBackoff, DefaultMaxBackoff, DefaultClientBackoff);

        protected RetryStrategy(string name, bool firstFastRetry)
        {
            this.Name = name;
            this.FastFirstRetry = firstFastRetry;
        }

        public bool FastFirstRetry { get; set; }
        public string Name { get; private set; }
        public abstract ShouldRetry GetShouldRetry();
    }



基本的な実装は、GetShouldRetryメソッドをオーバーライドし、リトライすべきタイミングか否かを表すShouldRetry デリゲートを返すように実装します


public delegate bool ShouldRetry(int retryCount, Exception lastException, out TimeSpan delay);

ShouldRetry デリゲートは、リトライする回数と最後に発生した例外およびリトライを行うタイミングの遅延間隔を受け取り、リトライ処理を行うべきタイミングか否かを返します組み込みで、Incremental(再試行と再試行間の増分の時間間隔数を制御する戦略)、FixedInterval(再試行一定間隔の再試行間を制御する戦略)、ExponentialBackoff(指数関数的な遅延を計算するためのバックオフ戦略)が提供されています



Transient Fault Handling Application Block (Topaz)によるリトライ処理の基本的な利用方法


Transient Fault Handling Application Block (Topaz)による基本的な利用方法(C#)は、検出戦略と再試行戦略を組み合わせて、RetryPolicyオブジェクト作成し、そのRetryPolicyオブジェクトリトライ中の処理を適宜設定し、RetryPolicyオブジェクトのExecuteActionメソッドを呼び出します。ExecuteActionメソッドへは、リトライを行いたい対象の処理を継続渡しスタイルで渡します



var strategy = new Incremental("Incr1",10, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
var policy = new RetryPolicy<SqlAzureTransientErrorDetectionStrategy>(strategy);

policy.Retrying += (_, e) =>
{
	Console.WriteLine("{0:HH:mm:ss.fff} RetryCount: {1}, ErrorMessage: {2}, StackTrace: {3}",
	    DateTime.Now,
	    e.CurrentRetryCount,
	    e.LastException.Message,
	    e.LastException.StackTrace);
};

var result = policy.ExecuteAction(() =>
{
	// SQL Azureへごにょごにょ

	return "クエリの結果などを返す";
});



EL5 for Azureオブジェクト指向プログラミングで書かれているライブラリ、FSharpxは関数プログラミングで書かれているライブラリです。これら異なるパラダイム部品を組み合わせてモナドを作る。とっても面白いですね。



モナドとは


モナドは単なる自己関手の圏におけるモノイド対象だよ。何か問題でも? - フィリップ・ワドラー


圏論を少しかじったことがある人にとっては問題ない説明なのですが、そうではない場合「日本語でおk」と言わざるを得ません。

この説明だけでは少々乱暴すぎるので、MSDN - コンピューテーション式(F#)へのリンクと、F#モナドの関係について参考になりそうな表を置いておきます


コンピュテーション式 (F#)

http://msdn.microsoft.com/ja-jp/library/dd233182(v=vs.110).aspx



HaskellF#数学(圏論)
returnreturnη(単位元:unit)
>>=bind(*)operator
クラスMonadインスタンスであるように実装するコンピューテーション式で少なくとも Return と Bind の2つのmemberを実装するNA
MonadComputation Expression, WorkflowモナドはKleisliトリプルと等価定義F#Haskell の中で定義されるモナド構造は実際にKleisliトリプル。
functor through a type class definitionusually not mentioned関手(functor)
functionfunction (fun)射(morphism)
Haskellデータ型のHask圏.Netデータ型の圏グループ、位相グラフ微分幾何学
composable functionscomposable functions 2項演算とモノイド


MSDN - Code Recipe - F#によるモナドの実装方法モナド則を確認するユニットテスト。 Retry Monad for Transient Fault Handling

モナド則を確認するためのユニットテスト等を含む、このプログラムコードソリューションファイル一式を、MSDN - Code Recipe よりダウンロードすることができます

http://code.msdn.microsoft.com/F-Retry-Monad-for-35ee1e72


関連記事

快刀乱麻を断つモナド - F#とIOモナドコンピューテーション式の奥義と

http://d.hatena.ne.jp/zecl/20110703/p1

nobsunnobsun 2012/04/17 17:19 Haskellのbindは>>= で,'='は1つです

zeclzecl 2012/04/17 18:21 nobsunさん、ご指摘ありがとうございます。
typoです;修正しました。

はてなユーザーのみコメントできます。はてなへログインもしくは新規登録をおこなってください。

トラックバック - http://d.hatena.ne.jp/zecl/20120416/p1
リンク元
-->