package org.opennms.netmgt.flows.elastic;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import io.opentracing.Scope;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import io.searchbox.client.JestClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.Index;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.opennms.core.tracing.api.TracerRegistry;
import org.opennms.distributed.core.api.Identity;
import org.opennms.features.jest.client.bulk.BulkException;
import org.opennms.features.jest.client.bulk.BulkRequest;
import org.opennms.features.jest.client.bulk.BulkWrapper;
import org.opennms.features.jest.client.index.IndexStrategy;
import org.opennms.features.jest.client.template.IndexSettings;
import org.opennms.netmgt.dao.api.NodeDao;
import org.opennms.netmgt.dao.api.SessionUtils;
import org.opennms.netmgt.dao.api.SnmpInterfaceDao;
import org.opennms.netmgt.flows.api.Conversation;
import org.opennms.netmgt.flows.api.Directional;
import org.opennms.netmgt.flows.api.EnrichedFlowForwarder;
import org.opennms.netmgt.flows.api.Flow;
import org.opennms.netmgt.flows.api.FlowException;
import org.opennms.netmgt.flows.api.FlowRepository;
import org.opennms.netmgt.flows.api.FlowSource;
import org.opennms.netmgt.flows.api.Host;
import org.opennms.netmgt.flows.api.TrafficSummary;
import org.opennms.netmgt.flows.filter.api.Filter;
import org.opennms.netmgt.model.OnmsNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/netmgt/flows/elastic/ElasticFlowRepository.class */
public class ElasticFlowRepository implements FlowRepository {
    public static final String TRACER_FLOW_MODULE = "ElasticFlow";
    private static final Logger LOG = LoggerFactory.getLogger(ElasticFlowRepository.class);
    private static final String INDEX_NAME = "netflow";
    private final JestClient client;
    private final IndexStrategy indexStrategy;
    private final DocumentEnricher documentEnricher;
    private final Meter flowsPersistedMeter;
    private final Timer logEnrichementTimer;
    private final Timer logPersistingTimer;
    private final Timer logMarkingTimer;
    private final Histogram flowsPerLog;
    private final SessionUtils sessionUtils;
    private final NodeDao nodeDao;
    private final SnmpInterfaceDao snmpInterfaceDao;
    private final Identity identity;
    private final TracerRegistry tracerRegistry;
    private final IndexSettings indexSettings;
    private final SmartQueryService smartQueryService;
    private final EnrichedFlowForwarder enrichedFlowForwarder;
    private final RateLimitedLog RATE_LIMITED_LOGGER = RateLimitedLog.withRateLimit(LOG).maxRate(5).every(Duration.ofSeconds(30)).build();
    private boolean enableFlowForwarding = false;
    private int bulkRetryCount = 1;
    private boolean skipElasticsearchPersistence = false;
    private final Map<Direction, Cache<Integer, Set<Integer>>> markerCache = Maps.newEnumMap(Direction.class);

