package org.opennms.netmgt.syslogd;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.opennms.core.concurrent.ExecutorFactory;
import org.opennms.core.concurrent.ExecutorFactoryJavaImpl;
import org.opennms.core.logging.Logging;
import org.opennms.core.utils.InetAddressUtils;
import org.opennms.netmgt.config.SyslogdConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/netmgt/syslogd/SyslogReceiverNioDisruptorImpl.class */
public class SyslogReceiverNioDisruptorImpl implements SyslogReceiver {
    private static final int SOCKET_TIMEOUT = 500;
    public static final int MAX_PACKET_SIZE = 4096;
    public static final int SOCKET_BYTE_BUFFER_QUEUE_SIZE = 65536;
    public static final int EVENT_CONVERSION_TASK_QUEUE_SIZE = 65536;
    private ExecutorFactory m_executorFactory = new ExecutorFactoryJavaImpl();
    private volatile boolean m_stop;
    private DatagramChannel m_channel;
    private Thread m_context;
    private final SyslogdConfig m_config;
    private ExecutorService m_socketReceivers;
    private ExecutorService m_syslogConnectionExecutor;
    private ExecutorService m_syslogProcessorExecutor;
    private final Disruptor<ByteBufferMessage> m_byteBuffers;
    private final RingBuffer<ByteBufferMessage> m_ringBuffer;
    private static final Logger LOG = LoggerFactory.getLogger(SyslogReceiverNioDisruptorImpl.class);
    private static final MetricRegistry METRICS = new MetricRegistry();
    public static final int SOCKET_RECEIVER_THREADS = Runtime.getRuntime().availableProcessors();
    public static final int EVENT_PARSER_THREADS = Runtime.getRuntime().availableProcessors();
    public static final int EVENT_SENDER_THREADS = Runtime.getRuntime().availableProcessors();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opennms/netmgt/syslogd/SyslogReceiverNioDisruptorImpl$ByteBufferMessage.class */
    public static class ByteBufferMessage {
        public final ByteBuffer buffer;

        private ByteBufferMessage() {
            this.buffer = ByteBuffer.allocate(4096);
        }
    }

    public static DatagramChannel openChannel(SyslogdConfig syslogdConfig) throws SocketException, IOException {
        DatagramChannel open = DatagramChannel.open();
        open.socket().setReuseAddress(true);
        if (syslogdConfig.getListenAddress() == null || syslogdConfig.getListenAddress().length() == 0) {
            open.socket().bind(new InetSocketAddress(syslogdConfig.getSyslogPort()));
        } else {
            open.socket().bind(new InetSocketAddress(InetAddressUtils.addr(syslogdConfig.getListenAddress()), syslogdConfig.getSyslogPort()));
        }
        return open;
    }

    public SyslogReceiverNioDisruptorImpl(SyslogdConfig syslogdConfig) throws SocketException, IOException {
        if (syslogdConfig == null) {
            throw new IllegalArgumentException("Config cannot be null");
        }
        this.m_stop = false;
        this.m_channel = null;
        this.m_config = syslogdConfig;
        this.m_byteBuffers = new Disruptor<>(() -> {
            return new ByteBufferMessage();
        }, 65536, (Executor) null);
        this.m_byteBuffers.start();
        this.m_ringBuffer = this.m_byteBuffers.getRingBuffer();
    }

    @Override // org.opennms.netmgt.syslogd.SyslogReceiver
    public String getName() {
        return getClass().getSimpleName() + " [" + ((this.m_config.getListenAddress() == null || this.m_config.getListenAddress().length() <= 0) ? "0.0.0.0" : this.m_config.getListenAddress()) + ":" + this.m_config.getSyslogPort() + "]";
    }

    public ExecutorFactory getExecutorFactory() {
        return this.m_executorFactory;
    }

    public void setExecutorFactory(ExecutorFactory executorFactory) {
        this.m_executorFactory = executorFactory;
    }

    @Override // org.opennms.netmgt.syslogd.SyslogReceiver
    public void stop() throws InterruptedException {
        this.m_stop = true;
        this.m_socketReceivers.shutdown();
        this.m_syslogConnectionExecutor.shutdown();
        this.m_syslogProcessorExecutor.shutdown();
        try {
            this.m_channel.close();
        } catch (IOException e) {
            LOG.warn("Exception while closing syslog channel: " + e.getMessage());
        } finally {
            this.m_channel = null;
        }
        if (this.m_context != null) {
            LOG.debug("Stopping and joining thread context {}", this.m_context.getName());
            this.m_context.interrupt();
            this.m_context.join();
            LOG.debug("Thread context stopped and joined");
        }
    }

