/*
 * Decompiled with CFR 0.152.
 */
package quickfix.mina;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import java.util.Map;
import quickfix.LogUtil;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.mina.EventHandlingStrategy;

public class ThreadPerSessionEventHandlingStrategy
implements EventHandlingStrategy {
    private Map dispatchers = new ConcurrentHashMap();

    public void onMessage(Session quickfixSession, Message message) {
        MessageDispatchingThread dispatcher = (MessageDispatchingThread)this.dispatchers.get(quickfixSession.getSessionID());
        if (dispatcher == null) {
            dispatcher = new MessageDispatchingThread(quickfixSession);
            this.dispatchers.put(quickfixSession.getSessionID(), dispatcher);
            this.startDispatcherThread(dispatcher);
        }
        dispatcher.enqueue(message);
    }

    protected void startDispatcherThread(MessageDispatchingThread dispatcher) {
        dispatcher.start();
    }

    BlockingQueue getMessages(SessionID sessionID) {
        MessageDispatchingThread dispatcher = this.getDispatcher(sessionID);
        return dispatcher.messages;
    }

    MessageDispatchingThread getDispatcher(SessionID sessionID) {
        return (MessageDispatchingThread)this.dispatchers.get(sessionID);
    }

    Message getNextMessage(BlockingQueue messages) throws InterruptedException {
        return (Message)messages.take();
    }

    class MessageDispatchingThread
    extends Thread {
        private final Session quickfixSession;
        final BlockingQueue messages = new LinkedBlockingQueue();

        public MessageDispatchingThread(Session session) {
            super("QF/J Session dispatcher: " + session.getSessionID());
            this.quickfixSession = session;
        }

        public void enqueue(Message message) {
            try {
                this.messages.put((Object)message);
            }
            catch (InterruptedException e) {
                this.quickfixSession.getLog().onEvent(e.getMessage());
            }
        }

        public void run() {
            while (true) {
                try {
                    while (true) {
                        Message message = ThreadPerSessionEventHandlingStrategy.this.getNextMessage(this.messages);
                        if (this.quickfixSession.getResponder() == null) continue;
                        this.quickfixSession.next(message);
                    }
                }
                catch (InterruptedException e) {
                    return;
                }
                catch (Throwable e) {
                    LogUtil.logThrowable(this.quickfixSession.getSessionID(), "Error during message processing", e);
                    continue;
                }
                break;
            }
        }
    }
}

