/*
 * Decompiled with CFR 0.152.
 */
package jp.ossc.nimbus.service.publish;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import jp.ossc.nimbus.core.ServiceBase;
import jp.ossc.nimbus.core.ServiceManagerFactory;
import jp.ossc.nimbus.core.ServiceName;
import jp.ossc.nimbus.daemon.Daemon;
import jp.ossc.nimbus.daemon.DaemonControl;
import jp.ossc.nimbus.daemon.DaemonRunnable;
import jp.ossc.nimbus.service.jms.JMSMessageConsumerFactory;
import jp.ossc.nimbus.service.publish.AnalyzeProcessException;
import jp.ossc.nimbus.service.publish.DefaultPublisherServiceMBean;
import jp.ossc.nimbus.service.publish.MessageSendException;
import jp.ossc.nimbus.service.publish.Protocol;
import jp.ossc.nimbus.service.publish.ProtocolMismatchException;
import jp.ossc.nimbus.service.publish.PublishContainer;
import jp.ossc.nimbus.service.publish.PublishContainerFactory;
import jp.ossc.nimbus.service.publish.Publisher;
import jp.ossc.nimbus.service.publish.Servant;
import jp.ossc.nimbus.service.queue.Queue;

public class DefaultPublisherService
extends ServiceBase
implements DefaultPublisherServiceMBean,
Publisher {
    protected static final String MSG_ID_00001 = "DP___00001";
    protected static final String MSG_ID_00002 = "DP___00002";
    protected static final String MSG_ID_00003 = "DP___00003";
    protected static final String MSG_ID_00004 = "DP___00004";
    protected static final String MSG_ID_00005 = "DP___00005";
    protected static final String MSG_ID_00006 = "DP___00006";
    protected static final String MSG_ID_00007 = "DP___00007";
    protected static final String MSG_ID_00008 = "DP___00008";
    protected String serverBindAddress;
    protected int port = 0;
    protected int containerNum = 0;
    protected Selector selector;
    protected Daemon socketReader;
    protected Map servants;
    protected List containerList;
    protected ServiceName publishContainerFactoryServiceName;
    protected ServiceName protocolServiceName;
    protected Protocol protocol;
    protected ServerSocketChannel serverSocketChannel;
    protected boolean isServerSocketChannelBlocking;
    protected boolean isKeepAlive = true;
    protected long servantGarbageInterval = -1L;
    protected Daemon servantGarbager;
    protected ServiceName[] jmsMessageConsumerFactoryServiceNames;
    protected ServiceName[] queueServiceNames;
    protected JMSMessageListener[] listeners;
    protected Daemon[] messageHandlers;
    protected Set consumers;

    public void setProtocolServiceName(ServiceName name) {
        this.protocolServiceName = name;
    }

    public ServiceName getProtocolServiceName() {
        return this.protocolServiceName;
    }

    public ServiceName getPublishContainerFactoryServiceName() {
        return this.publishContainerFactoryServiceName;
    }

    public void setPublishContainerFactoryServiceName(ServiceName name) {
        this.publishContainerFactoryServiceName = name;
    }

    public void setJMSMessageConsumerFactoryServiceNames(ServiceName[] names) {
        this.jmsMessageConsumerFactoryServiceNames = names;
    }

    public ServiceName[] getJMSMessageConsumerFactoryServiceNames() {
        return this.jmsMessageConsumerFactoryServiceNames;
    }

    public void setServerBindAddress(String address) {
        this.serverBindAddress = address;
    }

    public String getServerBindAddress() {
        return this.serverBindAddress;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public int getPort() {
        return this.port;
    }

    public void setContainerNum(int num) {
        this.containerNum = num;
    }

    public int getContainerNum() {
        return this.containerNum;
    }

    public boolean isServerSocketChannelBlocking() {
        return this.isServerSocketChannelBlocking;
    }

    public void setServerSocketChannelBlocking(boolean isBlocking) {
        this.isServerSocketChannelBlocking = isBlocking;
    }

    public boolean isKeepAlive() {
        return this.isKeepAlive;
    }

    public void setKeepAlive(boolean isKeepAlive) {
        this.isKeepAlive = isKeepAlive;
    }

    public void setServantGarbageInterval(long millis) {
        this.servantGarbageInterval = millis;
    }

    public long getServantGarbageInterval() {
        return this.servantGarbageInterval;
    }

    public void setQueueServiceNames(ServiceName[] names) {
        this.queueServiceNames = names;
    }

    public ServiceName[] getQueueServiceNames() {
        return this.queueServiceNames;
    }

    public void createService() throws Exception {
        this.servants = Collections.synchronizedMap(new HashMap());
        this.consumers = new HashSet();
    }

    public void startService() throws Exception {
        if (this.protocolServiceName == null) {
            throw new IllegalArgumentException("protocolServiceName must be specified.");
        }
        this.protocol = (Protocol)ServiceManagerFactory.getServiceObject(this.protocolServiceName);
        if (this.publishContainerFactoryServiceName == null) {
            throw new IllegalArgumentException("publishContainerFactoryServiceName must be specified.");
        }
        PublishContainerFactory containerFactory = (PublishContainerFactory)ServiceManagerFactory.getServiceObject(this.publishContainerFactoryServiceName);
        this.containerList = new ArrayList(this.containerNum);
        for (int i = 0; i < this.containerNum; ++i) {
            PublishContainer publishContainer = containerFactory.createContainer();
            this.containerList.add(publishContainer);
        }
        if (this.servantGarbageInterval > 0L) {
            this.servantGarbager = new Daemon(new ServantGarbager());
            this.servantGarbager.start();
        }
        this.selector = SelectorProvider.provider().openSelector();
        this.serverSocketChannel = ServerSocketChannel.open();
        this.serverSocketChannel.configureBlocking(this.isServerSocketChannelBlocking);
        InetAddress address = null;
        if (this.serverBindAddress == null) {
            address = InetAddress.getLocalHost();
        } else {
            byte[] ip = new byte[4];
            String tmp = this.serverBindAddress;
            for (int i = 0; i < ip.length; ++i) {
                if (i != ip.length - 1) {
                    int index = tmp.indexOf(46);
                    if (index == -1 || index == tmp.length() - 1) {
                        throw new IllegalArgumentException("Bad serverBindAddress : " + this.serverBindAddress);
                    }
                    ip[i] = Byte.parseByte(tmp.substring(0, index));
                    tmp = tmp.substring(index + 1);
                    continue;
                }
                ip[i] = Byte.parseByte(tmp);
            }
            address = InetAddress.getByAddress(ip);
        }
        InetSocketAddress socketAddress = new InetSocketAddress(address, this.port);
        this.serverSocketChannel.socket().bind(socketAddress);
        this.serverSocketChannel.register(this.selector, 16);
        this.socketReader = new Daemon(new SocketReader());
        this.socketReader.start();
        if (this.jmsMessageConsumerFactoryServiceNames == null || this.jmsMessageConsumerFactoryServiceNames.length == 0) {
            throw new IllegalArgumentException("jmsMessageConsumerFactoryServiceNames must be specified.");
        }
        if (this.queueServiceNames != null) {
            if (this.queueServiceNames.length != this.jmsMessageConsumerFactoryServiceNames.length) {
                throw new IllegalArgumentException("Length of queueServiceNames and jmsMessageConsumerFactoryServiceNames must equal.");
            }
            this.messageHandlers = new Daemon[this.queueServiceNames.length];
        }
        this.listeners = new JMSMessageListener[this.jmsMessageConsumerFactoryServiceNames.length];
        for (int i = 0; i < this.jmsMessageConsumerFactoryServiceNames.length; ++i) {
            JMSMessageConsumerFactory messageConsumerFactory = (JMSMessageConsumerFactory)ServiceManagerFactory.getServiceObject(this.jmsMessageConsumerFactoryServiceNames[i]);
            MessageConsumer consumer = messageConsumerFactory.createConsumer();
            Queue queue = null;
            if (this.queueServiceNames != null) {
                queue = (Queue)ServiceManagerFactory.getServiceObject(this.queueServiceNames[i]);
                this.messageHandlers[i] = new Daemon(new MessageHandler(queue));
                this.messageHandlers[i].start();
            }
            this.listeners[i] = new JMSMessageListener(queue);
            consumer.setMessageListener((MessageListener)this.listeners[i]);
            this.consumers.add(consumer);
            Session session = messageConsumerFactory.getSession();
            Connection con = messageConsumerFactory.getSessionFactory().getConnection();
            con.start();
        }
    }

    public void stopService() throws Exception {
        Iterator cons = this.consumers.iterator();
        while (cons.hasNext()) {
            MessageConsumer consumer = (MessageConsumer)cons.next();
            try {
                consumer.close();
            }
            catch (JMSException e) {}
        }
        this.consumers.clear();
        this.listeners = null;
        this.serverSocketChannel.close();
        this.socketReader.stop();
        this.socketReader = null;
        if (this.servantGarbager != null) {
            this.servantGarbager.stop();
            this.servantGarbager = null;
        }
        this.selector.close();
        if (this.messageHandlers != null && this.messageHandlers.length != 0) {
            for (int i = 0; i < this.messageHandlers.length; ++i) {
                this.messageHandlers[i].stop();
            }
            this.messageHandlers = null;
        }
        Iterator containers = this.containerList.iterator();
        while (containers.hasNext()) {
            PublishContainer container = (PublishContainer)containers.next();
            container.stop();
        }
        this.containerList.clear();
        this.servants.clear();
    }

    public void destroyService() {
        this.protocol = null;
        this.selector = null;
    }

    protected void handleMessage(Message msg) {
        Serializable obj = null;
        if (msg instanceof ObjectMessage) {
            ObjectMessage objMsg = (ObjectMessage)msg;
            try {
                obj = objMsg.getObject();
            }
            catch (JMSException e) {
                this.getLogger().write(MSG_ID_00007, e);
            }
        } else {
            this.getLogger().write(MSG_ID_00008, msg);
        }
        if (obj == null) {
            return;
        }
        Iterator containers = this.containerList.iterator();
        while (containers.hasNext()) {
            PublishContainer container = (PublishContainer)containers.next();
            container.handleMessage(obj);
        }
    }

    public synchronized boolean entryServant(Servant svt) {
        String key = svt.getID();
        int maxVacantNum = 0;
        PublishContainer maxVacantContainer = null;
        int max = this.containerList.size();
        for (int i = 0; i < max; ++i) {
            PublishContainer container = (PublishContainer)this.containerList.get(i);
            int vacantNum = container.getVacantServantNum();
            if (vacantNum <= maxVacantNum) continue;
            maxVacantNum = vacantNum;
            maxVacantContainer = container;
        }
        if (maxVacantContainer != null) {
            svt.setProtocol(this.protocol);
            if (maxVacantContainer.entryServant(svt)) {
                this.servants.put(key, svt);
                return true;
            }
            return this.entryServant(svt);
        }
        return false;
    }

    public synchronized boolean ejectServant(String key) {
        return this.ejectServant(key, false);
    }

    public synchronized boolean ejectServant(String key, boolean isForced) {
        Servant servant = (Servant)this.servants.get(key);
        if (servant != null) {
            PublishContainer container = servant.getContainer();
            if (container == null) {
                return true;
            }
            if (container.ejectServant(servant, isForced)) {
                this.servants.remove(key);
            } else {
                return false;
            }
        }
        return true;
    }

    public Servant findServant(String key) {
        return (Servant)this.servants.get(key);
    }

    public int getServantNum() {
        return this.servants != null ? this.servants.size() : 0;
    }

    public long getPublishCount() {
        if (this.containerList == null) {
            return 0L;
        }
        long count = 0L;
        Iterator containers = this.containerList.iterator();
        while (containers.hasNext()) {
            PublishContainer container = (PublishContainer)containers.next();
            count += container.getPublishCount();
        }
        return count;
    }

    public long getReceiveCount() {
        long receiveCount = 0L;
        if (this.listeners != null && this.listeners.length != 0) {
            for (int i = 0; i < this.listeners.length; ++i) {
                receiveCount += this.listeners[i].receiveCount;
            }
        }
        return receiveCount;
    }

    protected class ServantGarbager
    implements DaemonRunnable {
        protected ServantGarbager() {
        }

        public boolean onStart() {
            return true;
        }

        public boolean onStop() {
            return true;
        }

        public boolean onSuspend() {
            return true;
        }

        public boolean onResume() {
            return true;
        }

        public Object provide(DaemonControl ctrl) throws Throwable {
            Thread.sleep(DefaultPublisherService.this.servantGarbageInterval);
            return null;
        }

        public void consume(Object paramObj, DaemonControl ctrl) throws Throwable {
            Iterator containers = DefaultPublisherService.this.containerList.iterator();
            while (containers.hasNext()) {
                PublishContainer container = (PublishContainer)containers.next();
                Set garbage = container.garbage();
                if (garbage == null) continue;
                Iterator itr = garbage.iterator();
                while (itr.hasNext()) {
                    DefaultPublisherService.this.servants.remove(((Servant)itr.next()).getID());
                }
            }
        }

        public void garbage() {
        }
    }

    protected class SocketReader
    implements Serializable,
    DaemonRunnable {
        protected SocketReader() {
        }

        public boolean onStart() {
            return true;
        }

        public boolean onStop() {
            return true;
        }

        public boolean onSuspend() {
            return true;
        }

        public boolean onResume() {
            return true;
        }

        public Object provide(DaemonControl ctrl) throws Throwable {
            int count = DefaultPublisherService.this.selector.select();
            if (count > 0) {
                return DefaultPublisherService.this.selector.selectedKeys();
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void consume(Object paramObj, DaemonControl ctrl) throws Throwable {
            if (paramObj == null) {
                return;
            }
            Set selected = (Set)paramObj;
            Iterator keyIterator = selected.iterator();
            while (keyIterator.hasNext()) {
                try {
                    SelectionKey key = (SelectionKey)keyIterator.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                        try {
                            this.accept(serverSocketChannel);
                        }
                        catch (IOException e) {
                            DefaultPublisherService.this.getLogger().write(DefaultPublisherService.MSG_ID_00001, e);
                        }
                        continue;
                    }
                    if (!key.isReadable()) continue;
                    SocketChannel socketChannel = (SocketChannel)key.channel();
                    try {
                        this.read(key, socketChannel);
                    }
                    catch (AnalyzeProcessException e) {
                        DefaultPublisherService.this.getLogger().write(DefaultPublisherService.MSG_ID_00005, e);
                    }
                    catch (IOException e) {
                        DefaultPublisherService.this.getLogger().write(DefaultPublisherService.MSG_ID_00003, e);
                    }
                    catch (MessageSendException e) {
                        DefaultPublisherService.this.getLogger().write(DefaultPublisherService.MSG_ID_00004, e);
                    }
                    catch (ProtocolMismatchException e) {
                        DefaultPublisherService.this.getLogger().write(DefaultPublisherService.MSG_ID_00002, e);
                    }
                    catch (Throwable e) {
                        DefaultPublisherService.this.getLogger().write(DefaultPublisherService.MSG_ID_00006, e);
                    }
                }
                finally {
                    keyIterator.remove();
                }
            }
        }

        public void garbage() {
        }

        private void accept(ServerSocketChannel serverSocketChannel) throws IOException {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.socket().setKeepAlive(DefaultPublisherService.this.isKeepAlive);
            socketChannel.configureBlocking(DefaultPublisherService.this.isServerSocketChannelBlocking);
            socketChannel.register(DefaultPublisherService.this.selector, 1);
        }

        private void read(SelectionKey key, SocketChannel socketChanel) throws IOException, ProtocolMismatchException, MessageSendException, AnalyzeProcessException {
            try {
                DefaultPublisherService.this.protocol.analyze(socketChanel, DefaultPublisherService.this);
            }
            catch (MessageSendException e) {
                try {
                    socketChanel.finishConnect();
                    socketChanel.close();
                }
                catch (IOException e2) {
                    // empty catch block
                }
                key.cancel();
                throw e;
            }
            catch (AnalyzeProcessException e) {
                try {
                    socketChanel.finishConnect();
                    socketChanel.close();
                }
                catch (IOException e2) {
                    // empty catch block
                }
                key.cancel();
                throw e;
            }
            catch (IOException e) {
                try {
                    socketChanel.finishConnect();
                    socketChanel.close();
                }
                catch (IOException e2) {
                    // empty catch block
                }
                key.cancel();
                throw e;
            }
            catch (RuntimeException e) {
                try {
                    socketChanel.finishConnect();
                    socketChanel.close();
                }
                catch (IOException e2) {
                    // empty catch block
                }
                key.cancel();
                throw e;
            }
            catch (Error e) {
                try {
                    socketChanel.finishConnect();
                    socketChanel.close();
                }
                catch (IOException e2) {
                    // empty catch block
                }
                key.cancel();
                throw e;
            }
        }
    }

    protected class MessageHandler
    implements Serializable,
    DaemonRunnable {
        protected Queue queue;

        public MessageHandler(Queue queue) {
            this.queue = queue;
        }

        public boolean onStart() {
            return true;
        }

        public boolean onStop() {
            return true;
        }

        public boolean onSuspend() {
            return true;
        }

        public boolean onResume() {
            return true;
        }

        public Object provide(DaemonControl ctrl) throws Throwable {
            if (this.queue == null) {
                return null;
            }
            return this.queue.get(500L);
        }

        public void consume(Object paramObj, DaemonControl ctrl) {
            if (paramObj == null) {
                return;
            }
            DefaultPublisherService.this.handleMessage((Message)paramObj);
        }

        public void garbage() {
            if (this.queue != null) {
                while (this.queue.size() > 0) {
                    this.consume(this.queue.get(0L), null);
                }
            }
        }
    }

    protected class JMSMessageListener
    implements MessageListener {
        public long receiveCount;
        protected Queue queue;

        public JMSMessageListener(Queue queue) {
            this.queue = queue;
        }

        public void onMessage(Message msg) {
            if (this.queue == null) {
                ++this.receiveCount;
                DefaultPublisherService.this.handleMessage(msg);
            } else {
                ++this.receiveCount;
                this.queue.push(msg);
            }
        }
    }
}

