/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.telemetry.daemon;

import java.util.ArrayList;
import java.util.List;
import org.opennms.core.ipc.sink.api.AsyncDispatcher;
import org.opennms.core.ipc.sink.api.MessageConsumer;
import org.opennms.core.ipc.sink.api.MessageConsumerManager;
import org.opennms.core.ipc.sink.api.MessageDispatcherFactory;
import org.opennms.core.ipc.sink.api.SinkModule;
import org.opennms.netmgt.daemon.DaemonTools;
import org.opennms.netmgt.daemon.SpringServiceDaemon;
import org.opennms.netmgt.events.api.annotations.EventHandler;
import org.opennms.netmgt.events.api.annotations.EventListener;
import org.opennms.netmgt.telemetry.config.dao.TelemetrydConfigDao;
import org.opennms.netmgt.telemetry.config.model.Protocol;
import org.opennms.netmgt.telemetry.config.model.TelemetrydConfiguration;
import org.opennms.netmgt.telemetry.daemon.TelemetryMessageConsumer;
import org.opennms.netmgt.telemetry.ipc.ProtocolDefinition;
import org.opennms.netmgt.telemetry.ipc.TelemetrySinkModule;
import org.opennms.netmgt.telemetry.listeners.api.Listener;
import org.opennms.netmgt.telemetry.listeners.api.ListenerDefinition;
import org.opennms.netmgt.telemetry.utils.ListenerFactory;
import org.opennms.netmgt.xml.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;

@EventListener(name="Telemetryd", logPrefix="telemetryd")
public class Telemetryd
implements SpringServiceDaemon {
    private static final Logger LOG = LoggerFactory.getLogger(Telemetryd.class);
    public static final String NAME = "Telemetryd";
    public static final String LOG_PREFIX = "telemetryd";
    @Autowired
    private TelemetrydConfigDao telemetrydConfigDao;
    @Autowired
    private MessageDispatcherFactory messageDispatcherFactory;
    @Autowired
    private MessageConsumerManager messageConsumerManager;
    @Autowired
    private ApplicationContext applicationContext;
    private List<TelemetryMessageConsumer> consumers = new ArrayList<TelemetryMessageConsumer>();
    private List<AsyncDispatcher<?>> dispatchers = new ArrayList();
    private List<Listener> listeners = new ArrayList<Listener>();

    public synchronized void start() throws Exception {
        if (this.consumers.size() > 0) {
            throw new IllegalStateException("Telemetryd is already started.");
        }
        LOG.info("{} is starting.", (Object)NAME);
        TelemetrydConfiguration config = (TelemetrydConfiguration)this.telemetrydConfigDao.getContainer().getObject();
        AutowireCapableBeanFactory beanFactory = this.applicationContext.getAutowireCapableBeanFactory();
        for (Protocol protocol : config.getProtocols()) {
            if (!protocol.getEnabled().booleanValue()) {
                LOG.debug("Skipping disabled protocol: {}", (Object)protocol.getName());
                continue;
            }
            LOG.debug("Setting up protocol: {}", (Object)protocol.getName());
            TelemetrySinkModule sinkModule = new TelemetrySinkModule((ProtocolDefinition)protocol);
            beanFactory.autowireBean((Object)sinkModule);
            beanFactory.initializeBean((Object)sinkModule, "sinkModule");
            TelemetryMessageConsumer consumer = new TelemetryMessageConsumer(protocol, sinkModule);
            beanFactory.autowireBean((Object)consumer);
            beanFactory.initializeBean((Object)consumer, "consumer");
            this.consumers.add(consumer);
            AsyncDispatcher dispatcher = this.messageDispatcherFactory.createAsyncDispatcher((SinkModule)sinkModule);
            this.dispatchers.add(dispatcher);
            for (org.opennms.netmgt.telemetry.config.model.Listener listenerDef : protocol.getListeners()) {
                this.listeners.add(ListenerFactory.buildListener((ListenerDefinition)listenerDef, (AsyncDispatcher)dispatcher));
            }
        }
        for (TelemetryMessageConsumer consumer : this.consumers) {
            LOG.info("Starting consumer for {} protocol.", (Object)consumer.getProtocol().getName());
            this.messageConsumerManager.registerConsumer((MessageConsumer)consumer);
        }
        for (Listener listener : this.listeners) {
            LOG.info("Starting {} listener.", (Object)listener.getName());
            listener.start();
        }
        LOG.info("{} is started.", (Object)NAME);
    }

    public synchronized void destroy() {
        LOG.info("{} is stopping.", (Object)NAME);
        for (Listener listener : this.listeners) {
            try {
                LOG.info("Stopping {} listener.", (Object)listener.getName());
                listener.stop();
            }
            catch (InterruptedException e) {
                LOG.warn("Error while stopping listener.", (Throwable)e);
            }
        }
        this.listeners.clear();
        for (AsyncDispatcher asyncDispatcher : this.dispatchers) {
            try {
                LOG.info("Closing dispatcher.", (Object)asyncDispatcher);
                asyncDispatcher.close();
            }
            catch (Exception e) {
                LOG.warn("Error while closing dispatcher.", (Throwable)e);
            }
        }
        this.dispatchers.clear();
        for (TelemetryMessageConsumer telemetryMessageConsumer : this.consumers) {
            try {
                LOG.info("Stopping consumer for {} protocol.", (Object)telemetryMessageConsumer.getProtocol().getName());
                this.messageConsumerManager.unregisterConsumer((MessageConsumer)telemetryMessageConsumer);
            }
            catch (Exception e) {
                LOG.error("Error while stopping consumer.", (Throwable)e);
            }
        }
        this.consumers.clear();
        LOG.info("{} is stopped.", (Object)NAME);
    }

    public void afterPropertiesSet() {
    }

    private synchronized void handleConfigurationChanged() {
        this.destroy();
        try {
            this.start();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @EventHandler(uei="uei.opennms.org/internal/reloadDaemonConfig")
    public void handleReloadEvent(Event e) {
        LOG.info("Received a reload configuration event: {}", (Object)e);
        DaemonTools.handleReloadEvent((Event)e, (String)NAME, event -> this.handleConfigurationChanged());
    }
}

