Hatena::ブログ(Diary)

CLOVER

2017-06-17

jBatch(JBeret SE)で、Job Repositoryの保存先をデータベースにしてみる

jBatchの実装であるJBeretでは、Job Repositoryにいくつかのデータストアを選択することができます。

実装としては、これだけが用意されています。

  • In Memory(デフォルト)
  • JDBC
  • MongoDB
  • Infinispan(Embedded Mode)

書いているとおり、デフォルトのJob RepositoryはIn Memoryです。

Job Repositoryの切り替え方法については、ドキュメントのSet upのところにさらっと書いてあります。

Set up JBeret

「Configure JBeret Batch Runtime」のところを見るとよいでしょう。

WildFlyやJBoss EAP上で動かす時と、Java SE(JBeret SE)上で動かす時で設定方法が違うみたいなのですが、今回は
Java SE上で動かしてみたいと思います。

お題は、次のように設定します。

  • ItemReader/ItemWriter、およびBatchletをそれぞれ使ってステップを2つ使ったジョブを構成する
  • 各ステップは、パラメーターで任意に失敗可能とする(あとで再開させる)
  • ItemReaderには、Checkpointを使用する
  • Job RepositoryはJDBCMySQL)とする

では、試してみましょう。

準備

このお題でのプロジェクトの依存関係としては、このあたりがあればOKです。

libraryDependencies ++= Seq(
  // JBeret SE
  "org.jberet" % "jberet-se" % "1.2.3.Final" % Compile,
  "org.jboss.spec.javax.batch" % "jboss-batch-api_1.0_spec" % "1.0.0.Final" % Compile,
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.2_spec" % "1.0.0.Final" % Runtime,
  "javax.enterprise" % "cdi-api" % "1.1" % Compile,
  "javax.inject" % "javax.inject" % "1" % Compile,
  "org.jboss.weld.se" % "weld-se" % "2.4.4.Final" % Runtime,
  "org.wildfly.security" % "wildfly-security-manager" % "1.1.2.Final" % Runtime,
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.11.Final" % Runtime,
  "org.jboss.logging" % "jboss-logging" % "3.3.1.Final" % Compile,
  "org.jboss" % "jandex" % "2.0.3.Final" % Runtime,
  "com.google.guava" % "guava" % "18.0" % Runtime,

  // JDBC Driver for Job Repository
  "mysql" % "mysql-connector-java" % "5.1.42" % Runtime,

  // for Test
  "org.scalatest" %% "scalatest" % "3.0.3" % Test
)

JBeretへの依存関係は、Set upのドキュメントに書いてあるので、そちらを参考にしましょう。

Set up JBeret

MySQLが対象となるのでJDBCドライバと、あとテストコード用にScalaTestを入れています。

MySQLの準備は、とりあえずアクセス可能で、テーブル作成、参照などの権限を持ったユーザーがあれば大丈夫です。

サンプルコード

まずは、サンプルコードを用意します。

最初はItemReader。
src/main/scala/org/littlewings/javaee7/batch/MyItemReader.scala

package org.littlewings.javaee7.batch

import javax.batch.api.BatchProperty
import javax.batch.api.chunk.AbstractItemReader
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import org.jboss.logging.Logger

@Named
@Dependent
class MyItemReader extends AbstractItemReader {
  private[this] val logger: Logger = Logger.getLogger(getClass)

  private[this] var items: Iterator[Int] = _

  private[this] var checkpoint: MyCheckPoint = _

  @Inject
  @BatchProperty
  private[this] var readerFail: Boolean = _

  @Inject
  @BatchProperty
  private[this] var readerFailCount: Int = _

  override def open(checkpoint: java.io.Serializable): Unit = {
    checkpoint match {
      case null =>
        this.checkpoint = new MyCheckPoint
      case cp: MyCheckPoint =>
        this.checkpoint = cp
    }
    logger.infof("item reader start, from %d", this.checkpoint.counter)

    items = (this.checkpoint.counter until 100).iterator
  }

