/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.plugins.dbnotifier;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opennms.plugins.dbnotifier.DatabaseChangeNotifier;
import org.opennms.plugins.dbnotifier.DbNotification;
import org.opennms.plugins.dbnotifier.DbNotificationClient;
import org.opennms.plugins.dbnotifier.NotificationClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbNotificationClientQueueImpl
implements DbNotificationClient {
    private static final Logger LOG = LoggerFactory.getLogger(DbNotificationClientQueueImpl.class);
    private DatabaseChangeNotifier databaseChangeNotifier;
    private Integer maxQueueLength = 1000;
    private LinkedBlockingQueue<DbNotification> queue = null;
    private AtomicBoolean clientRunning = new AtomicBoolean(false);
    private RemovingConsumer removingConsumer = new RemovingConsumer();
    private Thread removingConsumerThread = new Thread(this.removingConsumer);
    private Map<String, NotificationClient> channelHandlingClients = new HashMap<String, NotificationClient>();

    public void setChannelHandlingClients(Map<String, NotificationClient> channelHandlingClients) {
        this.channelHandlingClients.putAll(channelHandlingClients);
    }

    @Override
    public void setDatabaseChangeNotifier(DatabaseChangeNotifier databaseChangeNotifier) {
        this.databaseChangeNotifier = databaseChangeNotifier;
    }

    @Override
    public DatabaseChangeNotifier getDatabaseChangeNotifier() {
        return this.databaseChangeNotifier;
    }

    public Integer getMaxQueueLength() {
        return this.maxQueueLength;
    }

    public void setMaxQueueLength(Integer maxQueueLength) {
        this.maxQueueLength = maxQueueLength;
    }

    @Override
    public void init() {
        LOG.debug("initialising dbNotificationClientQueue with queue size " + this.maxQueueLength);
        if (this.databaseChangeNotifier == null) {
            throw new IllegalStateException("databaseChangeNotifier cannot be null");
        }
        this.queue = new LinkedBlockingQueue(this.maxQueueLength);
        this.clientRunning.set(true);
        this.removingConsumerThread.start();
        this.databaseChangeNotifier.addDbNotificationClient(this);
    }

    @Override
    public void destroy() {
        LOG.debug("shutting down client");
        if (this.databaseChangeNotifier == null) {
            throw new IllegalStateException("databaseChangeNotifier cannot be null");
        }
        this.databaseChangeNotifier.removeDbNotificationClient(this);
        this.clientRunning.set(false);
        this.removingConsumerThread.interrupt();
    }

    @Override
    public void sendDbNotification(DbNotification dbNotification) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("client received notification - adding notification to queue");
        }
        if (!this.queue.offer(dbNotification)) {
            LOG.warn("Cannot queue any more dbNotification. dbNotification queue full. size=" + this.queue.size());
        }
    }

    private class RemovingConsumer
    implements Runnable {
        private RemovingConsumer() {
        }

        @Override
        public void run() {
            while (DbNotificationClientQueueImpl.this.clientRunning.get()) {
                try {
                    DbNotification dbNotification = (DbNotification)DbNotificationClientQueueImpl.this.queue.take();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Notification received from queue by consumer thread :\n processId:" + dbNotification.getProcessId() + "\n channelName:" + dbNotification.getChannelName() + "\n payload:" + dbNotification.getPayload());
                    }
                    if (DbNotificationClientQueueImpl.this.channelHandlingClients.isEmpty()) {
                        LOG.warn("no channel handing clients have been set to receive notification");
                        continue;
                    }
                    NotificationClient channelHandlingClient = (NotificationClient)DbNotificationClientQueueImpl.this.channelHandlingClients.get(dbNotification.getChannelName());
                    if (channelHandlingClient == null) {
                        LOG.warn("no channel handing client has been set for channel:" + dbNotification.getChannelName());
                        continue;
                    }
                    try {
                        channelHandlingClient.sendDbNotification(dbNotification);
                    }
                    catch (Exception e) {
                        LOG.error("problem processing dbNotification:", (Throwable)e);
                    }
                }
                catch (InterruptedException interruptedException) {}
            }
            LOG.debug("shutting down notification consumer thread");
        }
    }
}

