夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その4:LocalExecutor概要

こんにちは。

Embulkが前回の投稿から今回の投稿までの間にJava用のプラグインもサポートしていますね。
これでようやくプラグインを書けるようになった・・・のですが、
とりあえずプラグインを書くのは裏で行っておくとして、ここではソースコードリーディングを続けます。
Embulk 0.3 & 0.4 の新機能 - リジュームとJavaプラグイン - Blog by Sadayuki Furuhashi

今回はLocalExecutor、つまりローカルでデータの取得→書込処理を行うためのクラスです。

1. LocalExecutorに関連するクラス群

LocalExecutorに関連するクラス群をまとめてみますと以下のような図になります。
正確にはExecクラスによってThreadLocalな変数を生成してExecSessionを保持できるようにして実行・・
といったことも関わってくるのですが、動作を流してみる分にはそれほど影響が無いため、
図中からは省略しています。

実際に動かすにあたっては非常に重要な機構ですが、まずはわかりやすく処理を一本通してみようということで^^;

各クラスの役目は前回書いた内容とほぼ同じですので省略します。

2. LocalExecutorの動作

では、実際のLocalExecutorの動作を見ていきます。

初期化処理(コンストラクタ
  1. 最大スレッド数、システム設定などを初期化する。
  2. ExecutorServiceのスレッド数はデフォルトコア数×2、設定可能
    • ExecutorServiceが実際に使用されるのは本記事的にはしばらく後。

その後、前回の実行結果がErrorで終わっており、再実行状態が存在するかを確認しています。
再実行状態が存在しない場合は通常実行(run)、存在する場合は再実行処理(resume)、となります。
差分としては下記の内容が存在しますが、大まかな流れはrunで大体わかるはずですので、
今回はrunのみ読んでいきます。
#resumeはまた機会があれば。

  1. 通常実行(run)では入力スキーマを設定から読み込む、再実行処理(resume)では入力スキーマを再実行状態から読み込む。
  2. 通常実行(run)ではInputPlugin/OutputPluginのtransactionメソッドを実行、再実行処理(resume)ではresumeメソッドを実行。
  3. 再実行処理(resume)ではresumeState→processStateの結果の移し替えが行われる。
実行処理(run):実行前ブロック
  1. doWith(ExecSession session, ExecAction action)メソッドにてスレッドローカルにExecSessionを保存し、処理を起動。
    • Exec#doWithメソッド内以外ではスレッドローカルにExecSessionが存在しないため、防護機構となります。
  2. スレッド名を一時的に"transaction"に変更する。
    • try-with-resourcesをこういう形で使うのは面白いですね。
  3. LocalExecutor#doRun(ConfigSource config)メソッドで実処理開始
実行処理(run):処理実行部

では、実処理部に入るのですが、文章だけで書いてもわかりにくいため、
ソースをインデントを浅くして1行の長さを長くしたものを下記にはっておきます。
一度ソースを見てから以後の文章を見てください。

private ExecutionResult doRun(ConfigSource config) {
  final ExecutorTask task = config.loadConfig(ExecutorTask.class);

  final InputPlugin in = newInputPlugin(task);
  final List<FilterPlugin> filterPlugins = newFilterPlugins(task);
  final OutputPlugin out = newOutputPlugin(task);

  final ProcessState state = new ProcessState(Exec.getLogger(LocalExecutor.class));
  try {
    ConfigDiff inputConfigDiff = in.transaction(task.getInputConfig(), new InputPlugin.Control() {
      public List<CommitReport> run(final TaskSource inputTask, final Schema inputSchema, final int taskCount) {
        state.initialize(taskCount);
        state.setInputSchema(inputSchema);
        Filters.transaction(filterPlugins, task.getFilterConfigs(), inputSchema, new Filters.Control() {
          public void run(final List<TaskSource> filterTasks, final List<Schema> filterSchemas) {
            Schema outputSchema = last(filterSchemas);
            state.setOutputSchema(outputSchema);
            ConfigDiff outputConfigDiff = out.transaction(task.getOutputConfig(), outputSchema, taskCount,
                new OutputPlugin.Control() {
                  public List<CommitReport> run(final TaskSource outputTask) {
                    task.setInputTask(inputTask);
                    task.setFilterTasks(filterTasks);
                    task.setOutputTask(outputTask);

                    if (taskCount > 0) {
                      process(task.dump(), filterSchemas, taskCount, state);
                      if (!state.isAllCommitted()) {
                        throw state.getRepresentativeException();
                      }
                    } else {
                      // TODO warning?
                    }
                    return state.getOutputCommitReports();
                  }
                });
            state.setOutputConfigDiff(outputConfigDiff);
          }
        });
        return state.getInputCommitReports();
      }
    });
    state.setInputConfigDiff(inputConfigDiff);

    try {
      doCleanup(config, state.buildResumeState(task, Exec.session()));
    } catch (Exception ex) {
      state.logger.warn("Commit succeeded but cleanup failed. Ignoring this exception.", ex); // TODO
    }

    return state.buildExecuteResult();

  } catch (Throwable ex) {
    if (state.isAllCommitted()) {
      // ignore the exception
      return state.buildExecuteResultWithWarningException(ex);
    }
    if (!state.isAnyStarted()) {
      throw ex;
    }
    throw state.buildPartialExecuteException(ex, task, Exec.session());
  }
}

実処理はまずPluginの設定を連結して実際に実行するパイプライン(Taskと呼ぶのが正しい?)を生成し、
それをスレッドに割り振ってExecutorServiceで実行する形を取っています。

あと、InputPlugin、FilterPlugin、OutputPluginは実際はインタフェースのため
実処理は記述されていないのですが、わかりやすくするため下記のクラス群を参考に記述しています。
下記のクラス群に移行している間はその旨を書いています。
あと、FilterPluginは処理として必須の流れではないようなので、今回は省略です。
・InputPlugin>FileInputRunner>FileInputPlugin(LocalFileInputPlugin)
・OutputPlugin>FileOutputRunner>FileOutputPlugin(LocalFileOutputPlugin)

では、これから実処理部の流れを見ていきます。

  1. ExecutorTask、InputPlugin、FilterPluginのリスト、OutputPluginを設定/PluginManagerを用いてロード
  2. InputPluginのtransaction開始
  3. (FileInputRunner)RunnerTaskを設定からロード
  4. (FileInputRunner)FileInputPluginのtransaction開始
  5. (LocalFileInputPlugin)PluginTaskを設定からロード
  6. (LocalFileInputPlugin)PluginTaskの情報を基に、ファイル一覧を取得する。
  7. (LocalFileInputPlugin)処理スレッド数をファイルと同一にして読込み処理開始
  8. (FileInputRunner)Decoders、ParserPluginのtransactionを通し、結果をLocalExecutorから与えられたInputPlugin.Control()に渡す。
    • この段階でようやくLocalExecutor(内部無名クラス)に処理が返ってくるわけですね。
  9. InputPlugin.Control()に渡された結果を受けて、Listに対するtransactionを開始
  10. List中のFilterPluginのtransactionを順々に実行、最後のFilterPluginは結果をLocalExecutorから与えられたFilterPlugin.Control()に渡す。
  11. FilterPlugin.Control()に与えられた結果を受けてOutputPluginのtransaction開始
  12. (FileOutputRunner)RunnerTaskを設定からロード
  13. (FileOutputRunner)FileOutputPluginのtransaction開始
  14. (LocalFileOutputPlugin)PluginTaskを設定からロード
  15. (LocalFileOutputPlugin)シーケンスのチェックを実施し、失敗した場合はエラーとする
  16. (FileOutputRunner)Encoders、FormatterPluginのtransactionを通し、結果をLocalExecutorから与えられたOutputPlugin.Control()に渡す。
    • 再度LocalExecutor(内部無名クラス)に処理が返ります。
  17. それまでのInputTask、FilterTask、OutputTaskをTask情報に設定してLocalExecutorで実際に処理を行うためのTaskを生成する。
  18. TaskをstartProcessorメソッドで非同期/並列に実行。
    • 並列度は初期化処理で書かれた「ExecutorServiceのスレッド数」で決まる。
  19. 実行結果を取得し、各InputPlugin、OutputPluginのCleanup処理を呼び出す
    • cleanup処理も並列実行可能な基本構造となっています。標準Pluginではそこまで本格的なCleanup処理を呼び出しているものはありませんでしたが。
  20. 実行結果を生成する。

その後、Runnerクラス側の方で失敗していた場合は再実行状態を生成し、成功した場合は次回実行用の状態を生成してファイル出力・・
という流れになっています。

とりあえず、LocalExecutorを読んでみましたが、1回では流れを通すのがやっとという感じですね。
ですので、次回以降も内部のEntity構成やInputPluginといったPlugin群の構造等、読んでいこうと思います。