/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.features.kafka.producer.shell;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.karaf.shell.api.action.Action;
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.Option;
import org.apache.karaf.shell.api.action.lifecycle.Reference;
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.opennms.netmgt.topologies.service.api.OnmsTopology;
import org.opennms.netmgt.topologies.service.api.OnmsTopologyConsumer;
import org.opennms.netmgt.topologies.service.api.OnmsTopologyDao;
import org.opennms.netmgt.topologies.service.api.OnmsTopologyEdge;
import org.opennms.netmgt.topologies.service.api.OnmsTopologyMessage;
import org.opennms.netmgt.topologies.service.api.OnmsTopologyProtocol;
import org.opennms.netmgt.topologies.service.api.OnmsTopologyRef;
import org.opennms.netmgt.topologies.service.api.OnmsTopologyVertex;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;

@Command(scope="opennms-kafka-producer", name="push-topology-edges", description="Pushes all of the related topology edges to the configured topic.")
@Service
public class PushTopologyEdges
implements Action {
    @Option(name="-p", aliases={"--protocol"}, description="Protocol", multiValued=true)
    private List<String> protocols;
    @Reference
    private OnmsTopologyDao onmsTopologyDao;
    @Reference
    private BundleContext bundleContext;

    public Object execute() throws InvalidSyntaxException {
        LinkedHashMap<OnmsTopologyProtocol, OnmsTopology> topologies;
        ServiceReference serviceRef = (ServiceReference)this.bundleContext.getServiceReferences(OnmsTopologyConsumer.class, "(type=kafkaProducer)").stream().findFirst().orElseThrow(() -> new IllegalStateException("Could not find reference to OnmsTopologyConsumer service exposed by the OpennmsKafkaProducer."));
        OnmsTopologyConsumer consumer = (OnmsTopologyConsumer)this.bundleContext.getService(serviceRef);
        System.out.println("Retrieving topologies...");
        if (this.protocols == null || this.protocols.isEmpty()) {
            topologies = this.onmsTopologyDao.getTopologies();
        } else {
            topologies = new LinkedHashMap<OnmsTopologyProtocol, OnmsTopology>();
            for (String string : this.protocols) {
                topologies.put(OnmsTopologyProtocol.create((String)string), this.onmsTopologyDao.getTopology(string));
            }
        }
        System.out.printf("Retrieved %d topologies.\n", topologies.size());
        for (Map.Entry entry : topologies.entrySet()) {
            OnmsTopologyProtocol protocol = (OnmsTopologyProtocol)entry.getKey();
            String protocolName = protocol.getId().toUpperCase();
            OnmsTopology topology = (OnmsTopology)entry.getValue();
            Set topologyVertices = topology.getVertices();
            Set topologyEdges = topology.getEdges();
            System.out.printf("%s: Pushing %d vertices and %d edges.\n", protocolName, topologyVertices.size(), topologyEdges.size());
            int numVerticesPushed = 0;
            for (OnmsTopologyVertex vertex : topology.getVertices()) {
                consumer.consume(OnmsTopologyMessage.update((OnmsTopologyRef)vertex, (OnmsTopologyProtocol)protocol));
                if (++numVerticesPushed > 0 && numVerticesPushed % 100 == 0) {
                    System.out.printf("%s: Pushed %d vertices.\n", protocolName, numVerticesPushed);
                }
                if (!Thread.interrupted()) continue;
                System.out.println("Interrupted. Aborting.");
            }
            int numEdgesPushed = 0;
            for (OnmsTopologyEdge edge : topology.getEdges()) {
                consumer.consume(OnmsTopologyMessage.update((OnmsTopologyRef)edge, (OnmsTopologyProtocol)protocol));
                if (++numEdgesPushed > 0 && numEdgesPushed % 100 == 0) {
                    System.out.printf("%s: Pushed %d edges.\n", protocolName, numEdgesPushed);
                }
                if (!Thread.interrupted()) continue;
                System.out.println("Interrupted. Aborting.");
            }
        }
        System.out.println("Done.");
        return null;
    }
}

