/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.flows.elastic;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import io.searchbox.action.Action;
import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.client.JestResultHandler;
import io.searchbox.core.Bulk;
import io.searchbox.core.Index;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.MetricAggregation;
import io.searchbox.core.search.aggregation.TermsAggregation;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.opennms.netmgt.dao.api.NodeDao;
import org.opennms.netmgt.dao.api.SnmpInterfaceDao;
import org.opennms.netmgt.flows.api.Conversation;
import org.opennms.netmgt.flows.api.ConversationKey;
import org.opennms.netmgt.flows.api.Directional;
import org.opennms.netmgt.flows.api.Flow;
import org.opennms.netmgt.flows.api.FlowException;
import org.opennms.netmgt.flows.api.FlowRepository;
import org.opennms.netmgt.flows.api.FlowSource;
import org.opennms.netmgt.flows.api.TrafficSummary;
import org.opennms.netmgt.flows.classification.ClassificationEngine;
import org.opennms.netmgt.flows.classification.ClassificationRequest;
import org.opennms.netmgt.flows.classification.persistence.api.Protocol;
import org.opennms.netmgt.flows.classification.persistence.api.Protocols;
import org.opennms.netmgt.flows.elastic.ConversationKeyUtils;
import org.opennms.netmgt.flows.elastic.Direction;
import org.opennms.netmgt.flows.elastic.DocumentEnricher;
import org.opennms.netmgt.flows.elastic.FlowDocument;
import org.opennms.netmgt.flows.elastic.PersistenceException;
import org.opennms.netmgt.flows.elastic.ProportionalSumAggregation;
import org.opennms.netmgt.flows.elastic.SearchQueryProvider;
import org.opennms.netmgt.flows.elastic.TableUtils;
import org.opennms.netmgt.flows.elastic.index.IndexSelector;
import org.opennms.netmgt.flows.filter.api.Filter;
import org.opennms.netmgt.flows.filter.api.TimeRangeFilter;
import org.opennms.netmgt.model.OnmsNode;
import org.opennms.netmgt.model.OnmsSnmpInterface;
import org.opennms.plugins.elasticsearch.rest.bulk.BulkException;
import org.opennms.plugins.elasticsearch.rest.bulk.BulkRequest;
import org.opennms.plugins.elasticsearch.rest.bulk.BulkWrapper;
import org.opennms.plugins.elasticsearch.rest.index.IndexStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionOperations;

