package org.opennms.netmgt.telemetry.protocols.netflow.parser;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opennms.core.concurrent.LogPreservingThreadFactory;
import org.opennms.core.ipc.sink.api.AsyncDispatcher;
import org.opennms.distributed.core.api.Identity;
import org.opennms.netmgt.dnsresolver.api.DnsResolver;
import org.opennms.netmgt.events.api.EventForwarder;
import org.opennms.netmgt.model.events.EventBuilder;
import org.opennms.netmgt.telemetry.api.receiver.Parser;
import org.opennms.netmgt.telemetry.api.receiver.TelemetryMessage;
import org.opennms.netmgt.telemetry.protocols.netflow.parser.ie.RecordProvider;
import org.opennms.netmgt.telemetry.protocols.netflow.parser.ie.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/netmgt/telemetry/protocols/netflow/parser/ParserBase.class */
public abstract class ParserBase implements Parser {
    private static final Logger LOG = LoggerFactory.getLogger(ParserBase.class);
    private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private static final long DEFAULT_CLOCK_SKEW_EVENT_RATE_SECONDS = TimeUnit.HOURS.toSeconds(1);
    public static final String CLOCK_SKEW_EVENT_UEI = "uei.opennms.org/internal/telemetry/clockSkewDetected";
    private final Protocol protocol;
    private final String name;
    private final AsyncDispatcher<TelemetryMessage> dispatcher;
    private final EventForwarder eventForwarder;
    private final Identity identity;
    private final DnsResolver dnsResolver;
    private final Meter recordsDispatched;
    private final Timer recordEnrichmentTimer;
    private final ThreadFactory threadFactory;
    private LoadingCache<InetAddress, Optional<Instant>> eventCache;
    private ExecutorService executor;
    private final ThreadLocal<Boolean> isParserThread = new ThreadLocal<>();
    private int threads = DEFAULT_NUM_THREADS;
    private long maxClockSkew = 0;
    private long clockSkewEventRate = 0;
    private boolean dnsLookupsEnabled = true;

