[CentOS6][SOS JobScheduler] 異種ファイルは並列に処理しながら、同種ファイルは順番に処理する


Create: 2014/09/20
LastUpdate: 2014/09/20
[ メニューに戻る ]

Jobscheduler FAQの「Sequential execution of jobs in parallel」 を参考にして、異種ファイルは並列処理しながら、同種ファイルは順番に処理するジョブを作ってみました。
動作確認は、JobScheduler1.7 を使用しています。

シナリオ


あるディレクトリに、以下のような2種類(a系、b系)のファイルが着信したら、このデータをDBにロードするジョブを考えます。
a系のデータは、「テーブルa」にロードし、b系のデータは、「テーブルb」にロードします。
  • [a系のデータ] ・・・ a001.csv、a002.csv、a003.csv
  • [b系のデータ] ・・・ b001.csv、b002.csv、b003.csv

一番シンプルな方法(シーケンシャル処理)でジョブを作成


一番シンプルな方法は、「 [CentOS6][SOS JobScheduler] JobChain をファイルトリガで実行」で紹介した手順で、ファイルを1個づつ処理する方法です。
例えば、JobChain は以下のようにします。
/tmp/sandbox/data/in2 ディレクトリを監視して、<file_order_source> の正規表現で指定したファイルが存在したらロード処理をするジョブ(load_file_process) を実行します。
ロードに成功したら、ファイルを削除し、失敗したら、/tmp/sandbox/data/failed に移動します。
<?xml version="1.0" encoding="ISO-8859-1"?>
<job_chain  orders_recoverable="yes" visible="yes">
    <file_order_source  directory="/tmp/sandbox/data/in2" regex="^a[A-Za-z0-9_]*\.csv$"/>
    <file_order_source  directory="/tmp/sandbox/data/in2" regex="^b[A-Za-z0-9_]*\.csv$"/>
    <job_chain_node  state="load_file_process" job="load_file_process" next_state="ok" error_state="ng"/>
    <file_order_sink state="ok" remove="yes"/>
    <file_order_sink state="ng" move_to="/tmp/sandbox/data/failed"/>
</job_chain>
ジョブは、以下のようにします。
テスト用なのでDBへのロードまではしませんが、a系のファイルの場合は 1秒 sleep し、 b系のファイルの場合は、10秒 sleep します。
また、ファイル内容に "ng" を含む場合は、ジョブを異常終了するようにしています。
<?xml version="1.0" encoding="ISO-8859-1"?>
<job order="yes" stop_on_error="no" title="Load Files into database" tasks="1">
    <script  language="shell">
        <![CDATA[
# 本来は、ここにロード処理を記述する
# 以下は、テスト用のダミー
env | sort
if [ "`basename ${SCHEDULER_TASK_TRIGGER_FILES} | egrep -c '^a'`" -eq 0 ]; then
  sleep 1
else
  sleep 10
fi

ret=`grep -ic "ng" ${SCHEDULER_TASK_TRIGGER_FILES}`
exit $ret
        ]]>
    </script>
    <monitor name="process0" ordering="0">
        <script language="perl">
          <include file="./config/live/sample1/debug.pl"/>
        </script>
    </monitor>
    <run_time />
</job>

ジョブの動きを観察しやすいように <monitor> でデバッグログを出力します。<monitor>の debug.pl については、「 [CentOS6][SOS JobScheduler] 複数のJobChain 並列に実行し、複数の同期ポイントを設定するを参照してください。

一番シンプルな方法(シーケンシャル処理)でジョブを実行


上記でJobChain (load_files_simple) と Job (load_file_process) を作成した時点で、ディレクトリの監視が始まっています。
JOC で見ると下図のように表示されます。


/tmp/sandbox/data/in2 ディレクトリにファイルを作成すると、ジョブが実行されます。
今回は、以下のようなシェル・スクリプト(put_test_data.sh)を作成して、テスト用のファイルをつくります。
DATA_BASE=/tmp/sandbox/data

if [ "$1" = "" -o ! -d "${DATA_BASE}/$1" ]; then
  echo "[usage] sh $0 [in | in2]"
  exit 1
fi

echo "ok" > ${DATA_BASE}/$1/a001.csv
echo "ok" > ${DATA_BASE}/$1/b001.csv
echo "ok" > ${DATA_BASE}/$1/a002.csv
echo "ok" > ${DATA_BASE}/$1/b002.csv
echo "ok" > ${DATA_BASE}/$1/a003.csv
echo "ok" > ${DATA_BASE}/$1/b003.csv

