001/* 002 * Copyright (c) 2009 The openGion Project. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 013 * either express or implied. See the License for the specific language 014 * governing permissions and limitations under the License. 015 */ 016package org.opengion.fukurou.queue; 017 018import java.util.ArrayList; 019import java.util.List; 020 021import javax.jms.JMSException; 022// import javax.jms.Message; 023import javax.jms.MessageListener; 024import javax.jms.Queue; 025import javax.jms.QueueConnection; 026import javax.jms.QueueConnectionFactory; 027import javax.jms.QueueReceiver; 028import javax.jms.QueueSession; 029import javax.jms.TextMessage; 030import javax.naming.Context; 031import javax.naming.InitialContext; 032 033import org.apache.activemq.ActiveMQConnectionFactory; 034 035/** 036 * MQメッセージ受信用クラス。 037 * 038 * @og.group メッセージ連携 039 * 040 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 041 * 042 * @version 5 043 * @author oota 044 * @since JDK7 045 */ 046public class QueueReceive_MQ implements QueueReceive{ 047 048 private QueueConnection connection ; 049 private QueueSession session ; 050 private QueueReceiver receiver ; 051// List<QueueReceiver> listReceiver ; 052 private List<QueueReceiver> listReceiver ; // 7.2.9.4 (2020/11/20) 053 private boolean batch ; 054 055 /** 056 * 接続処理 057 * メッセージキューサーバに接続します。 058 * 059 * @param jmsServer jsmサーバ 060 * @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません) 061 * @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません) 062 */ 063 public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) { 064 connect(jmsServer); 065 } 066 067 /** 068 * 接続処理 069 * jmsServerに接続します。 070 * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。 071 * SQSの場合は最大受信件数の10件の処理を行います。 072 * 073 * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL 074 */ 075 private void connect(final String jmsServer) { 076 try { 077 if(batch) { 078 // バッチ用 079 final String mqUserId = System.getProperty("mqUserId"); 080 final String mqPassword = System.getProperty("mqPassword"); 081 final QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer); 082 connection = factory.createQueueConnection(mqUserId, mqPassword); 083 }else { 084 // jndi接続用 085 final Context ctx = new InitialContext(); 086 final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer); 087 connection = factory.createQueueConnection(); 088 } 089 090 connection.start(); 091 092 // Receiveの作成 093 session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); 094 095 // 初期化 096 listReceiver = new ArrayList<QueueReceiver>(); 097// }catch(Exception e) { // 8.0.0.0 (2021/07/31) 098// throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage()); 099 } catch( final Throwable th ) { 100 throw new RuntimeException("MQサーバの接続に失敗しました。:",th ); 101 } 102 } 103 104 /** 105 * 受信処理 106 * メッセージキューの受信の処理を行います。 107 * 108 * @param queueName キューの名前 109 * @return キュー情報格納クラス 110 */ 111 @Override 112 public QueueInfo receive(final String queueName) { 113 QueueInfo queueInfo = null; 114 115 try { 116 final Queue queue = session.createQueue(queueName); 117 receiver = session.createReceiver(queue); 118 119 final TextMessage msg = (TextMessage)receiver.receive(1000); 120 121 if(msg != null) { 122 // メッセージ受信の確認応答 123 msg.acknowledge(); 124 125 // メッセージの設定 126 queueInfo = new QueueInfo(); 127 queueInfo.setMessage(msg.getText()); 128 } 129// }catch(Exception e) { // 8.0.0.0 (2021/07/31) 130// throw new RuntimeException(e.getMessage()); 131 } catch( final Throwable th ) { 132 throw new RuntimeException( th ); 133 }finally { 134 try { 135 receiver.close(); 136// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 137 } catch( final Throwable th ) { 138 System.out.println("receiverのクローズに失敗しました。"); 139 } 140 } 141 142 return queueInfo; 143 } 144 145 /** 146 * リスナーの起動 147 * 指定したキュー名に対して、 148 * MessageListenerのリスナーを設定します。 149 * 150 * @param queueName キュー名 151 * @param listener MessageListerを実装したクラス 152 */ 153 @Override 154 public void setListener(final String queueName, final MessageListener listener) { 155 QueueReceiver receiver = null; 156 try { 157 final Queue queue = session.createQueue(queueName); 158 receiver = session.createReceiver(queue); 159 receiver.setMessageListener(listener); 160 161 // リスナーの起動 162 listReceiver.add(receiver); 163 }catch(JMSException ex) { 164// throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage()); 165 throw new RuntimeException("リスナーの起動に失敗しました。" , ex); // 8.0.0.0 (2021/07/31) original stack trace may be lost 166 } 167 } 168 169 /** 170 * クローズリスナー 171 * レシーバーをクローズすることで、 172 * リスナーの処理を終了します。 173 */ 174 public void closeListener() { 175 for(final QueueReceiver receiver: listReceiver) { 176 try { 177 receiver.close(); 178// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 179 } catch( final Throwable th ) { 180 System.out.println("receiverのクローズに失敗しました。"); 181 } 182 } 183 184 // 初期化 185 listReceiver = null; 186 listReceiver = new ArrayList<QueueReceiver>(); 187 } 188 189 /** 190 * クローズ処理 191 * クローズ処理を行います。 192 * 193 * @og.rev 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block 194 */ 195 @Override 196 public void close() { 197 if(receiver != null) { 198 try { 199 receiver.close(); 200// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 201 } catch( final Throwable th ) { 202 System.out.println("receiverのクローズに失敗しました。"); 203 } 204 } 205 if(session != null) { 206 try { 207 session.close(); 208// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 209 } catch( final Throwable th ) { 210 System.out.println("sessionのクローズに失敗しました。"); 211 } 212 } 213 if(connection != null) { 214 try { 215 connection.close(); 216// }catch(Exception e) { ; } // 8.0.0.0 (2021/07/31) 217 } catch( final Throwable th ) { 218 System.out.println("connectionのクローズに失敗しました。"); 219 } 220 } 221 } 222 223 /** 224 * バッチ処理判定フラグを設定します。 225 * 226 * @param batchFlg バッチ処理判定フラグ 227 */ 228 public void setBatchFlg(final Boolean batchFlg) { 229 batch = batchFlg; 230 } 231 232 /** 233 * 検証用メソッド 234 * テスト用のメソッドです。 235 * 236 * @param args 引数 237 */ 238 public static void main(final String[] args) { 239 final QueueReceive receive = new QueueReceive_MQ(); 240 final String jmsServer = "tcp://localhost:61616"; 241 242 // バッチフラグにtrueを設定 243 // 未設定の場合は、tomcatのjndi接続処理が実行されます。 244 receive.setBatchFlg(true); 245 246 // 認証情報の設定 247 System.setProperty("mqUserId", "admin"); 248 System.setProperty("mqPassword", "admin"); 249 250 // 接続 251 receive.connect(jmsServer, null, null); 252 253 // 処理対象のキュー名 254 final String queueName = "queue01"; 255 256 // ** 1件受信する場合 257 final QueueInfo queueInfo = receive.receive(queueName); 258 if(queueInfo != null) { 259 System.out.println("message:" + queueInfo.getMessage()); 260 }else { 261 System.out.println("キューが登録されていません。"); 262 } 263 264// // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ) 265// // MessageListerを実装した、QueueReceiveListenerクラスを作成します。 266// MessageListener listener = new QueueReceiveListener(); 267// receive.setListener(queueName, listener); 268// // 複数のキューにリスナーを設定することも可能です。 269// receive.setListener("queue02", listener); 270// 271// try { 272// // 1分間リスナーを起動しておく場合の、プロセス待機処理 273// Thread.sleep(60 * 1000); 274// }catch(InterruptedException e) { 275// throw new RuntimeException(e.getMessage()); 276// } 277 278 // リスナー利用時は、closeListenerを実行して、解放してください。 279 receive.closeListener(); 280 281 // 終了処理 282 receive.close(); 283 } 284 285// /** 286// * QueueReceiveリスナークラス 287// * リスナー用のクラスです。 288// * MQに設定することで、メッセージが受信されると、 289// * 自動的にonMessageメソッドが実行されます。 290// * 291// */ 292// static class QueueReceiveListener implements MessageListener { 293// /** 294// * メッセージ受信処理 295// * MQサーバにメッセージが受信されると、 296// * メソッドの処理が行われます。 297// * 298// * @param message 受信メッセージ 299// */ 300// @Override 301// public void onMessage(final Message message) { 302// 303// // メッセージ受信 304// TextMessage msg = (TextMessage) message; 305// String msgText = ""; 306// 307// try { 308// // キューサーバのメッセージを取得 309// msgText = msg.getText(); 310// // メーッセージの受信応答を返します。 311// msg.acknowledge(); 312// 313// System.out.println("message:" + msgText); 314// 315// } catch (JMSException e) { 316// throw new RuntimeException(e.getMessage()); 317// } 318// } 319// } 320 321}