  override def readItem(): AnyRef = {
    if (readerFail && checkpoint.counter == readerFailCount) {
      throw new RuntimeException(s"Oops!! ItemReader, count = ${checkpoint.counter}")
    }

    checkpoint.counter += 1

    if (checkpoint.counter % 25 == 0) {
      logger.infof("read count = %d", checkpoint.counter)
    }

    if (items.hasNext) {
      Integer.valueOf(items.next())
    } else {
      null
    }
  }

  override def close(): Unit =
    logger.info("item reader end")

  override def checkpointInfo(): java.io.Serializable =
    checkpoint
}

ItemReaderが扱うレコードとしては、単にRange/Iteratorで生成したIntegerを使用します。

    items = (this.checkpoint.counter until 100).iterator

また、@BatchPropertyを設けて任意のポイントでエラーとできるようにしました。

  @Inject
  @BatchProperty
  private[this] var readerFail: Boolean = _

  @Inject
  @BatchProperty
  private[this] var readerFailCount: Int = _

    if (readerFail && checkpoint.counter == readerFailCount){
      throw new RuntimeException(s"Oops!! ItemReader, count = ${checkpoint.counter}")
    }

CheckPointは今回はItemReaderでしか使用していませんが、現在の位置を保持しておくだけにします。

src/main/scala/org/littlewings/javaee7/batch/MyCheckPoint.scala 
package org.littlewings.javaee7.batch

@SerialVersionUID(1L)
class MyCheckPoint extends Serializable {
  var counter: Int = _
}

ItemWriterは受け取ったレコード件数を、一定件数でログ出力するだけにします。
src/main/scala/org/littlewings/javaee7/batch/MyItemWriter.scala

package org.littlewings.javaee7.batch

import javax.batch.api.chunk.AbstractItemWriter
import javax.enterprise.context.Dependent
import javax.inject.Named

import org.jboss.logging.Logger

@Named
@Dependent
class MyItemWriter extends AbstractItemWriter {
  private[this] val logger: Logger = Logger.getLogger(getClass)
  private[this] var counter: Int = _

  override def open(checkpoint: java.io.Serializable): Unit =
    logger.info("item writer start")

  override def writeItems(items: java.util.List[AnyRef]): Unit = {
    logger.infof("write items, size = %d", items.size)

    counter += items.size()
  }

  override def close(): Unit = {
    logger.infof("total count = %d", counter)
    logger.info("item writer end")
  }
}

Batchletは、起動したらログ出力するかどうかだけですが、設定でエラーにできるようにしておきました。
src/main/scala/org/littlewings/javaee7/batch/MyBatchlet.scala

package org.littlewings.javaee7.batch

import javax.batch.api.{AbstractBatchlet, BatchProperty}
import javax.enterprise.context.Dependent
import javax.inject.{Inject, Named}

import org.jboss.logging.Logger

@Named
@Dependent
class MyBatchlet extends AbstractBatchlet {
  private[this] val logger: Logger = Logger.getLogger(getClass)

  @Inject
  @BatchProperty
  private[this] var batchletFail: Boolean = _

  override def process(): String = {
    logger.info("process batchlet")

    if (batchletFail) {
      throw new RuntimeException("Oops!! Batchlet")
    }

    "Batchlet Success"
  }
}

続いて、設定ファイル系の準備です。

ジョブ定義ファイル。ItemReader/ItemWriterのステップと、Batchletのステップを分けて登録します。ItemReaderとBatchletには、
プロパティも設定可能にしておきます。チャンクサイズは、25としました。
src/main/resources/META-INF/batch-jobs/my-job.xml

<?xml version="1.0" encoding="UTF-8"?>
<job id="my-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="chunk-step" next="batchlet-step">
        <chunk item-count="25">
            <reader ref="myItemReader">
                <properties>
                    <property name="readerFail" value="#{jobParameters['readerFail']}"/>
                    <property name="readerFailCount" value="#{jobParameters['readerFailCount']}"/>
                </properties>
            </reader>
            <writer ref="myItemWriter"/>
        </chunk>
    </step>

    <step id="batchlet-step">
        <batchlet ref="myBatchlet">
            <properties>
                <property name="batchletFail" value="#{jobParameters['batchletFail']}"/>
            </properties>
        </batchlet>
    </step>
