Spring Batch (2)

最終更新日: 2021年3月11日
R8 | R9

Spring Batch の用語

アーキテクチャ

開発者は、図1の「Job」部分を作成します。Job は一つのバッチ処理の単位で、内部は複数個の「Step」から構成されます。各Stepの実行制御ルールをXMLファイルで記述します。リスタート可能か、といったさまざまな属性も指定できます。

図1 アーキテクチャ

図1の「JobLauncher」は Spring Batch が提供します。開発者は(提供されている)JobLauncher から実行したい Job を取得し、ジョブパラメータを引数として Job を実行します。

ジョブの実行状況(ステータス)は、「Job Repository」に適時、格納されます。内部では RDB を利用しており、その管理用テーブルは Spring Batch が提供します。Wagby は、この管理用テーブルを閲覧するための仕組みを備えています。

Reader-Processor-Writerパターン

開発者が作成する「Step」は、さらに Reader, Processor, Writer という枠組みが提供されています。Reader はデータベースまたはファイルから一定量のデータを読み込みます。読み込んだデータを「Item」という単位で扱い、これを Processor に渡します。Processor は何らかの処理を行うものですが、事前に指定されたコミット数に達したところで、Item を Writer に渡します。Reader が読み込む数と、Writer に書き込ませる数は必ずしも一致させることはありません。Writer は受け取った Item をデータベースまたはファイルに書き込みます。この Reader, Processor, Writer という処理動作をまとめて「Chunk」と呼びます。

図2 Reader-Processor-Writerパターン

ワンポイント

開発者は、Wagby が自動生成したモデルクラスやサービスクラスを(バッチプログラムの中で)利用することができます。SQL を使わずに、高品質のバッチプログラムを短期間で開発できます。

ジョブ実行コードサンプル

ここから、図2の Reader-Processor-Writer パターンに準拠したコードのサンプルを紹介します。

図3 サンプルコードの構成

MyItemReader

Spring Batch が提供する ItemReader インタフェースを実装したクラスを作成します。データの読み込みを行います。

対象を customer モデルとします。Wagby が自動生成したサービスクラスを利用してデータを1件ずつ取得します。

package jp.jasminesoft.wagby.batch.chunk;
import java.util.List;
import jp.jasminesoft.jfc.dao.CriteriaConverter;
import jp.jasminesoft.jfc.dao.FinderContext;
import jp.jasminesoft.jfc.service.JFCEntityService;
import jp.jasminesoft.wagby.model.customer.Customer;
import jp.jasminesoft.wagby.model.customer_c.CustomerC;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@Component("MyItemReader")
public class MyItemReader implements ItemReader<Customer> {
    @Autowired
    @Qualifier("CustomerEntityService")
    protected JFCEntityService<Customer, Integer> customerEntityService;
    @Autowired
    @Qualifier("CustomerCriteriaConverter")
    protected CriteriaConverter<CustomerC> converter;
    private FinderContext<CustomerC> finderContext;
    private List<Customer> results;
    private int index;
    public void init() {
        finderContext = new FinderContext<CustomerC>();
        finderContext.setCondition(new CustomerC());
        finderContext.setPageSize(10);
        finderContext.setCriteriaConverter(converter);
        results = customerEntityService.find(finderContext);
    }
    public Customer read() throws Exception, UnexpectedInputException,
    ParseException, NonTransientResourceException {
        if (finderContext == null) {
       	    init();
        }
        if (index >= results.size()) {
       	    if (finderContext.isNextPage()) {
       	        finderContext.next();
       	        results = customerEntityService.find(finderContext);
       	        index = 0;
       	    } else {
       	        return null;
       	    }
        }
        Customer customer = results.get(index++);
        System.out.println("index="+index+",customer="+customer);
        return customer;
    }
}

MyItemProcessor

Spring Batch が提供する ItemProcessor インタフェースを実装したクラスを作成します。データの処理を行います。

ここでは入力値をそのまま返すようにしています。一般的には、ここで計算処理や、加工処理を行います。

package jp.jasminesoft.wagby.batch.chunk;
import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.item.ItemProcessor;
public class MyItemProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer customer) throws Exception {
        return customer; // Do nothing
    }
}

MyItemWriter

Spring Batch が提供する ItemWriter インタフェースを実装したクラスを作成します。加工後のデータの書き込み処理を行います。

ここでは値をコンソールに出力しています。一般的にはデータベースやファイルに書き込みを行う処理を記述します。

package jp.jasminesoft.wagby.batch.chunk;
import java.util.List;
import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.item.ItemWriter;
public class MyItemWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> data) throws Exception {
        System.out.println("MyItemWriter,data="+data);
    }
}

MyItemListener

このクラスは必須ではありません。

開発者は ItemListenerSupport を継承したクラスを作成することができます。次の例は、読み込み処理の直前に呼び出されるメソッド beforeRead の動作を確認するものです。