echo ${DATA_BASE}/$1
ls -l ${DATA_BASE}/$1
exit 0

以下のようにしてテストファイルを作成すると、ジョブが実行されます。
$ sh ./put_test_data.sh in2
/tmp/sandbox/data/in2
合計 24
-rw-rw-r-- 1 jobs jobs 3  9月 20 14:13 2014 a001.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 14:13 2014 a002.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 14:13 2014 a003.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 14:13 2014 b001.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 14:13 2014 b002.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 14:13 2014 b003.csv
debug.log で Job (load_file_process) の開始、終了時刻を見ると、1ファイルづつ順番に処理されたことがわかります。
ORDER_ID は処理したファイルが設定されますが、ORDER_ID が異なるのでファイルごとに複数の JobChain が実行されたことわかります。
2014-09-20 14:13:59 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a001.csv
2014-09-20 14:14:09 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a001.csv
2014-09-20 14:14:10 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a002.csv
2014-09-20 14:14:20 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a002.csv
2014-09-20 14:14:21 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a003.csv
2014-09-20 14:14:31 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a003.csv
2014-09-20 14:14:32 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b001.csv
2014-09-20 14:14:33 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b001.csv
2014-09-20 14:14:34 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b002.csv
2014-09-20 14:14:35 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b002.csv
2014-09-20 14:14:35 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b003.csv
2014-09-20 14:14:36 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b003.csv
Job (load_file_process) の多重度(tasks)を1にしているので複数のJobChainが並列実行されたのに、ジョブはシーケンシャルに処理されましたが、例えば、多重度を 10 にして同じジョブが 10 個まで並列に実行することを許可すると、ジョブが並列に実行されます。
すべて並列に処理できるのであれば、この方法は全体の処理時間が一番短くなります。
以下は多重度を 10 にして実行したときの debug.log です。
2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a001.csv
2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a002.csv
2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a003.csv
2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b001.csv
2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b002.csv
2014-09-20 14:27:36 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b003.csv
2014-09-20 14:27:37 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b001.csv
2014-09-20 14:27:37 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b002.csv
2014-09-20 14:27:38 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/b003.csv
2014-09-20 14:27:46 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a001.csv
2014-09-20 14:27:46 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a002.csv
2014-09-20 14:27:46 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in2/a003.csv


異種ファイルは並列に処理するシナリオ


上記シナリオを少し変えます。
「a系のデータ」と「b系のデータ」の処理は、テーブルが異なるので並列に処理したいが、同じテーブルへの処理は、1ファイルづつ実施したいケースを考えます。
この場合、「a系のデータ」と「b系のデータ」用に2種類の排他ロックを使用します。
例えば、a001.csv を「テーブルa」にロード中は、「排他ロックa」を生成し、a002.csv と a003.csv は「排他ロックa」解除まで処理待ち状態(setback)にします。この間、b001.csv は、「テーブルb」にロードを実施して「排他ロックb」を生成し、b002.csv と b003.csv は「排他ロックb」解除まで処理待ち状態とします。そして、1ファイルのロード処理終了で、排他ロックが解除されしだい、順次、次のファイルが処理されるようにします。
下図は、JobScheduler での処理フロー(イメージ)です。
 図: JobCheduler FAQ より引用

 ・File_Tracsfer は、上記の put_test_data.sh に相当します。
 ・FileOrderSource は、ディレクトリを監視します。
 ・Identify Category は、処理対象のファイルを特定します。
 ・排他ロックを確認します。
 ・[Yes] 排他ロックが存在すれば、SetbackWait でロック解除まで待機します。
 ・[No] 排他ロックを設定します。ロードを実行して、排他ロックを解除します。


異種ファイルは並列に処理するジョブを作成する


下図のように流れる JobChain を実装します。


作成するファイルは以下のとおり。
$ tree
.
├── BERLIN_PROC.lock.xml ・・・・・・・「a系のデータ」の排他ロック
├── MUNICH_PROC.lock.xml ・・・・・・・「b系のデータ」の排他ロック
├── aquire_lock.job.xml ・・・・・・・ 排他ロック確認&作成
├── debug.pl ・・・・・・・・・・・・・ 上記参照
├── load_file_process.job.xml ・・・・ 上記参照。ただし、多重度=10
├── load_files.job_chain.xml ・・・・・ JobChain
├── move_file_error.job.xml ・・・・・  ファイル移動(エラー用)
├── move_file_suc.job.xml ・・・・・・  ファイル移動(成功用)
├── put_test_data.sh ・・・・・・・・・ 上記参照
└── release_lock.job.xml ・・・・・・・ 排他ロック解放

