package org.opennms.plugins.dbnotifier;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/plugins/dbnotifier/DbNotificationClientQueueImpl.class */
public class DbNotificationClientQueueImpl implements DbNotificationClient {
    private static final Logger LOG = LoggerFactory.getLogger(DbNotificationClientQueueImpl.class);
    private DatabaseChangeNotifier databaseChangeNotifier;
    private Integer maxQueueLength = 1000;
    private LinkedBlockingQueue<DbNotification> queue = null;
    private AtomicBoolean clientRunning = new AtomicBoolean(false);
    private RemovingConsumer removingConsumer = new RemovingConsumer();
    private Thread removingConsumerThread = new Thread(this.removingConsumer);
    private Map<String, NotificationClient> channelHandlingClients = new HashMap();

    /* loaded from: input_file:org/opennms/plugins/dbnotifier/DbNotificationClientQueueImpl$RemovingConsumer.class */
    private class RemovingConsumer implements Runnable {
        private RemovingConsumer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (DbNotificationClientQueueImpl.this.clientRunning.get()) {
                try {
                    DbNotification dbNotification = (DbNotification) DbNotificationClientQueueImpl.this.queue.take();
                    if (DbNotificationClientQueueImpl.LOG.isDebugEnabled()) {
                        DbNotificationClientQueueImpl.LOG.debug("Notification received from queue by consumer thread :\n processId:" + dbNotification.getProcessId() + "\n channelName:" + dbNotification.getChannelName() + "\n payload:" + dbNotification.getPayload());
                    }
                    if (DbNotificationClientQueueImpl.this.channelHandlingClients.isEmpty()) {
                        DbNotificationClientQueueImpl.LOG.warn("no channel handing clients have been set to receive notification");
                    } else {
                        NotificationClient notificationClient = (NotificationClient) DbNotificationClientQueueImpl.this.channelHandlingClients.get(dbNotification.getChannelName());
                        if (notificationClient == null) {
                            DbNotificationClientQueueImpl.LOG.warn("no channel handing client has been set for channel:" + dbNotification.getChannelName());
                        } else {
                            try {
                                notificationClient.sendDbNotification(dbNotification);
                            } catch (Exception e) {
                                DbNotificationClientQueueImpl.LOG.error("problem processing dbNotification:", e);
                            }
                        }
                    }
                } catch (InterruptedException e2) {
                }
            }
            DbNotificationClientQueueImpl.LOG.debug("shutting down notification consumer thread");
        }
    }

    public void setChannelHandlingClients(Map<String, NotificationClient> map) {
        this.channelHandlingClients.putAll(map);
    }

    @Override // org.opennms.plugins.dbnotifier.DbNotificationClient
    public void setDatabaseChangeNotifier(DatabaseChangeNotifier databaseChangeNotifier) {
        this.databaseChangeNotifier = databaseChangeNotifier;
    }

    @Override // org.opennms.plugins.dbnotifier.DbNotificationClient
    public DatabaseChangeNotifier getDatabaseChangeNotifier() {
        return this.databaseChangeNotifier;
    }

    public Integer getMaxQueueLength() {
        return this.maxQueueLength;
    }

    public void setMaxQueueLength(Integer num) {
        this.maxQueueLength = num;
    }

    @Override // org.opennms.plugins.dbnotifier.NotificationClient
    public void init() {
        LOG.debug("initialising dbNotificationClientQueue with queue size " + this.maxQueueLength);
        if (this.databaseChangeNotifier == null) {
            throw new IllegalStateException("databaseChangeNotifier cannot be null");
        }
        this.queue = new LinkedBlockingQueue<>(this.maxQueueLength.intValue());
        this.clientRunning.set(true);
        this.removingConsumerThread.start();
        this.databaseChangeNotifier.addDbNotificationClient(this);
    }

    @Override // org.opennms.plugins.dbnotifier.NotificationClient
    public void destroy() {
        LOG.debug("shutting down client");
        if (this.databaseChangeNotifier == null) {
            throw new IllegalStateException("databaseChangeNotifier cannot be null");
        }
        this.databaseChangeNotifier.removeDbNotificationClient(this);
        this.clientRunning.set(false);
        this.removingConsumerThread.interrupt();
    }

    @Override // org.opennms.plugins.dbnotifier.NotificationClient
    public void sendDbNotification(DbNotification dbNotification) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("client received notification - adding notification to queue");
        }
        if (this.queue.offer(dbNotification)) {
            return;
        }
        LOG.warn("Cannot queue any more dbNotification. dbNotification queue full. size=" + this.queue.size());
    }
}
