Create: 2014/09/20
LastUpdate: 2014/09/20
Jobscheduler FAQの「Sequential execution of jobs in parallel」 を参考にして、異種ファイルは並列処理しながら、同種ファイルは順番に処理するジョブを作ってみました。
動作確認は、JobScheduler1.7 を使用しています。
シナリオ
あるディレクトリに、以下のような2種類(a系、b系)のファイルが着信したら、このデータをDBにロードするジョブを考えます。
a系のデータは、「テーブルa」にロードし、b系のデータは、「テーブルb」にロードします。
- [a系のデータ] ・・・ a001.csv、a002.csv、a003.csv
- [b系のデータ] ・・・ b001.csv、b002.csv、b003.csv
一番シンプルな方法(シーケンシャル処理)でジョブを作成
一番シンプルな方法は、「 [CentOS6][SOS JobScheduler] JobChain をファイルトリガで実行」で紹介した手順で、ファイルを1個づつ処理する方法です。
例えば、JobChain は以下のようにします。
/tmp/sandbox/data/in2 ディレクトリを監視して、<file_order_source> の正規表現で指定したファイルが存在したらロード処理をするジョブ(load_file_process) を実行します。
ロードに成功したら、ファイルを削除し、失敗したら、/tmp/sandbox/data/failed に移動します。
<?xml version="1.0" encoding="ISO-8859-1"?> <job_chain orders_recoverable="yes" visible="yes"> <file_order_source directory="/tmp/sandbox/data/in2" regex="^a[A-Za-z0-9_]*\.csv$"/> <file_order_source directory="/tmp/sandbox/data/in2" regex="^b[A-Za-z0-9_]*\.csv$"/> <job_chain_node state="load_file_process" job="load_file_process" next_state="ok" error_state="ng"/> <file_order_sink state="ok" remove="yes"/> <file_order_sink state="ng" move_to="/tmp/sandbox/data/failed"/> </job_chain>ジョブは、以下のようにします。
テスト用なのでDBへのロードまではしませんが、a系のファイルの場合は 1秒 sleep し、 b系のファイルの場合は、10秒 sleep します。
また、ファイル内容に "ng" を含む場合は、ジョブを異常終了するようにしています。
<?xml version="1.0" encoding="ISO-8859-1"?> <job order="yes" stop_on_error="no" title="Load Files into database" tasks="1"> <script language="shell"> <![CDATA[ # 本来は、ここにロード処理を記述する # 以下は、テスト用のダミー env | sort if [ "`basename ${SCHEDULER_TASK_TRIGGER_FILES} | egrep -c '^a'`" -eq 0 ]; then sleep 1 else sleep 10 fi ret=`grep -ic "ng" ${SCHEDULER_TASK_TRIGGER_FILES}` exit $ret ]]> </script> <monitor name="process0" ordering="0"> <script language="perl"> <include file="./config/live/sample1/debug.pl"/> </script> </monitor> <run_time /> </job>ジョブの動きを観察しやすいように <monitor> でデバッグログを出力します。<monitor>の debug.pl については、「 [CentOS6][SOS JobScheduler] 複数のJobChain 並列に実行し、複数の同期ポイントを設定するを参照してください。
一番シンプルな方法(シーケンシャル処理)でジョブを実行
上記でJobChain (load_files_simple) と Job (load_file_process) を作成した時点で、ディレクトリの監視が始まっています。
JOC で見ると下図のように表示されます。
/tmp/sandbox/data/in2 ディレクトリにファイルを作成すると、ジョブが実行されます。
今回は、以下のようなシェル・スクリプト(put_test_data.sh)を作成して、テスト用のファイルをつくります。
DATA_BASE=/tmp/sandbox/data if [ "$1" = "" -o ! -d "${DATA_BASE}/$1" ]; then echo "[usage] sh $0 [in | in2]" exit 1 fi echo "ok" > ${DATA_BASE}/$1/a001.csv echo "ok" > ${DATA_BASE}/$1/b001.csv echo "ok" > ${DATA_BASE}/$1/a002.csv echo "ok" > ${DATA_BASE}/$1/b002.csv echo "ok" > ${DATA_BASE}/$1/a003.csv echo "ok" > ${DATA_BASE}/$1/b003.csv echo ${DATA_BASE}/$1 ls -l ${DATA_BASE}/$1 exit 0以下のようにしてテストファイルを作成すると、ジョブが実行されます。
$ sh ./put_test_data.sh in2 /tmp/sandbox/data/in2 合計 24 -rw-rw-r-- 1 jobs jobs 3 9月 20 14:13 2014 a001.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 14:13 2014 a002.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 14:13 2014 a003.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 14:13 2014 b001.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 14:13 2014 b002.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 14:13 2014 b003.csvdebug.log で Job (load_file_process) の開始、終了時刻を見ると、1ファイルづつ順番に処理されたことがわかります。
ORDER_ID は処理したファイルが設定されますが、ORDER_ID が異なるのでファイルごとに複数の JobChain が実行されたことわかります。
2014-09-20 14:13:59 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a001.csv 2014-09-20 14:14:09 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a001.csv 2014-09-20 14:14:10 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a002.csv 2014-09-20 14:14:20 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a002.csv 2014-09-20 14:14:21 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a003.csv 2014-09-20 14:14:31 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a003.csv 2014-09-20 14:14:32 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b001.csv 2014-09-20 14:14:33 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b001.csv 2014-09-20 14:14:34 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b002.csv 2014-09-20 14:14:35 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b002.csv 2014-09-20 14:14:35 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b003.csv 2014-09-20 14:14:36 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b003.csvJob (load_file_process) の多重度(tasks)を1にしているので複数のJobChainが並列実行されたのに、ジョブはシーケンシャルに処理されましたが、例えば、多重度を 10 にして同じジョブが 10 個まで並列に実行することを許可すると、ジョブが並列に実行されます。
すべて並列に処理できるのであれば、この方法は全体の処理時間が一番短くなります。
以下は多重度を 10 にして実行したときの debug.log です。
2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a001.csv 2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a002.csv 2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a003.csv 2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b001.csv 2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b002.csv 2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b003.csv 2014-09-20 14:27:37 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b001.csv 2014-09-20 14:27:37 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b002.csv 2014-09-20 14:27:38 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b003.csv 2014-09-20 14:27:46 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a001.csv 2014-09-20 14:27:46 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a002.csv 2014-09-20 14:27:46 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a003.csv
異種ファイルは並列に処理するシナリオ
上記シナリオを少し変えます。
「a系のデータ」と「b系のデータ」の処理は、テーブルが異なるので並列に処理したいが、同じテーブルへの処理は、1ファイルづつ実施したいケースを考えます。
この場合、「a系のデータ」と「b系のデータ」用に2種類の排他ロックを使用します。
例えば、a001.csv を「テーブルa」にロード中は、「排他ロックa」を生成し、a002.csv と a003.csv は「排他ロックa」解除まで処理待ち状態(setback)にします。この間、b001.csv は、「テーブルb」にロードを実施して「排他ロックb」を生成し、b002.csv と b003.csv は「排他ロックb」解除まで処理待ち状態とします。そして、1ファイルのロード処理終了で、排他ロックが解除されしだい、順次、次のファイルが処理されるようにします。
下図は、JobScheduler での処理フロー(イメージ)です。
図: JobCheduler FAQ より引用
・File_Tracsfer は、上記の put_test_data.sh に相当します。
・FileOrderSource は、ディレクトリを監視します。
・Identify Category は、処理対象のファイルを特定します。
・排他ロックを確認します。
・[Yes] 排他ロックが存在すれば、SetbackWait でロック解除まで待機します。
・[No] 排他ロックを設定します。ロードを実行して、排他ロックを解除します。
異種ファイルは並列に処理するジョブを作成する
下図のように流れる JobChain を実装します。
作成するファイルは以下のとおり。
$ tree . ├── BERLIN_PROC.lock.xml ・・・・・・・「a系のデータ」の排他ロック ├── MUNICH_PROC.lock.xml ・・・・・・・「b系のデータ」の排他ロック ├── aquire_lock.job.xml ・・・・・・・ 排他ロック確認&作成 ├── debug.pl ・・・・・・・・・・・・・ 上記参照 ├── load_file_process.job.xml ・・・・ 上記参照。ただし、多重度=10 ├── load_files.job_chain.xml ・・・・・ JobChain ├── move_file_error.job.xml ・・・・・ ファイル移動(エラー用) ├── move_file_suc.job.xml ・・・・・・ ファイル移動(成功用) ├── put_test_data.sh ・・・・・・・・・ 上記参照 └── release_lock.job.xml ・・・・・・・ 排他ロック解放JobChain の内容は、以下のとおり。
上記では、ロード処理成功後はファイルを削除していましたが、ここでは移動することにします。
<?xml version="1.0" encoding="ISO-8859-1"?> <job_chain orders_recoverable="yes" visible="yes"> <file_order_source directory="/tmp/sandbox/data/in" regex="^a[A-Za-z0-9_]*\.csv$"/> <file_order_source directory="/tmp/sandbox/data/in" regex="^b[A-Za-z0-9_]*\.csv$"/> <job_chain_node state="start" job="/sos/jitl/JobChainStart" next_state="aquire_lock" error_state="move_file_err"/> <job_chain_node state="aquire_lock" job="aquire_lock" next_state="load_file_process" error_state="move_file_err"/> <job_chain_node state="load_file_process" job="load_file_process" next_state="move_file_suc" error_state="move_file_err"/> <job_chain_node state="move_file_suc" job="move_file_suc" next_state="suc_relese_lock" error_state="err_release_lock"/> <job_chain_node state="move_file_err" job="move_file_error" next_state="err_release_lock" error_state="error"/> <job_chain_node state="suc_relese_lock" job="release_lock" next_state="end_suc" error_state="err_release_lock"/> <job_chain_node state="err_release_lock" job="release_lock" next_state="end_err" error_state="error"/> <job_chain_node state="end_err" job="/sos/jitl/JobChainEnd" next_state="error" error_state="error"/> <job_chain_node state="end_suc" job="/sos/jitl/JobChainEnd" next_state="success" error_state="error"/> <job_chain_node state="success"/> <job_chain_node state="error"/> </job_chain>排他ロック(BERLIN_PROC.lock.xml、MUNICH_PROC.lock.xml)の内容は、以下のとおり。
ファイル名が違うだけで内容は同じです。
<?xml version="1.0" encoding="ISO-8859-1"?> <lock max_non_exclusive="1"/>排他ロックを確認・作成(aquire_lock.job.xml)するジョブは、以下のとおり。
内部APIを使用して、処理対象データにあわせて排他ロックを設定します。
また、既に排他ロックが設定されている場合は、setback を設定して定期的にジョブを再実行します。
なお、グローバルなパラメータとしてロック名(lock_name)とファイル名(file_spec)を設定し、同一Orderのロック解放のジョブ(release_lock.job.xml)で参照できるようにします。
<?xml version="1.0" encoding="ISO-8859-1"?> <job order="yes" title="Hold lock for an individual order throughout the job chain"> <script language="javax.script:rhino"> <![CDATA[ function spooler_process() { try { var parameters = spooler_task.order().params(); var filePath = "" + String(parameters.value("scheduler_file_path")); spooler_log.info( " scheduler_file_path : " + filePath ); var fileParts = filePath.split("/"); var fileName = fileParts[fileParts.length-1]; spooler_log.info( "fileName : " + fileName ); if(fileName.match("^a[A-Za-z0-9_]*\.csv$")) { var lockName = "BERLIN_PROC"; var lock_name = "BERLIN_PROC"; spooler_log.info( "File matched with berlin lock_name : "+ lockName ); } if(fileName.match("^b[A-Za-z0-9_]*\.csv$")) { var lockName = "MUNICH_PROC"; spooler_log.info( "File matched with berlin lock_name : "+ lockName ); } spooler_task.order().params().set_value("file_spec",fileName); spooler_task.order().params().set_value("lock_name",lockName); if (!spooler.locks().lock_or_null( lockName )) { var lock = spooler.locks().create_lock(); lock.set_name( lockName ); spooler.locks().add_lock( lock ); if (spooler_task.try_hold_lock( lockName )) { return true; } else { spooler_task.call_me_again_when_locks_available(); } } else { spooler_task.order().setback(); spooler_log.info( " lock is already aquired , lock_name : "+ lockName +" , setback , will retry after sometime" ); } return true; } catch (e) { spooler_log.warn("error occurred : " + String(e)); return false; } } ]]> </script> <delay_order_after_setback setback_count="1" is_maximum="no" delay="15"/> <delay_order_after_setback setback_count="5400" is_maximum="yes" delay="0"/> <run_time /> </job>排他ロックを解除するジョブ(release_lock.job.xml)は以下のとおり。
グローバル・パラメータ(lock_name)からロック名を取得して排他ロックを解放します。
<?xml version="1.0" encoding="ISO-8859-1"?> <job order="yes" title="Release the lock that has been acquired by the job hold_lock"> <params /> <script language="javax.script:rhino"> <![CDATA[ function spooler_process() { try { var parameters = spooler_task.order().params(); var filePath = "" + String(parameters.value("scheduler_file_path")); spooler_log.info( " scheduler_file_path : " + filePath ); var lockName = "" + String(parameters.value( "lock_name" )); spooler_log.info( " lock_name : " + lockName ); if (spooler.locks().lock_or_null( lockName )) { spooler.locks().lock( lockName ).remove(); } return true; } catch (e) { spooler_log.warn("error occurred: " + String(e)); return false; } } ]]> </script> <run_time /> </job>
ロード成功時にファイルを移動するジョブ(move_file_suc.job.xml)は以下のとおり。
パラメータ(source_file)からパラメータ(target_file)にファイルを移動します。
移動先にファイルが存在する場合は上書きします。
JOCでJobChainを見ると、下図のように表示されます。
/tmp/sandbox/data/in ディレクトリにファイルを作成すると、ジョブが実行されます。
以下のようにしてテストファイルを作成すると、ジョブが実行されます。
ジョブ(aquire_lock)は、排他ロックの解放待ちで Setback でリトライしてます。
debug.log には、以下のように記録されています。
「a系のデータ」と「b系のデータ」は並列に処理されていますが、a系のa001.csv、a002.csv、a003.csv は順次処理されています。b系も同様です。
パラメータ(source_file)からパラメータ(target_file)にファイルを移動します。
移動先にファイルが存在する場合は上書きします。
<?xml version="1.0" encoding="ISO-8859-1"?> <job title="Move file to targer directory" order="yes" stop_on_error="no"> <description > <include file="jobs/JobSchedulerRenameFile.xml"/> </description> <params > <param name="overwrite" value="true"/> <param name="target_file" value="/tmp/sandbox/data/done"/> <param name="source_file" value="/tmp/sandbox/data/in"/> </params> <script language="java" java_class="sos.scheduler.file.JobSchedulerRenameFile"/> <run_time /> </job>ロード失敗時にファイルを移動するジョブ(move_file_error.job.xml)は以下のとおり。
<?xml version="1.0" encoding="ISO-8859-1"?> <job title="Move file to targer directory" order="yes" stop_on_error="no"> <description > <include file="jobs/JobSchedulerRenameFile.xml"/> </description> <params > <param name="target_file" value="/tmp/sandbox/data/failed"/> <param name="overwrite" value="true"/> <param name="source_file" value="/tmp/sandbox/data/in"/> </params> <script language="java" java_class="sos.scheduler.file.JobSchedulerRenameFile"/> <run_time /> </job>
異種ファイルは並列に処理するジョブを実行する
JOCでJobChainを見ると、下図のように表示されます。
/tmp/sandbox/data/in ディレクトリにファイルを作成すると、ジョブが実行されます。
以下のようにしてテストファイルを作成すると、ジョブが実行されます。
$ sh put_test_data.sh in /tmp/sandbox/data/in 合計 24 -rw-rw-r-- 1 jobs jobs 3 9月 20 21:58 2014 a001.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 21:58 2014 a002.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 21:58 2014 a003.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 21:58 2014 b001.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 21:58 2014 b002.csv -rw-rw-r-- 1 jobs jobs 3 9月 20 21:58 2014 b003.csv下図は、実行中の状態をJOCで見たところです。
ジョブ(aquire_lock)は、排他ロックの解放待ちで Setback でリトライしてます。
debug.log には、以下のように記録されています。
「a系のデータ」と「b系のデータ」は並列に処理されていますが、a系のa001.csv、a002.csv、a003.csv は順次処理されています。b系も同様です。
2014-09-20 21:58:25 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b001.csv 2014-09-20 21:58:26 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a001.csv 2014-09-20 21:58:26 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b001.csv 2014-09-20 21:58:32 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b003.csv 2014-09-20 21:58:33 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b003.csv 2014-09-20 21:58:36 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a001.csv 2014-09-20 21:58:43 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a002.csv 2014-09-20 21:58:45 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b002.csv 2014-09-20 21:58:46 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b002.csv 2014-09-20 21:58:53 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a002.csv 2014-09-20 21:58:59 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a003.csv 2014-09-20 21:59:09 - END - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a003.csv