/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.timeseries.integration;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.math.DoubleMath;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Named;
import org.opennms.core.logging.Logging;
import org.opennms.integration.api.v1.timeseries.Metric;
import org.opennms.integration.api.v1.timeseries.StorageException;
import org.opennms.integration.api.v1.timeseries.Tag;
import org.opennms.integration.api.v1.timeseries.immutables.ImmutableMetric;
import org.opennms.integration.api.v1.timeseries.immutables.ImmutableSample;
import org.opennms.integration.api.v1.timeseries.immutables.ImmutableTag;
import org.opennms.netmgt.timeseries.impl.TimeseriesStorageManager;
import org.opennms.netmgt.timeseries.integration.SampleBatchEvent;
import org.opennms.netmgt.timeseries.meta.MetaData;
import org.opennms.netmgt.timeseries.meta.TimeSeriesMetaDataDao;
import org.opennms.newts.api.MetricType;
import org.opennms.newts.api.Sample;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;

public class TimeseriesWriter
implements WorkHandler<SampleBatchEvent>,
DisposableBean {
    private static final Logger LOG = LoggerFactory.getLogger(TimeseriesWriter.class);
    private static final RateLimitedLog RATE_LIMITED_LOGGER = RateLimitedLog.withRateLimit((Logger)LOG).maxRate(5).every(Duration.ofSeconds(30L)).build();
    private WorkerPool<SampleBatchEvent> workerPool;
    private RingBuffer<SampleBatchEvent> ringBuffer;
    private final int ringBufferSize;
    private final int numWriterThreads;
    private final Meter droppedSamples;
    @Autowired
    private TimeseriesStorageManager storage;
    @Autowired
    private TimeSeriesMetaDataDao timeSeriesMetaDataDao;
    private final AtomicLong numEntriesOnRingBuffer = new AtomicLong();
    private static final EventTranslatorOneArg<SampleBatchEvent, List<Sample>> TRANSLATOR = new EventTranslatorOneArg<SampleBatchEvent, List<Sample>>(){

        public void translateTo(SampleBatchEvent event, long sequence, List<Sample> samples) {
            event.setIndexOnly(false);
            event.setSamples(samples);
        }
    };
    private static final EventTranslatorOneArg<SampleBatchEvent, List<Sample>> INDEX_ONLY_TRANSLATOR = new EventTranslatorOneArg<SampleBatchEvent, List<Sample>>(){

        public void translateTo(SampleBatchEvent event, long sequence, List<Sample> samples) {
            event.setIndexOnly(true);
            event.setSamples(samples);
        }
    };

    @Inject
    public TimeseriesWriter(@Named(value="timeseries.max_batch_size") Integer maxBatchSize, @Named(value="timeseries.ring_buffer_size") Integer ringBufferSize, @Named(value="timeseries.writer_threads") Integer numWriterThreads, @Named(value="timeseriesMetricRegistry") MetricRegistry registry) {
        Preconditions.checkArgument((maxBatchSize > 0 ? 1 : 0) != 0, (Object)"maxBatchSize must be strictly positive");
        Preconditions.checkArgument((ringBufferSize > 0 ? 1 : 0) != 0, (Object)"ringBufferSize must be positive");
        Preconditions.checkArgument((boolean)DoubleMath.isMathematicalInteger((double)(Math.log(ringBufferSize.intValue()) / Math.log(2.0))), (Object)"ringBufferSize must be a power of two");
        Preconditions.checkArgument((numWriterThreads > 0 ? 1 : 0) != 0, (Object)"numWriterThreads must be positive");
        Preconditions.checkNotNull((Object)registry, (Object)"metric registry");
        this.ringBufferSize = ringBufferSize;
        this.numWriterThreads = numWriterThreads;
        this.numEntriesOnRingBuffer.set(0L);
        registry.register(MetricRegistry.name((String)"ring-buffer", (String[])new String[]{"size"}), (com.codahale.metrics.Metric)new Gauge<Long>(){

            public Long getValue() {
                return TimeseriesWriter.this.numEntriesOnRingBuffer.get();
            }
        });
        registry.register(MetricRegistry.name((String)"ring-buffer", (String[])new String[]{"max-size"}), (com.codahale.metrics.Metric)new Gauge<Long>(){

            public Long getValue() {
                return TimeseriesWriter.this.ringBufferSize;
            }
        });
        this.droppedSamples = registry.meter(MetricRegistry.name((String)"ring-buffer", (String[])new String[]{"dropped-samples"}));
        LOG.debug("Using max_batch_size: {} and ring_buffer_size: {}", (Object)maxBatchSize, (Object)this.ringBufferSize);
        this.setUpWorkerPool();
    }

    private void setUpWorkerPool() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("TimeseriesWriter-Consumer-%d").build();
        ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory);
        WorkHandler[] handlers = new WorkHandler[this.numWriterThreads];
        for (int i = 0; i < this.numWriterThreads; ++i) {
            handlers[i] = this;
        }
        this.ringBuffer = RingBuffer.createMultiProducer(SampleBatchEvent::new, (int)this.ringBufferSize);
        this.workerPool = new WorkerPool(this.ringBuffer, this.ringBuffer.newBarrier(new Sequence[0]), (ExceptionHandler)new FatalExceptionHandler(), handlers);
        this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
        this.workerPool.start((Executor)executor);
    }

    public void destroy() throws Exception {
        if (this.workerPool != null) {
            this.workerPool.drainAndHalt();
        }
    }

    public void insert(List<Sample> samples) {
        this.pushToRingBuffer(samples, TRANSLATOR);
    }

    public void index(List<Sample> samples) {
        this.pushToRingBuffer(samples, INDEX_ONLY_TRANSLATOR);
    }

    private void pushToRingBuffer(final List<Sample> samples, EventTranslatorOneArg<SampleBatchEvent, List<Sample>> translator) {
        if (!this.ringBuffer.tryPublishEvent(translator, samples)) {
            RATE_LIMITED_LOGGER.error("The ring buffer is full. {} samples associated with resource ids {} will be dropped.", (Object)samples.size(), new Object(){

                public String toString() {
                    return samples.stream().map(s -> s.getResource().getId()).distinct().collect(Collectors.joining(", "));
                }
            });
            this.droppedSamples.mark((long)samples.size());
            return;
        }
        this.numEntriesOnRingBuffer.incrementAndGet();
    }

    public void onEvent(SampleBatchEvent event) throws Exception {
        Logging.putPrefix((String)"collectd");
        this.numEntriesOnRingBuffer.decrementAndGet();
        try {
            if (event.isIndexOnly()) {
                this.storeMetadata(event);
            } else {
                this.storeTimeseriesData(event);
                this.storeMetadata(event);
            }
        }
        catch (Throwable t) {
            RATE_LIMITED_LOGGER.error("An error occurred while inserting samples. Some sample may be lost.", t);
        }
    }

    private void storeTimeseriesData(SampleBatchEvent event) throws StorageException {
        List samples = event.getSamples().stream().map(this::toApiSample).collect(Collectors.toList());
        this.storage.get().store(samples);
    }

    private void storeMetadata(SampleBatchEvent event) throws SQLException {
        HashSet<MetaData> metaData = new HashSet<MetaData>();
        for (Sample sample : event.getSamples()) {
            if (!sample.getResource().getAttributes().isPresent()) continue;
            ((Map)sample.getResource().getAttributes().get()).forEach((key, value) -> metaData.add(new MetaData(sample.getResource().getId(), (String)key, (String)value)));
        }
        this.timeSeriesMetaDataDao.store(metaData);
    }

    private org.opennms.integration.api.v1.timeseries.Sample toApiSample(Sample sample) {
        ImmutableMetric.MetricBuilder builder = ImmutableMetric.builder().intrinsicTag("resourceId", sample.getResource().getId()).intrinsicTag("name", sample.getName()).metaTag(this.typeToTag(sample.getType()));
        if (sample.getResource().getAttributes().isPresent()) {
            ((Map)sample.getResource().getAttributes().get()).forEach((arg_0, arg_1) -> ((ImmutableMetric.MetricBuilder)builder).metaTag(arg_0, arg_1));
        }
        ImmutableMetric metric = builder.build();
        Instant time = Instant.ofEpochMilli(sample.getTimestamp().asMillis());
        Double value = sample.getValue().doubleValue();
        return ImmutableSample.builder().metric((Metric)metric).time(time).value(value).build();
    }

    private Tag typeToTag(MetricType type) {
        Metric.Mtype mtype;
        if (type == MetricType.GAUGE) {
            mtype = Metric.Mtype.gauge;
        } else if (type == MetricType.COUNTER) {
            mtype = Metric.Mtype.count;
        } else {
            throw new IllegalArgumentException(String.format("I can't find a matching %s for %s", Metric.Mtype.class.getSimpleName(), type.toString()));
        }
        return new ImmutableTag("mtype", mtype.name());
    }

    public void setTimeSeriesStorage(TimeseriesStorageManager timeseriesStorage) {
        this.storage = timeseriesStorage;
    }

    public void setTimeSeriesMetaDataDao(TimeSeriesMetaDataDao timeSeriesMetaDataDao) {
        this.timeSeriesMetaDataDao = timeSeriesMetaDataDao;
    }
}