    public ElasticFlowRepository(MetricRegistry metricRegistry, JestClient jestClient, IndexStrategy indexStrategy, DocumentEnricher documentEnricher, SessionUtils sessionUtils, NodeDao nodeDao, SnmpInterfaceDao snmpInterfaceDao, Identity identity, TracerRegistry tracerRegistry, EnrichedFlowForwarder enrichedFlowForwarder, IndexSettings indexSettings, SmartQueryService smartQueryService) {
        this.client = (JestClient) Objects.requireNonNull(jestClient);
        this.indexStrategy = (IndexStrategy) Objects.requireNonNull(indexStrategy);
        this.documentEnricher = (DocumentEnricher) Objects.requireNonNull(documentEnricher);
        this.sessionUtils = (SessionUtils) Objects.requireNonNull(sessionUtils);
        this.nodeDao = (NodeDao) Objects.requireNonNull(nodeDao);
        this.snmpInterfaceDao = (SnmpInterfaceDao) Objects.requireNonNull(snmpInterfaceDao);
        this.identity = identity;
        this.tracerRegistry = tracerRegistry;
        this.enrichedFlowForwarder = enrichedFlowForwarder;
        this.indexSettings = (IndexSettings) Objects.requireNonNull(indexSettings);
        this.smartQueryService = (SmartQueryService) Objects.requireNonNull(smartQueryService);
        this.flowsPersistedMeter = metricRegistry.meter("flowsPersisted");
        this.logEnrichementTimer = metricRegistry.timer("logEnrichment");
        this.logPersistingTimer = metricRegistry.timer("logPersisting");
        this.logMarkingTimer = metricRegistry.timer("logMarking");
        this.flowsPerLog = metricRegistry.histogram("flowsPerLog");
        this.markerCache.put(Direction.INGRESS, CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.HOURS).build());
        this.markerCache.put(Direction.EGRESS, CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.HOURS).build());
        this.sessionUtils.withTransaction(() -> {
            for (OnmsNode onmsNode : this.nodeDao.findAllHavingIngressFlows()) {
                this.markerCache.get(Direction.INGRESS).put(onmsNode.getId(), this.snmpInterfaceDao.findAllHavingIngressFlows(onmsNode.getId()).stream().map((v0) -> {
                    return v0.getIfIndex();
                }).collect(Collectors.toCollection(Sets::newConcurrentHashSet)));
            }
            for (OnmsNode onmsNode2 : this.nodeDao.findAllHavingEgressFlows()) {
                this.markerCache.get(Direction.EGRESS).put(onmsNode2.getId(), this.snmpInterfaceDao.findAllHavingEgressFlows(onmsNode2.getId()).stream().map((v0) -> {
                    return v0.getIfIndex();
                }).collect(Collectors.toCollection(Sets::newConcurrentHashSet)));
            }
            return null;
        });
    }

    public void persist(Collection<Flow> collection, FlowSource flowSource) throws FlowException {
        this.flowsPerLog.update(collection.size());
        if (collection.isEmpty()) {
            LOG.info("Received empty flows from {} @ {}. Nothing to do.", flowSource.getSourceAddress(), flowSource.getLocation());
            return;
        }
        LOG.debug("Enriching {} flow documents.", Integer.valueOf(collection.size()));
        try {
            Timer.Context time = this.logEnrichementTimer.time();
            Throwable th = null;
            try {
                List<FlowDocument> enrich = this.documentEnricher.enrich(collection, flowSource);
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        time.close();
                    }
                }
                if (this.enableFlowForwarding) {
                    LOG.debug("Forwarding {} flow documents.", Integer.valueOf(enrich.size()));
                    Stream<R> map = enrich.stream().map(FlowDocument::buildEnrichedFlow);
                    EnrichedFlowForwarder enrichedFlowForwarder = this.enrichedFlowForwarder;
                    enrichedFlowForwarder.getClass();
                    map.forEach(enrichedFlowForwarder::forward);
                }
                if (this.skipElasticsearchPersistence) {
                    this.RATE_LIMITED_LOGGER.error("Flow persistence disabled. Dropping {} flow documents.", Integer.valueOf(enrich.size()));
                    return;
                }
                LOG.debug("Persisting {} flow documents.", Integer.valueOf(enrich.size()));
                Tracer tracer = getTracer();
                Timer.Context time2 = this.logPersistingTimer.time();
                Throwable th3 = null;
                try {
                    Scope startActive = tracer.buildSpan(TRACER_FLOW_MODULE).startActive(true);
                    Throwable th4 = null;
                    try {
                        startActive.span().setTag("location", flowSource.getLocation());
                        startActive.span().setTag("sourceAddress", flowSource.getSourceAddress());
                        startActive.span().setTag("thread", Thread.currentThread().getName());
                        try {
                            new BulkRequest(this.client, enrich, list -> {
                                Bulk.Builder builder = new Bulk.Builder();
                                Iterator it = list.iterator();
                                while (it.hasNext()) {
                                    FlowDocument flowDocument = (FlowDocument) it.next();
                                    builder.addAction(((Index.Builder) new Index.Builder(flowDocument).index(this.indexStrategy.getIndex(this.indexSettings, "netflow", Instant.ofEpochMilli(flowDocument.getTimestamp())))).build());
                                }
                                return new BulkWrapper(builder);
                            }, this.bulkRetryCount).execute();
                            this.flowsPersistedMeter.mark(enrich.size());
                            if (startActive != null) {
                                if (0 != 0) {
                                    try {
                                        startActive.close();
                                    } catch (Throwable th5) {
                                        th4.addSuppressed(th5);
                                    }
                                } else {
                                    startActive.close();
                                }
                            }
                            Timer.Context time3 = this.logMarkingTimer.time();
                            Throwable th6 = null;
                            try {
                                try {
                                    EnumMap newEnumMap = Maps.newEnumMap(Direction.class);
                                    EnumMap newEnumMap2 = Maps.newEnumMap(Direction.class);
                                    newEnumMap.put((EnumMap) Direction.INGRESS, (Direction) Lists.newArrayListWithExpectedSize(enrich.size()));
                                    newEnumMap.put((EnumMap) Direction.EGRESS, (Direction) Lists.newArrayListWithExpectedSize(enrich.size()));
                                    newEnumMap2.put((EnumMap) Direction.INGRESS, (Direction) Maps.newHashMap());
                                    newEnumMap2.put((EnumMap) Direction.EGRESS, (Direction) Maps.newHashMap());
                                    for (FlowDocument flowDocument : enrich) {
                                        if (flowDocument.getNodeExporter() != null && flowDocument.getNodeExporter().getNodeId() != null) {
                                            Integer nodeId = flowDocument.getNodeExporter().getNodeId();
                                            Set set = (Set) this.markerCache.get(flowDocument.getDirection()).getIfPresent(nodeId);
                                            if (set == null) {
                                                Cache<Integer, Set<Integer>> cache = this.markerCache.get(flowDocument.getDirection());
                                                Set newConcurrentHashSet = Sets.newConcurrentHashSet();
                                                set = newConcurrentHashSet;
                                                cache.put(nodeId, newConcurrentHashSet);
                                                ((List) newEnumMap.get(flowDocument.getDirection())).add(nodeId);
                                            }
                                            if (flowDocument.getInputSnmp() != null && flowDocument.getInputSnmp().intValue() != 0 && !set.contains(flowDocument.getInputSnmp())) {
                                                set.add(flowDocument.getInputSnmp());
                                                ((List) ((Map) newEnumMap2.get(flowDocument.getDirection())).computeIfAbsent(nodeId, num -> {
                                                    return Lists.newArrayList();
                                                })).add(flowDocument.getInputSnmp());
                                            }
                                            if (flowDocument.getOutputSnmp() != null && flowDocument.getOutputSnmp().intValue() != 0 && !set.contains(flowDocument.getOutputSnmp())) {
                                                set.add(flowDocument.getOutputSnmp());
                                                ((List) ((Map) newEnumMap2.get(flowDocument.getDirection())).computeIfAbsent(nodeId, num2 -> {
                                                    return Lists.newArrayList();
                                                })).add(flowDocument.getOutputSnmp());
                                            }
                                        }
                                    }
                                    if (!((List) newEnumMap.get(Direction.INGRESS)).isEmpty() || !((Map) newEnumMap2.get(Direction.INGRESS)).isEmpty() || !((List) newEnumMap.get(Direction.EGRESS)).isEmpty() || !((Map) newEnumMap2.get(Direction.EGRESS)).isEmpty()) {
                                        this.sessionUtils.withTransaction(() -> {
                                            if (!((List) newEnumMap.get(Direction.INGRESS)).isEmpty() || !((List) newEnumMap.get(Direction.EGRESS)).isEmpty()) {
                                                this.nodeDao.markHavingFlows((Collection) newEnumMap.get(Direction.INGRESS), (Collection) newEnumMap.get(Direction.EGRESS));
                                            }
                                            for (Map.Entry entry : ((Map) newEnumMap2.get(Direction.INGRESS)).entrySet()) {
                                                this.snmpInterfaceDao.markHavingIngressFlows((Integer) entry.getKey(), (Collection) entry.getValue());
                                            }
                                            for (Map.Entry entry2 : ((Map) newEnumMap2.get(Direction.EGRESS)).entrySet()) {
                                                this.snmpInterfaceDao.markHavingEgressFlows((Integer) entry2.getKey(), (Collection) entry2.getValue());
                                            }
                                            return null;
                                        });
                                    }
                                    if (time3 != null) {
                                        if (0 == 0) {
                                            time3.close();
                                            return;
                                        }
                                        try {
                                            time3.close();
                                        } catch (Throwable th7) {
                                            th6.addSuppressed(th7);
                                        }
                                    }
                                } catch (Throwable th8) {
                                    th6 = th8;
                                    throw th8;
                                }
                            } catch (Throwable th9) {
                                if (time3 != null) {
                                    if (th6 != null) {
                                        try {
                                            time3.close();
                                        } catch (Throwable th10) {
                                            th6.addSuppressed(th10);
                                        }
                                    } else {
                                        time3.close();
                                    }
                                }
                                throw th9;
                            }
                        } catch (BulkException e) {
                            throw new PersistenceException(e.getMessage(), e.getBulkResult().getFailedDocuments());
                        } catch (IOException e2) {
                            LOG.error("An error occurred while executing the given request: {}", e2.getMessage(), e2);
                            throw new FlowException(e2.getMessage(), e2);
                        }
                    } catch (Throwable th11) {
                        if (startActive != null) {
                            if (0 != 0) {
                                try {
                                    startActive.close();
                                } catch (Throwable th12) {
                                    th4.addSuppressed(th12);
                                }
                            } else {
                                startActive.close();
                            }
                        }
                        throw th11;
                    }
                } finally {
                    if (time2 != null) {
                        if (0 != 0) {
                            try {
                                time2.close();
                            } catch (Throwable th13) {
                                th3.addSuppressed(th13);
                            }
                        } else {
                            time2.close();
                        }
                    }
                }
            } finally {
            }
        } catch (Exception e3) {
            throw new FlowException("Failed to enrich one or more flows.", e3);
        }
    }

    public CompletableFuture<Long> getFlowCount(List<Filter> list) {
        return this.smartQueryService.getFlowCount(list);
    }

    public CompletableFuture<List<String>> getApplications(String str, long j, List<Filter> list) {
        return this.smartQueryService.getApplications(str, j, list);
    }

    public CompletableFuture<List<TrafficSummary<String>>> getTopNApplicationSummaries(int i, boolean z, List<Filter> list) {
        return this.smartQueryService.getTopNApplicationSummaries(i, z, list);
    }

    public CompletableFuture<List<TrafficSummary<String>>> getApplicationSummaries(Set<String> set, boolean z, List<Filter> list) {
        return this.smartQueryService.getApplicationSummaries(set, z, list);
    }

    public CompletableFuture<Table<Directional<String>, Long, Double>> getApplicationSeries(Set<String> set, long j, boolean z, List<Filter> list) {
        return this.smartQueryService.getApplicationSeries(set, j, z, list);
    }

    public CompletableFuture<Table<Directional<String>, Long, Double>> getTopNApplicationSeries(int i, long j, boolean z, List<Filter> list) {
        return this.smartQueryService.getTopNApplicationSeries(i, j, z, list);
    }

    public CompletableFuture<List<String>> getConversations(String str, String str2, String str3, String str4, String str5, long j, List<Filter> list) {
        return this.smartQueryService.getConversations(str, str2, str3, str4, str5, j, list);
    }

    public CompletableFuture<List<TrafficSummary<Conversation>>> getTopNConversationSummaries(int i, boolean z, List<Filter> list) {
        return this.smartQueryService.getTopNConversationSummaries(i, z, list);
    }

    public CompletableFuture<List<TrafficSummary<Conversation>>> getConversationSummaries(Set<String> set, boolean z, List<Filter> list) {
        return this.smartQueryService.getConversationSummaries(set, z, list);
    }

    public CompletableFuture<Table<Directional<Conversation>, Long, Double>> getConversationSeries(Set<String> set, long j, boolean z, List<Filter> list) {
        return this.smartQueryService.getConversationSeries(set, j, z, list);
    }

    public CompletableFuture<Table<Directional<Conversation>, Long, Double>> getTopNConversationSeries(int i, long j, boolean z, List<Filter> list) {
        return this.smartQueryService.getTopNConversationSeries(i, j, z, list);
    }

    public CompletableFuture<List<String>> getHosts(String str, long j, List<Filter> list) {
        return this.smartQueryService.getHosts(str, j, list);
    }

    public CompletableFuture<List<TrafficSummary<Host>>> getTopNHostSummaries(int i, boolean z, List<Filter> list) {
        return this.smartQueryService.getTopNHostSummaries(i, z, list);
    }

    public CompletableFuture<List<TrafficSummary<Host>>> getHostSummaries(Set<String> set, boolean z, List<Filter> list) {
        return this.smartQueryService.getHostSummaries(set, z, list);
    }

    public CompletableFuture<Table<Directional<Host>, Long, Double>> getHostSeries(Set<String> set, long j, boolean z, List<Filter> list) {
        return this.smartQueryService.getHostSeries(set, j, z, list);
    }

    public CompletableFuture<Table<Directional<Host>, Long, Double>> getTopNHostSeries(int i, long j, boolean z, List<Filter> list) {
        return this.smartQueryService.getTopNHostSeries(i, j, z, list);
    }

    public Identity getIdentity() {
        return this.identity;
    }

    public TracerRegistry getTracerRegistry() {
        return this.tracerRegistry;
    }

    public void start() {
        if (this.tracerRegistry == null || this.identity == null) {
            return;
        }
        this.tracerRegistry.init(this.identity.getId());
    }

    private Tracer getTracer() {
        return this.tracerRegistry != null ? this.tracerRegistry.getTracer() : GlobalTracer.get();
    }

    public boolean isEnableFlowForwarding() {
        return this.enableFlowForwarding;
    }

    public void setEnableFlowForwarding(boolean z) {
        this.enableFlowForwarding = z;
    }

    public int getBulkRetryCount() {
        return this.bulkRetryCount;
    }

    public void setBulkRetryCount(int i) {
        this.bulkRetryCount = i;
    }

    public boolean isSkipElasticsearchPersistence() {
        return this.skipElasticsearchPersistence;
    }

    public void setSkipElasticsearchPersistence(boolean z) {
        this.skipElasticsearchPersistence = z;
    }
}
