package org.opennms.netmgt.newts;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.math.DoubleMath;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Named;
import org.joda.time.Duration;
import org.opennms.core.logging.Logging;
import org.opennms.netmgt.newts.support.NewtsUtils;
import org.opennms.newts.api.Sample;
import org.opennms.newts.api.SampleRepository;
import org.opennms.newts.api.search.Indexer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/opennms/netmgt/newts/NewtsWriter.class */
public class NewtsWriter implements WorkHandler<SampleBatchEvent>, DisposableBean {

    @Autowired
    private SampleRepository m_sampleRepository;

    @Autowired
    private Indexer m_indexer;
    private WorkerPool<SampleBatchEvent> m_workerPool;
    private RingBuffer<SampleBatchEvent> m_ringBuffer;
    private final int m_maxBatchSize;
    private final int m_ringBufferSize;
    private final int m_numWriterThreads;
    private final Meter m_droppedSamples;
    private final AtomicLong m_numEntriesOnRingBuffer = new AtomicLong();
    private static final Logger LOG = LoggerFactory.getLogger(NewtsWriter.class);
    private static final RateLimitedLog RATE_LIMITED_LOGGER = RateLimitedLog.withRateLimit(LOG).maxRate(5).every(Duration.standardSeconds(30)).build();
    private static final EventTranslatorOneArg<SampleBatchEvent, List<Sample>> TRANSLATOR = new EventTranslatorOneArg<SampleBatchEvent, List<Sample>>() { // from class: org.opennms.netmgt.newts.NewtsWriter.4
        public void translateTo(SampleBatchEvent sampleBatchEvent, long j, List<Sample> list) {
            sampleBatchEvent.setIndexOnly(false);
            sampleBatchEvent.setSamples(list);
        }
    };
    private static final EventTranslatorOneArg<SampleBatchEvent, List<Sample>> INDEX_ONLY_TRANSLATOR = new EventTranslatorOneArg<SampleBatchEvent, List<Sample>>() { // from class: org.opennms.netmgt.newts.NewtsWriter.5
        public void translateTo(SampleBatchEvent sampleBatchEvent, long j, List<Sample> list) {
            sampleBatchEvent.setIndexOnly(true);
            sampleBatchEvent.setSamples(list);
        }
    };

    @Inject
    public NewtsWriter(@Named("newts.max_batch_size") Integer num, @Named("newts.ring_buffer_size") Integer num2, @Named("newts.writer_threads") Integer num3, MetricRegistry metricRegistry) {
        Preconditions.checkArgument(num.intValue() > 0, "maxBatchSize must be strictly positive");
        Preconditions.checkArgument(num2.intValue() > 0, "ringBufferSize must be positive");
        Preconditions.checkArgument(DoubleMath.isMathematicalInteger(Math.log(num2.intValue()) / Math.log(2.0d)), "ringBufferSize must be a power of two");
        Preconditions.checkArgument(num3.intValue() > 0, "numWriterThreads must be positive");
        Preconditions.checkNotNull(metricRegistry, "metric registry");
        this.m_maxBatchSize = num.intValue();
        this.m_ringBufferSize = num2.intValue();
        this.m_numWriterThreads = num3.intValue();
        this.m_numEntriesOnRingBuffer.set(0L);
        metricRegistry.register(MetricRegistry.name("ring-buffer", new String[]{"size"}), new Gauge<Long>() { // from class: org.opennms.netmgt.newts.NewtsWriter.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m5getValue() {
                return Long.valueOf(NewtsWriter.this.m_numEntriesOnRingBuffer.get());
            }
        });
        metricRegistry.register(MetricRegistry.name("ring-buffer", new String[]{"max-size"}), new Gauge<Long>() { // from class: org.opennms.netmgt.newts.NewtsWriter.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m6getValue() {
                return Long.valueOf(NewtsWriter.this.m_ringBufferSize);
            }
        });
        this.m_droppedSamples = metricRegistry.meter(MetricRegistry.name("ring-buffer", new String[]{"dropped-samples"}));
        LOG.debug("Using max_batch_size: {} and ring_buffer_size: {}", num, Integer.valueOf(this.m_ringBufferSize));
        setUpWorkerPool();
    }

    private void setUpWorkerPool() {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("NewtsWriter-Consumer-%d").build());
        WorkHandler[] workHandlerArr = new WorkHandler[this.m_numWriterThreads];
        for (int i = 0; i < this.m_numWriterThreads; i++) {
            workHandlerArr[i] = this;
        }
        this.m_ringBuffer = RingBuffer.createMultiProducer(SampleBatchEvent::new, this.m_ringBufferSize);
        this.m_workerPool = new WorkerPool<>(this.m_ringBuffer, this.m_ringBuffer.newBarrier(new Sequence[0]), new FatalExceptionHandler(), workHandlerArr);
        this.m_ringBuffer.addGatingSequences(this.m_workerPool.getWorkerSequences());
        this.m_workerPool.start(newCachedThreadPool);
    }

    public void destroy() throws Exception {
        if (this.m_workerPool != null) {
            this.m_workerPool.drainAndHalt();
        }
    }

    public void insert(List<Sample> list) {
        pushToRingBuffer(list, TRANSLATOR);
    }

    public void index(List<Sample> list) {
        pushToRingBuffer(list, INDEX_ONLY_TRANSLATOR);
    }

    private void pushToRingBuffer(final List<Sample> list, EventTranslatorOneArg<SampleBatchEvent, List<Sample>> eventTranslatorOneArg) {
        if (this.m_ringBuffer.tryPublishEvent(eventTranslatorOneArg, list)) {
            this.m_numEntriesOnRingBuffer.incrementAndGet();
        } else {
            RATE_LIMITED_LOGGER.error("The ring buffer is full. {} samples associated with resource ids {} will be dropped.", Integer.valueOf(list.size()), new Object() { // from class: org.opennms.netmgt.newts.NewtsWriter.3
                public String toString() {
                    return (String) list.stream().map(sample -> {
                        return sample.getResource().getId();
                    }).distinct().collect(Collectors.joining(", "));
                }
            });
            this.m_droppedSamples.mark(list.size());
        }
    }

    public void onEvent(SampleBatchEvent sampleBatchEvent) throws Exception {
        Logging.putPrefix("collectd");
        List<Sample> samples = sampleBatchEvent.getSamples();
        this.m_numEntriesOnRingBuffer.decrementAndGet();
        for (List list : Lists.partition(samples, this.m_maxBatchSize)) {
            try {
                if (!sampleBatchEvent.isIndexOnly() || NewtsUtils.DISABLE_INDEXING) {
                    LOG.debug("Inserting {} samples", Integer.valueOf(list.size()));
                    this.m_sampleRepository.insert(list);
                } else {
                    LOG.debug("Indexing {} samples", Integer.valueOf(list.size()));
                    this.m_indexer.update(list);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Successfully inserted samples for resources with ids {}", (String) list.stream().map(sample -> {
                        return sample.getResource().getId();
                    }).distinct().collect(Collectors.joining(", ")));
                }
            } catch (Throwable th) {
                RATE_LIMITED_LOGGER.error("An error occurred while inserting samples. Some sample may be lost.", th);
            }
        }
    }

    public void setSampleRepository(SampleRepository sampleRepository) {
        this.m_sampleRepository = sampleRepository;
    }

    public void setIndexer(Indexer indexer) {
        this.m_indexer = indexer;
    }
}
