package org.opennms.netmgt.telemetry.protocols.flows;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Strings;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import org.opennms.core.rpc.utils.mate.ContextKey;
import org.opennms.netmgt.flows.api.Converter;
import org.opennms.netmgt.flows.api.DetailedFlowException;
import org.opennms.netmgt.flows.api.FlowException;
import org.opennms.netmgt.flows.api.FlowRepository;
import org.opennms.netmgt.flows.api.FlowSource;
import org.opennms.netmgt.flows.api.UnrecoverableFlowException;
import org.opennms.netmgt.telemetry.api.adapter.Adapter;
import org.opennms.netmgt.telemetry.api.adapter.TelemetryMessageLog;
import org.opennms.netmgt.telemetry.api.adapter.TelemetryMessageLogEntry;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/netmgt/telemetry/protocols/flows/AbstractFlowAdapter.class */
public abstract class AbstractFlowAdapter<P> implements Adapter {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlowAdapter.class);
    private final FlowRepository flowRepository;
    private final Converter<P> converter;
    private String metaDataNodeLookup;
    private ContextKey contextKey;
    private final Timer logParsingTimer;
    private final Histogram packetsPerLogHistogram;

    public AbstractFlowAdapter(String str, MetricRegistry metricRegistry, FlowRepository flowRepository, Converter<P> converter) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(metricRegistry);
        this.flowRepository = (FlowRepository) Objects.requireNonNull(flowRepository);
        this.converter = (Converter) Objects.requireNonNull(converter);
        this.logParsingTimer = metricRegistry.timer(MetricRegistry.name("adapters", new String[]{str, "logParsing"}));
        this.packetsPerLogHistogram = metricRegistry.histogram(MetricRegistry.name("adapters", new String[]{str, "packetsPerLog"}));
    }

    public void setConfig(AdapterDefinition adapterDefinition) {
    }

    public void handleMessageLog(TelemetryMessageLog telemetryMessageLog) {
        LOG.debug("Received {} telemetry messages", Integer.valueOf(telemetryMessageLog.getMessageList().size()));
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        Timer.Context time = this.logParsingTimer.time();
        Throwable th = null;
        try {
            try {
                for (TelemetryMessageLogEntry telemetryMessageLogEntry : telemetryMessageLog.getMessageList()) {
                    LOG.trace("Parsing packet: {}", telemetryMessageLogEntry);
                    P parse = parse(telemetryMessageLogEntry);
                    if (parse != null) {
                        linkedList.add(parse);
                        linkedList2.addAll(this.converter.convert(parse));
                    }
                }
                this.packetsPerLogHistogram.update(linkedList.size());
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        time.close();
                    }
                }
                try {
                    LOG.debug("Persisting {} packets, {} flows.", Integer.valueOf(linkedList.size()), Integer.valueOf(linkedList2.size()));
                    this.flowRepository.persist(linkedList2, new FlowSource(telemetryMessageLog.getLocation(), telemetryMessageLog.getSourceAddress(), this.contextKey));
                } catch (DetailedFlowException e) {
                    LOG.error("Error while persisting flows: {}", e.getMessage(), e);
                    Iterator it = e.getDetailedLogMessages().iterator();
                    while (it.hasNext()) {
                        LOG.error((String) it.next());
                    }
                } catch (FlowException e2) {
                    LOG.error("Error while persisting flows: {}", e2.getMessage(), e2);
                } catch (UnrecoverableFlowException e3) {
                    LOG.error("Error while persisting flows. Cannot recover: {}. {} messages are lost.", new Object[]{e3.getMessage(), Integer.valueOf(telemetryMessageLog.getMessageList().size()), e3});
                    return;
                }
                LOG.debug("Completed processing {} telemetry messages.", Integer.valueOf(telemetryMessageLog.getMessageList().size()));
            } finally {
            }
        } catch (Throwable th3) {
            if (time != null) {
                if (th != null) {
                    try {
                        time.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    time.close();
                }
            }
            throw th3;
        }
    }

    protected abstract P parse(TelemetryMessageLogEntry telemetryMessageLogEntry);

    public void destroy() {
    }

    public String getMetaDataNodeLookup() {
        return this.metaDataNodeLookup;
    }

    public void setMetaDataNodeLookup(String str) {
        this.metaDataNodeLookup = str;
        if (Strings.isNullOrEmpty(this.metaDataNodeLookup)) {
            this.contextKey = null;
        } else {
            this.contextKey = new ContextKey(str);
        }
    }
}
