/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.newts;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.math.DoubleMath;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Named;
import org.opennms.core.logging.Logging;
import org.opennms.netmgt.newts.SampleBatchEvent;
import org.opennms.newts.api.Sample;
import org.opennms.newts.api.SampleRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;

public class NewtsWriter
implements WorkHandler<SampleBatchEvent>,
DisposableBean {
    private static final Logger LOG = LoggerFactory.getLogger(NewtsWriter.class);
    @Autowired
    private SampleRepository m_sampleRepository;
    private WorkerPool<SampleBatchEvent> m_workerPool;
    private RingBuffer<SampleBatchEvent> m_ringBuffer;
    private final int m_maxBatchSize;
    private final int m_ringBufferSize;
    private final int m_numWriterThreads;
    private final AtomicLong m_numSamplesOnRingBuffer = new AtomicLong();
    private static final EventTranslatorOneArg<SampleBatchEvent, List<Sample>> TRANSLATOR = new EventTranslatorOneArg<SampleBatchEvent, List<Sample>>(){

        public void translateTo(SampleBatchEvent event, long sequence, List<Sample> samples) {
            event.setSamples(samples);
        }
    };

    @Inject
    public NewtsWriter(@Named(value="newts.max_batch_size") Integer maxBatchSize, @Named(value="newts.ring_buffer_size") Integer ringBufferSize, @Named(value="newts.writer_threads") Integer numWriterThreads, MetricRegistry registry) {
        Preconditions.checkArgument((maxBatchSize > 0 ? 1 : 0) != 0, (Object)"maxBatchSize must be strictly positive");
        Preconditions.checkArgument((ringBufferSize > 0 ? 1 : 0) != 0, (Object)"ringBufferSize must be positive");
        Preconditions.checkArgument((boolean)DoubleMath.isMathematicalInteger((double)(Math.log(ringBufferSize.intValue()) / Math.log(2.0))), (Object)"ringBufferSize must be a power of two");
        Preconditions.checkArgument((numWriterThreads > 0 ? 1 : 0) != 0, (Object)"numWriterThreads must be positive");
        Preconditions.checkNotNull((Object)registry, (Object)"metric registry");
        this.m_maxBatchSize = maxBatchSize;
        this.m_ringBufferSize = ringBufferSize;
        this.m_numWriterThreads = numWriterThreads;
        this.m_numSamplesOnRingBuffer.set(0L);
        registry.register(MetricRegistry.name((String)"ring-buffer", (String[])new String[]{"size"}), (Metric)new Gauge<Long>(){

            public Long getValue() {
                return NewtsWriter.this.m_numSamplesOnRingBuffer.get();
            }
        });
        registry.register(MetricRegistry.name((String)"ring-buffer", (String[])new String[]{"max-size"}), (Metric)new Gauge<Long>(){

            public Long getValue() {
                return NewtsWriter.this.m_ringBufferSize;
            }
        });
        LOG.debug("Using max_batch_size: {} and ring_buffer_size: {}", (Object)maxBatchSize, (Object)this.m_ringBufferSize);
        this.setUpWorkerPool();
    }

    private void setUpWorkerPool() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("NewtsWriter-Consumer-%d").build();
        ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory);
        WorkHandler[] handlers = new WorkHandler[this.m_numWriterThreads];
        for (int i = 0; i < this.m_numWriterThreads; ++i) {
            handlers[i] = this;
        }
        this.m_ringBuffer = RingBuffer.createMultiProducer(SampleBatchEvent::new, (int)this.m_ringBufferSize);
        this.m_workerPool = new WorkerPool(this.m_ringBuffer, this.m_ringBuffer.newBarrier(new Sequence[0]), (ExceptionHandler)new FatalExceptionHandler(), handlers);
        this.m_ringBuffer.addGatingSequences(this.m_workerPool.getWorkerSequences());
        this.m_workerPool.start((Executor)executor);
    }

    public void destroy() throws Exception {
        if (this.m_workerPool != null) {
            this.m_workerPool.drainAndHalt();
        }
    }

    public void insert(List<Sample> samples) {
        if (!this.m_ringBuffer.tryPublishEvent(TRANSLATOR, samples)) {
            String uniqueResourceIds = samples.stream().map(s -> s.getResource().getId()).distinct().collect(Collectors.joining(", "));
            LOG.error("The ring buffer is full. {} samples associated with resource ids {} will be dropped.", (Object)samples.size(), (Object)uniqueResourceIds);
            return;
        }
        this.m_numSamplesOnRingBuffer.addAndGet(samples.size());
    }

    public void onEvent(SampleBatchEvent event) throws Exception {
        Logging.putPrefix((String)"collectd");
        List<Sample> samples = event.getSamples();
        this.m_numSamplesOnRingBuffer.addAndGet(-samples.size());
        for (List batch : Lists.partition(samples, (int)this.m_maxBatchSize)) {
            try {
                LOG.debug("Inserting {} samples", (Object)batch.size());
                this.m_sampleRepository.insert((Collection)batch);
                if (!LOG.isDebugEnabled()) continue;
                String uniqueResourceIds = batch.stream().map(s -> s.getResource().getId()).distinct().collect(Collectors.joining(", "));
                LOG.debug("Successfully inserted samples for resources with ids {}", (Object)uniqueResourceIds);
            }
            catch (Throwable t) {
                LOG.error("An error occurred while inserting the samples. They will be lost.", t);
            }
        }
    }

    @VisibleForTesting
    public void setSampleRepository(SampleRepository sampleRepository) {
        this.m_sampleRepository = sampleRepository;
    }
}