</job>

では、JBeretのJob Repositoryの設定をしましょう。

JBeret SEの場合は、「jberet.properties」というファイルを用意します。WildFlyまたはJBoss EAPの場合は、サブシステムの
設定として行うようです。
src/main/resources/jberet.properties

job-repository-type = jdbc

db-url = jdbc:mysql://172.17.0.2:3306/practice
db-user = kazuhira
db-password = password
db-properties = useUnicode=true:characterEncoding=utf-8:characterSetResults=utf-8:useServerPrepStmts=true:useLocalSessionState=true:elideSetAutoCommits=true:alwaysSendSetIsolation=false:useSSL=false

#ddl-file = sql/jberet-mysql.ddl

Job RepositoryをJDBCにするには、「job-repository-type」を「jdbc」に設定します。

job-repository-type = jdbc

この「jberet.properties」を見ているのは、こちらのクラスです。
https://github.com/jberet/jsr352/blob/1.2.3.Final/jberet-se/src/main/java/org/jberet/se/BatchSEEnvironment.java

Set upにはちょこっとしか内容が書かれていないので、

Set up JBeret

詳しく見たかったらソースコードを見ることになります…。「db-properties」の書き方とか、ソースコード見ないと
わかりませんでした。
https://github.com/jberet/jsr352/blob/1.2.3.Final/jberet-core/src/main/java/org/jberet/repository/JdbcRepository.java

その他のJob Repositoryの実装も置かれているので、気になる場合は見てみるとよいでしょう。
https://github.com/jberet/jsr352/tree/1.2.3.Final/jberet-core/src/main/java/org/jberet/repository

また、ドキュメントに記載のとおり、テーブルを4つ作る必要があります。自分で書いてもいいのですが(その場合は、
ddl-file」にDDLが書かれたファイルを指定します)、今回はJBeret自身に作ってもらいます。

以下のRDBMSに対するDDLは、JBeretが用意しています。

実行時の情報から、DDLを選んで実行してくれます、と。

MySQLの場合は、こちらが使用されます。
https://github.com/jberet/jsr352/blob/1.2.3.Final/jberet-core/src/main/resources/sql/jberet-mysql.ddl

その他のRDBMSへのDDLも置かれているので、興味のある方は見てみるとよいでしょう。
https://github.com/jberet/jsr352/tree/1.2.3.Final/jberet-core/src/main/resources/sql

JBeretが使用するSQLは、こちらに設定されています。
https://github.com/jberet/jsr352/blob/1.2.3.Final/jberet-core/src/main/resources/sql/jberet-sql.properties

最後に、CDIを使うのでbeans.xmlを用意します。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
</beans>

テストコードと動作確認

確認のためのテストコードを用意して、バッチを実行してみましょう。

テストコードの雛形は、こちら。
src/test/scala/org/littlewings/javaee7/batch/BatchJdbcJobRepositorySpec.scala

package org.littlewings.javaee7.batch

import java.util.Properties
import java.util.concurrent.TimeUnit
import javax.batch.runtime.{BatchRuntime, BatchStatus}

import org.jberet.runtime.JobExecutionImpl
import org.jboss.logging.Logger
import org.scalatest.{FunSuite, Matchers}

class BatchJdbcJobRepositorySpec extends FunSuite with Matchers {
  val logger: Logger = Logger.getLogger(getClass)

  // ここに、テストを書く!
}

ここに、正常終了させたり、特定のステップでエラーにさせるテストを書いていきます。

正常終了するケースは、こちら。

  test("success case") {
    logger.info("[success case] start")

    val jobOperator = BatchRuntime.getJobOperator

    val properties = new Properties
    properties.put("readerFail", "false")
    properties.put("readerFailCount", "0")
    properties.put("batchletFail", "false")

    val executionId = jobOperator.start("my-job", properties)

    val jobExecution = jobOperator.getJobExecution(executionId)
    jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0, TimeUnit.SECONDS)

    jobExecution.getBatchStatus should be(BatchStatus.COMPLETED)

    logger.infof("[success case] end, execution-id = %d", executionId)
  }

