Spring Batch のサンプルコードを説明します。

アーキテクチャ

開発者は、図1の「Job」部分を作成します。Job は一つのバッチ処理の単位で、内部は複数個の「Step」から構成されます。各Stepの実行制御ルールをXMLファイルで記述します。リスタート可能か、といったさまざまな属性も指定できます。

図1 アーキテクチャ

図1の「JobLauncher」は Spring Batch が提供します。開発者は(提供されている)JobLauncher から実行したい Job を取得し、ジョブパラメータを引数として Job を実行します。

ジョブの実行状況(ステータス)は、「Job Repository」に適時、格納されます。内部では RDB を利用しており、その管理用テーブルは Spring Batch が提供します。Wagby は、この管理用テーブルを閲覧するための仕組みを備えています。

Reader-Processor-Writerパターン

開発者が作成する「Step」は、さらに Reader, Processor, Writer という枠組みが提供されています。Reader はデータベースまたはファイルから一定量のデータを読み込みます。読み込んだデータを「Item」という単位で扱い、これを Processor に渡します。Processor は何らかの処理を行うものですが、事前に指定されたコミット数に達したところで、Item を Writer に渡します。Reader が読み込む数と、Writer に書き込ませる数は必ずしも一致させることはありません。Writer は受け取った Item をデータベースまたはファイルに書き込みます。この Reader, Processor, Writer という処理動作をまとめて「Chunk」と呼びます。

図2 Reader-Processor-Writer パターン
開発者は、Wagby が自動生成したモデルクラスやサービスクラスを(バッチプログラムの中で)利用することができます。SQL を使わずに、高品質のバッチプログラムを短期間で開発できます。

ここから、図2の Reader-Processor-Writer パターンに準拠したコードのサンプルを紹介します。

図3 サンプルコードの構成

MyItemReader

Spring Batch が提供する ItemReader インタフェースを実装したクラスを作成します。データの読み込みを行います。

対象を customer モデルとします。Wagby が自動生成したサービスクラスを利用してデータを1件ずつ取得します。

package jp.jasminesoft.wagby.batch.chunk;

import java.util.List;
import jp.jasminesoft.jfc.dao.CriteriaConverter;
import jp.jasminesoft.jfc.dao.FinderContext;
import jp.jasminesoft.jfc.service.JFCEntityService;
import jp.jasminesoft.wagby.model.customer.Customer;
import jp.jasminesoft.wagby.model.customer_c.CustomerC;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component("MyItemReader")
public class MyItemReader implements ItemReader<Customer> {
    @Autowired
    @Qualifier("CustomerEntityService")
    protected JFCEntityService<Customer, Integer> customerEntityService;

    @Autowired
    @Qualifier("CustomerCriteriaConverter")
    protected CriteriaConverter<CustomerC> converter;
  
    private FinderContext<CustomerC> finderContext;
    private List<Customer> results;
    private int index;

    public void init() {
        finderContext = new FinderContext<CustomerC>();
        finderContext.setCondition(new CustomerC());
        finderContext.setPageSize(10);
        finderContext.setCriteriaConverter(converter);
        results = customerEntityService.find(finderContext);
    }

    public Customer read() throws Exception, UnexpectedInputException,
    ParseException, NonTransientResourceException {
        if (finderContext == null) {
       	    init();
        }
        if (index >= results.size()) {
       	    if (finderContext.isNextPage()) {
       	        finderContext.next();
       	        results = customerEntityService.find(finderContext);
       	        index = 0;
       	    } else {
       	        return null;
       	    }
        }
        Customer customer = results.get(index++);
        System.out.println("index="+index+",customer="+customer);
        return customer;
    }
}

MyItemProcessor

Spring Batch が提供する ItemProcessor インタフェースを実装したクラスを作成します。データの処理を行います。

ここでは入力値をそのまま返すようにしています。一般的には、ここで計算処理や、加工処理を行います。

package jp.jasminesoft.wagby.batch.chunk;

import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.item.ItemProcessor;

public class MyItemProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer customer) throws Exception {
        return customer; // Do nothing
    }
}

MyItemWriter

Spring Batch が提供する ItemWriter インタフェースを実装したクラスを作成します。加工後のデータの書き込み処理を行います。

ここでは値をコンソールに出力しています。一般的にはデータベースやファイルに書き込みを行う処理を記述します。

package jp.jasminesoft.wagby.batch.chunk;

import java.util.List;
import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.item.ItemWriter;