public class ElasticFlowRepository
implements FlowRepository {
    public static final String OTHER_APPLICATION_NAME = "Other";
    public static final String UNKNOWN_APPLICATION_NAME = "Unknown";
    private static final Logger LOG = LoggerFactory.getLogger(ElasticFlowRepository.class);
    private static final String TYPE = "netflow";
    private final JestClient client;
    private final IndexStrategy indexStrategy;
    private final DocumentEnricher documentEnricher;
    private final SearchQueryProvider searchQueryProvider = new SearchQueryProvider();
    private final ClassificationEngine classificationEngine;
    private final int bulkRetryCount;
    private final Meter flowsPersistedMeter;
    private final Timer logConversionTimer;
    private final Timer logEnrichementTimer;
    private final Timer logPersistingTimer;
    private final Timer logMarkingTimer;
    private final Histogram flowsPerLog;
    private final IndexSelector indexSelector;
    private final TransactionOperations transactionOperations;
    private final NodeDao nodeDao;
    private final SnmpInterfaceDao snmpInterfaceDao;
    private final ConcurrentMap<Integer, Set<Integer>> markerCache = Maps.newConcurrentMap();

    public ElasticFlowRepository(MetricRegistry metricRegistry, JestClient jestClient, IndexStrategy indexStrategy, DocumentEnricher documentEnricher, ClassificationEngine classificationEngine, TransactionOperations transactionOperations, NodeDao nodeDao, SnmpInterfaceDao snmpInterfaceDao, int bulkRetryCount, long maxFlowDurationMs) {
        this.client = Objects.requireNonNull(jestClient);
        this.indexStrategy = Objects.requireNonNull(indexStrategy);
        this.documentEnricher = Objects.requireNonNull(documentEnricher);
        this.classificationEngine = Objects.requireNonNull(classificationEngine);
        this.transactionOperations = Objects.requireNonNull(transactionOperations);
        this.nodeDao = Objects.requireNonNull(nodeDao);
        this.snmpInterfaceDao = Objects.requireNonNull(snmpInterfaceDao);
        this.bulkRetryCount = bulkRetryCount;
        this.indexSelector = new IndexSelector(TYPE, indexStrategy, maxFlowDurationMs);
        this.flowsPersistedMeter = metricRegistry.meter("flowsPersisted");
        this.logConversionTimer = metricRegistry.timer("logConversion");
        this.logEnrichementTimer = metricRegistry.timer("logEnrichment");
        this.logPersistingTimer = metricRegistry.timer("logPersisting");
        this.logMarkingTimer = metricRegistry.timer("logMarking");
        this.flowsPerLog = metricRegistry.histogram("flowsPerLog");
        this.transactionOperations.execute(cb -> {
            for (OnmsNode node : this.nodeDao.findAllHavingFlows()) {
                this.markerCache.put(node.getId(), this.snmpInterfaceDao.findAllHavingFlows(node.getId()).stream().map(OnmsSnmpInterface::getIfIndex).collect(Collectors.toCollection(Sets::newConcurrentHashSet)));
            }
            return null;
        });
    }

    public void persist(Collection<Flow> flows, FlowSource source) throws FlowException {
        List<FlowDocument> documents;
        LOG.debug("Converting {} flows from {} to flow documents.", (Object)flows.size(), (Object)source);
        try (Timer.Context ctx = this.logConversionTimer.time();){
            documents = flows.stream().map(FlowDocument::from).collect(Collectors.toList());
        }
        this.enrichAndPersistFlows(documents, source);
    }

    public void enrichAndPersistFlows(List<FlowDocument> flowDocuments, FlowSource source) throws FlowException {
        this.flowsPerLog.update(flowDocuments.size());
        if (flowDocuments.isEmpty()) {
            LOG.info("Received empty flows. Nothing to do.");
            return;
        }
        LOG.debug("Enriching {} flow documents.", (Object)flowDocuments.size());
        try (Timer.Context ctx = this.logEnrichementTimer.time();){
            this.documentEnricher.enrich(flowDocuments, source);
        }
        LOG.debug("Persisting {} flow documents.", (Object)flowDocuments.size());
        ctx = this.logPersistingTimer.time();
        var4_4 = null;
        try {
            BulkRequest bulkRequest = new BulkRequest(this.client, flowDocuments, documents -> {
                Bulk.Builder bulkBuilder = new Bulk.Builder();
                for (FlowDocument flowDocument : documents) {
                    String index = this.indexStrategy.getIndex(TYPE, (TemporalAccessor)Instant.ofEpochMilli(flowDocument.getTimestamp()));
                    Index.Builder indexBuilder = (Index.Builder)((Index.Builder)new Index.Builder((Object)flowDocument).index(index)).type(TYPE);
                    bulkBuilder.addAction((BulkableAction)indexBuilder.build());
                }
                return new BulkWrapper(bulkBuilder);
            }, this.bulkRetryCount);
            try {
                bulkRequest.execute();
            }
            catch (BulkException ex) {
                throw new PersistenceException(ex.getMessage(), ex.getBulkResult().getFailedDocuments());
            }
            catch (IOException ex) {
                LOG.error("An error occurred while executing the given request: {}", (Object)ex.getMessage(), (Object)ex);
                throw new FlowException(ex.getMessage(), (Throwable)ex);
            }
            this.flowsPersistedMeter.mark((long)flowDocuments.size());
        }
        catch (Throwable bulkRequest) {
            var4_4 = bulkRequest;
            throw bulkRequest;
        }
        finally {
            if (ctx != null) {
                if (var4_4 != null) {
                    try {
                        ctx.close();
                    }
                    catch (Throwable bulkRequest) {
                        var4_4.addSuppressed(bulkRequest);
                    }
                } else {
                    ctx.close();
                }
            }
        }
        ctx = this.logMarkingTimer.time();
        var4_4 = null;
        try {
            ArrayList nodesToUpdate = Lists.newArrayListWithExpectedSize((int)flowDocuments.size());
            HashMap interfacesToUpdate = Maps.newHashMap();
            for (FlowDocument flow : flowDocuments) {
                if (flow.getNodeExporter() == null || flow.getNodeExporter().getNodeId() == null) continue;
                Integer nodeId = flow.getNodeExporter().getNodeId();
                Set ifaceMarkerCache = (Set)this.markerCache.get(nodeId);
                if (ifaceMarkerCache == null) {
                    ifaceMarkerCache = Sets.newConcurrentHashSet();
                    this.markerCache.put(nodeId, ifaceMarkerCache);
                    nodesToUpdate.add(nodeId);
                }
                if (flow.getInputSnmp() != null && flow.getInputSnmp() != 0 && !ifaceMarkerCache.contains(flow.getInputSnmp())) {
                    ifaceMarkerCache.add(flow.getInputSnmp());
                    interfacesToUpdate.computeIfAbsent(nodeId, k -> Lists.newArrayList()).add(flow.getInputSnmp());
                }
                if (flow.getOutputSnmp() == null || flow.getOutputSnmp() == 0 || ifaceMarkerCache.contains(flow.getOutputSnmp())) continue;
                ifaceMarkerCache.add(flow.getOutputSnmp());
                interfacesToUpdate.computeIfAbsent(nodeId, k -> Lists.newArrayList()).add(flow.getOutputSnmp());
            }
            if (!nodesToUpdate.isEmpty() || !interfacesToUpdate.isEmpty()) {
                this.transactionOperations.execute(cb -> {
                    if (!nodesToUpdate.isEmpty()) {
                        this.nodeDao.markHavingFlows((Collection)nodesToUpdate);
                    }
                    for (Map.Entry e : interfacesToUpdate.entrySet()) {
                        this.snmpInterfaceDao.markHavingFlows((Integer)e.getKey(), (Collection)e.getValue());
                    }
                    return null;
                });
            }
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (ctx != null) {
                if (var4_4 != null) {
                    try {
                        ctx.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    ctx.close();
                }
            }
        }
    }

    public CompletableFuture<Long> getFlowCount(List<Filter> filters) {
        String query = this.searchQueryProvider.getFlowCountQuery(filters);
        return this.searchAsync(query, ElasticFlowRepository.extractTimeRangeFilter(filters)).thenApply(SearchResult::getTotal);
    }

    public CompletableFuture<List<TrafficSummary<String>>> getTopNApplications(int N, boolean includeOther, List<Filter> filters) {
        return this.getTotalBytesFromTopN(N, "netflow.application", UNKNOWN_APPLICATION_NAME, includeOther, filters);
    }

    public CompletableFuture<Table<Directional<String>, Long, Double>> getTopNApplicationsSeries(int N, long step, boolean includeOther, List<Filter> filters) {
        return this.getSeriesFromTopN(N, step, "netflow.application", UNKNOWN_APPLICATION_NAME, includeOther, filters).thenApply(res -> ElasticFlowRepository.mapTable((Table<Directional<String>, Long, Double>)res, s -> s));
    }

    public CompletableFuture<List<TrafficSummary<Conversation>>> getTopNConversations(int N, List<Filter> filters) {
        return this.getTotalBytesFromTopN(N, "netflow.convo_key", null, false, filters).thenApply(res -> res.stream().map(summary -> {
            ConversationKey convo = ConversationKeyUtils.fromJsonString((String)summary.getEntity());
            Conversation conversation = new Conversation(convo, this.classify(convo));
            TrafficSummary out = new TrafficSummary((Object)conversation);
            out.setBytesIn(summary.getBytesIn());
            out.setBytesOut(summary.getBytesOut());
            return out;
        }).collect(Collectors.toList()));
    }

    public CompletableFuture<Table<Directional<Conversation>, Long, Double>> getTopNConversationsSeries(int N, long step, List<Filter> filters) {
        return this.getSeriesFromTopN(N, step, "netflow.convo_key", null, false, filters).thenApply(res -> ElasticFlowRepository.mapTable((Table<Directional<String>, Long, Double>)res, key -> {
            ConversationKey convo = ConversationKeyUtils.fromJsonString(key);
            String application = this.classify(convo);
            return new Conversation(convo, application);
        }));
    }

    private CompletableFuture<List<String>> getTopN(int N, String groupByTerm, String keyForMissingTerm, List<Filter> filters) {
        if (N < 1) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        int multiplier = 2;
        String query = this.searchQueryProvider.getTopNQuery(2 * N, groupByTerm, keyForMissingTerm, filters);
        return this.searchAsync(query, ElasticFlowRepository.extractTimeRangeFilter(filters)).thenApply(res -> {
            TermsAggregation groupedBy = res.getAggregations().getTermsAggregation("grouped_by");
            if (groupedBy == null) {
                return Collections.emptyList();
            }
            return groupedBy.getBuckets().stream().map(TermsAggregation.Entry::getKey).limit(N).collect(Collectors.toList());
        });
    }

    private CompletableFuture<Table<Directional<String>, Long, Double>> getSeriesFromTopN(List<String> topN, long step, String groupByTerm, String keyForMissingTerm, boolean includeOther, List<Filter> filters) {
        boolean missingTermIncludedInTopN;
        CompletionStage<Object> seriesFuture;
        TimeRangeFilter timeRangeFilter = ElasticFlowRepository.getRequiredTimeRangeFilter(filters);
        ImmutableTable.Builder builder = ImmutableTable.builder();
        if (topN.size() < 1) {
            seriesFuture = CompletableFuture.completedFuture(null);
        } else {
            String seriesFromTopNQuery = this.searchQueryProvider.getSeriesFromTopNQuery(topN, step, timeRangeFilter.getStart(), timeRangeFilter.getEnd(), groupByTerm, filters);
            seriesFuture = this.searchAsync(seriesFromTopNQuery, timeRangeFilter).thenApply(res -> {
                ElasticFlowRepository.toTable((ImmutableTable.Builder<Directional<String>, Long, Double>)builder, res);
                return null;
            });
        }
        boolean bl = missingTermIncludedInTopN = keyForMissingTerm != null && topN.contains(keyForMissingTerm);
        if (missingTermIncludedInTopN) {
            String seriesFromMissingQuery = this.searchQueryProvider.getSeriesFromMissingQuery(step, timeRangeFilter.getStart(), timeRangeFilter.getEnd(), groupByTerm, keyForMissingTerm, filters);
            seriesFuture = ((CompletableFuture)seriesFuture).thenCombine(this.searchAsync(seriesFromMissingQuery, ElasticFlowRepository.extractTimeRangeFilter(filters)), (ignored, res) -> {
                ElasticFlowRepository.toTable((ImmutableTable.Builder<Directional<String>, Long, Double>)builder, res);
                return null;
            });
        }
        if (includeOther) {
            String seriesFromOthersQuery = this.searchQueryProvider.getSeriesFromOthersQuery(topN, step, timeRangeFilter.getStart(), timeRangeFilter.getEnd(), groupByTerm, missingTermIncludedInTopN, filters);
            seriesFuture = ((CompletableFuture)seriesFuture).thenCombine(this.searchAsync(seriesFromOthersQuery, timeRangeFilter), (ignored, res) -> {
                MetricAggregation aggs = res.getAggregations();
                TermsAggregation directionAgg = aggs.getTermsAggregation("direction");
                for (TermsAggregation.Entry directionBucket : directionAgg.getBuckets()) {
                    boolean isIngress = ElasticFlowRepository.isIngress(directionBucket);
                    ProportionalSumAggregation sumAgg = (ProportionalSumAggregation)directionBucket.getAggregation("bytes", ProportionalSumAggregation.class);
                    for (ProportionalSumAggregation.DateHistogram dateHistogram : sumAgg.getBuckets()) {
                        builder.put((Object)new Directional((Object)OTHER_APPLICATION_NAME, isIngress), (Object)dateHistogram.getTime(), (Object)dateHistogram.getValue());
                    }
                }
                return null;
            });
        }
        return ((CompletableFuture)seriesFuture).thenApply(ignored -> TableUtils.sortTableByRowKeys(builder.build(), topN));
    }

    private static void toTable(ImmutableTable.Builder<Directional<String>, Long, Double> builder, SearchResult res) {
        MetricAggregation aggs = res.getAggregations();
        TermsAggregation groupedBy = aggs.getTermsAggregation("grouped_by");
        for (TermsAggregation.Entry bucket : groupedBy.getBuckets()) {
            TermsAggregation directionAgg = bucket.getTermsAggregation("direction");
            for (TermsAggregation.Entry directionBucket : directionAgg.getBuckets()) {
                boolean isIngress = ElasticFlowRepository.isIngress(directionBucket);
                ProportionalSumAggregation sumAgg = (ProportionalSumAggregation)directionBucket.getAggregation("bytes", ProportionalSumAggregation.class);
                for (ProportionalSumAggregation.DateHistogram dateHistogram : sumAgg.getBuckets()) {
                    builder.put((Object)new Directional((Object)bucket.getKey(), isIngress), (Object)dateHistogram.getTime(), (Object)dateHistogram.getValue());
                }
            }
        }
    }

    private CompletableFuture<Table<Directional<String>, Long, Double>> getSeriesFromTopN(int N, long step, String groupByTerm, String keyForMissingTerm, boolean includeOther, List<Filter> filters) {
        return this.getTopN(N, groupByTerm, keyForMissingTerm, filters).thenCompose(topN -> this.getSeriesFromTopN((List<String>)topN, step, groupByTerm, keyForMissingTerm, includeOther, filters));
    }

    private CompletableFuture<List<TrafficSummary<String>>> getTotalBytesFromTopN(List<String> topN, String groupByTerm, String keyForMissingTerm, boolean includeOther, List<Filter> filters) {
        boolean missingTermIncludedInTopN;
        CompletionStage summariesFuture;
        TimeRangeFilter timeRangeFilter = ElasticFlowRepository.getRequiredTimeRangeFilter(filters);
        long start = timeRangeFilter.getStart();
        long end = Math.max(timeRangeFilter.getStart(), timeRangeFilter.getEnd() - 1L);
        long step = timeRangeFilter.getEnd() - timeRangeFilter.getStart();
        if (topN.size() < 1) {
            summariesFuture = CompletableFuture.completedFuture(new LinkedHashMap());
        } else {
            String bytesFromTopNQuery = this.searchQueryProvider.getSeriesFromTopNQuery(topN, step, start, end, groupByTerm, filters);
            summariesFuture = this.searchAsync(bytesFromTopNQuery, timeRangeFilter).thenApply(ElasticFlowRepository::toTrafficSummaries);
        }
        boolean bl = missingTermIncludedInTopN = keyForMissingTerm != null && topN.contains(keyForMissingTerm);
        if (missingTermIncludedInTopN) {
            String bytesFromMissingQuery = this.searchQueryProvider.getSeriesFromMissingQuery(step, start, end, groupByTerm, keyForMissingTerm, filters);
            summariesFuture = ((CompletableFuture)summariesFuture).thenCombine(this.searchAsync(bytesFromMissingQuery, timeRangeFilter), (summaries, results) -> {
                summaries.putAll(ElasticFlowRepository.toTrafficSummaries(results));
                return summaries;
            });
        }
        if (includeOther) {
            String bytesFromOthersQuery = this.searchQueryProvider.getSeriesFromOthersQuery(topN, step, start, end, groupByTerm, missingTermIncludedInTopN, filters);
            summariesFuture = ((CompletableFuture)summariesFuture).thenCombine(this.searchAsync(bytesFromOthersQuery, timeRangeFilter), (summaries, results) -> {
                MetricAggregation aggs = results.getAggregations();
                TrafficSummary trafficSummary = new TrafficSummary((Object)OTHER_APPLICATION_NAME);
                TermsAggregation directionAgg = aggs.getTermsAggregation("direction");
                for (TermsAggregation.Entry directionBucket : directionAgg.getBuckets()) {
                    boolean isIngress = ElasticFlowRepository.isIngress(directionBucket);
                    ProportionalSumAggregation sumAgg = (ProportionalSumAggregation)directionBucket.getAggregation("bytes", ProportionalSumAggregation.class);
                    List<ProportionalSumAggregation.DateHistogram> sumBuckets = sumAgg.getBuckets();
                    if (sumBuckets.size() != 1) {
                        throw new IllegalStateException("Expected 1 bucket, but got: " + sumBuckets);
                    }
                    Double sum = sumBuckets.iterator().next().getValue();
                    if (!isIngress) {
                        trafficSummary.setBytesOut(sum.longValue());
                        continue;
                    }
                    trafficSummary.setBytesIn(sum.longValue());
                }
                summaries.put(OTHER_APPLICATION_NAME, trafficSummary);
                return summaries;
            });
        }
        return ((CompletableFuture)summariesFuture).thenApply(summaries -> {
            ArrayList<Object> topNRes = new ArrayList<Object>(topN.size());
            for (String topNEntry : topN) {
                TrafficSummary summary = (TrafficSummary)summaries.remove(topNEntry);
                if (summary == null) continue;
                topNRes.add(summary);
            }
            topNRes.addAll(summaries.values());
            return topNRes;
        });
    }

    private static Map<String, TrafficSummary<String>> toTrafficSummaries(SearchResult res) {
        LinkedHashMap<String, TrafficSummary<String>> summaries = new LinkedHashMap<String, TrafficSummary<String>>();
        MetricAggregation aggs = res.getAggregations();
        TermsAggregation groupedBy = aggs.getTermsAggregation("grouped_by");
        for (TermsAggregation.Entry bucket : groupedBy.getBuckets()) {
            TrafficSummary trafficSummary = new TrafficSummary((Object)bucket.getKey());
            TermsAggregation directionAgg = bucket.getTermsAggregation("direction");
            for (TermsAggregation.Entry directionBucket : directionAgg.getBuckets()) {
                boolean isIngress = ElasticFlowRepository.isIngress(directionBucket);
                ProportionalSumAggregation sumAgg = (ProportionalSumAggregation)directionBucket.getAggregation("bytes", ProportionalSumAggregation.class);
                List<ProportionalSumAggregation.DateHistogram> sumBuckets = sumAgg.getBuckets();
                if (sumBuckets.size() != 1) {
                    throw new IllegalStateException("Expected 1 bucket, but got: " + sumBuckets);
                }
                Double sum = sumBuckets.iterator().next().getValue();
                if (!isIngress) {
                    trafficSummary.setBytesOut(sum.longValue());
                    continue;
                }
                trafficSummary.setBytesIn(sum.longValue());
            }
            summaries.put(bucket.getKey(), (TrafficSummary<String>)trafficSummary);
        }
        return summaries;
    }

    private CompletableFuture<List<TrafficSummary<String>>> getTotalBytesFromTopN(int N, String groupByTerm, String keyForMissingTerm, boolean includeOther, List<Filter> filters) {
        return this.getTopN(N, groupByTerm, keyForMissingTerm, filters).thenCompose(topN -> this.getTotalBytesFromTopN((List<String>)topN, groupByTerm, keyForMissingTerm, includeOther, filters));
    }

    private String classify(ConversationKey convo) {
        ClassificationRequest request = new ClassificationRequest();
        request.setSrcAddress(convo.getSrcIp());
        request.setSrcPort(convo.getSrcPort().intValue());
        request.setDstAddress(convo.getSrcIp());
        request.setDstPort(convo.getDstPort().intValue());
        Protocol protocol = Protocols.getProtocol((int)convo.getProtocol());
        if (protocol == null) {
            protocol = new Protocol(convo.getProtocol().intValue(), null, null);
        }
        request.setProtocol(protocol);
        request.setLocation(convo.getLocation());
        String application = this.classificationEngine.classify(request);
        if (application != null) {
            return application;
        }
        request.setSrcAddress(convo.getDstIp());
        request.setSrcPort(convo.getDstPort().intValue());
        request.setDstAddress(convo.getSrcIp());
        request.setDstPort(convo.getSrcPort().intValue());
        return this.classificationEngine.classify(request);
    }

    private CompletableFuture<SearchResult> searchAsync(String query, TimeRangeFilter timeRangeFilter) {
        Search.Builder builder = (Search.Builder)new Search.Builder(query).addType(TYPE);
        if (timeRangeFilter != null) {
            List<String> indices = this.indexSelector.getIndexNames(timeRangeFilter);
            builder.addIndices(indices);
            builder.setParameter("ignore_unavailable", (Object)"true");
            LOG.debug("Executing asynchronous query on {}: {}", indices, (Object)query);
        } else {
            LOG.debug("Executing asynchronous query on all indices: {}", (Object)query);
        }
        return this.executeAsync((Action)builder.build());
    }

    private <T extends JestResult> CompletableFuture<T> executeAsync(Action<T> action) {
        final CompletableFuture future = new CompletableFuture();
        this.client.executeAsync(action, new JestResultHandler<T>(){

            public void completed(T result) {
                if (!result.isSucceeded()) {
                    future.completeExceptionally(new Exception(result.getErrorMessage()));
                } else {
                    future.complete(result);
                }
            }

            public void failed(Exception ex) {
                future.completeExceptionally(ex);
            }
        });
        return future;
    }

    private static <T> Table<Directional<T>, Long, Double> mapTable(Table<Directional<String>, Long, Double> source, Function<String, T> fn) {
        ImmutableTable.Builder target = ImmutableTable.builder();
        Set columnKeys = source.columnKeySet();
        for (Directional sourceRowKey : source.rowKeySet()) {
            Directional targetRowKey = new Directional(fn.apply((String)sourceRowKey.getValue()), sourceRowKey.isIngress());
            for (Long columnKey : columnKeys) {
                Double value = (Double)source.get((Object)sourceRowKey, (Object)columnKey);
                if (value == null) {
                    value = Double.NaN;
                }
                target.put((Object)targetRowKey, (Object)columnKey, (Object)value);
            }
        }
        return target.build();
    }

    private static TimeRangeFilter getRequiredTimeRangeFilter(Collection<Filter> filters) {
        TimeRangeFilter filter = ElasticFlowRepository.extractTimeRangeFilter(filters);
        if (filter == null) {
            throw new IllegalArgumentException("Time range is required.");
        }
        return filter;
    }

    private static TimeRangeFilter extractTimeRangeFilter(Collection<Filter> filters) {
        return filters.stream().filter(f -> f instanceof TimeRangeFilter).map(f -> (TimeRangeFilter)f).findFirst().orElse(null);
    }

    private static boolean isIngress(TermsAggregation.Entry entry) {
        String directionAsString = entry.getKeyAsString();
        if (Direction.INGRESS.name().equalsIgnoreCase(directionAsString)) {
            return true;
        }
        if (Direction.EGRESS.name().equalsIgnoreCase(directionAsString)) {
            return false;
        }
        throw new IllegalArgumentException("Unknown direction value: " + directionAsString);
    }
}