では、試す…前にMySQLの状態を見てみます。該当のデータベースには、まだテーブルはありません。

mysql> show tables;
Empty set (0.00 sec)


実行してみます。

> test

こんな感じのログが出力されます。

6 17, 2017 1:12:24 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$1
INFO: [success case] start
6 17, 2017 1:12:24 午後 org.jberet.repository.JdbcRepository getDDLLocation
INFO: JBERET000021: About to initialize batch job repository with ddl-file: sql/jberet-mysql.ddl for database MySQL
6 17, 2017 1:12:25 午後 org.jboss.weld.bootstrap.WeldStartup <clinit>
INFO: WELD-000900: 2.4.4 (Final)
6 17, 2017 1:12:25 午後 org.jboss.weld.environment.deployment.discovery.DiscoveryStrategyFactory create
INFO: WELD-ENV-000020: Using jandex for bean discovery
6 17, 2017 1:12:25 午後 org.jboss.weld.bootstrap.WeldStartup startContainer
INFO: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
6 17, 2017 1:12:25 午後 org.jboss.weld.environment.se.WeldContainer fireContainerInitializedEvent
INFO: WELD-ENV-002003: Weld SE container STATIC_INSTANCE initialized
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: item reader start, from 0
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open
INFO: item writer start
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 50
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 75
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 100
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: total count = 100
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: item writer end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close
INFO: item reader end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: process batchlet
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$1
INFO: [success case] end, execution-id = 1

よく見ると、DDLを選んだみたいな感じのログが出力されています。

6 17, 2017 1:12:24 午後 org.jberet.repository.JdbcRepository getDDLLocation
INFO: JBERET000021: About to initialize batch job repository with ddl-file: sql/jberet-mysql.ddl for database MySQL

MySQLの方を見てみると、テーブルができていたりします。

mysql> show tables;
+---------------------+
| Tables_in_practice  |
+---------------------+
| JOB_EXECUTION       |
| JOB_INSTANCE        |
| PARTITION_EXECUTION |
| STEP_EXECUTION      |
+---------------------+
4 rows in set (0.01 sec)

内容を見ると、それっぽい情報が入っています。

mysql> select * from JOB_EXECUTION where JOBEXECUTIONID = 1;
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
| JOBEXECUTIONID | JOBINSTANCEID | VERSION | CREATETIME          | STARTTIME           | ENDTIME             | LASTUPDATEDTIME     | BATCHSTATUS | EXITSTATUS | JOBPARAMETERS                                                | RESTARTPOSITION |
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
|              1 |             1 |    NULL | 2017-06-17 13:12:25 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED   | COMPLETED  | readerFail = false
batchletFail = false
readerFailCount = 0
 | NULL            |
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
1 row in set (0.00 sec)


mysql> select * from JOB_INSTANCE;
+---------------+---------+---------+-----------------+
| JOBINSTANCEID | VERSION | JOBNAME | APPLICATIONNAME |
+---------------+---------+---------+-----------------+
|             1 |    NULL | my-job  | NULL            |
+---------------+---------+---------+-----------------+
1 rows in set (0.00 sec)



mysql> select * from PARTITION_EXECUTION;
Empty set (0.00 sec)



