package org.opennms.netmgt.telemetry.daemon;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.opennms.core.ipc.sink.api.AsyncDispatcher;
import org.opennms.core.ipc.sink.api.MessageConsumerManager;
import org.opennms.core.ipc.sink.api.MessageDispatcherFactory;
import org.opennms.netmgt.daemon.DaemonTools;
import org.opennms.netmgt.daemon.SpringServiceDaemon;
import org.opennms.netmgt.events.api.annotations.EventHandler;
import org.opennms.netmgt.events.api.annotations.EventListener;
import org.opennms.netmgt.telemetry.api.receiver.Listener;
import org.opennms.netmgt.telemetry.api.registry.TelemetryRegistry;
import org.opennms.netmgt.telemetry.common.ipc.TelemetrySinkModule;
import org.opennms.netmgt.telemetry.config.dao.TelemetrydConfigDao;
import org.opennms.netmgt.telemetry.config.model.ListenerConfig;
import org.opennms.netmgt.telemetry.config.model.QueueConfig;
import org.opennms.netmgt.telemetry.config.model.TelemetrydConfig;
import org.opennms.netmgt.xml.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;

@EventListener(name = Telemetryd.NAME, logPrefix = Telemetryd.LOG_PREFIX)
/* loaded from: input_file:org/opennms/netmgt/telemetry/daemon/Telemetryd.class */
public class Telemetryd implements SpringServiceDaemon {
    private static final Logger LOG = LoggerFactory.getLogger(Telemetryd.class);
    public static final String NAME = "Telemetryd";
    public static final String LOG_PREFIX = "telemetryd";

    @Autowired
    private TelemetrydConfigDao telemetrydConfigDao;

    @Autowired
    private MessageDispatcherFactory messageDispatcherFactory;

    @Autowired
    private MessageConsumerManager messageConsumerManager;

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private TelemetryRegistry telemetryRegistry;
    private List<TelemetryMessageConsumer> consumers = new ArrayList();
    private List<Listener> listeners = new ArrayList();

    public synchronized void start() throws Exception {
        if (this.consumers.size() > 0) {
            throw new IllegalStateException("Telemetryd is already started.");
        }
        LOG.info("{} is starting.", NAME);
        TelemetrydConfig telemetrydConfig = (TelemetrydConfig) this.telemetrydConfigDao.getContainer().getObject();
        AutowireCapableBeanFactory autowireCapableBeanFactory = this.applicationContext.getAutowireCapableBeanFactory();
        for (QueueConfig queueConfig : telemetrydConfig.getQueues()) {
            TelemetrySinkModule telemetrySinkModule = new TelemetrySinkModule(queueConfig);
            autowireCapableBeanFactory.autowireBean(telemetrySinkModule);
            autowireCapableBeanFactory.initializeBean(telemetrySinkModule, "sinkModule");
            List list = (List) queueConfig.getAdapters().stream().filter((v0) -> {
                return v0.isEnabled();
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                LOG.debug("Skipping consumer for queue: {} (no adapters enabled/defined)", queueConfig.getName());
            } else {
                TelemetryMessageConsumer telemetryMessageConsumer = new TelemetryMessageConsumer(queueConfig, list, telemetrySinkModule);
                autowireCapableBeanFactory.autowireBean(telemetryMessageConsumer);
                autowireCapableBeanFactory.initializeBean(telemetryMessageConsumer, "consumer");
                this.consumers.add(telemetryMessageConsumer);
            }
            this.telemetryRegistry.registerDispatcher(queueConfig.getName(), this.messageDispatcherFactory.createAsyncDispatcher(telemetrySinkModule));
        }
        for (ListenerConfig listenerConfig : telemetrydConfig.getListeners()) {
            if (!listenerConfig.isEnabled()) {
                LOG.debug("Skipping disabled listener: {}", listenerConfig.getName());
            } else if (listenerConfig.getParsers().isEmpty()) {
                LOG.debug("Skipping listener with no parsers: {}", listenerConfig.getName());
            } else {
                this.listeners.add(this.telemetryRegistry.getListener(listenerConfig));
            }
        }
        for (TelemetryMessageConsumer telemetryMessageConsumer2 : this.consumers) {
            LOG.info("Starting consumer for {} adapter.", telemetryMessageConsumer2.getQueue().getName());
            this.messageConsumerManager.registerConsumer(telemetryMessageConsumer2);
        }
        for (Listener listener : this.listeners) {
            LOG.info("Starting {} listener.", listener.getName());
            listener.start();
        }
        LOG.info("{} is started.", NAME);
    }

    public synchronized void destroy() {
        LOG.info("{} is stopping.", NAME);
        for (Listener listener : this.listeners) {
            try {
                LOG.info("Stopping {} listener.", listener.getName());
                listener.stop();
            } catch (InterruptedException e) {
                LOG.warn("Error while stopping listener.", e);
            }
        }
        this.listeners.clear();
        for (AsyncDispatcher asyncDispatcher : this.telemetryRegistry.getDispatchers()) {
            try {
                LOG.info("Closing dispatcher.", asyncDispatcher);
                asyncDispatcher.close();
            } catch (Exception e2) {
                LOG.warn("Error while closing dispatcher.", e2);
            }
        }
        this.telemetryRegistry.clearDispatchers();
        AutowireCapableBeanFactory autowireCapableBeanFactory = this.applicationContext.getAutowireCapableBeanFactory();
        for (TelemetryMessageConsumer telemetryMessageConsumer : this.consumers) {
            try {
                LOG.info("Stopping consumer for {} protocol.", telemetryMessageConsumer.getQueue().getName());
                this.messageConsumerManager.unregisterConsumer(telemetryMessageConsumer);
            } catch (Exception e3) {
                LOG.error("Error while stopping consumer.", e3);
            }
            autowireCapableBeanFactory.destroyBean(telemetryMessageConsumer);
        }
        this.consumers.clear();
        LOG.info("{} is stopped.", NAME);
    }

    public void afterPropertiesSet() {
    }

    private synchronized void handleConfigurationChanged() {
        destroy();
        try {
            start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @EventHandler(uei = "uei.opennms.org/internal/reloadDaemonConfig")
    public void handleReloadEvent(Event event) {
        DaemonTools.handleReloadEvent(event, NAME, event2 -> {
            handleConfigurationChanged();
        });
    }
}
