package org.opennms.newts.persistence.cassandra;

import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Batch;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import javax.inject.Inject;
import javax.inject.Named;
import org.opennms.newts.aggregate.IntervalGenerator;
import org.opennms.newts.aggregate.ResultProcessor;
import org.opennms.newts.api.Duration;
import org.opennms.newts.api.Measurement;
import org.opennms.newts.api.Resource;
import org.opennms.newts.api.Results;
import org.opennms.newts.api.Sample;
import org.opennms.newts.api.SampleProcessorService;
import org.opennms.newts.api.SampleRepository;
import org.opennms.newts.api.Timestamp;
import org.opennms.newts.api.ValueType;
import org.opennms.newts.api.query.ResultDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/newts/persistence/cassandra/CassandraSampleRepository.class */
public class CassandraSampleRepository implements SampleRepository {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSampleRepository.class);
    private Session m_session;
    private final MetricRegistry m_registry;
    private SampleProcessorService m_processorService;
    private Duration m_resourceShard = Duration.seconds(600000);
    private PreparedStatement m_selectStatement;
    private int m_ttl;

    @Inject
    public CassandraSampleRepository(@Named("samples.cassandra.keyspace") String str, @Named("samples.cassandra.host") String str2, @Named("samples.cassandra.port") int i, @Named("samples.cassandra.time-to-live") int i2, MetricRegistry metricRegistry, SampleProcessorService sampleProcessorService) {
        Preconditions.checkNotNull(str, "Cassandra keyspace argument");
        Preconditions.checkNotNull(str2, "Cassandra host argument");
        Preconditions.checkArgument(i2 >= 0, "Negative Cassandra column TTL");
        this.m_ttl = i2;
        this.m_session = Cluster.builder().withPort(i).addContactPoint(str2).build().connect(str);
        this.m_registry = (MetricRegistry) Preconditions.checkNotNull(metricRegistry, "metric registry argument");
        this.m_processorService = sampleProcessorService;
        Select from = QueryBuilder.select().from(SchemaConstants.T_SAMPLES);
        from.where(QueryBuilder.eq(SchemaConstants.F_PARTITION, QueryBuilder.bindMarker(SchemaConstants.F_PARTITION)));
        from.where(QueryBuilder.eq(SchemaConstants.F_RESOURCE, QueryBuilder.bindMarker(SchemaConstants.F_RESOURCE)));
        from.where(QueryBuilder.gte(SchemaConstants.F_COLLECTED, QueryBuilder.bindMarker("start")));
        from.where(QueryBuilder.lte(SchemaConstants.F_COLLECTED, QueryBuilder.bindMarker("end")));
        this.m_selectStatement = this.m_session.prepare(from);
    }

    @Override // org.opennms.newts.api.SampleRepository
    public Results<Measurement> select(Resource resource, Optional<Timestamp> optional, Optional<Timestamp> optional2, ResultDescriptor resultDescriptor, Duration duration) {
        validateSelect(optional, optional2);
        Timestamp now = optional2.isPresent() ? optional2.get() : Timestamp.now();
        Timestamp minus = optional.isPresent() ? optional.get() : now.minus(Duration.seconds(86400L));
        LOG.debug("Querying database for resource {}, from {} to {}", resource, minus.minus(duration), now);
        DriverAdapter driverAdapter = new DriverAdapter(cassandraSelect(resource, minus.minus(duration), now), resultDescriptor.getSourceNames());
        Results<Measurement> process = new ResultProcessor(resource, minus, now, resultDescriptor, duration).process(driverAdapter);
        LOG.debug("{} results returned from database", Integer.valueOf(driverAdapter.getResultCount()));
        return process;
    }

    @Override // org.opennms.newts.api.SampleRepository
    public Results<Sample> select(Resource resource, Optional<Timestamp> optional, Optional<Timestamp> optional2) {
        validateSelect(optional, optional2);
        Timestamp now = optional2.isPresent() ? optional2.get() : Timestamp.now();
        Timestamp minus = optional.isPresent() ? optional.get() : now.minus(Duration.seconds(86400L));
        LOG.debug("Querying database for resource {}, from {} to {}", resource, minus, now);
        Results<Sample> results = new Results<>();
        DriverAdapter driverAdapter = new DriverAdapter(cassandraSelect(resource, minus, now));
        Iterator<Results.Row<Sample>> it = driverAdapter.iterator();
        while (it.hasNext()) {
            results.addRow(it.next());
        }
        LOG.debug("{} results returned from database", Integer.valueOf(driverAdapter.getResultCount()));
        return results;
    }

    @Override // org.opennms.newts.api.SampleRepository
    public void insert(Collection<Sample> collection) {
        Batch unloggedBatch = QueryBuilder.unloggedBatch(new RegularStatement[0]);
        for (Sample sample : collection) {
            unloggedBatch.add(QueryBuilder.insertInto(SchemaConstants.T_SAMPLES).value(SchemaConstants.F_PARTITION, Long.valueOf(sample.getTimestamp().stepFloor(this.m_resourceShard).asSeconds())).value(SchemaConstants.F_RESOURCE, sample.getResource().getId()).value(SchemaConstants.F_COLLECTED, Long.valueOf(sample.getTimestamp().asMillis())).value(SchemaConstants.F_METRIC_NAME, sample.getName()).value("value", ValueType.decompose(sample.getValue())).value(SchemaConstants.F_ATTRIBUTES, sample.getAttributes()).using(QueryBuilder.ttl(this.m_ttl)));
        }
        this.m_session.execute(unloggedBatch);
        if (this.m_processorService != null) {
            this.m_processorService.submit(collection);
        }
    }

    private Iterator<Row> cassandraSelect(Resource resource, Timestamp timestamp, Timestamp timestamp2) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Timestamp> it = new IntervalGenerator(timestamp.stepFloor(this.m_resourceShard), timestamp2.stepFloor(this.m_resourceShard), this.m_resourceShard).iterator();
        while (it.hasNext()) {
            Timestamp next = it.next();
            BoundStatement bind = this.m_selectStatement.bind(new Object[0]);
            bind.setInt(SchemaConstants.F_PARTITION, (int) next.asSeconds());
            bind.setString(SchemaConstants.F_RESOURCE, resource.getId());
            bind.setDate("start", timestamp.asDate());
            bind.setDate("end", timestamp2.asDate());
            newArrayList.add(this.m_session.executeAsync(bind));
        }
        return new ConcurrentResultWrapper(newArrayList);
    }

    private void validateSelect(Optional<Timestamp> optional, Optional<Timestamp> optional2) {
        if (optional.isPresent() && optional2.isPresent() && optional.get().gt(optional2.get())) {
            throw new IllegalArgumentException("start time must be less than end time");
        }
    }

    void setResourceShard(Duration duration) {
        this.m_resourceShard = duration;
    }
}
