001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016package org.opengion.fukurou.queue;
017
018import javax.jms.Connection;
019import javax.jms.JMSException;
020import javax.jms.MessageProducer;
021import javax.jms.Queue;
022import javax.jms.QueueConnectionFactory;
023import javax.jms.QueueSession;
024import javax.jms.Session;
025import javax.jms.TextMessage;
026import javax.naming.Context;
027import javax.naming.InitialContext;
028import javax.naming.NamingException;
029
030import org.apache.activemq.ActiveMQConnection;
031import org.apache.activemq.ActiveMQConnectionFactory;
032// import org.opengion.hayabusa.common.HybsSystemException;
033
034// import com.sun.star.uno.RuntimeException;
035
036/**
037 * MQサーバへのメッセージキュー送信用クラス
038 *
039 * MQサーバへのメッセージキュー送信用のクラスです。
040 * Apache ActiveMQとAmazonMQへの送信が可能です。
041 * tomcatからの送信(JNDI利用)と、
042 * バッチ処理(urlを指定し接続)の2通りが可能です。
043 *
044 * ※Apache ActiveMQとAmazonMQの切り替えは、
045 * jmsServerの接続先URLを変更するのみで接続の変更が可能です。
046 * (proxy環境からAmazonMqへの接続は行えない場合があります)
047 *
048 * @og.group メッセージ連携
049 *
050 * @og.rev 5.10.14.0 (2019/08/01) 新規作成
051 *
052 * @version 5
053 * @author oota
054 * @since JDK7
055 *
056 */
057public class QueueSend_MQ implements QueueSend {
058        private Connection              connection;
059        private Session                 session;
060        private MessageProducer sender;
061        private Context                 ctx;
062        // バッチ用フィールド
063        private boolean batch;
064        private String  mqUserId        = "";
065        private String  mqPassword      = "";
066
067        /**
068         * 接続処理
069         * MQサーバに接続を行います。
070         *
071         * @param jmsServer jmsサーバ接続名(バッチの場合はurl)
072         */
073        public void connect(final String jmsServer) {
074                try {
075                        ctx = new InitialContext();
076                        // 1. Connectionの作成s
077//                      QueueConnectionFactory factory = null;
078                        final QueueConnectionFactory factory;
079                        if (batch) {
080                                // バッチ処理の場合。URL指定で、ユーザIDとパスワードを指定して接続。
081                                mqUserId        = System.getProperty("mqUserId");
082                                mqPassword      = System.getProperty("mqPassword");
083                                factory         = new ActiveMQConnectionFactory(jmsServer);
084                                connection      = (ActiveMQConnection)factory.createConnection(mqUserId, mqPassword);
085                        } else {
086                                // tomcat接続の場合。JNDIを利用して接続。
087                                factory = (QueueConnectionFactory) ctx.lookup("java:comp/env/" + jmsServer);
088                                connection = (ActiveMQConnection)factory.createConnection();
089                        }
090
091                        // 2. Connectioの開始
092                        connection.start();
093
094                } catch (final JMSException jmse) {
095                        throwErrMsg("MQサーバーの接続に失敗しました。" + jmse.getMessage());
096                } catch (final NamingException ne) {
097                        throwErrMsg("名前解決に失敗しました。" + ne.getMessage());
098                }
099        }
100
101        /**
102         * 接続処理
103         * MQサーバに接続します。
104         * connect(String jmsServer)と同じ処理になります。
105         *
106         * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応
107         *
108         * @param jmsServer jmsサーバ情報
109         * @param sqsAccessKey アクセスキー(MQサーバでは未使用)
110         * @param sqsSecretKey シークレットキー(MQサーバでは未使用)
111         */
112        @Override
113        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
114                // MQではsqsAccessKeyとsqsSecretKeyは利用しません。
115                connect(jmsServer);
116        }
117
118        /**
119         * エラーメッセージ送信。
120         *
121         * @og.rev 5.10.15.0 (2019/08/30) Hybs除外
122         *
123         * @param errMsg エラーメッセージ
124         */
125        public void throwErrMsg(final String errMsg) {
126                throw new RuntimeException( errMsg );
127//              if (batch) {
128//                      // バッチ用エラー
129//                      throw new RuntimeException(errMsg);
130//              } else {
131//                      // 画面用エラー
132//                      throw new HybsSystemException(errMsg);
133//              }
134        }
135
136        /**
137         * メッセージ送信
138         * MQサーバにメッセージを送信します。
139         *
140         * @param queueInfo 送信キュー情報
141         */
142        @Override
143        public void sendMessage(final QueueInfo queueInfo) {
144                try {
145                        // 初期チェック
146                        if (connection == null) {
147                                throwErrMsg("MQサーバに接続されていません。");
148                        }
149
150                        // 1. QueueSessionの作成
151                        session = connection.createSession(queueInfo.isMqTransacted(), queueInfo.getMqAcknowledgeMode());
152                        if (session == null) {
153                                throwErrMsg("キューセッションの生成に失敗しました。");
154                        }
155
156                        // 2. Queueの作成
157//                      Queue queue = null;
158//                      queue = session.createQueue(queueInfo.getMqQueueName());
159                        final Queue queue = session.createQueue(queueInfo.getMqQueueName());
160                        sender = session.createProducer(queue);
161
162                        // 3. テキストメッセージの作成
163                        final TextMessage msg = session.createTextMessage(queueInfo.getMessage());
164
165                        // 4. 送信処理
166                        sender.send(msg);
167
168                } catch (JMSException e) {
169                        throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage());
170                }
171        }
172
173        /**
174         * クローズ処理
175         * MQサーバとの接続をクローズします。
176         *
177         * @og.rev 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
178         */
179        @Override
180        public void close() {
181                if (ctx != null) {
182                        try {
183                                ctx.close();
184//                      } catch (Exception e) {                                         // 8.0.0.0 (2021/07/31)
185                        } catch( final Throwable th ) {
186                                System.out.println("ctxのクローズに失敗しました。");
187                        }
188                }
189                // 1. sender,session,connectionのクローズ処理
190                if (sender != null) {
191                        try {
192                                sender.close();
193//                      } catch (Exception e) {                                         // 8.0.0.0 (2021/07/31)
194                        } catch( final Throwable th ) {
195                                System.out.println("senderのクローズに失敗しました。");
196                        }
197                }
198                if (session != null) {
199                        try {
200                                session.close();
201//                      } catch (Exception e) {                                         // 8.0.0.0 (2021/07/31)
202                        } catch( final Throwable th ) {
203                                System.out.println("sessionのクローズに失敗しました。");
204                        }
205                }
206                if (connection != null) {
207                        try {
208                                connection.close();
209//                      } catch (Exception e) {                                         // 8.0.0.0 (2021/07/31)
210                        } catch( final Throwable th ) {
211                                System.out.println("connectionのクローズに失敗しました。");
212                        }
213                }
214        }
215
216        /**
217         * バッチ処理判定フラグを設定します。
218         * バッチ処理の場合は引数で接続先情報を与えます。
219         * それ以外の場合(Tomcat)ではJNDIより情報を取得します。
220         *
221         * @param batchFlg バッチ処理判定フラグ
222         */
223        @Override
224        public void setBatchFlg(final Boolean batchFlg) {
225                batch = batchFlg;
226        }
227
228        /**
229         * テスト用メソッド
230         * テスト実行用です。
231         *
232         * @param args 引数
233         */
234        public static void main(final String[] args) {
235                System.out.println("main start");
236                // 送信情報の設定
237                final String url = "tcp://localhost:61616";
238                final String queueName = "test01";
239                final String msg = "送信メッセージ";
240
241                final QueueInfo queueInfo = new QueueInfo();
242                queueInfo.setMqQueueName(queueName);
243                queueInfo.setMqTransacted(false);
244                queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
245                queueInfo.setMessage(msg);
246
247                final QueueSend queueSend = new QueueSend_MQ();
248                queueSend.setBatchFlg(true);
249
250                try {
251                        queueSend.connect(url,null,null);
252//                      queueSend.connect(url);
253                        queueSend.sendMessage(queueInfo);
254//              } catch (final Exception e) {                                           // 8.0.0.0 (2021/07/31) Avoid catching generic exceptions such as Exception in try-catch block
255//                      System.out.println(e.getMessage());
256                } catch( final Throwable th ) {
257                        System.out.println(th.getMessage());
258                } finally {
259                        queueSend.close();
260                }
261
262                System.out.println("main end");
263        }
264}