JobChain の内容は、以下のとおり。
上記では、ロード処理成功後はファイルを削除していましたが、ここでは移動することにします。
<?xml version="1.0" encoding="ISO-8859-1"?>
<job_chain  orders_recoverable="yes" visible="yes">
    <file_order_source  directory="/tmp/sandbox/data/in" regex="^a[A-Za-z0-9_]*\.csv$"/>
    <file_order_source  directory="/tmp/sandbox/data/in" regex="^b[A-Za-z0-9_]*\.csv$"/>
    <job_chain_node  state="start" job="/sos/jitl/JobChainStart" next_state="aquire_lock" error_state="move_file_err"/>
    <job_chain_node  state="aquire_lock" job="aquire_lock" next_state="load_file_process" error_state="move_file_err"/>
    <job_chain_node  state="load_file_process" job="load_file_process" next_state="move_file_suc" error_state="move_file_err"/>
    <job_chain_node  state="move_file_suc" job="move_file_suc" next_state="suc_relese_lock" error_state="err_release_lock"/>
    <job_chain_node  state="move_file_err" job="move_file_error" next_state="err_release_lock" error_state="error"/>
    <job_chain_node  state="suc_relese_lock" job="release_lock" next_state="end_suc" error_state="err_release_lock"/>
    <job_chain_node  state="err_release_lock" job="release_lock" next_state="end_err" error_state="error"/>
    <job_chain_node  state="end_err" job="/sos/jitl/JobChainEnd" next_state="error" error_state="error"/>
    <job_chain_node  state="end_suc" job="/sos/jitl/JobChainEnd" next_state="success" error_state="error"/>
    <job_chain_node  state="success"/>
    <job_chain_node  state="error"/>
</job_chain>
排他ロック(BERLIN_PROC.lock.xml、MUNICH_PROC.lock.xml)の内容は、以下のとおり。
ファイル名が違うだけで内容は同じです。
<?xml version="1.0" encoding="ISO-8859-1"?>
<lock  max_non_exclusive="1"/>

排他ロックを確認・作成(aquire_lock.job.xml)するジョブは、以下のとおり。
内部APIを使用して、処理対象データにあわせて排他ロックを設定します。
また、既に排他ロックが設定されている場合は、setback を設定して定期的にジョブを再実行します。
なお、グローバルなパラメータとしてロック名(lock_name)とファイル名(file_spec)を設定し、同一Orderのロック解放のジョブ(release_lock.job.xml)で参照できるようにします。
<?xml version="1.0" encoding="ISO-8859-1"?>
<job  order="yes" title="Hold lock for an individual order throughout the job chain">
    <script  language="javax.script:rhino">
        <![CDATA[
function spooler_process()
{
 try {
     var parameters = spooler_task.order().params();
     var filePath = "" + String(parameters.value("scheduler_file_path"));
     spooler_log.info( " scheduler_file_path : " + filePath );
     var fileParts = filePath.split("/");
     var fileName  = fileParts[fileParts.length-1];
     spooler_log.info( "fileName : " + fileName );

     if(fileName.match("^a[A-Za-z0-9_]*\.csv$")) {
        var lockName = "BERLIN_PROC";
        var lock_name = "BERLIN_PROC";
        spooler_log.info( "File matched with berlin lock_name : "+ lockName  );
     }

     if(fileName.match("^b[A-Za-z0-9_]*\.csv$")) {
        var lockName = "MUNICH_PROC";
        spooler_log.info( "File matched with berlin lock_name : "+ lockName  );
     }

     spooler_task.order().params().set_value("file_spec",fileName);
     spooler_task.order().params().set_value("lock_name",lockName);

     if (!spooler.locks().lock_or_null( lockName )) {
         var lock = spooler.locks().create_lock();
         lock.set_name( lockName );
         spooler.locks().add_lock( lock );
         if (spooler_task.try_hold_lock( lockName )) {
             return true;
         } else {
           spooler_task.call_me_again_when_locks_available();
         }
    } else {
      spooler_task.order().setback();
      spooler_log.info( " lock is already aquired , lock_name : "+ lockName +" , setback , will retry after sometime" );
    }

    return true;
  }
  catch (e) {
    spooler_log.warn("error occurred    : " + String(e));
    return false;
  }
}
        ]]>
    </script>
    <delay_order_after_setback  setback_count="1" is_maximum="no" delay="15"/>
    <delay_order_after_setback  setback_count="5400" is_maximum="yes" delay="0"/>
    <run_time />
