Spring Batch (2)
最終更新日: 2021年3月11日
R8 | R9
開発者は、図1の「Job」部分を作成します。Job は一つのバッチ処理の単位で、内部は複数個の「Step」から構成されます。各Stepの実行制御ルールをXMLファイルで記述します。リスタート可能か、といったさまざまな属性も指定できます。
図1の「JobLauncher」は Spring Batch が提供します。開発者は(提供されている)JobLauncher から実行したい Job を取得し、ジョブパラメータを引数として Job を実行します。
ジョブの実行状況(ステータス)は、「Job Repository」に適時、格納されます。内部では RDB を利用しており、その管理用テーブルは Spring Batch が提供します。Wagby は、この管理用テーブルを閲覧するための仕組みを備えています。
開発者が作成する「Step」は、さらに Reader, Processor, Writer という枠組みが提供されています。Reader はデータベースまたはファイルから一定量のデータを読み込みます。読み込んだデータを「Item」という単位で扱い、これを Processor に渡します。Processor は何らかの処理を行うものですが、事前に指定されたコミット数に達したところで、Item を Writer に渡します。Reader が読み込む数と、Writer に書き込ませる数は必ずしも一致させることはありません。Writer は受け取った Item をデータベースまたはファイルに書き込みます。この Reader, Processor, Writer という処理動作をまとめて「Chunk」と呼びます。
開発者は、Wagby が自動生成したモデルクラスやサービスクラスを(バッチプログラムの中で)利用することができます。SQL を使わずに、高品質のバッチプログラムを短期間で開発できます。
ここから、図2の Reader-Processor-Writer パターンに準拠したコードのサンプルを紹介します。
Spring Batch が提供する ItemReader インタフェースを実装したクラスを作成します。データの読み込みを行います。
対象を customer モデルとします。Wagby が自動生成したサービスクラスを利用してデータを1件ずつ取得します。
Spring Batch が提供する ItemProcessor インタフェースを実装したクラスを作成します。データの処理を行います。
ここでは入力値をそのまま返すようにしています。一般的には、ここで計算処理や、加工処理を行います。
Spring Batch が提供する ItemWriter インタフェースを実装したクラスを作成します。加工後のデータの書き込み処理を行います。
ここでは値をコンソールに出力しています。一般的にはデータベースやファイルに書き込みを行う処理を記述します。
このクラスは必須ではありません。
開発者は ItemListenerSupport を継承したクラスを作成することができます。次の例は、読み込み処理の直前に呼び出されるメソッド beforeRead の動作を確認するものです。
図1の「Job Launcher」を利用して、ジョブを実行するコードを紹介します。
独自ボタンを押下したタイミングで、開発者が用意した SpringBatch プログラムを呼び出す処理を記述しています。
最後に、ここまで開発者が用意したクラスをまとめ、名前を付与します。
Wagby と Spring Batch 連携では、ファイル名は batch-context.xml 固定としています。このファイルにジョブフローを記述してください。
ジョブフローには Job, Step, Chunk, Listener を指定することができます。Chunk とは、コミット単位にデータをひとまとめに扱うものです。
chunk には commit-interval という属性を指定できます。これにより、例えば N回の ItemReader を実行したあと、1回の ItemWriter を実行する、という運用が行えます。
内部の管理テーブルで、ItemReaderがコミットした数を記憶しています。
リスタート時には、その位置から読み込みを再開します。
ItemProcessor 内での処理で異常を検知したとき、例外を投げることができます。
skippable-execution-classes 要素で、例外検出時にスキップして処理を継続させる例外を指定します。この処理はロールバックされます。
また、skip-limit(スキップ回数の上限値)を指定できます。これを超えた場合はジョブは失敗という扱いになります。
next要素のon属性で条件を指定できます。
ジョブインスタンスは起動したジョブを区別するための概念です。
ジョブIDとジョブパラメータの組み合わせが同じなら、同じジョブインスタンスとみなします。
正常終了したジョブは、再実行することができません。(二重起動の防止)
ジョブ実行結果は正常終了時は1つです。リラン(再実行)があれば、N個のジョブ実行結果をもちます。
SpringBatch では、実行対象のジョブは、ジョブパラメータによって処理する対象データが決まることを前提としています。
そのため、同じジョブパラメータであれば、再実行は行えません。
同じジョブを何度も起動したい場合はジョブパラメータを意図的に変えるようにしてください。
ジョブ設計のポイントは次の通りです。
Wagby では、オンライン処理(画面操作によるデータベース処理)とは別に、Spring Batch 専用のデータベースセッションを用意しています。
そのため、トランザクションマネージャもオンライン系とは別になります。オンラインとバッチ(Spring Batch)をまたぐトランザクションとすることはできません。
追加したファイルは次のとおりです。
このファイルをダウンロードできます。customize フォルダ直下に展開してください。
MyShowCustomerController でジョブパラメータを追加し、これを MyItemProcessor で利用するサンプルを説明します。
ジョブパラメータに p.username を渡すコードを追加しました。
JobParameter に p を直接、渡すことはできません。渡せる型は String, Date, Long, Double のみとなっています。
username を管理するフィールドと、setter メソッドを用意しました。
MyItemProcessorにジョブパラメータ "username" を渡す設定を行います。
この修正を含めたファイルをダウンロードできます。
Spring Batch の用語
アーキテクチャ
Reader-Processor-Writerパターン
ワンポイント
ジョブ実行コードサンプル
MyItemReader
package jp.jasminesoft.wagby.batch.chunk;
import java.util.List;
import jp.jasminesoft.jfc.dao.CriteriaConverter;
import jp.jasminesoft.jfc.dao.FinderContext;
import jp.jasminesoft.jfc.service.JFCEntityService;
import jp.jasminesoft.wagby.model.customer.Customer;
import jp.jasminesoft.wagby.model.customer_c.CustomerC;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@Component("MyItemReader")
public class MyItemReader implements ItemReader<Customer> {
@Autowired
@Qualifier("CustomerEntityService")
protected JFCEntityService<Customer, Integer> customerEntityService;
@Autowired
@Qualifier("CustomerCriteriaConverter")
protected CriteriaConverter<CustomerC> converter;
private FinderContext<CustomerC> finderContext;
private List<Customer> results;
private int index;
public void init() {
finderContext = new FinderContext<CustomerC>();
finderContext.setCondition(new CustomerC());
finderContext.setPageSize(10);
finderContext.setCriteriaConverter(converter);
results = customerEntityService.find(finderContext);
}
public Customer read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (finderContext == null) {
init();
}
if (index >= results.size()) {
if (finderContext.isNextPage()) {
finderContext.next();
results = customerEntityService.find(finderContext);
index = 0;
} else {
return null;
}
}
Customer customer = results.get(index++);
System.out.println("index="+index+",customer="+customer);
return customer;
}
}
MyItemProcessor
package jp.jasminesoft.wagby.batch.chunk;
import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.item.ItemProcessor;
public class MyItemProcessor implements ItemProcessor<Customer,Customer> {
@Override
public Customer process(Customer customer) throws Exception {
return customer; // Do nothing
}
}
MyItemWriter
package jp.jasminesoft.wagby.batch.chunk;
import java.util.List;
import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.item.ItemWriter;
public class MyItemWriter implements ItemWriter<Customer> {
@Override
public void write(List<? extends Customer> data) throws Exception {
System.out.println("MyItemWriter,data="+data);
}
}
MyItemListener
package jp.jasminesoft.wagby.batch.chunk;
import jp.jasminesoft.wagby.model.customer.Customer;
import org.springframework.batch.core.listener.ItemListenerSupport;
public class MyItemListener extends ItemListenerSupport<Customer,Customer> {
@Override
public void beforeRead() {
System.out.println("beforeRead !");
}
}
ジョブランチャー
MyShowCustomerController
package jp.jasminesoft.wagby.controller.customer;
import static jp.jasminesoft.util.ExcelFunction.*;
import java.security.Permission;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.sql.*;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import jp.jasminesoft.util.*;
import jp.jasminesoft.jfc.*;
import jp.jasminesoft.jfc.controller.*;
import jp.jasminesoft.jfc.service.*;
import jp.jasminesoft.jfc.menu.*;
import jp.jasminesoft.wagby.app.*;
import org.apache.logging.log4j.*;
import org.apache.commons.lang3.*;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
/**
* SpringBatch Sample
*
* @author JasmineSoft
*/
@Controller
public class MyShowCustomerController
extends ShowCustomerController
{
@Override
public String do_original(ActionParameter p)
throws IOException, ServletException, SecurityException
{
SimpleJobLauncher launcher = (SimpleJobLauncher)p.appctx.getBean("jobLauncher");
Job job = (Job)p.appctx.getBean("Batch1");
try {
System.out.println("job="+job);
java.util.Map<String,JobParameter> params = new HashMap<String,JobParameter>();
params.put("date1", new JobParameter(new java.util.Date()));
JobExecution jobExecution = launcher.run(job, new JobParameters(params));
System.out.println("jobExecution="+jobExecution);
} catch (JobExecutionAlreadyRunningException e) {
} catch (JobRestartException e) {
} catch (JobInstanceAlreadyCompleteException e) {
} catch (JobParametersInvalidException e) {
}
return do_show(p);
}
}
ジョブフロー
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="MyItemReader" class="jp.jasminesoft.wagby.batch.chunk.MyItemReader" scope="step"/>
<bean id="MyItemProcessor" class="jp.jasminesoft.wagby.batch.chunk.MyItemProcessor" scope="step"/>
<bean id="MyItemWriter" class="jp.jasminesoft.wagby.batch.chunk.MyItemWriter" scope="step"/>
<bean id="MyItemListener" class="jp.jasminesoft.wagby.batch.chunk.MyItemListener" scope="step"/>
<!-- Batch1 start -->
<batch:job id="Batch1">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="MyItemReader" processor="MyItemProcessor"
writer="MyItemWriter" commit-interval="1">
<batch:listeners>
<batch:listener ref="MyItemListener" />
</batch:listeners>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<!-- Batch1 end -->
</beans>
コミットインターバル
リスタートの位置
例外発生時のスキップ処理
<batch:tasklet>
<batch:chunk reader="MyItemReader" processor="MyItemProcessor"
writer="MyItemWriter" skip-limit="10">
<batch:skippable-exception-classes>
<batch:include class="jp.jasminesoft.wagby.job.MySkipException" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
条件分岐
<batch:job id="job1">
<batch:step id="step1">
<next on="*" to="next3"/>
<next on="FAILED" to="next2"/>
</batch:step>
<batch:step id="step2" next="step3"/>
<batch:step id="step3"/>
</batch:job>
ジョブインスタンス
一つのジョブを何回も起動する場合
ジョブ設計のポイント
データベースセッション
ダウンロード
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemListener.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemProcessor.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemReader.java
customize/java/jp/jasminesoft/wagby/batch/chunk/MyItemWriter.java
customize/java/jp/jasminesoft/wagby/controller/customer/MyShowCustomerController.java
customize/webapp/WEB-INF/applicationContext/batch-context.xml
「Javaソースコードの設定 > カスタマイズ用のクラスを出力する」を有効にします。
詳細画面に独自ボタンを用意してください。ボタン名は任意です。ここではイベント名を "Original1" としています。アクション名、追加パラメータは空白としてください。
ジョブパラメータの扱い
MyShowCustomerController
@Override
public String do_original(ActionParameter p)
throws IOException, ServletException, SecurityException
{
SimpleJobLauncher launcher = (SimpleJobLauncher)p.appctx.getBean("jobLauncher");
Job job = (Job)p.appctx.getBean("Batch1");
try {
System.out.println("job="+job);
java.util.Map<String,JobParameter> params = new HashMap<String,JobParameter>();
params.put("date1", new JobParameter(new java.util.Date()));
params.put("username", new JobParameter(p.user.getUsername()));//追加したコード
JobExecution jobExecution = launcher.run(job, new JobParameters(params));
System.out.println("jobExecution="+jobExecution);
} catch (JobExecutionAlreadyRunningException e) {
} catch (JobRestartException e) {
} catch (JobInstanceAlreadyCompleteException e) {
} catch (JobParametersInvalidException e) {
}
return do_show(p);
}
MyItemProcessor
public class MyItemProcessor implements ItemProcessor<Customer,Customer> {
private String username;
public void setUsername(String username) {
this.username = username;
}
@Override
public Customer process(Customer customer) throws Exception {
System.out.println("username="+username);
return customer; // Do nothing
}
}
batch-context.xml
...
<bean id="MyItemProcessor" class="jp.jasminesoft.wagby.batch.chunk.MyItemProcessor" scope="step">
<property name="username" value="#{jobParameters[username]}" />
</bean>
...
ダウンロード