/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.collection.persistence.tcp;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opennms.core.sysprops.SystemProperties;
import org.opennms.netmgt.collection.persistence.tcp.SimpleTcpOutputStrategy;
import org.opennms.netmgt.collection.persistence.tcp.TcpOutputStrategy;
import org.opennms.netmgt.rrd.tcp.RrdOutputSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueuingTcpOutputStrategy
implements TcpOutputStrategy {
    private static final long SLEEP_TIME = SystemProperties.getLong((String)"org.opennms.netmgt.persistence.tcp.queuingTcpSleepTime", (int)1000);
    private static final long OFFER_WAIT_TIME = SystemProperties.getLong((String)"org.opennms.netmgt.persistence.tcp.queuingTcpOfferWaitTime", (int)500);
    private static final boolean LOGGING = Boolean.getBoolean("org.opennms.netmgt.persistence.tcp.queuingTcpLogging");
    private static final long LOGGING_INTERVAL = SystemProperties.getLong((String)"org.opennms.netmgt.persistence.tcp.queuingTcpLoggingInterval", (int)300000);
    private static final Logger LOG = LoggerFactory.getLogger(QueuingTcpOutputStrategy.class);
    private final BlockingQueue<PerformanceDataReading> m_queue;
    private int m_skippedReadings = 0;
    private int m_totalOffers = 0;
    private int m_goodOffers = 0;
    private int m_badOffers = 0;

    public QueuingTcpOutputStrategy(SimpleTcpOutputStrategy delegate, int queueSize) {
        this.m_queue = new LinkedBlockingQueue<PerformanceDataReading>(queueSize);
        ConsumerThread consumerThread = new ConsumerThread(delegate, this.m_queue);
        consumerThread.start();
        if (LOGGING) {
            LogThread logThread = new LogThread(this, consumerThread, this.m_queue);
            logThread.start();
        }
    }

    @Override
    public void updateData(String path, String owner, Long timestamp, List<Double> dblValues, List<String> strValues) throws Exception {
        boolean offerGood = false;
        if (this.m_queue.offer(new PerformanceDataReading(path, owner, timestamp, dblValues, strValues), OFFER_WAIT_TIME, TimeUnit.MILLISECONDS)) {
            offerGood = true;
            if (this.m_skippedReadings > 0) {
                LOG.warn("Skipped {} performance data message(s) because of queue overflow", (Object)this.m_skippedReadings);
                this.m_skippedReadings = 0;
            }
        } else {
            ++this.m_skippedReadings;
        }
        if (LOGGING) {
            this.countOfferStats(offerGood);
        }
    }

    public void countOfferStats(boolean goodOffer) {
        ++this.m_totalOffers;
        if (goodOffer) {
            ++this.m_goodOffers;
        } else {
            ++this.m_badOffers;
        }
    }

    public void clearOfferStats() {
        this.m_totalOffers = 0;
        this.m_goodOffers = 0;
        this.m_badOffers = 0;
    }

    public long getTotalOffers() {
        return this.m_totalOffers;
    }

    public long getGoodOffers() {
        return this.m_goodOffers;
    }

    public long getBadOffers() {
        return this.m_badOffers;
    }

    private static class LogThread
    extends Thread {
        private final BlockingQueue<PerformanceDataReading> m_myQueue;
        private final QueuingTcpOutputStrategy m_strategy;
        private final ConsumerThread m_consumer;

        public LogThread(QueuingTcpOutputStrategy strategy, ConsumerThread consumer, BlockingQueue<PerformanceDataReading> queue) {
            this.m_strategy = strategy;
            this.m_myQueue = queue;
            this.m_consumer = consumer;
            this.setName(this.getClass().getSimpleName());
        }

        @Override
        public void run() {
            try {
                while (true) {
                    long totalOffers = this.m_strategy.getTotalOffers();
                    long badOffers = this.m_strategy.getBadOffers();
                    long goodOffers = this.m_strategy.getGoodOffers();
                    long queueChecks = this.m_consumer.getQueueChecks();
                    long queueDrains = this.m_consumer.getQueueDrains();
                    long queueSize = this.m_myQueue.size();
                    long queueRemaining = this.m_myQueue.remainingCapacity();
                    long sentReadings = this.m_consumer.getSentReadings();
                    LOG.info("Queue offers: " + totalOffers + " total, " + goodOffers + " good, " + badOffers + " bad; queue drains: " + queueChecks + " checks, " + queueDrains + " drains, " + sentReadings + " readings; queue state: " + queueSize + " elements, " + queueRemaining + " remaining capacity");
                    this.m_strategy.clearOfferStats();
                    this.m_consumer.clearDrainStats();
                    Thread.sleep(LOGGING_INTERVAL);
                }
            }
            catch (InterruptedException e) {
                LOG.warn("InterruptedException caught in QueuingTcpOutputStrategy$ConsumerThread, closing thread");
            }
            catch (Throwable e) {
                LOG.error("Unexpected exception caught in QueuingTcpOutputStrategy$ConsumerThread, closing thread", e);
            }
        }
    }

    private static class ConsumerThread
    extends Thread {
        private final BlockingQueue<PerformanceDataReading> m_myQueue;
        private final SimpleTcpOutputStrategy m_strategy;
        private long m_queueChecks = 0L;
        private long m_queueDrains = 0L;
        private long m_sentReadings = 0L;

        public ConsumerThread(SimpleTcpOutputStrategy strategy, BlockingQueue<PerformanceDataReading> queue) {
            this.m_strategy = strategy;
            this.m_myQueue = queue;
            this.setName(this.getClass().getSimpleName());
        }

        @Override
        public void run() {
            try {
                while (true) {
                    boolean drain = false;
                    long sentReadings = 0L;
                    ArrayList sendMe = new ArrayList();
                    if (this.m_myQueue.drainTo(sendMe) > 0) {
                        drain = true;
                        sentReadings = sendMe.size();
                        RrdOutputSocket socket = new RrdOutputSocket(this.m_strategy.getHost(), this.m_strategy.getPort());
                        for (PerformanceDataReading reading : sendMe) {
                            socket.addData(reading.getFilename(), reading.getOwner(), reading.getTimestamp(), reading.getDblValues(), reading.getStrValues());
                        }
                        socket.writeData();
                    } else {
                        Thread.sleep(SLEEP_TIME);
                    }
                    if (!LOGGING) continue;
                    this.countDrainStats(drain, sentReadings);
                }
            }
            catch (InterruptedException e) {
                LOG.warn("InterruptedException caught in QueuingTcpOutputStrategy$ConsumerThread, closing thread");
            }
            catch (Throwable e) {
                LOG.error("Unexpected exception caught in QueuingTcpOutputStrategy$ConsumerThread, closing thread", e);
            }
        }

        public void countDrainStats(boolean drain, long readings) {
            ++this.m_queueChecks;
            if (drain) {
                ++this.m_queueDrains;
                this.m_sentReadings += readings;
            }
        }

        public void clearDrainStats() {
            this.m_queueChecks = 0L;
            this.m_queueDrains = 0L;
            this.m_sentReadings = 0L;
        }

        public long getQueueChecks() {
            return this.m_queueChecks;
        }

        public long getQueueDrains() {
            return this.m_queueDrains;
        }

        public long getSentReadings() {
            return this.m_sentReadings;
        }
    }

    private static class PerformanceDataReading {
        private String m_filename;
        private String m_owner;
        private Long m_timestamp;
        private List<Double> m_dblValues;
        private List<String> m_strValues;

        public PerformanceDataReading(String filename, String owner, Long timestamp, List<Double> dblValues, List<String> strValues) {
            this.m_filename = filename;
            this.m_owner = owner;
            this.m_timestamp = timestamp;
            this.m_dblValues = dblValues;
            this.m_strValues = strValues;
        }

        public String getFilename() {
            return this.m_filename;
        }

        public String getOwner() {
            return this.m_owner;
        }

        public Long getTimestamp() {
            return this.m_timestamp;
        }

        public List<Double> getDblValues() {
            return this.m_dblValues;
        }

        public List<String> getStrValues() {
            return this.m_strValues;
        }
    }
}