public class MyItemWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> data) throws Exception {
        System.out.println("MyItemWriter,data="+data);
    }
}

MyItemListener

このクラスは必須ではありません。

開発者は ItemListenerSupport を継承したクラスを作成することができます。次の例は、読み込み処理の直前に呼び出されるメソッド beforeRead の動作を確認するものです。

package jp.jasminesoft.wagby.batch.chunk;

import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.core.listener.ItemListenerSupport;

public class MyItemListener extends ItemListenerSupport<Customer,Customer> {

    @Override
    public void beforeRead() {
        System.out.println("beforeRead !");
    }
}

図1の「Job Launcher」を利用して、ジョブを実行するコードを紹介します。

MyShowCustomerController

独自ボタンを押下したタイミングで、開発者が用意した SpringBatch プログラムを呼び出す処理を記述しています。

package jp.jasminesoft.wagby.controller.customer;

import static jp.jasminesoft.util.ExcelFunction.*;

import java.security.Permission;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.sql.*;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import jp.jasminesoft.util.*;
import jp.jasminesoft.jfc.*;
import jp.jasminesoft.jfc.controller.*;
import jp.jasminesoft.jfc.service.*;
import jp.jasminesoft.jfc.menu.*;

import jp.jasminesoft.wagby.app.*;

import org.apache.log4j.Logger;
import org.apache.commons.lang3.*;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;

/**
 * SpringBatch Sample
 *
 * @author JasmineSoft
 */
@Controller
public class MyShowCustomerController
    extends ShowCustomerController
{
    @Override
    public String do_original(DbActionParameter p)
    throws IOException, ServletException, SecurityException
    {
        SimpleJobLauncher launcher = (SimpleJobLauncher)p.appctx.getBean("jobLauncher");
        Job job = (Job)p.appctx.getBean("Batch1");
        try {
            System.out.println("job="+job);
	    java.util.Map<String,JobParameter> params = new HashMap<String,JobParameter>();
	    params.put("date1", new JobParameter(new java.util.Date()));
            JobExecution jobExecution = launcher.run(job, new JobParameters(params));
            System.out.println("jobExecution="+jobExecution);
        } catch (JobExecutionAlreadyRunningException e) {
        } catch (JobRestartException e) {
        } catch (JobInstanceAlreadyCompleteException e) {
        } catch (JobParametersInvalidException e) {
        }
        return do_show(p);
    }
}
  • SimpleJobLauncher は SpringBatch が提供するクラスです。開発者が用意したジョブを呼び出すための基本機能を提供しています。
  • ジョブ名 "Batch1" は、次章で説明します。
  • ジョブパラメータ date1 は、ダミー情報です。実際には利用していません。ここではパラメータの渡し方の例として記述しています。
  • コントローラクラスの拡張方法(接頭語に My をつけ、必要なメソッドだけをオーバーライドする)の詳細は「Javaを用いたカスタマイズ」をお読みください。

最後に、ここまで開発者が用意したクラスをまとめ、名前を付与します。 Wagby と Spring Batch 連携では、ファイル名は batch-context.xml 固定としています。このファイルにジョブフローを記述してください。

ジョブフローには Job, Step, Chunk, Listener を指定することができます。Chunk とは、コミット単位にデータをひとまとめに扱うものです。

図4 ジョブフロー
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans.xsd">

  <bean id="MyItemReader" class="jp.jasminesoft.wagby.batch.chunk.MyItemReader"/>
  <bean id="MyItemProcessor" class="jp.jasminesoft.wagby.batch.chunk.MyItemProcessor"/>
  <bean id="MyItemWriter" class="jp.jasminesoft.wagby.batch.chunk.MyItemWriter"/>
  <bean id="MyItemListener" class="jp.jasminesoft.wagby.batch.chunk.MyItemListener"/>

  <!-- Batch1 start -->
  <batch:job id="Batch1">
    <batch:step id="step1">
      <batch:tasklet>
	<batch:chunk reader="MyItemReader" processor="MyItemProcessor"
		     writer="MyItemWriter" commit-interval="1">
	  <batch:listeners>
	    <batch:listener ref="MyItemListener" />
	  </batch:listeners>
	</batch:chunk>
      </batch:tasklet>
    </batch:step>
  </batch:job>
  <!-- Batch1 end -->
