/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.eventd.adaptors.tcp;

import java.io.BufferedInputStream;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.opennms.core.fiber.Fiber;
import org.opennms.core.utils.InetAddressUtils;
import org.opennms.core.xml.JaxbUtils;
import org.opennms.netmgt.eventd.adaptors.EventHandler;
import org.opennms.netmgt.eventd.adaptors.tcp.TcpRecordHandler;
import org.opennms.netmgt.xml.event.Event;
import org.opennms.netmgt.xml.event.EventReceipt;
import org.opennms.netmgt.xml.event.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.InputSource;

final class TcpStreamHandler
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(TcpStreamHandler.class);
    private List<EventHandler> m_handlers;
    private volatile boolean m_stop;
    private Fiber m_parent;
    private Socket m_connection;
    private Thread m_context;
    private int m_recsPerConn;

    TcpStreamHandler(Fiber parent, Socket sock, List<EventHandler> handlers, int number) {
        this.m_parent = parent;
        this.m_connection = sock;
        this.m_handlers = handlers;
        this.m_stop = false;
        this.m_context = null;
        this.m_recsPerConn = number;
    }

    boolean isAlive() {
        boolean rc = false;
        if (this.m_context != null) {
            rc = this.m_context.isAlive();
        }
        return rc;
    }

    void stop() throws InterruptedException {
        this.m_stop = true;
        if (this.m_context != null) {
            LOG.debug("Interrupting and joining the thread context {}", (Object)this.m_context.getName());
            this.m_context.interrupt();
            this.m_context.join();
            LOG.debug("Context stopped and joined");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Thread tchunker;
        Thread thread = this.m_context = Thread.currentThread();
        synchronized (thread) {
            this.m_context.notifyAll();
        }
        if (this.m_stop) {
            LOG.debug("The stop flag was set prior to thread entry, closing connection");
            try {
                this.m_connection.close();
            }
            catch (IOException e) {
                LOG.error("An error occured while closing the connection.", (Throwable)e);
            }
            LOG.debug("Thread context exiting");
            return;
        }
        InetAddress sender = this.m_connection.getInetAddress();
        LOG.debug("Event Log Stream Handler Started for {}", (Object)sender);
        LinkedList<Object> pipeXchange = new LinkedList<Object>();
        TcpRecordHandler chunker = new TcpRecordHandler(this.m_connection, pipeXchange);
        Thread thread2 = tchunker = new Thread((Runnable)chunker, "TCPRecord Chunker[" + InetAddressUtils.str((InetAddress)this.m_connection.getInetAddress()) + ":" + this.m_connection.getPort() + "]");
        synchronized (thread2) {
            tchunker.start();
            try {
                tchunker.wait();
            }
            catch (InterruptedException e) {
                LOG.error("The thread was interrupted.", (Throwable)e);
            }
        }
        block45: while (!this.m_stop && this.m_parent.getStatus() != 3 && this.m_parent.getStatus() != 4 && this.m_recsPerConn != 0) {
            PipedInputStream pipeIn = null;
            LinkedList<Object> e = pipeXchange;
            synchronized (e) {
                while (pipeXchange.isEmpty()) {
                    if (chunker.isAlive()) {
                        try {
                            pipeXchange.wait(500L);
                            continue;
                        }
                        catch (InterruptedException e2) {
                            LOG.error("The thread was interrupted.", (Throwable)e2);
                        }
                    }
                    break block45;
                }
                Object o = pipeXchange.removeFirst();
                if (o instanceof Throwable) {
                    break;
                }
                try {
                    pipeIn = new PipedInputStream((PipedOutputStream)o);
                }
                catch (IOException e3) {
                    LOG.error("An I/O exception occured construction a record reader.", (Throwable)e3);
                    break;
                }
                Object e3 = o;
                synchronized (e3) {
                    o.notify();
                }
            }
            this.m_recsPerConn -= this.m_recsPerConn > 0 ? 1 : 0;
            BufferedInputStream stream = new BufferedInputStream(pipeIn);
            Log eLog = null;
            boolean doCleanup = false;
            try {
                eLog = (Log)JaxbUtils.unmarshal(Log.class, (InputSource)new InputSource(stream));
                LOG.debug("Event record converted");
            }
            catch (Exception e4) {
                LOG.error("Could not unmarshall the XML record.", (Throwable)e4);
                doCleanup = true;
            }
            finally {
                if (stream != null) {
                    IOUtils.closeQuietly((InputStream)stream);
                }
            }
            if (doCleanup) {
                try {
                    while (((InputStream)stream).read() != -1) {
                    }
                    continue;
                }
                catch (IOException e4) {
                    continue;
                }
            }
            Event[] events = eLog.getEvents().getEvent();
            Arrays.sort(events, new Comparator<Event>(){

                @Override
                public int compare(Event e1, Event e2) {
                    boolean e2t;
                    boolean e1t = e1.getTime() != null;
                    boolean bl = e2t = e2.getTime() != null;
                    if (e1t && !e2t) {
                        return 1;
                    }
                    if (!e1t && e2t) {
                        return -1;
                    }
                    if (!e1t && !e2t) {
                        return 0;
                    }
                    Date de1 = e1.getTime();
                    Date de2 = e2.getTime();
                    if (de1 != null && de2 != null) {
                        return (int)(de1.getTime() - de2.getTime());
                    }
                    if (de1 == null && de2 != null) {
                        return -1;
                    }
                    if (de1 != null && de2 == null) {
                        return 1;
                    }
                    return 0;
                }
            });
            if (events != null && events.length != 0) {
                ArrayList<Event> okEvents = new ArrayList<Event>(events.length);
                List<EventHandler> list = this.m_handlers;
                synchronized (list) {
                    for (EventHandler eventHandler : this.m_handlers) {
                        for (Event event2 : events) {
                            try {
                                LOG.debug("handling event: {}", (Object)event2);
                                if (!eventHandler.processEvent(event2) || okEvents.contains(event2)) continue;
                                okEvents.add(event2);
                            }
                            catch (Throwable t) {
                                LOG.warn("An exception occured while processing an event.", t);
                            }
                        }
                    }
                }
                boolean hasReceipt = false;
                EventReceipt receipt = new EventReceipt();
                for (Object event : okEvents) {
                    if (event.getUuid() == null) continue;
                    receipt.addUuid(event.getUuid());
                    hasReceipt = true;
                }
                if (!hasReceipt) continue;
                try {
                    Object event;
                    BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(this.m_connection.getOutputStream(), "UTF-8"));
                    JaxbUtils.marshal((Object)receipt, (Writer)bufferedWriter);
                    ((Writer)bufferedWriter).flush();
                    event = this.m_handlers;
                    synchronized (event) {
                        for (EventHandler hdl3 : this.m_handlers) {
                            try {
                                hdl3.receiptSent(receipt);
                            }
                            catch (Throwable t) {
                                LOG.warn("An exception occured while processing an event receipt.", t);
                            }
                        }
                    }
                    if (!LOG.isDebugEnabled()) continue;
                    try {
                        StringWriter swriter = new StringWriter();
                        JaxbUtils.marshal((Object)receipt, (Writer)swriter);
                        LOG.debug("Sent Event Receipt {");
                        LOG.debug(swriter.getBuffer().toString());
                        LOG.debug("}");
                    }
                    catch (Throwable e5) {
                        LOG.error("An error occured during marshalling of event receipt for the log.", e5);
                    }
                    continue;
                }
                catch (IOException iOException) {
                    LOG.warn("Failed to send event-receipt XML document.", (Throwable)iOException);
                    break;
                }
            }
            LOG.debug("The agent sent an empty event stream");
        }
        try {
            LOG.debug("stopping record handler");
            chunker.stop();
            LOG.debug("record handler stopped");
        }
        catch (InterruptedException e) {
            LOG.warn("The thread was interrupted while trying to close the record handler.", (Throwable)e);
        }
        try {
            LOG.debug("closing connnection");
            this.m_connection.close();
            LOG.debug("connnection closed ");
        }
        catch (IOException e) {
            LOG.warn("An I/O exception occured while closing the TCP/IP connection.", (Throwable)e);
        }
        LOG.debug("Thread exiting");
    }
}