mysql> select * from STEP_EXECUTION where JOBEXECUTIONID = 1;
+-----------------+----------------+---------+---------------+---------------------+---------------------+-------------+------------------+--------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+-------------------------------------------------------------------------------+----------------------+
| STEPEXECUTIONID | JOBEXECUTIONID | VERSION | STEPNAME      | STARTTIME           | ENDTIME             | BATCHSTATUS | EXITSTATUS       | EXECUTIONEXCEPTION | PERSISTENTUSERDATA | READCOUNT | WRITECOUNT | COMMITCOUNT | ROLLBACKCOUNT | READSKIPCOUNT | PROCESSSKIPCOUNT | FILTERCOUNT | WRITESKIPCOUNT | READERCHECKPOINTINFO                                                          | WRITERCHECKPOINTINFO |
+-----------------+----------------+---------+---------------+---------------------+---------------------+-------------+------------------+--------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+-------------------------------------------------------------------------------+----------------------+
|               1 |              1 |    NULL | chunk-step    | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED   | COMPLETED        | NULL               | NULL               |       100 |        100 |           5 |             0 |             0 |                0 |           0 |              0 | &#65533;&#65533; sr *org.littlewings.javaee7.batch.MyCheckPoint       ^A^B ^AI counterxp   e   | NULL                 |
|               2 |              1 |    NULL | batchlet-step | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED   | Batchlet Success | NULL               | NULL               |         0 |          0 |           0 |             0 |             0 |                0 |           0 |              0 | NULL                                                                          | NULL                 |
+-----------------+----------------+---------+---------------+---------------------+---------------------+-------------+------------------+--------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+-------------------------------------------------------------------------------+----------------------+
2 rows in set (0.00 sec)

STEP_EXECUTIONテーブルには、ステップの情報やシリアライズされたデータっぽいものも入っていますね。

PARTITION_EXECUTIONテーブルについては、今回パーティションを使っていないからか、なにも入っていません。

続いて、ItemReader/ItemWriterのステップで異常終了させるケース。

  test("reader fail and restart") {
    logger.info("[reader fail and restart] start")

    val jobOperator = BatchRuntime.getJobOperator

    // first fail
    val p1 = new Properties
    p1.put("readerFail", "true")
    p1.put("readerFailCount", "45")
    p1.put("batchletFail", "false")

    val executionId = jobOperator.start("my-job", p1)

    val jobExecution = jobOperator.getJobExecution(executionId)
    jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS)

    jobExecution.getBatchStatus should be(BatchStatus.FAILED)

    logger.info("first job end.")

    // retry
    val p2 = new Properties
    p2.put("readerFail", "false")
    p2.put("readerFailCount", "0")
    p2.put("batchletFail", "false")

    val restartExecutionId = jobOperator.restart(executionId, p2)

    executionId should not be (restartExecutionId)

    val restartedJobExecution = jobOperator.getJobExecution(restartExecutionId)
    restartedJobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS)

    restartedJobExecution.getBatchStatus should be(BatchStatus.COMPLETED)

    logger.info("restart job end")

    logger.infof("[reader fail and restart] end, execution-id first = %d, restarted = %d", executionId, restartExecutionId)
  }

ジョブの再開には、前回のExecution Idを使ってJobOperator#restartを呼び出します。

    val restartExecutionId = jobOperator.restart(executionId, p2)

つまり、前回の実行情報が必要です、と。

ちなみに、1回目のジョブは45レコード読んだところでコケてもらいます。

    // first fail
    val p1 = new Properties
    p1.put("readerFail", "true")
    p1.put("readerFailCount", "45")
    p1.put("batchletFail", "false")

    val executionId = jobOperator.start("my-job", p1)

    val jobExecution = jobOperator.getJobExecution(executionId)
    jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS)

    jobExecution.getBatchStatus should be(BatchStatus.FAILED)

では、実行してみます。

6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2
INFO: [reader fail and restart] start
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: item reader start, from 0
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open
INFO: item writer start
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner readProcessWriteItems
ERROR: ProcessingInfo{count=20, timerExpired=false, itemState=RUNNING, chunkState=RUNNING, checkpointPosition=24, readPosition=45, failurePoint=null}
6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner run
ERROR: item-count=25, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0
6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner run
ERROR: JBERET000007: Failed to run job my-job, chunk-step, org.jberet.job.model.Chunk@293f8c7c
java.lang.RuntimeException: Oops!! ItemReader, count = 45
	at org.littlewings.javaee7.batch.MyItemReader.readItem(MyItemReader.scala:41)
	at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:359)
	at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:305)
	at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:201)
	at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:226)
	at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88)
	at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60)
	at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: total count = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: item writer end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close
