001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.plugin.daemon; 017 018import java.io.File; 019import java.util.Date; 020import java.util.Locale; // 7.2.9.4 (2020/11/20) 021 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageListener; 025import javax.jms.TextMessage; 026 027// import org.opengion.fukurou.util.BizUtil; 028import org.opengion.fukurou.business.BizUtil; 029import org.opengion.fukurou.queue.QueueInfo; 030import org.opengion.fukurou.queue.QueueReceive; 031import org.opengion.fukurou.queue.QueueReceiveFactory; 032import org.opengion.fukurou.util.HybsTimerTask; 033import org.opengion.fukurou.util.StringUtil; 034import org.opengion.hayabusa.common.HybsSystem; 035import org.opengion.hayabusa.common.HybsSystemException; 036import org.opengion.hayabusa.queue.DBAccessQueue; 037 038/** 039 * メッセージキュー受信 メッセージキューの受信処理を行います。 040 * 041 * @og.group メッセージ連携 042 * 043 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 044 * 045 * @version 5.0 046 * @author oota 047 * @since JDK7 048 * 049 */ 050public class Daemon_QueueReceive extends HybsTimerTask { 051 /** このプログラムのVERSION文字列を設定します。 {@value} */ 052 private static final String VERSION = "7.2.9.4 (2020/11/20)" ; 053 054 private int loopCnt ; 055 private QueueReceive queueReceive ; 056 057 private static final int LOOP_COUNTER = 24; 058 private static final char FPSC = File.pathSeparatorChar ; // 7.2.9.4 (2020/11/20) システムに依存するパス区切り文字 059 060 private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY"); 061 private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY"); 062 private final String MQ_QUEUE_TYPE; 063 private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL"); 064 private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER"); 065 066 private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 067 private final String USER_ID = "CYYYYY"; 068 private final String PG_ID; 069 private final String DMN_NAME = "QueueReceiveDMN"; 070 private final DBAccessQueue dbAccessQueue; 071 072 private final String REAL_PATH = HybsSystem.sys("REAL_PATH"); // 7.2.9.4 (2020/11/20) 073 074 /** 075 * コンストラクター 076 * 初期処理を行います。 077 * 078 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 079 */ 080 public Daemon_QueueReceive() { 081 super(); 082 083 // パラメータの設定 084 // 7.2.9.4 (2020/11/20) PMD:Avoid if (x != y) ..; else ..; 085 if(StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) { 086 throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい"); 087 }else { 088// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase(); 089 MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 090 PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10); 091 } 092 093 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 094 095// // パラメータの設定 096// if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) { 097//// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase(); 098// MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 099// PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10); 100// }else { 101// throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい"); 102// } 103// 104// dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 105 } 106 107 /** 108 * 初期処理 MQサーバに接続します。 109 * 110 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 111 */ 112 @Override 113 public void initDaemon() { 114 // 開始ログO 115 final StringBuilder errMsg = new StringBuilder(); 116 if (MQ_QUEUE_TYPE == null) { 117 errMsg.append("MQ_QUEUE_TYPE"); 118 } 119 if (MQ_QURUE_SERVER_URL == null) { 120 errMsg.append(" MQ_QUEUE_SERVER_URL"); 121 } 122 123 if (errMsg.length() > 0) { 124 errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。"); 125 throw new HybsSystemException(errMsg.toString()); 126 } 127 128// final String queueType = MQ_QUEUE_TYPE.toUpperCase(); 129 final String queueType = MQ_QUEUE_TYPE.toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 130 131 // 開始ログ 132 System.out.println("MQキュータイプ:" + queueType); 133 System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL); 134 135 queueReceive = QueueReceiveFactory.newQueueReceive(queueType); 136 137 queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 138 } 139 140 /** 141 * 開始処理 タイマータスクのデーモン処理の開始ポイントです。 142 */ 143 @Override 144 protected void startDaemon() { 145 if (loopCnt % LOOP_COUNTER == 0) { 146 loopCnt = 1; 147 System.out.println(); 148 System.out.print(toString() + " " + new Date() + " "); 149 } else { 150 // 対象 キュー名(グループ名)とbizlogic名の取得処理 151 final String[][] ge67vals = dbAccessQueue.setlectGE67(); 152 // キュー情報登録チェック 153 if (ge67vals.length == 0) { 154 final String errMsg = "GE67にキュー情報が登録されていません。"; 155 throw new RuntimeException(errMsg); 156 } 157 // MQとSQSで処理を分岐 158 // MQ:指定キューIDからキューメッセージを取得 159 // SQS:キューメッセージを取得してからキューID(グループID)を取得 160 switch (MQ_QUEUE_TYPE) { 161 case "MQ": 162 processMq(ge67vals); 163 break; 164 case "SQS": 165 processSqs(ge67vals); 166 break; 167 default: 168 final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE; 169 throw new RuntimeException(errMsg); 170 } 171 172 loopCnt++; 173 } 174 } 175 176 /** 177 * MQ用の処理 178 * GE67に登録されているキューIDの、 179 * メッセージキューを取得して処理を行います。 180 * 181 * @param ge67vals GE67の配列データ 182 */ 183 private void processMq(final String[][] ge67vals) { 184 boolean listenerMode = false; 185 186 if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) { 187 listenerMode = true; 188 } 189 190 if(listenerMode) { 191 // リスナーの初期化 192 queueReceive.closeListener(); 193 } 194 195 // ge67のキューリスト分繰り返します 196 for (int row = 0; row < ge67vals.length; row++) { 197 final String queueId = ge67vals[row][0]; 198 final String bizLogicId = ge67vals[row][1]; 199 200 if(listenerMode) { 201 // リスナーを設定して、動的な受信処理(MQ専用) 202 final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId); 203 queueReceive.setListener(queueId, listener); 204 }else { 205 // 1件の受信処理 206 final QueueInfo queueInfo = queueReceive.receive(queueId); 207 if (queueInfo != null) { 208 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 209 // 1件処理を行ったら処理を終了します。 210 break; 211 } 212 } 213 } 214 } 215 216 /** 217 * SQS用の処理 218 * SQSはグループIDを指定して、キューを取得することはできず、 219 * 任意のキューを1つ取得してから、 220 * 判定処理を行います。 221 * GE67に登録されていないグループIDのキューが取得された場合は、 222 * GE68にエラーレコードを登録します。 223 * 224 * @param ge67vals GE67の配列データ 225 */ 226 private void processSqs(final String[][] ge67vals) { 227 // 下記はSQSの場合(キューを1件取得して処理) 228 final QueueInfo queueInfo = queueReceive.receive(null); 229 230 // キューが未取得の場合 231 if(queueInfo == null) { 232 return; 233 } 234 235 // 受信したキューを処理 236 final String groupId = queueInfo.getSqsFifoGroupId(); 237 Boolean existsFlg = false; 238 // valsにグループIDのレコードが存在するか検索 239 for (int row = 0; row < ge67vals.length; row++) { 240 final String queueId = ge67vals[row][0]; 241 242 if (groupId != null && groupId.equals(queueId)) { 243 // 該当レコードあり 244 final String bizLogicId = ge67vals[row][1]; 245 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 246 247 existsFlg = true; 248 break; 249 } 250 } 251 252 if (!existsFlg) { 253 // 該当groupIdの未登録エラー 254 // 処理番号生成 255 final String syoriNo = dbAccessQueue.generateSyoriNo(); 256 dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage()); 257 dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。"); 258 } 259 } 260 261 /** 262 * キャンセル処理 263 * タイマータスクのデーモン処理の終了ポイントです。 264 * 265 * @return キャンセルできれば、true 266 */ 267 @Override 268 public boolean cancel() { 269 if (queueReceive != null) { 270 queueReceive.close(); 271 } 272 273 return super.cancel(); 274 } 275 276 /** 277 * メッセージの処理 278 * 受信したメッセージをbizLogicに渡して、 279 * 処理を実行します。 280 * 281 * @param queueId キューID 282 * @param bizLogicId ビズロジックID 283 * @param msgText 受信メッセージ 284 */ 285 private void processMessage(final String queueId, final String bizLogicId, final String msgText) { 286 String syoriNo = ""; 287 try { 288 // 処理番号生成 289 syoriNo = dbAccessQueue.generateSyoriNo(); 290 291 // 管理テーブル登録 292 dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText); 293 294 // bizLogicの処理を実行 295 callActBizLogic(SYSTEM_ID, bizLogicId, msgText); 296 297 // 管理テーブル更新(完了) 298 dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END); 299 300 } catch (Throwable te) { 301 // bizLogicでのエラーはログの未出力して、処理を継続します。 302 // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。 303 String errMessage = null; 304 if (te.getCause() != null) { 305 // causeが設定されている場合のエラー情報 306 errMessage = te.getCause().getMessage(); 307 } else { 308 // causeが未設定の場合のエラー情報 309 errMessage = te.getMessage(); 310 } 311 System.out.println(errMessage); 312 try { 313 // エラーテーブルに登録 314 dbAccessQueue.updateGE68Error(syoriNo, errMessage); 315 // 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 316// } catch (Exception e) { 317// // ここでのエラーはスルーします。 318// System.out.println("管理テーブル登録エラー:" + e.getMessage()); 319 } catch( final Throwable th ) { 320 // ここでのエラーはスルーします。 321 System.out.println("管理テーブル登録エラー:" + th.getMessage()); 322 } 323 } 324 } 325 326 /** 327 * bizLogic処理の呼び出し 328 * 必要なパス情報をリソースから取得して、 329 * BizUtil.actBizLogicにパス情報を渡すことで、 330 * bizLogicの処理を行います。 331 * 332 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している 333 * 334 * @param systemId システムID 335 * @param logicName ロジックファイル名 336 * @param msgText メッセージ 337 * @throws Throwable エラー情報 338 */ 339 private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable { 340 // 対象 クラスパスの生成 341 // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 342 // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 343 // bizLogicTag.javaのコードを移植 344 final String classDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_CLASS_PATH" ); // bizの下のパス 345 final String webIinf = REAL_PATH + "WEB-INF" + File.separator ; 346 347 final StringBuilder sb = new StringBuilder().append('.').append(FPSC); 348 349 final File lib = new File( webIinf + "lib"); 350 final File[] libFiles = lib.listFiles(); 351 if( libFiles != null ) { 352 // 7.2.9.4 (2020/11/20) PMD:This for loop can be replaced by a foreach loop 353 for( final File file : libFiles ) { 354 sb.append( file.getAbsolutePath() ).append(FPSC); 355 } 356// for (int i = 0; i < libFiles.length; i++) { 357// sb.append( libFiles[i].getAbsolutePath() ).append(FPSC); 358// } 359 } 360 361 // 上記で生成したクラスパスをclassPathに格納 362 final String classPath = 363 sb.append( webIinf ).append( "classes" ).append(FPSC) 364 .append( classDir ).append(FPSC) // bizの下のパス 365 .toString(); 366 367 // ソースパス情報の生成 368 final String srcDir = REAL_PATH + HybsSystem.sys( "BIZLOGIC_SRC_PATH" ); 369 final boolean isAutoCompile = HybsSystem.sysBool( "BIZLOGIC_AUTO_COMPILE" ); 370 final boolean isHotDeploy = HybsSystem.sysBool( "BIZLOGIC_HOT_DEPLOY" ); 371 372 // bizLogicに渡すパラメータ 373 final String[] keys = new String[] { "message" }; 374 final String[] vals = new String[] { msgText }; 375 376 // bizLogic処理の実行 377 BizUtil.actBizLogic( srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals ); 378 } 379 380// 7.2.9.4 (2020/11/20) spotbugs:null になっている可能性があるメソッドの戻り値を利用している 381// private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable { 382// // 対象 クラスパスの生成 383// // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 384// // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 385// // bizLogicTag.javaのコードを移植 386// final StringBuilder sb = new StringBuilder(); 387// sb.append('.').append(File.pathSeparatorChar); 388// final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib"); 389// final File[] libFiles = lib.listFiles(); 390// for (int i = 0; i < libFiles.length; i++) { 391// sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar); 392// } 393// sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar); 394// // bizの下のパス 395// sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar); 396// // 上記で生成したクラスパスをclassPathに格納 397// final String classPath = sb.toString(); 398// 399// // ソースパス情報の生成 400// final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH"); 401// final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH"); 402// final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE"); 403// final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY"); 404// 405// // bizLogicに渡すパラメータ 406// final String[] keys = new String[] { "message" }; 407// final String[] vals = new String[] { msgText }; 408// 409// // bizLogic処理の実行 410// BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals); 411// } 412 413 /** 414 * 受信処理リスナー用のインナークラス 415 * QueueReceiveリスナークラス リスナー用のクラスです。 416 * MQに設定することで、メッセージが受信されると、 417 * onMessageメソッドが実行されます。 418 * 419 * @og.rev 7.2.9.4 (2020/11/20) private final 追加 420 */ 421// class QueueReceiveListener implements MessageListener { 422 private final class QueueReceiveListener implements MessageListener { 423// private String queueId = ""; 424// private String bizLogicId = ""; 425 private final String queueId ; 426 private final String bizLogicId ; 427 428 /** 429 * コンストラクター 初期処理を行います。 430 * 431 * @param quId キューID 432 * @param bizId ビズロジックID 433 */ 434 public QueueReceiveListener(final String quId, final String bizId) { 435 queueId = quId; 436 bizLogicId = bizId; 437 } 438 439 /** 440 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。 441 * 442 * @param message 受信メッセージ 443 */ 444 @Override 445 public void onMessage(final Message message) { 446 // 要求番号 : ここでは使用していません。 447 final String ykno = ""; 448 449 // メッセージ受信 450 final TextMessage msg = (TextMessage) message; 451 String msgText = ""; 452 453 try { 454 // キューサーバのメッセージを取得 455 msgText = msg.getText(); 456 457 // メーッセージの受信応答を返します。 458 msg.acknowledge(); 459 460 processMessage(queueId, bizLogicId, msgText); 461 462 } catch (JMSException jmse) { 463 try { 464 // 管理テーブル更新 465 // 管理テーブル更新(エラー) 466 dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR); 467 // 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 468// } catch (Exception e) { 469// // ここでのエラーはスルーします。 470// System.out.println("管理テーブル登録エラー:" + e.getMessage()); 471 } catch( final Throwable th ) { 472 // ここでのエラーはスルーします。 473 System.out.println("管理テーブル登録エラー:" + th.getMessage()); 474 } 475 476 throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage()); 477 } 478 } 479 } 480}