package org.opennms.netmgt.telemetry.protocols.sflow.parser;

import io.netty.buffer.ByteBuf;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.bson.BsonBinaryWriter;
import org.bson.BsonWriter;
import org.bson.io.BasicOutputBuffer;
import org.opennms.core.concurrent.LogPreservingThreadFactory;
import org.opennms.core.ipc.sink.api.AsyncDispatcher;
import org.opennms.netmgt.dnsresolver.api.DnsResolver;
import org.opennms.netmgt.telemetry.api.receiver.TelemetryMessage;
import org.opennms.netmgt.telemetry.listeners.Dispatchable;
import org.opennms.netmgt.telemetry.listeners.UdpParser;
import org.opennms.netmgt.telemetry.listeners.utils.BufferUtils;
import org.opennms.netmgt.telemetry.protocols.sflow.parser.proto.flows.DatagramVersion;
import org.opennms.netmgt.telemetry.protocols.sflow.parser.proto.flows.SampleDatagram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/netmgt/telemetry/protocols/sflow/parser/SFlowUdpParser.class */
public class SFlowUdpParser implements UdpParser, Dispatchable {
    private static final Logger LOG = LoggerFactory.getLogger(SFlowUdpParser.class);
    private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private final ThreadFactory threadFactory;
    private final String name;
    private final AsyncDispatcher<TelemetryMessage> dispatcher;
    private final SampleDatagramEnricher enricher;
    private ExecutorService executor;
    private final ThreadLocal<Boolean> isParserThread = new ThreadLocal<>();
    private int threads = DEFAULT_NUM_THREADS;
    private boolean dnsLookupsEnabled = true;

    public SFlowUdpParser(String str, AsyncDispatcher<TelemetryMessage> asyncDispatcher, DnsResolver dnsResolver) {
        this.name = (String) Objects.requireNonNull(str);
        this.dispatcher = (AsyncDispatcher) Objects.requireNonNull(asyncDispatcher);
        final LogPreservingThreadFactory logPreservingThreadFactory = new LogPreservingThreadFactory("Telemetryd-sFlow-" + str, Integer.MAX_VALUE);
        this.threadFactory = new ThreadFactory() { // from class: org.opennms.netmgt.telemetry.protocols.sflow.parser.SFlowUdpParser.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return logPreservingThreadFactory.newThread(() -> {
                    SFlowUdpParser.this.isParserThread.set(true);
                    runnable.run();
                });
            }
        };
        this.enricher = new SampleDatagramEnricher(dnsResolver, getDnsLookupsEnabled());
    }

    public boolean handles(ByteBuf byteBuf) {
        return BufferUtils.uint32(byteBuf) == ((long) DatagramVersion.VERSION5.value);
    }

    public CompletableFuture<?> parse(ByteBuf byteBuf, InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2) throws Exception {
        SampleDatagram sampleDatagram = new SampleDatagram(byteBuf);
        LOG.trace("Got packet: {}", sampleDatagram);
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        this.executor.execute(() -> {
            this.enricher.enrich(sampleDatagram).whenComplete((sampleDatagramEnrichment, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                    return;
                }
                Runnable runnable = () -> {
                    BasicOutputBuffer basicOutputBuffer = new BasicOutputBuffer();
                    BsonWriter bsonBinaryWriter = new BsonBinaryWriter(basicOutputBuffer);
                    Throwable th = null;
                    try {
                        try {
                            bsonBinaryWriter.writeStartDocument();
                            bsonBinaryWriter.writeName("time");
                            bsonBinaryWriter.writeInt64(System.currentTimeMillis());
                            bsonBinaryWriter.writeName("data");
                            sampleDatagram.version.datagram.writeBson(bsonBinaryWriter, sampleDatagramEnrichment);
                            bsonBinaryWriter.writeEndDocument();
                            if (bsonBinaryWriter != null) {
                                if (0 != 0) {
                                    try {
                                        bsonBinaryWriter.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    bsonBinaryWriter.close();
                                }
                            }
                            this.dispatcher.send(new TelemetryMessage(inetSocketAddress, ((org.bson.ByteBuf) basicOutputBuffer.getByteBuffers().get(0)).asNIO())).whenComplete((telemetryMessage, th3) -> {
                                if (th3 != null) {
                                    completableFuture.completeExceptionally(th3);
                                }
                                completableFuture.complete(telemetryMessage);
                            });
                        } finally {
                        }
                    } catch (Throwable th4) {
                        if (bsonBinaryWriter != null) {
                            if (th != null) {
                                try {
                                    bsonBinaryWriter.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                bsonBinaryWriter.close();
                            }
                        }
                        throw th4;
                    }
                };
                if (Boolean.TRUE.equals(this.isParserThread.get())) {
                    runnable.run();
                } else {
                    this.executor.execute(runnable);
                }
            });
        });
        return completableFuture;
    }

    public boolean getDnsLookupsEnabled() {
        return this.dnsLookupsEnabled;
    }

    public void setDnsLookupsEnabled(boolean z) {
        this.dnsLookupsEnabled = z;
    }

    public String getName() {
        return this.name;
    }

    public void start(ScheduledExecutorService scheduledExecutorService) {
        this.executor = new ThreadPoolExecutor(1, this.threads, 60L, TimeUnit.SECONDS, new SynchronousQueue(true), this.threadFactory, (runnable, threadPoolExecutor) -> {
            try {
                if (!threadPoolExecutor.isShutdown()) {
                    threadPoolExecutor.getQueue().put(runnable);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException("Executor interrupted while waiting for capacity in the work queue.", e);
            }
        });
    }

    public void stop() {
        this.executor.shutdown();
    }

    public int getThreads() {
        return this.threads;
    }

    public void setThreads(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Threads must be >= 1");
        }
        this.threads = i;
    }
}
