package org.opennms.netmgt.telemetry.distributed.minion;

import java.util.ArrayList;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.opennms.core.health.api.HealthCheck;
import org.opennms.core.ipc.sink.api.MessageDispatcherFactory;
import org.opennms.netmgt.dao.api.DistPollerDao;
import org.opennms.netmgt.telemetry.api.receiver.Listener;
import org.opennms.netmgt.telemetry.api.registry.TelemetryRegistry;
import org.opennms.netmgt.telemetry.common.ipc.TelemetrySinkModule;
import org.opennms.netmgt.telemetry.distributed.common.MapBasedListenerDef;
import org.opennms.netmgt.telemetry.distributed.common.PropertyTree;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.ManagedServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/netmgt/telemetry/distributed/minion/ListenerManager.class */
public class ListenerManager implements ManagedServiceFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ListenerManager.class);
    private MessageDispatcherFactory messageDispatcherFactory;
    private DistPollerDao distPollerDao;
    private TelemetryRegistry telemetryRegistry;
    private Map<String, Entity> entities = new LinkedHashMap();
    private BundleContext bundleContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opennms/netmgt/telemetry/distributed/minion/ListenerManager$Entity.class */
    public static class Entity {
        private Listener listener;
        private Set<String> queueNames;
        private ServiceRegistration<HealthCheck> healthCheck;

        private Entity() {
            this.queueNames = new HashSet();
        }
    }

    public String getName() {
        return "Manages telemetry listener lifecycle.";
    }

    public void updated(String str, Dictionary<String, ?> dictionary) {
        if (this.entities.containsKey(str)) {
            LOG.info("Updating existing listener/dispatcher for pid: {}", str);
            deleted(str);
        } else {
            LOG.info("Creating new listener/dispatcher for pid: {}", str);
        }
        MapBasedListenerDef mapBasedListenerDef = new MapBasedListenerDef(PropertyTree.from(dictionary));
        ListenerHealthCheck listenerHealthCheck = new ListenerHealthCheck(mapBasedListenerDef);
        Entity entity = new Entity();
        entity.healthCheck = this.bundleContext.registerService(HealthCheck.class, listenerHealthCheck, (Dictionary) null);
        try {
            mapBasedListenerDef.getParsers().stream().forEach(mapBasedParserDef -> {
                if (this.telemetryRegistry.getDispatcher(mapBasedParserDef.getQueueName()) != null) {
                    throw new IllegalArgumentException("A queue with name " + mapBasedParserDef.getQueueName() + " is already defined. Bailing.");
                }
                TelemetrySinkModule telemetrySinkModule = new TelemetrySinkModule(mapBasedParserDef);
                telemetrySinkModule.setDistPollerDao(this.distPollerDao);
                this.telemetryRegistry.registerDispatcher((String) Objects.requireNonNull(mapBasedParserDef.getQueueName()), this.messageDispatcherFactory.createAsyncDispatcher(telemetrySinkModule));
                entity.queueNames.add(mapBasedParserDef.getQueueName());
            });
            entity.listener = this.telemetryRegistry.getListener(mapBasedListenerDef);
            entity.listener.start();
            listenerHealthCheck.markSucess();
            this.entities.put(str, entity);
        } catch (Exception e) {
            LOG.error("Failed to build listener.", e);
            listenerHealthCheck.markError(e);
            stopQueues(entity.queueNames);
        }
        LOG.info("Successfully started listener/dispatcher for pid: {}", str);
    }

    public void deleted(String str) {
        Entity remove = this.entities.remove(str);
        if (remove.healthCheck != null) {
            remove.healthCheck.unregister();
        }
        if (remove.listener != null) {
            LOG.info("Stopping listener for pid: {}", str);
            try {
                remove.listener.stop();
            } catch (InterruptedException e) {
                LOG.error("Error occurred while stopping listener for pid: {}", str, e);
            }
        }
        if (remove.queueNames != null) {
            stopQueues(remove.queueNames);
        }
    }

    public void init() {
        LOG.info("ListenerManager started.");
    }

    public void destroy() {
        new ArrayList(this.entities.keySet()).forEach(str -> {
            deleted(str);
        });
        LOG.info("ListenerManager stopped.");
    }

    public MessageDispatcherFactory getMessageDispatcherFactory() {
        return this.messageDispatcherFactory;
    }

    public void setMessageDispatcherFactory(MessageDispatcherFactory messageDispatcherFactory) {
        this.messageDispatcherFactory = messageDispatcherFactory;
    }

    public DistPollerDao getDistPollerDao() {
        return this.distPollerDao;
    }

    public void setDistPollerDao(DistPollerDao distPollerDao) {
        this.distPollerDao = distPollerDao;
    }

    public void setBundleContext(BundleContext bundleContext) {
        this.bundleContext = bundleContext;
    }

    public void setTelemetryRegistry(TelemetryRegistry telemetryRegistry) {
        this.telemetryRegistry = telemetryRegistry;
    }

    private void stopQueues(Set<String> set) {
        Objects.requireNonNull(set);
        for (String str : set) {
            try {
                try {
                    this.telemetryRegistry.getDispatcher(str).close();
                    this.telemetryRegistry.removeDispatcher(str);
                } catch (Exception e) {
                    LOG.error("Failed to close dispatcher.", e);
                    this.telemetryRegistry.removeDispatcher(str);
                }
            } catch (Throwable th) {
                this.telemetryRegistry.removeDispatcher(str);
                throw th;
            }
        }
    }
}