</beans>
この XML ファイルは Wagby R7.10 以降の版で動作します。それ以前の Wagby を利用する場合は、"spring-batch.xsd" の部分を "spring-batch-2.2.xsd" に書き換えてください。
  • ジョブには識別子(ID)が必要です。ここでは "Batch1" としました。
  • 単純にするため、ジョブステップは一つとしています。複数のステップを用意することができます。

コミットインターバル

chunk には commit-interval という属性を指定できます。これにより、例えば N回の ItemReader を実行したあと、1回の ItemWriter を実行する、という運用が行えます。

複数の読み込みに対して、一つの書き込みというスタイルがよいでしょう。

リスタートの位置

内部の管理テーブルで、ItemReaderがコミットした数を記憶しています。 リスタート時には、その位置から読み込みを再開します。

例外発生時のスキップ処理

ItemProcessor 内での処理で異常を検知したとき、例外を投げることができます。

skippable-execution-classes 要素で、例外検出時にスキップして処理を継続させる例外を指定します。この処理はロールバックされます。

また、skip-limit(スキップ回数の上限値)を指定できます。これを超えた場合はジョブは失敗という扱いになります。

<batch:tasklet>
  <batch:chunk reader="MyItemReader" processor="MyItemProcessor"
                writer="MyItemWriter" skip-limit="10">
    <batch:skippable-exception-classes>
      <batch:include class="jp.jasminesoft.wagby.job.MySkipException" />
    </batch:skippable-exception-classes>
  </batch:chunk>
</batch:tasklet>

条件分岐

next要素のon属性で条件を指定できます。

<batch:job id="job1">
  
   <batch:step id="step1">
      <next on="*" to="next3"/>
      <next on="FAILED" to="next2"/>
   </batch:step>
 
   <batch:step id="step2" next="step3"/>
  
   <batch:step id="step3"/>
</batch:job>
詳細は公式マニュアル http://docs.spring.io/spring-batch/trunk/reference/html/index.html をお読みください。

ジョブインスタンスは起動したジョブを区別するための概念です。

図5 ジョブインスタンス

ジョブIDとジョブパラメータの組み合わせが同じなら、同じジョブインスタンスとみなします。 正常終了したジョブは、再実行することができません。(二重起動の防止)

ジョブ実行結果は正常終了時は1つです。リラン(再実行)があれば、N個のジョブ実行結果をもちます。

一つのジョブを何回も起動する場合

SpringBatch では、実行対象のジョブは、ジョブパラメータによって処理する対象データが決まることを前提としています。

そのため、同じジョブパラメータであれば、再実行は行えません。 同じジョブを何度も起動したい場合はジョブパラメータを意図的に変えるようにしてください。

一般に、ジョブパラメータには日付を指定します。(例:target=2016-12-01 と指定した場合、その日のデータだけが対象となり、他の日のデータは使わない、など。)

ジョブ設計のポイントは次の通りです。

  • 1つの「ステップ」に入れる処理をどこまで細かくするか。
    • 細かくすると失敗、再実行時のリカバリ時間を短縮しやすい。
    • 細かくしすぎると制御が難しくなる。
  • オンライン処理を受け付けたまま、更新系のバッチ処理を並行して実行することがあるか。
    • オンライン側でロックをかけていたときの、バッチ処理の動作はどうあるべきか。
    • バッチ処理でロックをかけた場合、オンライン側は失敗させていいか。

データベースセッション

Wagby では、オンライン処理(画面操作によるデータベース処理)とは別に、Spring Batch 専用のデータベースセッションを用意しています。

接続先データベース情報は、オンライン処理と同じものを使います。これはDesignerの「環境 > データベース」に記載された情報です。

そのため、トランザクションマネージャもオンライン系とは別になります。オンラインとバッチ(Spring Batch)をまたぐトランザクションとすることはできません。

追加したファイルは次のとおりです。

customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemListener.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemProcessor.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemReader.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemWriter.java
customize/java/jp/jasminesoft/wagby/controller/customer/MyShowCustomerController.java
customize/webapp/WEB-INF/applicationContext/batch-context.xml

このファイルをダウンロードできます。customize フォルダ直下に展開してください。

別途、"customer" モデルを用意してからお試しください。customer モデルの定義内容は問いません。
「Javaソースコードの設定 > カスタマイズ用のクラスを出力する」を有効にします。
詳細画面に独自ボタンを用意してください。(ボタン名は任意です。イベント名、アクション名、追加パラメータはすべて空白としてください。)
図6 カスタマイズ用のクラスを出力する&独自ボタンの設定