/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.telemetry.ipc;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.opennms.core.ipc.sink.api.AggregationPolicy;
import org.opennms.core.ipc.sink.api.AsyncPolicy;
import org.opennms.core.ipc.sink.api.SinkModule;
import org.opennms.netmgt.dao.api.DistPollerDao;
import org.opennms.netmgt.telemetry.config.api.Protocol;
import org.opennms.netmgt.telemetry.ipc.TelemetryProtos;
import org.opennms.netmgt.telemetry.listeners.api.TelemetryMessage;
import org.springframework.beans.factory.annotation.Autowired;

public class TelemetrySinkModule
implements SinkModule<TelemetryMessage, TelemetryProtos.TelemetryMessageLog> {
    private static final String MODULE_ID_PREFIX = "Telemetry-";
    private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private static final int DEFAULT_BATCH_SIZE = 1000;
    private static final int DEFAULT_BATCH_INTERVAL_MS = 500;
    private static final int DEFAULT_QUEUE_SIZE = 10000;
    @Autowired
    private DistPollerDao distPollerDao;
    private final Protocol protocol;
    private final String moduleId;

    public TelemetrySinkModule(Protocol protocol) {
        this.protocol = Objects.requireNonNull(protocol);
        this.moduleId = MODULE_ID_PREFIX + protocol.getName();
    }

    public String getId() {
        return this.moduleId;
    }

    public int getNumConsumerThreads() {
        return this.protocol.getNumThreads().orElse(DEFAULT_NUM_THREADS);
    }

    public byte[] marshal(TelemetryProtos.TelemetryMessageLog message) {
        return message.toByteArray();
    }

    public TelemetryProtos.TelemetryMessageLog unmarshal(byte[] bytes) {
        try {
            return TelemetryProtos.TelemetryMessageLog.parseFrom(bytes);
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
    }

    public AggregationPolicy<TelemetryMessage, TelemetryProtos.TelemetryMessageLog, TelemetryProtos.TelemetryMessageLog.Builder> getAggregationPolicy() {
        final String systemId = this.distPollerDao.whoami().getId();
        final String systemLocation = this.distPollerDao.whoami().getLocation();
        return new AggregationPolicy<TelemetryMessage, TelemetryProtos.TelemetryMessageLog, TelemetryProtos.TelemetryMessageLog.Builder>(){

            public int getCompletionSize() {
                return TelemetrySinkModule.this.protocol.getBatchSize().orElse(1000);
            }

            public int getCompletionIntervalMs() {
                return TelemetrySinkModule.this.protocol.getBatchIntervalMs().orElse(500);
            }

            public Object key(TelemetryMessage telemetryMessage) {
                return telemetryMessage.getSource();
            }

            public TelemetryProtos.TelemetryMessageLog.Builder aggregate(TelemetryProtos.TelemetryMessageLog.Builder accumulator, TelemetryMessage message) {
                if (accumulator == null) {
                    accumulator = TelemetryProtos.TelemetryMessageLog.newBuilder().setLocation(systemLocation).setSystemId(systemId).setSourceAddress(message.getSource().getHostString()).setSourcePort(message.getSource().getPort());
                }
                TelemetryProtos.TelemetryMessage messageDto = TelemetryProtos.TelemetryMessage.newBuilder().setTimestamp(message.getReceivedAt().getTime()).setBytes(ByteString.copyFrom((ByteBuffer)message.getBuffer())).build();
                accumulator.addMessage(messageDto);
                return accumulator;
            }

            public TelemetryProtos.TelemetryMessageLog build(TelemetryProtos.TelemetryMessageLog.Builder accumulator) {
                return accumulator.build();
            }
        };
    }

    public AsyncPolicy getAsyncPolicy() {
        return new AsyncPolicy(){

            public int getQueueSize() {
                return TelemetrySinkModule.this.protocol.getQueueSize().orElse(10000);
            }

            public int getNumThreads() {
                return TelemetrySinkModule.this.protocol.getNumThreads().orElse(DEFAULT_NUM_THREADS);
            }

            public boolean isBlockWhenFull() {
                return true;
            }
        };
    }

    public DistPollerDao getDistPollerDao() {
        return this.distPollerDao;
    }

    public void setDistPollerDao(DistPollerDao distPollerDao) {
        this.distPollerDao = distPollerDao;
    }
}