INFO: item reader end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2
INFO: first job end.
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: item reader start, from 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open
INFO: item writer start
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 50
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 75
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 100
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: total count = 75
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: item writer end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close
INFO: item reader end
6 17, 2017 1:12:26 午後 org.jberet.runtime.context.StepContextImpl <init>
WARN: JBERET000018: Could not find the original step execution to restart.  Current step execution id: 0, step name: batchlet-step
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: process batchlet
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2
INFO: restart job end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2
INFO: [reader fail and restart] end, execution-id first = 2, restarted = 3

45レコード読んだ後に、エラーになります。

6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner readProcessWriteItems
ERROR: ProcessingInfo{count=20, timerExpired=false, itemState=RUNNING, chunkState=RUNNING, checkpointPosition=24, readPosition=45, failurePoint=null}
6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner run
ERROR: item-count=25, time-limit=0, skip-limit=-1, skipCount=0, retry-limit=-1, retryCount=0
6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.ChunkRunner run
ERROR: JBERET000007: Failed to run job my-job, chunk-step, org.jberet.job.model.Chunk@293f8c7c
java.lang.RuntimeException: Oops!! ItemReader, count = 45

このあと、1度そこまでの処理が終了した後で

6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: total count = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: item writer end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close
INFO: item reader end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2
INFO: first job end.

25から(2つ目のチャンクから)再開します。

6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: item reader start, from 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open
INFO: item writer start
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 50

再開後は正常終了して、初回実行時と再実行時では異なるExecution Idが得られることがわかります。

6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$2
INFO: [reader fail and restart] end, execution-id first = 2, restarted = 3

実行後の情報。

mysql> select * from JOB_EXECUTION where JOBEXECUTIONID IN (2, 3);
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
| JOBEXECUTIONID | JOBINSTANCEID | VERSION | CREATETIME          | STARTTIME           | ENDTIME             | LASTUPDATEDTIME     | BATCHSTATUS | EXITSTATUS | JOBPARAMETERS                                                | RESTARTPOSITION |
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
|              2 |             2 |    NULL | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | FAILED      | FAILED     | readerFail = true
batchletFail = false
readerFailCount = 45
 | NULL            |
|              3 |             2 |    NULL | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED   | COMPLETED  | readerFail = false
batchletFail = false
readerFailCount = 0
 | NULL            |
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
2 rows in set (0.00 sec)


mysql> select * from JOB_EXECUTION where JOBEXECUTIONID IN (2, 3);
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
| JOBEXECUTIONID | JOBINSTANCEID | VERSION | CREATETIME          | STARTTIME           | ENDTIME             | LASTUPDATEDTIME     | BATCHSTATUS | EXITSTATUS | JOBPARAMETERS                                                | RESTARTPOSITION |
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
|              2 |             2 |    NULL | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | FAILED      | FAILED     | readerFail = true
batchletFail = false
readerFailCount = 45
 | NULL            |
|              3 |             2 |    NULL | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED   | COMPLETED  | readerFail = false
batchletFail = false
readerFailCount = 0
 | NULL            |
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
2 rows in set (0.00 sec)


mysql> select * from JOB_INSTANCE;
+---------------+---------+---------+-----------------+
| JOBINSTANCEID | VERSION | JOBNAME | APPLICATIONNAME |
+---------------+---------+---------+-----------------+
|             1 |    NULL | my-job  | NULL            |
|             2 |    NULL | my-job  | NULL            |
+---------------+---------+---------+-----------------+
2 rows in set (0.00 sec)



mysql> select * from STEP_EXECUTION where JOBEXECUTIONID IN (2, 3);

| STEPEXECUTIONID | JOBEXECUTIONID | VERSION | STEPNAME      | STARTTIME           | ENDTIME             | BATCHSTATUS | EXITSTATUS       || PERSISTENTUSERDATA | READCOUNT | WRITECOUNT | COMMITCOUNT | ROLLBACKCOUNT | READSKIPCOUNT | PROCESSSKIPCOUNT | FILTERCOUNT | WRITESKIPCOUNT | READERCHECKPOINTINFO                                                          | WRITERCHECKPOINTINFO |

