/*
 * Decompiled with CFR 0.152.
 */
package com.sun.messaging.bridge.service.stomp;

import com.sun.messaging.bridge.service.stomp.StompConnection;
import com.sun.messaging.bridge.service.stomp.StompFrameMessage;
import com.sun.messaging.bridge.service.stomp.StompOutputHandler;
import com.sun.messaging.bridge.service.stomp.StompProtocolException;
import com.sun.messaging.bridge.service.stomp.StompProtocolHandler;
import com.sun.messaging.bridge.service.stomp.StompSenderSession;
import com.sun.messaging.bridge.service.stomp.StompServer;
import com.sun.messaging.bridge.service.stomp.TransactedSubscriber;
import com.sun.messaging.bridge.service.stomp.resources.StompBridgeResources;
import com.sun.messaging.jmq.jmsclient.MessageImpl;
import com.sun.messaging.jmq.jmsclient.SessionImpl;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

public class StompTransactedSession
implements StompSenderSession,
Runnable {
    protected Logger _logger = null;
    private static final int MAX_QUEUE_SIZE = 100;
    private String _lastRolledbackTID = null;
    private Connection _connection = null;
    private Session _session = null;
    private MessageProducer _producer = null;
    private StompOutputHandler _out = null;
    private Map<String, TransactedSubscriber> _subscribers = Collections.synchronizedMap(new HashMap());
    private List<SubscribedMessage> _msgqueue = Collections.synchronizedList(new ArrayList());
    private List<SubscribedMessage> _unackqueue = Collections.synchronizedList(new ArrayList());
    private List<TransactedAck> _ackedqueue = Collections.synchronizedList(new ArrayList());
    private Object _lock = new Object();
    private String _tid = null;
    private boolean _closed = false;
    private boolean _locked = false;
    private boolean _stopped = false;
    private Thread _subthread = null;
    private StompConnection _stompc = null;
    private StompBridgeResources _sbr = null;

    public StompTransactedSession(StompConnection stompConnection) throws Exception {
        this._logger = StompServer.logger();
        this._sbr = StompServer.getStompBridgeResources();
        this._stompc = stompConnection;
        this._connection = stompConnection.getConnection();
        this._session = this._connection.createSession(true, 0);
        this._logger.log(Level.INFO, this._sbr.getString("BSS1020", this.toString()));
        this._producer = this._session.createProducer(null);
    }

    public String toString() {
        return "[" + this._connection + ", " + this._session + ", " + this._tid + "]";
    }

    public MessageProducer getJMSProducer() throws Exception {
        this.checkSession();
        return this._producer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void createSubscriber(String string, Destination destination, String string2, String string3, boolean bl, StompOutputHandler stompOutputHandler) throws Exception {
        this._out = stompOutputHandler;
        this.checkSession();
        MessageConsumer messageConsumer = null;
        if (this._subscribers.get(string) != null) {
            throw new JMSException(this._sbr.getKString("BSS4029", string, this.toString()));
        }
        String string4 = null;
        if (destination instanceof Queue) {
            messageConsumer = this._session.createConsumer(destination, string2);
            string4 = ((Queue)destination).getQueueName();
        } else if (string3 != null) {
            messageConsumer = this._session.createDurableSubscriber((Topic)destination, string3, string2, bl);
            string4 = ((Topic)destination).getTopicName();
        } else {
            messageConsumer = this._session.createConsumer(destination, string2, bl);
            string4 = ((Topic)destination).getTopicName();
        }
        Object object = this._lock;
        synchronized (object) {
            if (this._subthread == null) {
                this._subthread = new Thread(this);
                this._subthread.setName("TransactedSession[" + this + "]");
                this._subthread.setDaemon(true);
                this._subthread.start();
            }
        }
        object = new TransactedSubscriber(string, messageConsumer, destination instanceof Queue ? null : string3, this);
        this._subscribers.put(string, (TransactedSubscriber)object);
        Object[] objectArray = new String[]{string, string4, this.toString()};
        this._logger.log(Level.INFO, this._sbr.getString("BSS1021", objectArray));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized String closeSubscriber(String string, String string2) throws Exception {
        if (string2 == null) {
            TransactedSubscriber transactedSubscriber = this._subscribers.get(string);
            if (transactedSubscriber == null) {
                return null;
            }
            this.preCloseSubscriber(string);
            transactedSubscriber.close();
            this._subscribers.remove(string);
            return string;
        }
        TransactedSubscriber transactedSubscriber = null;
        String string3 = null;
        Map<String, TransactedSubscriber> map = this._subscribers;
        synchronized (map) {
            for (String string4 : this._subscribers.keySet()) {
                transactedSubscriber = this._subscribers.get(string4);
                string3 = transactedSubscriber.getDuraName();
                if (string3 == null || !string3.equals(string2)) continue;
                this.preCloseSubscriber(string4);
                transactedSubscriber.close();
                this._subscribers.remove(string4);
                this._session.unsubscribe(string2);
                return string4;
            }
        }
        this._session.unsubscribe(string2);
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void preCloseSubscriber(String string) throws Exception {
        this._connection.stop();
        try {
            TransactedAck transactedAck = null;
            List<TransactedAck> list = this._ackedqueue;
            synchronized (list) {
                Iterator<TransactedAck> iterator = this._ackedqueue.iterator();
                while (iterator.hasNext()) {
                    transactedAck = iterator.next();
                    if (!transactedAck.subid.equals(string)) continue;
                    try {
                        this.acknowledge(transactedAck.msg);
                    }
                    catch (Exception exception) {
                        Object[] objectArray = new String[]{transactedAck.msg.getJMSMessageID(), this._tid, string, exception.getMessage()};
                        this._logger.log(Level.WARNING, this._sbr.getKString("BSS2010", objectArray), exception);
                    }
                    iterator.remove();
                }
            }
        }
        finally {
            this._connection.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String getTransactionID() {
        Object object = this._lock;
        synchronized (object) {
            return this._tid;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void setTransactionID(String string) {
        if (this._ackedqueue.size() != 0) {
            this._logger.log(Level.WARNING, "acked-queue is not empty on setting transaction ID " + string + (this._lastRolledbackTID == null ? "" : ", last rolledback transaction ID was " + this._lastRolledbackTID));
        }
        this._ackedqueue.clear();
        if (string != null) {
            this._lastRolledbackTID = null;
        }
        Object object = this._lock;
        synchronized (object) {
            this._tid = string;
            this._lock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void commit() throws Exception {
        this._logger.log(Level.FINE, "Committing transaction " + this._tid + " on JMS session " + this._session);
        boolean bl = false;
        try {
            if (this._ackedqueue.size() > 0) {
                bl = true;
                this._connection.stop();
                List<TransactedAck> list = this._ackedqueue;
                synchronized (list) {
                    TransactedAck transactedAck = null;
                    Iterator<TransactedAck> iterator = this._ackedqueue.iterator();
                    while (iterator.hasNext()) {
                        transactedAck = iterator.next();
                        if (!transactedAck.tid.equals(this._tid)) {
                            throw new JMSException("Transaction ack [" + transactedAck + "] tid not match current transaction id " + this._tid);
                        }
                        this._logger.log(Level.FINE, "Ack message " + transactedAck.msgid + " for committing transaction " + this._tid);
                        this.acknowledge(transactedAck.msg);
                        iterator.remove();
                    }
                }
            }
            this._session.commit();
        }
        catch (Exception exception) {
            String string = this._sbr.getKString("BSS3007", this._tid, exception.getMessage());
            this._logger.log(Level.SEVERE, string);
            try {
                this.rollback();
            }
            finally {
                JMSException jMSException = new JMSException(string);
                jMSException.initCause((Throwable)exception);
                throw jMSException;
            }
        }
        finally {
            this.setTransactionID(null);
            this._ackedqueue.clear();
            if (bl) {
                this._connection.start();
            }
        }
    }

    public synchronized String getLastRolledbackTID() {
        return this._lastRolledbackTID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void rollback() throws Exception {
        try {
            Object[] objectArray;
            Iterator<Object> iterator;
            Object object;
            this._connection.stop();
            this.stop(true);
            List<Object> list = this._ackedqueue;
            synchronized (list) {
                object = null;
                iterator = this._ackedqueue.iterator();
                while (iterator.hasNext()) {
                    object = iterator.next();
                    if (!((TransactedAck)object).tid.equals(this._tid)) {
                        throw new JMSException("Transaction ack [" + object + "] tid not match current transaction id " + this._tid);
                    }
                    if (this._subscribers.get(((TransactedAck)object).subid) != null) {
                        try {
                            this.acknowledge(((TransactedAck)object).msg);
                        }
                        catch (Exception exception) {
                            objectArray = new String[]{((TransactedAck)object).msg.getJMSMessageID(), this.getTransactionID(), exception.getMessage()};
                            this._logger.log(Level.WARNING, this._sbr.getKString("BSS2011", objectArray), exception);
                        }
                    }
                    iterator.remove();
                }
            }
            list = this._unackqueue;
            synchronized (list) {
                object = null;
                iterator = this._unackqueue.iterator();
                while (iterator.hasNext()) {
                    object = (SubscribedMessage)iterator.next();
                    if (this._subscribers.get(((SubscribedMessage)object).subid) != null) {
                        try {
                            this.acknowledge(((SubscribedMessage)object).msg);
                        }
                        catch (Exception exception) {
                            objectArray = new String[]{((SubscribedMessage)object).msg.getJMSMessageID(), this.getTransactionID(), exception.getMessage()};
                            this._logger.log(Level.WARNING, this._sbr.getKString("BSS2012", objectArray), exception);
                        }
                    }
                    iterator.remove();
                }
            }
            list = this._msgqueue;
            synchronized (list) {
                object = null;
                iterator = this._msgqueue.iterator();
                while (iterator.hasNext()) {
                    object = (SubscribedMessage)iterator.next();
                    if (this._subscribers.get(((SubscribedMessage)object).subid) != null) {
                        try {
                            this.acknowledge(((SubscribedMessage)object).msg);
                        }
                        catch (Exception exception) {
                            objectArray = new String[]{((SubscribedMessage)object).msg.getJMSMessageID(), this.getTransactionID(), exception.getMessage()};
                            this._logger.log(Level.WARNING, this._sbr.getKString("BSS2013", objectArray), exception);
                        }
                    }
                    iterator.remove();
                }
            }
            this._session.rollback();
        }
        finally {
            this._lastRolledbackTID = this._tid;
            this.setTransactionID(null);
            this._ackedqueue.clear();
            this.stop(false);
            this._connection.start();
        }
    }

    public void ack(String string, String string2) throws Exception {
        this.ack(string, string2, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void ack(String string, String string2, boolean bl) throws Exception {
        Object[] objectArray;
        this.checkSession();
        if (this.getTransactionID() == null) {
            throw new StompProtocolException(this._sbr.getKString("BSS4030", string2, string));
        }
        TransactedSubscriber transactedSubscriber = this._subscribers.get(string);
        if (transactedSubscriber == null) {
            if (bl) {
                objectArray = this._subscribers;
                synchronized (objectArray) {
                    for (String string3 : this._subscribers.keySet()) {
                        if (!string3.startsWith(string)) continue;
                        string = string3;
                        transactedSubscriber = this._subscribers.get(string3);
                        break;
                    }
                }
            }
            if (transactedSubscriber == null) {
                if (!bl) {
                    objectArray = new String[]{string, string2, this._tid};
                    throw new JMSException(this._sbr.getKString("BSS4031", objectArray));
                }
                objectArray = new String[]{string2, this._tid, "subscription".toString()};
                throw new JMSException(this._sbr.getKString("BSS4032", objectArray));
            }
        }
        objectArray = this._unackqueue;
        synchronized (objectArray) {
            Object object = new SubscribedMessage(string, string2);
            int n = this._unackqueue.indexOf(object);
            if (n == -1) {
                if (!this._ackedqueue.contains(new TransactedAck(this._tid, string, ((SubscribedMessage)object).msgid))) {
                    Object[] objectArray2 = new String[]{string2, string, this._tid};
                    throw new StompProtocolException(this._sbr.getKString("BSS4033", objectArray2));
                }
                if (this._logger.isLoggable(Level.INFO)) {
                    this._logger.log(Level.INFO, "Message " + string2 + " for subcriber " + string + " has already acked in transaction " + this._tid);
                }
                return;
            }
            ArrayList<Object> arrayList = new ArrayList<Object>();
            for (int i = 0; i <= n; ++i) {
                object = this._unackqueue.get(i);
                if (!((SubscribedMessage)object).subid.equals(string)) continue;
                this._ackedqueue.add(new TransactedAck(this._tid, string, ((SubscribedMessage)object).msg));
                arrayList.add(object);
            }
            Iterator iterator = arrayList.iterator();
            while (iterator.hasNext()) {
                this._unackqueue.remove(iterator.next());
            }
        }
    }

    private void acknowledge(Message message) throws Exception {
        MessageImpl messageImpl = (MessageImpl)message;
        SessionImpl sessionImpl = (SessionImpl)this._session;
        sessionImpl._appTransactedAck(messageImpl);
    }

    public Session getJMSSession() throws Exception {
        this.checkSession();
        return this._session;
    }

    private void checkSession() throws Exception {
        if (this._closed) {
            throw new JMSException("Session closed !");
        }
    }

    protected SubscribedMessage dequeue() {
        SubscribedMessage subscribedMessage = this._msgqueue.remove(0);
        return subscribedMessage;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void enqueue(String string, Message message) throws Exception {
        this._msgqueue.add(new SubscribedMessage(string, message));
        Object object = this._lock;
        synchronized (object) {
            this._lock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stop(boolean bl) throws Exception {
        Object object = this._lock;
        synchronized (object) {
            this._locked = bl;
            this._lock.notifyAll();
            if (bl) {
                try {
                    while (this._subthread != null && !this._closed && !this._stopped) {
                        this._logger.log(Level.INFO, this._sbr.getString("BSS1022", "[" + Thread.currentThread() + "]", this._subthread.toString()));
                        this._lock.wait(60000L);
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (this._closed) {
                    throw new JMSException(this._sbr.getKString("BSS4034", this.toString()));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() throws Exception {
        String string = null;
        TransactedSubscriber transactedSubscriber = null;
        Iterator<String> iterator = this._subscribers.keySet().iterator();
        while (iterator.hasNext()) {
            string = iterator.next();
            transactedSubscriber = this._subscribers.get(string);
            this.preCloseSubscriber(string);
            transactedSubscriber.close();
            iterator.remove();
        }
        try {
            this.rollback();
        }
        catch (Exception exception) {
            this._logger.log(Level.WARNING, this._sbr.getKString("BSS2014", this.toString(), exception.getMessage()), exception);
        }
        Object object = this._lock;
        synchronized (object) {
            this._closed = true;
            this._lock.notifyAll();
        }
        this._session.close();
        this._msgqueue.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        Object object;
        while (true) {
            object = this._lock;
            synchronized (object) {
                while (this._locked || this._msgqueue.isEmpty() || this.getTransactionID() == null) {
                    if (this._closed) {
                        this._logger.log(Level.INFO, this._sbr.getString("BSS1023", this.toString()));
                        return;
                    }
                    this._stopped = true;
                    this._lock.notifyAll();
                    try {
                        this._lock.wait();
                    }
                    catch (Exception exception) {}
                }
                this._stopped = false;
                SubscribedMessage subscribedMessage = null;
                try {
                    subscribedMessage = this.dequeue();
                    if (subscribedMessage == null) {
                        continue;
                    }
                    if (this._subscribers.get(subscribedMessage.subid) == null) {
                        this._logger.log(Level.FINE, "Skip delivering message " + subscribedMessage.msg.getJMSMessageID() + " for transaction " + this._tid + " for its subscriber " + subscribedMessage.subid + " has been closed");
                        continue;
                    }
                    this._logger.log(Level.FINE, "Delivering message " + subscribedMessage.msg.getJMSMessageID() + " to STOMP client for subscriber " + subscribedMessage.subid);
                    this._unackqueue.add(subscribedMessage);
                    this._out.sendToClient(this._stompc.toStompFrameMessage(subscribedMessage.msg, subscribedMessage.subid, this._session));
                }
                catch (Throwable throwable) {
                    block23: {
                        Object[] objectArray = new String[]{subscribedMessage.msgid, subscribedMessage.subid, throwable.getMessage()};
                        if (throwable instanceof ClosedChannelException) {
                            this._logger.log(Level.WARNING, this._sbr.getKString("BSS2015", objectArray));
                            break;
                        }
                        this._logger.log(Level.WARNING, this._sbr.getKString("BSS2015", objectArray), throwable);
                        StompFrameMessage stompFrameMessage = null;
                        try {
                            stompFrameMessage = StompProtocolHandler.toStompErrorMessage("getTransactionID().run", throwable, true);
                        }
                        catch (Throwable throwable2) {
                            this._logger.log(Level.WARNING, this._sbr.getKString("BSS3004", throwable.getMessage()), throwable2);
                            break;
                        }
                        try {
                            this._out.sendToClient(stompFrameMessage);
                        }
                        catch (Throwable throwable3) {
                            if (throwable3 instanceof ClosedChannelException) {
                                this._logger.log(Level.WARNING, this._sbr.getKString("BSS3001", throwable.getMessage(), throwable3.getMessage()));
                                break block23;
                            }
                            this._logger.log(Level.WARNING, this._sbr.getKString("BSS3001", throwable.getMessage(), throwable3.getMessage()), throwable3);
                        }
                    }
                    break;
                }
            }
        }
        object = this._lock;
        synchronized (object) {
            this._stopped = true;
            this._lock.notifyAll();
        }
        try {
            this.close();
        }
        catch (Exception exception) {
            this._logger.log(Level.FINE, "Close transacted session " + this + " failed: " + exception.getMessage(), exception);
        }
        this._logger.log(Level.INFO, this._sbr.getString("BSS1023", this.toString()));
    }

    class SubscribedMessage {
        String subid = null;
        String msgid = null;
        Message msg = null;

        public SubscribedMessage(String string, String string2) {
            this.subid = string;
            this.msgid = string2;
        }

        public SubscribedMessage(String string, Message message) throws Exception {
            this.subid = string;
            this.msg = message;
            this.msgid = message.getJMSMessageID();
        }

        public boolean equals(Object object) {
            if (object == null) {
                return false;
            }
            if (!(object instanceof SubscribedMessage)) {
                return false;
            }
            SubscribedMessage subscribedMessage = (SubscribedMessage)object;
            return subscribedMessage.subid.equals(this.subid) && subscribedMessage.msgid.equals(this.msgid);
        }

        public int hashCode() {
            return this.subid.hashCode() + this.msgid.hashCode();
        }
    }

    class TransactedAck {
        String tid = null;
        String subid = null;
        String msgid = null;
        Message msg = null;

        public TransactedAck(String string, String string2, String string3) {
            this.tid = string;
            this.subid = string2;
            this.msgid = string3;
        }

        public TransactedAck(String string, String string2, Message message) throws JMSException {
            this.tid = string;
            this.subid = string2;
            this.msgid = message.getJMSMessageID();
            this.msg = message;
        }

        public boolean equals(Object object) {
            if (object == null) {
                return false;
            }
            if (!(object instanceof TransactedAck)) {
                return false;
            }
            TransactedAck transactedAck = (TransactedAck)object;
            return transactedAck.subid.equals(this.subid) && transactedAck.msgid.equals(this.msgid) && transactedAck.tid.equals(this.tid);
        }

        public int hashCode() {
            return this.tid.hashCode() + this.subid.hashCode() + this.msgid.hashCode();
        }

        public String toString() {
            return "tid=" + this.tid + ", subid=" + this.subid + ", msgid=" + this.msgid;
        }
    }
}

