package org.opennms.newts.stress;

import ch.qos.logback.core.pattern.color.ANSIConstants;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import org.opennms.newts.api.Sample;
import org.opennms.newts.api.SampleProcessorService;
import org.opennms.newts.api.SampleRepository;
import org.opennms.newts.persistence.cassandra.CassandraSampleRepository;
import org.opennms.newts.persistence.cassandra.SchemaConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/newts/stress/InsertDispatcher.class */
class InsertDispatcher extends Dispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(InsertDispatcher.class);
    private final InsertConfig m_config;
    private final SampleRepository m_repository;
    private final BlockingQueue<Collection<Sample>> m_samplesQueue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InsertDispatcher(InsertConfig insertConfig) throws InterruptedException {
        super(insertConfig);
        this.m_config = insertConfig;
        this.m_repository = new CassandraSampleRepository(insertConfig.getCassandraKeyspace(), insertConfig.getCassandraHost(), insertConfig.getCassandraPort(), Config.CASSANDRA_TTL, new MetricRegistry(), new SampleProcessorService(1));
        this.m_samplesQueue = Queues.newArrayBlockingQueue(insertConfig.getThreads() * 10);
    }

    private void createThreads() {
        for (int i = 0; i < this.m_config.getThreads(); i++) {
            this.m_threads[i] = new Inserter(i, this.m_repository, this.m_samplesQueue);
        }
    }

    private SampleGenerator[] getSampleGenerators() {
        SampleGenerator[] sampleGeneratorArr = new SampleGenerator[this.m_config.getNumResources() * this.m_config.getNumMetrics()];
        int i = 0;
        for (int i2 = 0; i2 < this.m_config.getNumResources(); i2++) {
            for (int i3 = 0; i3 < this.m_config.getNumMetrics(); i3++) {
                int i4 = i;
                i++;
                sampleGeneratorArr[i4] = new SampleGenerator("r" + i2, ANSIConstants.ESC_END + i3, this.m_config.getStart(), this.m_config.getEnd(), this.m_config.getInterval());
            }
        }
        return sampleGeneratorArr;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.opennms.newts.stress.Dispatcher
    public void go() throws InterruptedException {
        createThreads();
        SampleGenerator[] sampleGenerators = getSampleGenerators();
        ArrayList newArrayList = Lists.newArrayList();
        boolean z = false;
        Meter meter = this.m_metricRegistry.meter(MetricRegistry.name(getClass(), SchemaConstants.T_SAMPLES));
        while (true) {
            for (int i = 0; i < sampleGenerators.length; i++) {
                if (sampleGenerators[i].hasNext()) {
                    Optional<Sample> next = sampleGenerators[i].next();
                    if (next.isPresent()) {
                        newArrayList.add(next.get());
                        meter.mark();
                    }
                } else {
                    z = true;
                }
                if (z) {
                    LOG.debug("Queuing {} samples for insert", Integer.valueOf(newArrayList.size()));
                    this.m_samplesQueue.put(newArrayList);
                    shutdown();
                    LOG.debug("Done.");
                    return;
                }
                if (newArrayList.size() >= this.m_config.getBatchSize()) {
                    LOG.debug("Queuing {} samples for insert", Integer.valueOf(newArrayList.size()));
                    this.m_samplesQueue.put(newArrayList);
                    newArrayList = Lists.newArrayList();
                }
            }
        }
    }
}