|               3 |              2 |    NULL | chunk-step    | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | FAILED      | FAILED           | java.lang.RuntimeException: Oops!! ItemReader, count = 45
	at org.littlewings.javaee7.batch.MyItemReader.readItem(MyItemReader.scala:41)
	at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:359)
	at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:305)
	at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:201)
	at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:226)
	at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88)
	at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60)
	at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
 | NULL               |        45 |         25 |           1 |             1 |             0 |                0 |           0 |              0 | &#65533;&#65533; sr *org.littlewings.javaee7.batch.MyCheckPoint       ^A^B ^AI counterxp   ^Y   | NULL                 |
|               4 |              3 |    NULL | chunk-step    | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED   | COMPLETED        || NULL               |        75 |         75 |           4 |             0 |             0 |                0 |           0 |              0 | &#65533;&#65533; sr *org.littlewings.javaee7.batch.MyCheckPoint       ^A^B ^AI counterxp   e   | NULL                 |
|               5 |              3 |    NULL | batchlet-step | 2017-06-17 13:12:26 | 2017-06-17 13:12:26 | COMPLETED   | Batchlet Success || NULL               |         0 |          0 |           0 |             0 |             0 |                0 |           0 |              0 | NULL                                                                          | NULL                 |

3 rows in set (0.00 sec)


STEP_EXECUTIONには、スタックトレースまで入っています。

ちなみに、再開時にはJOBEXECUTIONIDは変わりますが、JOBINSTANCEIDは同じものになるみたいですね。

最後は、Batchletでエラーになってもらうパターン。

  test("batchlet fail and restart") {
    logger.info("[batchlet fail and restart] start")

    val jobOperator = BatchRuntime.getJobOperator

    // first fail
    val p1 = new Properties
    p1.put("readerFail", "false")
    p1.put("readerFailCount", "0")
    p1.put("batchletFail", "true")

    val executionId = jobOperator.start("my-job", p1)

    val jobExecution = jobOperator.getJobExecution(executionId)
    jobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS)

    jobExecution.getBatchStatus should be(BatchStatus.FAILED)

    logger.info("first job end.")

    // retry
    val p2 = new Properties
    p2.put("readerFail", "false")
    p2.put("readerFailCount", "0")
    p2.put("batchletFail", "false")

    val restartedExecutionId = jobOperator.restart(executionId, p2)

    executionId should not be (restartedExecutionId)

    val restartedJobExecution = jobOperator.getJobExecution(restartedExecutionId)
    restartedJobExecution.asInstanceOf[JobExecutionImpl].awaitTermination(0L, TimeUnit.SECONDS)

    restartedJobExecution.getBatchStatus should be(BatchStatus.COMPLETED)

    logger.info("restart job end")

    logger.infof("[batchlet fail and restart] end, execution-id first = %d, restarted = %d", executionId, restartedExecutionId)
  }

実行時のログは、こんな感じ。

6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$3
INFO: [batchlet fail and restart] start
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader open
INFO: item reader start, from 0
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter open
INFO: item writer start
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 50
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 75
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader readItem
INFO: read count = 100
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter writeItems
INFO: write items, size = 25
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: total count = 100
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemWriter close
INFO: item writer end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyItemReader close
INFO: item reader end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: process batchlet
6 17, 2017 1:12:26 午後 org.jberet.runtime.runner.BatchletRunner run
WARN: JBERET000001: Failed to run batchlet org.jberet.job.model.RefArtifact@698cf36c
java.lang.RuntimeException: Oops!! Batchlet
	at org.littlewings.javaee7.batch.MyBatchlet.process(MyBatchlet.scala:22)
	at org.jberet.runtime.runner.BatchletRunner.run(BatchletRunner.java:72)
	at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:229)
	at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:128)
	at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:203)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88)
	at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60)
	at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$3
INFO: first job end.
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.MyBatchlet process
INFO: process batchlet
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$3
INFO: restart job end
6 17, 2017 1:12:26 午後 org.littlewings.javaee7.batch.BatchJdbcJobRepositorySpec $anonfun$new$3
INFO: [batchlet fail and restart] end, execution-id first = 4, restarted = 5

