/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.telemetry.daemon;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.opennms.core.ipc.sink.api.AsyncDispatcher;
import org.opennms.core.ipc.sink.api.MessageConsumer;
import org.opennms.core.ipc.sink.api.MessageConsumerManager;
import org.opennms.core.ipc.sink.api.MessageDispatcherFactory;
import org.opennms.core.ipc.sink.api.SinkModule;
import org.opennms.netmgt.daemon.DaemonTools;
import org.opennms.netmgt.daemon.SpringServiceDaemon;
import org.opennms.netmgt.events.api.annotations.EventHandler;
import org.opennms.netmgt.events.api.annotations.EventListener;
import org.opennms.netmgt.telemetry.api.receiver.Listener;
import org.opennms.netmgt.telemetry.api.registry.TelemetryRegistry;
import org.opennms.netmgt.telemetry.common.ipc.TelemetrySinkModule;
import org.opennms.netmgt.telemetry.config.api.ListenerDefinition;
import org.opennms.netmgt.telemetry.config.api.QueueDefinition;
import org.opennms.netmgt.telemetry.config.dao.TelemetrydConfigDao;
import org.opennms.netmgt.telemetry.config.model.AdapterConfig;
import org.opennms.netmgt.telemetry.config.model.ListenerConfig;
import org.opennms.netmgt.telemetry.config.model.QueueConfig;
import org.opennms.netmgt.telemetry.config.model.TelemetrydConfig;
import org.opennms.netmgt.telemetry.daemon.TelemetryMessageConsumer;
import org.opennms.netmgt.xml.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;

@EventListener(name="Telemetryd", logPrefix="telemetryd")
public class Telemetryd
implements SpringServiceDaemon {
    private static final Logger LOG = LoggerFactory.getLogger(Telemetryd.class);
    public static final String NAME = "Telemetryd";
    public static final String LOG_PREFIX = "telemetryd";
    @Autowired
    private TelemetrydConfigDao telemetrydConfigDao;
    @Autowired
    private MessageDispatcherFactory messageDispatcherFactory;
    @Autowired
    private MessageConsumerManager messageConsumerManager;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private TelemetryRegistry telemetryRegistry;
    private List<TelemetryMessageConsumer> consumers = new ArrayList<TelemetryMessageConsumer>();
    private List<Listener> listeners = new ArrayList<Listener>();

    public synchronized void start() throws Exception {
        if (this.consumers.size() > 0) {
            throw new IllegalStateException("Telemetryd is already started.");
        }
        LOG.info("{} is starting.", (Object)NAME);
        TelemetrydConfig config = (TelemetrydConfig)this.telemetrydConfigDao.getContainer().getObject();
        AutowireCapableBeanFactory beanFactory = this.applicationContext.getAutowireCapableBeanFactory();
        for (QueueConfig queueConfig : config.getQueues()) {
            TelemetrySinkModule sinkModule = new TelemetrySinkModule((QueueDefinition)queueConfig);
            beanFactory.autowireBean((Object)sinkModule);
            beanFactory.initializeBean((Object)sinkModule, "sinkModule");
            List enabledAdapters = queueConfig.getAdapters().stream().filter(AdapterConfig::isEnabled).collect(Collectors.toList());
            if (!enabledAdapters.isEmpty()) {
                TelemetryMessageConsumer consumer = new TelemetryMessageConsumer((QueueDefinition)queueConfig, enabledAdapters, sinkModule);
                beanFactory.autowireBean((Object)consumer);
                beanFactory.initializeBean((Object)consumer, "consumer");
                this.consumers.add(consumer);
            } else {
                LOG.debug("Skipping consumer for queue: {} (no adapters enabled/defined)", (Object)queueConfig.getName());
            }
            AsyncDispatcher dispatcher = this.messageDispatcherFactory.createAsyncDispatcher((SinkModule)sinkModule);
            this.telemetryRegistry.registerDispatcher(queueConfig.getName(), dispatcher);
        }
        for (ListenerConfig listenerConfig : config.getListeners()) {
            if (!listenerConfig.isEnabled()) {
                LOG.debug("Skipping disabled listener: {}", (Object)listenerConfig.getName());
                continue;
            }
            if (listenerConfig.getParsers().isEmpty()) {
                LOG.debug("Skipping listener with no parsers: {}", (Object)listenerConfig.getName());
                continue;
            }
            Listener listener = this.telemetryRegistry.getListener((ListenerDefinition)listenerConfig);
            this.listeners.add(listener);
        }
        for (TelemetryMessageConsumer consumer : this.consumers) {
            LOG.info("Starting consumer for {} adapter.", (Object)consumer.getQueue().getName());
            this.messageConsumerManager.registerConsumer((MessageConsumer)consumer);
        }
        for (Listener listener : this.listeners) {
            LOG.info("Starting {} listener.", (Object)listener.getName());
            listener.start();
        }
        LOG.info("{} is started.", (Object)NAME);
    }

    public synchronized void destroy() {
        LOG.info("{} is stopping.", (Object)NAME);
        for (Listener listener : this.listeners) {
            try {
                LOG.info("Stopping {} listener.", (Object)listener.getName());
                listener.stop();
            }
            catch (InterruptedException e) {
                LOG.warn("Error while stopping listener.", (Throwable)e);
            }
        }
        this.listeners.clear();
        for (AsyncDispatcher dispatcher : this.telemetryRegistry.getDispatchers()) {
            try {
                LOG.info("Closing dispatcher.", (Object)dispatcher);
                dispatcher.close();
            }
            catch (Exception e) {
                LOG.warn("Error while closing dispatcher.", (Throwable)e);
            }
        }
        this.telemetryRegistry.clearDispatchers();
        AutowireCapableBeanFactory beanFactory = this.applicationContext.getAutowireCapableBeanFactory();
        for (TelemetryMessageConsumer consumer : this.consumers) {
            try {
                LOG.info("Stopping consumer for {} protocol.", (Object)consumer.getQueue().getName());
                this.messageConsumerManager.unregisterConsumer((MessageConsumer)consumer);
            }
            catch (Exception e) {
                LOG.error("Error while stopping consumer.", (Throwable)e);
            }
            beanFactory.destroyBean((Object)consumer);
        }
        this.consumers.clear();
        LOG.info("{} is stopped.", (Object)NAME);
    }

    public void afterPropertiesSet() {
    }

    private synchronized void handleConfigurationChanged() {
        this.destroy();
        try {
            this.start();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @EventHandler(uei="uei.opennms.org/internal/reloadDaemonConfig")
    public void handleReloadEvent(Event e) {
        DaemonTools.handleReloadEvent((Event)e, (String)NAME, event -> this.handleConfigurationChanged());
    }
}

