/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.flows.elastic.agg;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Table;
import io.searchbox.client.JestClient;
import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.MetricAggregation;
import io.searchbox.core.search.aggregation.SumAggregation;
import io.searchbox.core.search.aggregation.TermsAggregation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.opennms.features.jest.client.SearchResultUtils;
import org.opennms.features.jest.client.index.IndexSelector;
import org.opennms.netmgt.flows.api.BytesInOut;
import org.opennms.netmgt.flows.api.Conversation;
import org.opennms.netmgt.flows.api.Directional;
import org.opennms.netmgt.flows.api.Host;
import org.opennms.netmgt.flows.api.TrafficSummary;
import org.opennms.netmgt.flows.elastic.ElasticFlowQueryService;
import org.opennms.netmgt.flows.elastic.ProportionalSumAggregation;
import org.opennms.netmgt.flows.elastic.agg.AggregatedSearchQueryProvider;
import org.opennms.netmgt.flows.elastic.agg.GroupedBy;
import org.opennms.netmgt.flows.elastic.agg.Types;
import org.opennms.netmgt.flows.filter.api.Filter;
import org.opennms.netmgt.flows.filter.api.TimeRangeFilter;

public class AggregatedFlowQueryService
extends ElasticFlowQueryService {
    public static final String INDEX_NAME = "netflow_agg";
    private final AggregatedSearchQueryProvider searchQueryProvider = new AggregatedSearchQueryProvider();

    public AggregatedFlowQueryService(JestClient client, IndexSelector indexSelector) {
        super(client, indexSelector);
    }

    public CompletableFuture<List<TrafficSummary<String>>> getTopNApplicationSummaries(int N, boolean includeOther, List<Filter> filters) {
        return this.getTopNSummary(N, includeOther, filters, GroupedBy.EXPORTER_INTERFACE_APPLICATION, Types.APPLICATION);
    }

    public CompletableFuture<Table<Directional<String>, Long, Double>> getTopNApplicationSeries(int N, long step, boolean includeOther, List<Filter> filters) {
        return this.getTopNSeries(N, step, includeOther, filters, GroupedBy.EXPORTER_INTERFACE_APPLICATION, Types.APPLICATION);
    }

    public CompletableFuture<List<TrafficSummary<Conversation>>> getTopNConversationSummaries(int N, boolean includeOther, List<Filter> filters) {
        return this.getTopNSummary(N, includeOther, filters, GroupedBy.EXPORTER_INTERFACE_CONVERSATION, Types.CONVERSATION);
    }

    public CompletableFuture<Table<Directional<Conversation>, Long, Double>> getTopNConversationSeries(int N, long step, boolean includeOther, List<Filter> filters) {
        return this.getTopNSeries(N, step, includeOther, filters, GroupedBy.EXPORTER_INTERFACE_CONVERSATION, Types.CONVERSATION);
    }

    public CompletableFuture<List<TrafficSummary<Host>>> getTopNHostSummaries(int N, boolean includeOther, List<Filter> filters) {
        return this.getTopNSummary(N, includeOther, filters, GroupedBy.EXPORTER_INTERFACE_HOST, Types.HOST);
    }

    public CompletableFuture<Table<Directional<Host>, Long, Double>> getTopNHostSeries(int N, long step, boolean includeOther, List<Filter> filters) {
        return this.getTopNSeries(N, step, includeOther, filters, GroupedBy.EXPORTER_INTERFACE_HOST, Types.HOST);
    }

    private <T> CompletableFuture<Table<Directional<T>, Long, Double>> getTopNSeries(int N, long step, boolean includeOther, List<Filter> filters, GroupedBy groupedBy, Types.Type<T> type) {
        CompletionStage<Object> seriesFuture;
        TimeRangeFilter timeRangeFilter = (TimeRangeFilter)Filter.find(filters, TimeRangeFilter.class).orElseThrow(() -> new IllegalArgumentException("Time range filter is required to derive time series."));
        ImmutableTable.Builder builder = ImmutableTable.builder();
        if (N > 0) {
            String seriesFromTopNQuery = this.searchQueryProvider.getSeriesFromTopNQuery(N, groupedBy, type.getKey(), step, timeRangeFilter.getStart(), timeRangeFilter.getEnd(), filters);
            seriesFuture = this.searchAsync(seriesFromTopNQuery, timeRangeFilter).thenApply(res -> {
                AggregatedFlowQueryService.toTableFromBuckets(builder, type::toEntity, res);
                return null;
            });
        } else {
            seriesFuture = CompletableFuture.completedFuture(null);
        }
        if (includeOther) {
            String seriesFromTotalQuery = this.searchQueryProvider.getSeriesFromTotalsQuery(groupedBy.getParent(), step, timeRangeFilter.getStart(), timeRangeFilter.getEnd(), filters);
            ImmutableTable.Builder totalsTableBuilder = ImmutableTable.builder();
            CompletionStage totalSeriesFuture = this.searchAsync(seriesFromTotalQuery, timeRangeFilter).thenApply(res -> {
                AggregatedFlowQueryService.toTableFromTotals(totalsTableBuilder, type.getOtherEntity(), res);
                return null;
            });
            seriesFuture = ((CompletableFuture)seriesFuture).thenCombine(totalSeriesFuture, (topN, totals) -> {
                ImmutableTable topNTable = builder.build();
                ImmutableTable totalsTable = totalsTableBuilder.build();
                TreeSet timestamps = new TreeSet();
                timestamps.addAll(topNTable.columnKeySet());
                timestamps.addAll(totalsTable.columnKeySet());
                for (Long ts : timestamps) {
                    ImmutableMap entries = topNTable.column((Object)ts);
                    BytesInOut bytesFromTopN = entries != null ? BytesInOut.sum((ImmutableSet)entries.entrySet()) : new BytesInOut();
                    entries = totalsTable.column((Object)ts);
                    BytesInOut totalBytes = entries != null ? BytesInOut.sum((ImmutableSet)entries.entrySet()) : new BytesInOut();
                    BytesInOut otherBytes = totalBytes.minus(bytesFromTopN);
                    builder.put((Object)new Directional(type.getOtherEntity(), true), (Object)ts, (Object)otherBytes.getBytesIn());
                    builder.put((Object)new Directional(type.getOtherEntity(), false), (Object)ts, (Object)otherBytes.getBytesOut());
                }
                return null;
            });
        }
        return ((CompletableFuture)seriesFuture).thenApply(ignored -> builder.build());
    }

    private static <T> void toTableFromBuckets(ImmutableTable.Builder<Directional<T>, Long, Double> builder, Function<String, T> keyToEntity, SearchResult res) {
        MetricAggregation aggs = res.getAggregations();
        if (aggs == null) {
            return;
        }
        TermsAggregation byKeyAgg = aggs.getTermsAggregation("by_key");
        if (byKeyAgg == null) {
            return;
        }
        for (TermsAggregation.Entry bucket : byKeyAgg.getBuckets()) {
            ProportionalSumAggregation bytesInAgg = (ProportionalSumAggregation)bucket.getAggregation("bytes_in", ProportionalSumAggregation.class);
            for (ProportionalSumAggregation.DateHistogram dateHistogram : bytesInAgg.getBuckets()) {
                builder.put((Object)new Directional(keyToEntity.apply(bucket.getKey()), true), (Object)dateHistogram.getTime(), (Object)dateHistogram.getValue());
            }
            ProportionalSumAggregation bytesOutAgg = (ProportionalSumAggregation)bucket.getAggregation("bytes_out", ProportionalSumAggregation.class);
            for (ProportionalSumAggregation.DateHistogram dateHistogram : bytesOutAgg.getBuckets()) {
                builder.put((Object)new Directional(keyToEntity.apply(bucket.getKey()), false), (Object)dateHistogram.getTime(), (Object)dateHistogram.getValue());
            }
        }
    }

    private static <T> void toTableFromTotals(ImmutableTable.Builder<Directional<T>, Long, Double> builder, T otherEntity, SearchResult res) {
        MetricAggregation aggs = res.getAggregations();
        if (aggs == null) {
            return;
        }
        ProportionalSumAggregation bytesInAgg = (ProportionalSumAggregation)aggs.getAggregation("bytes_in", ProportionalSumAggregation.class);
        for (ProportionalSumAggregation.DateHistogram dateHistogram : bytesInAgg.getBuckets()) {
            builder.put((Object)new Directional(otherEntity, true), (Object)dateHistogram.getTime(), (Object)dateHistogram.getValue());
        }
        ProportionalSumAggregation bytesOutAgg = (ProportionalSumAggregation)aggs.getAggregation("bytes_out", ProportionalSumAggregation.class);
        for (ProportionalSumAggregation.DateHistogram dateHistogram : bytesOutAgg.getBuckets()) {
            builder.put((Object)new Directional(otherEntity, false), (Object)dateHistogram.getTime(), (Object)dateHistogram.getValue());
        }
    }

    private <T> CompletableFuture<List<TrafficSummary<T>>> getTopNSummary(int N, boolean includeOther, List<Filter> filters, GroupedBy groupedBy, Types.Type<T> type) {
        CompletionStage<List<Object>> summaryFutures;
        if (N > 0) {
            String query = this.searchQueryProvider.getTopNQuery(N, groupedBy, type.getKey(), filters);
            summaryFutures = this.searchAsync(query, Filter.find(filters, TimeRangeFilter.class).orElse(null)).thenApply(searchResult -> {
                MetricAggregation aggs = searchResult.getAggregations();
                if (aggs == null) {
                    return Collections.emptyList();
                }
                TermsAggregation byKeyAgg = aggs.getTermsAggregation("by_key");
                if (byKeyAgg == null) {
                    return Collections.emptyList();
                }
                ArrayList<TrafficSummary> trafficSummaries = new ArrayList<TrafficSummary>(N);
                for (TermsAggregation.Entry bucket : byKeyAgg.getBuckets()) {
                    SumAggregation ingress = bucket.getSumAggregation("bytes_ingress");
                    SumAggregation egress = bucket.getSumAggregation("bytes_egress");
                    trafficSummaries.add(TrafficSummary.builder().withEntity(type.toEntity(bucket.getKeyAsString())).withBytesIn(ingress.getSum().longValue()).withBytesOut(egress.getSum().longValue()).build());
                }
                return trafficSummaries;
            });
        } else {
            summaryFutures = CompletableFuture.completedFuture(Collections.emptyList());
        }
        if (!includeOther) {
            return summaryFutures;
        }
        CompletableFuture<TrafficSummary<T>> totalTrafficFuture = this.getOtherTraffic(groupedBy.getParent(), type.getOtherEntity(), filters);
        return summaryFutures.thenCombine(totalTrafficFuture, (topK, total) -> {
            BytesInOut totalBytes = total.getBytesInOut();
            BytesInOut bytesFromTopK = BytesInOut.sum((List)topK);
            BytesInOut otherBytes = totalBytes.minus(bytesFromTopK);
            ArrayList<TrafficSummary> newTopK = new ArrayList<TrafficSummary>((Collection<TrafficSummary>)topK);
            newTopK.add(TrafficSummary.builder().withEntity(type.getOtherEntity()).withBytesIn(otherBytes.getBytesIn()).withBytesOut(otherBytes.getBytesOut()).build());
            return newTopK;
        });
    }

    private <T> CompletableFuture<TrafficSummary<T>> getOtherTraffic(GroupedBy groupedBy, T entity, List<Filter> filters) {
        String query = this.searchQueryProvider.getSumQuery(groupedBy, filters);
        return this.searchAsync(query, Filter.find(filters, TimeRangeFilter.class).orElse(null)).thenApply(searchResult -> {
            MetricAggregation aggs = searchResult.getAggregations();
            SumAggregation ingress = aggs.getSumAggregation("bytes_ingress");
            SumAggregation egress = aggs.getSumAggregation("bytes_egress");
            return TrafficSummary.builder().withEntity(entity).withBytesIn(ingress != null ? ingress.getSum().longValue() : 0L).withBytesOut(egress != null ? egress.getSum().longValue() : 0L).build();
        });
    }

    public CompletableFuture<Long> getFlowCount(List<Filter> filters) {
        String query = this.searchQueryProvider.getFlowCountQuery(filters);
        return this.searchAsync(query, Filter.find(filters, TimeRangeFilter.class).orElse(null)).thenApply(SearchResultUtils::getTotal);
    }

    public CompletableFuture<List<String>> getApplications(String matchingPrefix, long limit, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating applications is not supported.");
    }

    public CompletableFuture<List<String>> getConversations(String locationPattern, String protocolPattern, String lowerIPPattern, String upperIPPattern, String applicationPattern, long limit, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating conversations is not supported.");
    }

    public CompletableFuture<List<String>> getHosts(String regex, long limit, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating hosts is not supported.");
    }

    public CompletableFuture<List<TrafficSummary<String>>> getApplicationSummaries(Set<String> applications, boolean includeOther, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating specific application summaries is not supported.");
    }

    public CompletableFuture<Table<Directional<String>, Long, Double>> getApplicationSeries(Set<String> applications, long step, boolean includeOther, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating specific application series is not supported.");
    }

    public CompletableFuture<List<TrafficSummary<Conversation>>> getConversationSummaries(Set<String> conversations, boolean includeOther, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating specific conversation summaries is not supported.");
    }

    public CompletableFuture<Table<Directional<Conversation>, Long, Double>> getConversationSeries(Set<String> conversations, long step, boolean includeOther, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating specific conversation series is not supported.");
    }

    public CompletableFuture<List<TrafficSummary<Host>>> getHostSummaries(Set<String> hosts, boolean includeOther, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating specific host summaries is not supported.");
    }

    public CompletableFuture<Table<Directional<Host>, Long, Double>> getHostSeries(Set<String> hosts, long step, boolean includeOther, List<Filter> filters) {
        throw new UnsupportedOperationException("Enumerating specific conversation series is not supported.");
    }
}