    public ParserBase(Protocol protocol, String str, AsyncDispatcher<TelemetryMessage> asyncDispatcher, EventForwarder eventForwarder, Identity identity, DnsResolver dnsResolver, MetricRegistry metricRegistry) {
        this.protocol = (Protocol) Objects.requireNonNull(protocol);
        this.name = (String) Objects.requireNonNull(str);
        this.dispatcher = (AsyncDispatcher) Objects.requireNonNull(asyncDispatcher);
        this.eventForwarder = (EventForwarder) Objects.requireNonNull(eventForwarder);
        this.identity = (Identity) Objects.requireNonNull(identity);
        this.dnsResolver = (DnsResolver) Objects.requireNonNull(dnsResolver);
        Objects.requireNonNull(metricRegistry);
        final LogPreservingThreadFactory logPreservingThreadFactory = new LogPreservingThreadFactory("Telemetryd-" + protocol + "-" + str, Integer.MAX_VALUE);
        this.threadFactory = new ThreadFactory() { // from class: org.opennms.netmgt.telemetry.protocols.netflow.parser.ParserBase.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return logPreservingThreadFactory.newThread(() -> {
                    ParserBase.this.isParserThread.set(true);
                    runnable.run();
                });
            }
        };
        this.recordsDispatched = metricRegistry.meter(MetricRegistry.name("parsers", new String[]{str, "recordsDispatched"}));
        this.recordEnrichmentTimer = metricRegistry.timer(MetricRegistry.name("parsers", new String[]{str, "recordEnrichment"}));
        setClockSkewEventRate(DEFAULT_CLOCK_SKEW_EVENT_RATE_SECONDS);
        setThreads(DEFAULT_NUM_THREADS);
    }

    public void start(ScheduledExecutorService scheduledExecutorService) {
        this.executor = new ThreadPoolExecutor(1, this.threads, 60L, TimeUnit.SECONDS, new SynchronousQueue(true), this.threadFactory, (runnable, threadPoolExecutor) -> {
            try {
                if (!threadPoolExecutor.isShutdown()) {
                    threadPoolExecutor.getQueue().put(runnable);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException("Executor interrupted while waiting for capacity in the work queue.", e);
            }
        });
    }

    public void stop() {
        this.executor.shutdown();
    }

    public String getName() {
        return this.name;
    }

    public void setMaxClockSkew(long j) {
        this.maxClockSkew = j;
    }

    public long getMaxClockSkew() {
        return this.maxClockSkew;
    }

    public long getClockSkewEventRate() {
        return this.clockSkewEventRate;
    }

    public void setClockSkewEventRate(long j) {
        this.clockSkewEventRate = j;
        this.eventCache = CacheBuilder.newBuilder().expireAfterWrite(this.clockSkewEventRate, TimeUnit.SECONDS).build(new CacheLoader<InetAddress, Optional<Instant>>() { // from class: org.opennms.netmgt.telemetry.protocols.netflow.parser.ParserBase.2
            public Optional<Instant> load(InetAddress inetAddress) throws Exception {
                return Optional.empty();
            }
        });
    }

    public boolean getDnsLookupsEnabled() {
        return this.dnsLookupsEnabled;
    }

    public void setDnsLookupsEnabled(boolean z) {
        this.dnsLookupsEnabled = z;
    }

    public int getThreads() {
        return this.threads;
    }

    public void setThreads(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Threads must be >= 1");
        }
        this.threads = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<?> transmit(RecordProvider recordProvider, InetSocketAddress inetSocketAddress) {
        LOG.trace("Got packet: {}", recordProvider);
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            return (CompletableFuture[]) recordProvider.getRecords().map(iterable -> {
                CompletableFuture completableFuture = new CompletableFuture();
                Timer.Context time = this.recordEnrichmentTimer.time();
                new RecordEnricher(this.dnsResolver, getDnsLookupsEnabled()).enrich(iterable).whenComplete((recordEnrichment, th) -> {
                    time.close();
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                        return;
                    }
                    Runnable runnable = () -> {
                        this.dispatcher.send(new TelemetryMessage(inetSocketAddress, ByteBuffer.wrap(buildMessage(iterable, recordEnrichment)))).whenComplete((dispatchStatus, th) -> {
                            if (th != null) {
                                completableFuture.completeExceptionally(th);
                            } else {
                                completableFuture.complete(dispatchStatus);
                            }
                        });
                        this.recordsDispatched.mark();
                    };
                    if (Boolean.TRUE.equals(this.isParserThread.get())) {
                        runnable.run();
                    } else {
                        this.executor.execute(runnable);
                    }
                });
                return completableFuture;
            }).toArray(i -> {
                return new CompletableFuture[i];
            });
        }, this.executor);
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        supplyAsync.whenComplete((completableFutureArr, th) -> {
            if (th == null) {
                CompletableFuture.allOf(completableFutureArr).whenComplete((r5, th) -> {
                    if (th == null) {
                        completableFuture.complete(r5);
                    } else {
                        LOG.warn("One or more of the records were not successfully dispatched.", th);
                        completableFuture.completeExceptionally(th);
                    }
                });
            } else {
                LOG.warn("Error preparing records for dispatch.", th);
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    protected abstract byte[] buildMessage(Iterable<Value<?>> iterable, RecordEnrichment recordEnrichment);

    /* JADX INFO: Access modifiers changed from: protected */
    public void detectClockSkew(long j, InetAddress inetAddress) {
        if (getMaxClockSkew() > 0) {
            long abs = Math.abs(j - System.currentTimeMillis());
            if (abs > getMaxClockSkew() * 1000) {
                Optional optional = (Optional) this.eventCache.getUnchecked(inetAddress);
                if (!optional.isPresent() || Duration.between((Temporal) optional.get(), Instant.now()).getSeconds() > getClockSkewEventRate()) {
                    this.eventCache.put(inetAddress, Optional.of(Instant.now()));
                    this.eventForwarder.sendNow(new EventBuilder().setUei(CLOCK_SKEW_EVENT_UEI).setTime(new Date()).setSource(getName()).setInterface(inetAddress).setDistPoller(this.identity.getId()).addParam("monitoringSystemId", this.identity.getId()).addParam("monitoringSystemLocation", this.identity.getLocation()).setParam("delta", (int) abs).setParam("clockSkewEventRate", (int) getClockSkewEventRate()).setParam("maxClockSkew", (int) getMaxClockSkew()).getEvent());
                }
            }
        }
    }
}
