/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.plugins.elasticsearch.rest;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opennms.netmgt.events.api.EventForwarder;
import org.opennms.netmgt.xml.event.Event;
import org.opennms.netmgt.xml.event.Log;
import org.opennms.plugins.elasticsearch.rest.ElasticSearchInitialiser;
import org.opennms.plugins.elasticsearch.rest.EventToIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventForwarderQueueImpl
implements EventForwarder {
    private static final Logger LOG = LoggerFactory.getLogger(EventForwarderQueueImpl.class);
    private Integer maxQueueLength = 1000;
    private boolean blockWhenFull = true;
    private LinkedBlockingQueue<Event> queue = null;
    private AtomicBoolean clientRunning = new AtomicBoolean(false);
    private RemovingConsumer removingConsumer = new RemovingConsumer();
    private Thread removingConsumerThread = new Thread(this.removingConsumer);
    private EventToIndex eventToIndex = null;
    private ElasticSearchInitialiser elasticSearchInitialiser = null;

    public EventToIndex getEventToIndex() {
        return this.eventToIndex;
    }

    public void setEventToIndex(EventToIndex eventToIndex) {
        this.eventToIndex = eventToIndex;
    }

    public Integer getMaxQueueLength() {
        return this.maxQueueLength;
    }

    public void setMaxQueueLength(Integer maxQueueLength) {
        this.maxQueueLength = maxQueueLength;
    }

    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setBlockWhenFull(boolean blockWhenFull) {
        this.blockWhenFull = blockWhenFull;
    }

    public void sendNow(Event event) {
        if (this.elasticSearchInitialiser != null && this.elasticSearchInitialiser.isInitialised()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Event received: queue.size() " + this.queue.size() + " queue.remainingCapacity() " + this.queue.remainingCapacity() + "\n   event:" + event.toString());
            }
            if (this.blockWhenFull) {
                try {
                    this.queue.put(event);
                }
                catch (InterruptedException e) {
                    LOG.warn("Elasticsearch interface discarding event dbid=" + event.getDbid() + " Interrupted exception while trying to add event to queue, size=" + this.queue.size());
                }
            } else if (!this.queue.offer(event)) {
                LOG.warn("Elasticsearch interface discarding event dbid=" + event.getDbid() + " Cannot queue any more events. Event queue full. size=" + this.queue.size());
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Not sending event received Elasticsearch not initialised\n   event:" + event.toString());
        }
    }

    public void sendNow(Log eventLog) {
    }

    public void sendNowSync(Event event) {
        throw new UnsupportedOperationException();
    }

    public void sendNowSync(Log eventLog) {
        throw new UnsupportedOperationException();
    }

    public void init() {
        LOG.debug("initialising EventFowarderQueue with queue size " + this.maxQueueLength);
        this.queue = new LinkedBlockingQueue(this.maxQueueLength);
        this.clientRunning.set(true);
        this.removingConsumerThread.start();
    }

    public void destroy() {
        LOG.debug("shutting down EventFowarderQueue");
        this.clientRunning.set(false);
        this.removingConsumerThread.interrupt();
    }

    public ElasticSearchInitialiser getElasticSearchInitialiser() {
        return this.elasticSearchInitialiser;
    }

    public void setElasticSearchInitialiser(ElasticSearchInitialiser elasticSearchInitialiser) {
        this.elasticSearchInitialiser = elasticSearchInitialiser;
    }

    private class RemovingConsumer
    implements Runnable {
        private RemovingConsumer() {
        }

        @Override
        public void run() {
            while (EventForwarderQueueImpl.this.clientRunning.get()) {
                try {
                    Event event = (Event)EventForwarderQueueImpl.this.queue.take();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Event received from queue by consumer thread :\n event:" + event.toString());
                    }
                    if (EventForwarderQueueImpl.this.eventToIndex != null) {
                        EventForwarderQueueImpl.this.eventToIndex.forwardEvent(event);
                        continue;
                    }
                    LOG.error("cannot send event eventToIndex is null");
                }
                catch (InterruptedException interruptedException) {}
            }
            LOG.debug("shutting down event consumer thread");
        }
    }
}