    @Override // org.opennms.netmgt.syslogd.SyslogReceiver, java.lang.Runnable
    public void run() {
        this.m_context = Thread.currentThread();
        Logging.putPrefix(Syslogd.LOG4J_CATEGORY);
        ConsoleReporter.forRegistry(METRICS).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build().start(1L, TimeUnit.SECONDS);
        final Meter meter = METRICS.meter(MetricRegistry.name(getClass(), new String[]{"packets"}));
        final Meter meter2 = METRICS.meter(MetricRegistry.name(getClass(), new String[]{"processors"}));
        final Meter meter3 = METRICS.meter(MetricRegistry.name(getClass(), new String[]{"connections"}));
        final Histogram histogram = METRICS.histogram(MetricRegistry.name(getClass(), new String[]{"packetSize"}));
        if (this.m_stop) {
            LOG.debug("Stop flag set before thread started, exiting");
            return;
        }
        LOG.debug("Thread context started");
        this.m_socketReceivers = this.m_executorFactory.newExecutor(SOCKET_RECEIVER_THREADS, Integer.MAX_VALUE, "OpenNMS.Syslogd", "socketReceivers");
        this.m_syslogConnectionExecutor = this.m_executorFactory.newExecutor(EVENT_PARSER_THREADS, Integer.MAX_VALUE, "OpenNMS.Syslogd", "syslogConnections");
        this.m_syslogProcessorExecutor = this.m_executorFactory.newExecutor(EVENT_SENDER_THREADS, Integer.MAX_VALUE, "OpenNMS.Syslogd", "syslogProcessors");
        try {
            LOG.debug("Opening syslog channel...");
            this.m_channel = openChannel(this.m_config);
        } catch (IOException e) {
            LOG.warn("An I/O error occured while trying to set the socket timeout", e);
        }
        try {
            LOG.debug("Setting socket timeout to {}ms", Integer.valueOf(SOCKET_TIMEOUT));
            this.m_channel.socket().setSoTimeout(SOCKET_TIMEOUT);
        } catch (SocketException e2) {
            LOG.warn("An I/O error occured while trying to set the socket timeout", e2);
        }
        try {
            LOG.debug("Attempting to set receive buffer size to {}", Integer.MAX_VALUE);
            this.m_channel.socket().setReceiveBufferSize(Integer.MAX_VALUE);
            LOG.debug("Actual receive buffer size is {}", Integer.valueOf(this.m_channel.socket().getReceiveBufferSize()));
        } catch (SocketException e3) {
            LOG.info("Failed to set the receive buffer to {}", Integer.MAX_VALUE, e3);
        }
        for (int i = 0; i < SOCKET_RECEIVER_THREADS; i++) {
            this.m_socketReceivers.execute(new Runnable() { // from class: org.opennms.netmgt.syslogd.SyslogReceiverNioDisruptorImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        if (SyslogReceiverNioDisruptorImpl.this.m_stop) {
                            break;
                        }
                        if (SyslogReceiverNioDisruptorImpl.this.m_context.isInterrupted()) {
                            SyslogReceiverNioDisruptorImpl.LOG.debug("Thread context interrupted");
                            break;
                        }
                        try {
                            long next = SyslogReceiverNioDisruptorImpl.this.m_ringBuffer.next();
                            ByteBufferMessage byteBufferMessage = (ByteBufferMessage) SyslogReceiverNioDisruptorImpl.this.m_ringBuffer.get(next);
                            InetSocketAddress inetSocketAddress = (InetSocketAddress) SyslogReceiverNioDisruptorImpl.this.m_channel.receive(byteBufferMessage.buffer);
                            meter.mark();
                            byteBufferMessage.buffer.flip();
                            histogram.update(byteBufferMessage.buffer.remaining());
                            SyslogConnection syslogConnection = new SyslogConnection(inetSocketAddress.getAddress(), inetSocketAddress.getPort(), byteBufferMessage.buffer, SyslogReceiverNioDisruptorImpl.this.m_config);
                            syslogConnection.getClass();
                            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(syslogConnection::call2, SyslogReceiverNioDisruptorImpl.this.m_syslogConnectionExecutor);
                            Meter meter4 = meter2;
                            supplyAsync.thenRun(() -> {
                                byteBufferMessage.buffer.clear();
                                if (next % 50 == 0) {
                                    SyslogReceiverNioDisruptorImpl.LOG.debug("Released 50 more datagrams");
                                }
                                SyslogReceiverNioDisruptorImpl.this.m_ringBuffer.publish(next);
                                meter4.mark();
                            });
                            CompletableFuture<Void> thenAcceptAsync = supplyAsync.thenAcceptAsync(syslogProcessor -> {
                                syslogProcessor.call();
                            }, (Executor) SyslogReceiverNioDisruptorImpl.this.m_syslogProcessorExecutor);
                            Meter meter5 = meter3;
                            thenAcceptAsync.thenRun(() -> {
                                meter5.mark();
                            });
                        } catch (SocketTimeoutException e4) {
                        } catch (InterruptedIOException e5) {
                        } catch (IOException e6) {
                            SyslogReceiverNioDisruptorImpl.LOG.error("An I/O exception occured on the datagram receipt port, exiting", e6);
                        }
                    }
                    SyslogReceiverNioDisruptorImpl.LOG.debug("Thread context exiting");
                }
            });
        }
    }
}