package jp.jasminesoft.wagby.batch.chunk;
import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.core.listener.ItemListenerSupport;
public class MyItemListener extends ItemListenerSupport<Customer,Customer> {
    @Override
    public void beforeRead() {
        System.out.println("beforeRead !");
    }
}

ジョブランチャー

図1の「Job Launcher」を利用して、ジョブを実行するコードを紹介します。

MyShowCustomerController

独自ボタンを押下したタイミングで、開発者が用意した SpringBatch プログラムを呼び出す処理を記述しています。

package jp.jasminesoft.wagby.controller.customer;
import static jp.jasminesoft.util.ExcelFunction.*;
import java.security.Permission;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.sql.*;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import jp.jasminesoft.util.*;
import jp.jasminesoft.jfc.*;
import jp.jasminesoft.jfc.controller.*;
import jp.jasminesoft.jfc.service.*;
import jp.jasminesoft.jfc.menu.*;
import jp.jasminesoft.wagby.app.*;
import org.apache.logging.log4j.*;
import org.apache.commons.lang3.*;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
/**
 * SpringBatch Sample
 *
 * @author JasmineSoft
 */
@Controller
public class MyShowCustomerController
    extends ShowCustomerController
{
    @Override
    public String do_original(ActionParameter p)
    throws IOException, ServletException, SecurityException
    {
        SimpleJobLauncher launcher = (SimpleJobLauncher)p.appctx.getBean("jobLauncher");
        Job job = (Job)p.appctx.getBean("Batch1");
        try {
            System.out.println("job="+job);
            java.util.Map<String,JobParameter> params = new HashMap<String,JobParameter>();
            params.put("date1", new JobParameter(new java.util.Date()));
            JobExecution jobExecution = launcher.run(job, new JobParameters(params));
            System.out.println("jobExecution="+jobExecution);
        } catch (JobExecutionAlreadyRunningException e) {
        } catch (JobRestartException e) {
        } catch (JobInstanceAlreadyCompleteException e) {
        } catch (JobParametersInvalidException e) {
        }
        return do_show(p);
    }
}

ジョブフロー

最後に、ここまで開発者が用意したクラスをまとめ、名前を付与します。 Wagby と Spring Batch 連携では、ファイル名は batch-context.xml 固定としています。このファイルにジョブフローを記述してください。

ジョブフローには Job, Step, Chunk, Listener を指定することができます。Chunk とは、コミット単位にデータをひとまとめに扱うものです。

図4 ジョブフロー
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd">
  <bean id="MyItemReader" class="jp.jasminesoft.wagby.batch.chunk.MyItemReader" scope="step"/>
  <bean id="MyItemProcessor" class="jp.jasminesoft.wagby.batch.chunk.MyItemProcessor" scope="step"/>
  <bean id="MyItemWriter" class="jp.jasminesoft.wagby.batch.chunk.MyItemWriter" scope="step"/>
  <bean id="MyItemListener" class="jp.jasminesoft.wagby.batch.chunk.MyItemListener" scope="step"/>
  <!-- Batch1 start -->
  <batch:job id="Batch1">
    <batch:step id="step1">
      <batch:tasklet>
<batch:chunk reader="MyItemReader" processor="MyItemProcessor"
     writer="MyItemWriter" commit-interval="1">
  <batch:listeners>
    <batch:listener ref="MyItemListener" />
  </batch:listeners>
</batch:chunk>
      </batch:tasklet>
    </batch:step>
  </batch:job>
  <!-- Batch1 end -->
</beans>
  • ジョブには識別子(ID)が必要です。ここでは "Batch1" としました。
  • 単純にするため、ジョブステップは一つとしています。複数のステップを用意することができます。
  • batch.chunk パッケージ内のクラスのスコープは "step" を指定してください。

コミットインターバル

chunk には commit-interval という属性を指定できます。これにより、例えば N回の ItemReader を実行したあと、1回の ItemWriter を実行する、という運用が行えます。

複数の読み込みに対して、一つの書き込みというスタイルがよいでしょう。

リスタートの位置

内部の管理テーブルで、ItemReaderがコミットした数を記憶しています。 リスタート時には、その位置から読み込みを再開します。

例外発生時のスキップ処理

ItemProcessor 内での処理で異常を検知したとき、例外を投げることができます。

skippable-execution-classes 要素で、例外検出時にスキップして処理を継続させる例外を指定します。この処理はロールバックされます。

また、skip-limit(スキップ回数の上限値)を指定できます。これを超えた場合はジョブは失敗という扱いになります。

<batch:tasklet>
  <batch:chunk reader="MyItemReader" processor="MyItemProcessor"
                writer="MyItemWriter" skip-limit="10">
    <batch:skippable-exception-classes>
      <batch:include class="jp.jasminesoft.wagby.job.MySkipException" />
    </batch:skippable-exception-classes>
  </batch:chunk>
</batch:tasklet>

条件分岐

next要素のon属性で条件を指定できます。