</job>

排他ロックを解除するジョブ(release_lock.job.xml)は以下のとおり。
グローバル・パラメータ(lock_name)からロック名を取得して排他ロックを解放します。
 <?xml version="1.0" encoding="ISO-8859-1"?>
<job  order="yes" title="Release the lock that has been acquired by the job hold_lock">
    <params />
    <script  language="javax.script:rhino">
        <![CDATA[
function spooler_process()
{
  try {
    var parameters = spooler_task.order().params();
    var filePath = "" + String(parameters.value("scheduler_file_path"));
    spooler_log.info( " scheduler_file_path : " + filePath );
    var lockName = "" + String(parameters.value( "lock_name" ));
    spooler_log.info( "  lock_name : " +  lockName );
    if (spooler.locks().lock_or_null( lockName )) {
      spooler.locks().lock( lockName ).remove();
    }
    return true;
  }
  catch (e) {
    spooler_log.warn("error occurred: " + String(e));
    return false;
  }
}
        ]]>
    </script>
    <run_time />
</job> 
ロード成功時にファイルを移動するジョブ(move_file_suc.job.xml)は以下のとおり。
パラメータ(source_file)からパラメータ(target_file)にファイルを移動します。
移動先にファイルが存在する場合は上書きします。
<?xml version="1.0" encoding="ISO-8859-1"?>
<job  title="Move file to targer directory" order="yes" stop_on_error="no">
    <description >
        <include  file="jobs/JobSchedulerRenameFile.xml"/>
    </description>
    <params >
        <param  name="overwrite" value="true"/>
        <param  name="target_file" value="/tmp/sandbox/data/done"/>
        <param  name="source_file" value="/tmp/sandbox/data/in"/>
    </params>
    <script  language="java" java_class="sos.scheduler.file.JobSchedulerRenameFile"/>
    <run_time />
</job>
ロード失敗時にファイルを移動するジョブ(move_file_error.job.xml)は以下のとおり。
<?xml version="1.0" encoding="ISO-8859-1"?>
<job  title="Move file to targer directory" order="yes" stop_on_error="no">
    <description >
        <include  file="jobs/JobSchedulerRenameFile.xml"/>
    </description>
    <params >
        <param  name="target_file" value="/tmp/sandbox/data/failed"/>
        <param  name="overwrite" value="true"/>
        <param  name="source_file" value="/tmp/sandbox/data/in"/>
    </params>
    <script  language="java" java_class="sos.scheduler.file.JobSchedulerRenameFile"/>
    <run_time />
</job>


異種ファイルは並列に処理するジョブを実行する


JOCでJobChainを見ると、下図のように表示されます。


/tmp/sandbox/data/in ディレクトリにファイルを作成すると、ジョブが実行されます。
以下のようにしてテストファイルを作成すると、ジョブが実行されます。
$ sh put_test_data.sh in
/tmp/sandbox/data/in
合計 24
-rw-rw-r-- 1 jobs jobs 3  9月 20 21:58 2014 a001.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 21:58 2014 a002.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 21:58 2014 a003.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 21:58 2014 b001.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 21:58 2014 b002.csv
-rw-rw-r-- 1 jobs jobs 3  9月 20 21:58 2014 b003.csv

下図は、実行中の状態をJOCで見たところです。
ジョブ(aquire_lock)は、排他ロックの解放待ちで Setback でリトライしてます。

debug.log には、以下のように記録されています。
「a系のデータ」と「b系のデータ」は並列に処理されていますが、a系のa001.csv、a002.csv、a003.csv は順次処理されています。b系も同様です。
2014-09-20 21:58:25 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b001.csv
2014-09-20 21:58:26 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a001.csv
2014-09-20 21:58:26 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b001.csv
2014-09-20 21:58:32 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b003.csv
2014-09-20 21:58:33 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b003.csv
2014-09-20 21:58:36 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a001.csv
2014-09-20 21:58:43 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a002.csv
2014-09-20 21:58:45 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b002.csv
2014-09-20 21:58:46 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/b002.csv
2014-09-20 21:58:53 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a002.csv
2014-09-20 21:58:59 - START - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a003.csv
2014-09-20 21:59:09 - END   - JOB=sample1/load_file_process - ORDER_ID=/tmp/sandbox/data/in/a003.csv