この場合、ItemReader/ItemWriterのステップは終了しているので、Batchletのステップからの再開となります。

一応、テーブルの情報も載せておきます。

mysql> select * from JOB_EXECUTION where JOBEXECUTIONID IN (4, 5);
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
| JOBEXECUTIONID | JOBINSTANCEID | VERSION | CREATETIME          | STARTTIME           | ENDTIME             | LASTUPDATEDTIME     | BATCHSTATUS | EXITSTATUS | JOBPARAMETERS                                                | RESTARTPOSITION |
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
|              4 |             3 |    NULL | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | FAILED      | FAILED     | readerFail = false
batchletFail = true
readerFailCount = 0
  | NULL            |
|              5 |             3 |    NULL | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | COMPLETED   | COMPLETED  | readerFail = false
batchletFail = false
readerFailCount = 0
 | NULL            |
+----------------+---------------+---------+---------------------+---------------------+---------------------+---------------------+-------------+------------+--------------------------------------------------------------+-----------------+
2 rows in set (0.00 sec)


mysql> select * from JOB_INSTANCE;
+---------------+---------+---------+-----------------+
| JOBINSTANCEID | VERSION | JOBNAME | APPLICATIONNAME |
+---------------+---------+---------+-----------------+
|             1 |    NULL | my-job  | NULL            |
|             2 |    NULL | my-job  | NULL            |
|             3 |    NULL | my-job  | NULL            |
+---------------+---------+---------+-----------------+
3 rows in set (0.00 sec)


mysql> select * from STEP_EXECUTION where JOBEXECUTIONID IN (4, 5);

| STEPEXECUTIONID | JOBEXECUTIONID | VERSION | STEPNAME      | STARTTIME           | ENDTIME             | BATCHSTATUS | EXITSTATUS       || PERSISTENTUSERDATA | READCOUNT | WRITECOUNT | COMMITCOUNT | ROLLBACKCOUNT | READSKIPCOUNT | PROCESSSKIPCOUNT | FILTERCOUNT | WRITESKIPCOUNT | READERCHECKPOINTINFO                                                          | WRITERCHECKPOINTINFO |

|               6 |              4 |    NULL | chunk-step    | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | COMPLETED   | COMPLETED        || NULL               |       100 |        100 |           5 |             0 |             0 |                0 |           0 |              0 | &#65533;&#65533; sr *org.littlewings.javaee7.batch.MyCheckPoint       ^A^B ^AI counterxp   e   | NULL                 |
|               7 |              4 |    NULL | batchlet-step | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | FAILED      | FAILED           | java.lang.RuntimeException: Oops!! Batchlet
	at org.littlewings.javaee7.batch.MyBatchlet.process(MyBatchlet.scala:22)
	at org.jberet.runtime.runner.BatchletRunner.run(BatchletRunner.java:72)
	at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:229)
	at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:147)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:128)
	at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:203)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:164)
	at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88)
	at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:60)
	at org.jberet.spi.JobExecutor$1.run(JobExecutor.java:99)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
 | NULL               |         0 |          0 |           0 |             0 |             0 |                0 |           0 |              0 | NULL                                                                          | NULL                 |
|               8 |              5 |    NULL | batchlet-step | 2017-06-17 13:12:27 | 2017-06-17 13:12:27 | COMPLETED   | Batchlet Success || NULL               |         0 |          0 |           0 |             0 |             0 |                0 |           0 |              0 | NULL                                                                          | NULL                 |

3 rows in set (0.00 sec)


OKそうですね。

まとめ

jBatch…というか、JBeret SEを使って、Job Repositoryの保存先をデータベース(MySQL)にしてみました。

ちゃんとジョブの情報がMySQLに保存されたことと、再開時の様子も実は今回初めて見たので、理解の助けになりました。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/jbatch-jdbc-job-repository

スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証

トラックバック - http://d.hatena.ne.jp/Kazuhira/20170617/1497674273