<batch:job id="job1">
   <batch:step id="step1">
      <next on="*" to="next3"/>
      <next on="FAILED" to="next2"/>
   </batch:step>
   <batch:step id="step2" next="step3"/>
   <batch:step id="step3"/>
</batch:job>

ジョブインスタンス

ジョブインスタンスは起動したジョブを区別するための概念です。

図5 ジョブインスタンス

ジョブIDとジョブパラメータの組み合わせが同じなら、同じジョブインスタンスとみなします。 正常終了したジョブは、再実行することができません。(二重起動の防止)

ジョブ実行結果は正常終了時は1つです。リラン(再実行)があれば、N個のジョブ実行結果をもちます。

一つのジョブを何回も起動する場合

SpringBatch では、実行対象のジョブは、ジョブパラメータによって処理する対象データが決まることを前提としています。

そのため、同じジョブパラメータであれば、再実行は行えません。 同じジョブを何度も起動したい場合はジョブパラメータを意図的に変えるようにしてください。

一般に、ジョブパラメータには日付を指定します。(例:target=2016-12-01 と指定した場合、その日のデータだけが対象となり、他の日のデータは使わない、など。)

ジョブ設計のポイント

ジョブ設計のポイントは次の通りです。

  • 1つの「ステップ」に入れる処理をどこまで細かくするか。
    • 細かくすると失敗、再実行時のリカバリ時間を短縮しやすい。
    • 細かくしすぎると制御が難しくなる。
  • オンライン処理を受け付けたまま、更新系のバッチ処理を並行して実行することがあるか。
    • オンライン側でロックをかけていたときの、バッチ処理の動作はどうあるべきか。
    • バッチ処理でロックをかけた場合、オンライン側は失敗させていいか。

データベースセッション

Wagby では、オンライン処理(画面操作によるデータベース処理)とは別に、Spring Batch 専用のデータベースセッションを用意しています。

接続先データベース情報は、オンライン処理と同じものを使います。これはDesignerの「環境 > データベース」に記載された情報です。

そのため、トランザクションマネージャもオンライン系とは別になります。オンラインとバッチ(Spring Batch)をまたぐトランザクションとすることはできません。

ダウンロード

追加したファイルは次のとおりです。

customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemListener.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemProcessor.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemReader.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemWriter.java
customize/java/jp/jasminesoft/wagby/controller/customer/MyShowCustomerController.java
customize/webapp/WEB-INF/applicationContext/batch-context.xml

このファイルをダウンロードできます。customize フォルダ直下に展開してください。

別途、"customer" モデルを用意してからお試しください。customer モデルの定義内容は問いません。
「Javaソースコードの設定 > カスタマイズ用のクラスを出力する」を有効にします。
詳細画面に独自ボタンを用意してください。ボタン名は任意です。ここではイベント名を "Original1" としています。アクション名、追加パラメータは空白としてください。
図6 カスタマイズ用のクラスを出力する&独自ボタンの設定

ジョブパラメータの扱い

MyShowCustomerController でジョブパラメータを追加し、これを MyItemProcessor で利用するサンプルを説明します。

MyShowCustomerController

ジョブパラメータに p.username を渡すコードを追加しました。

    @Override
    public String do_original(ActionParameter p)
    throws IOException, ServletException, SecurityException
    {
        SimpleJobLauncher launcher = (SimpleJobLauncher)p.appctx.getBean("jobLauncher");
        Job job = (Job)p.appctx.getBean("Batch1");
        try {
            System.out.println("job="+job);
    java.util.Map<String,JobParameter> params = new HashMap<String,JobParameter>();
    params.put("date1", new JobParameter(new java.util.Date()));
            params.put("username", new JobParameter(p.user.getUsername()));//追加したコード
            JobExecution jobExecution = launcher.run(job, new JobParameters(params));
            System.out.println("jobExecution="+jobExecution);
        } catch (JobExecutionAlreadyRunningException e) {
        } catch (JobRestartException e) {
        } catch (JobInstanceAlreadyCompleteException e) {
        } catch (JobParametersInvalidException e) {
        }
        return do_show(p);
    }

JobParameter に p を直接、渡すことはできません。渡せる型は String, Date, Long, Double のみとなっています。

MyItemProcessor

username を管理するフィールドと、setter メソッドを用意しました。

public class MyItemProcessor implements ItemProcessor<Customer,Customer> {
    private String username;
    public void setUsername(String username) {
        this.username = username;
    }
    @Override
    public Customer process(Customer customer) throws Exception {
System.out.println("username="+username);
        return customer; // Do nothing
    }
}

batch-context.xml

MyItemProcessorにジョブパラメータ "username" を渡す設定を行います。

...
  <bean id="MyItemProcessor" class="jp.jasminesoft.wagby.batch.chunk.MyItemProcessor" scope="step">
    <property name="username" value="#{jobParameters[username]}" />
  </bean>
...

ダウンロード

この修正を含めたファイルをダウンロードできます。

別途、"customer" モデルを用意してからお試しください。customer モデルの定義内容は問いません。