package org.opennms.netmgt.telemetry.ipc;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Objects;
import org.opennms.core.ipc.sink.api.AggregationPolicy;
import org.opennms.core.ipc.sink.api.AsyncPolicy;
import org.opennms.core.ipc.sink.api.SinkModule;
import org.opennms.netmgt.dao.api.DistPollerDao;
import org.opennms.netmgt.telemetry.config.api.Protocol;
import org.opennms.netmgt.telemetry.ipc.TelemetryProtos;
import org.opennms.netmgt.telemetry.listeners.api.TelemetryMessage;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/opennms/netmgt/telemetry/ipc/TelemetrySinkModule.class */
public class TelemetrySinkModule implements SinkModule<TelemetryMessage, TelemetryProtos.TelemetryMessageLog> {
    private static final String MODULE_ID_PREFIX = "Telemetry-";
    private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private static final int DEFAULT_BATCH_SIZE = 1000;
    private static final int DEFAULT_BATCH_INTERVAL_MS = 500;
    private static final int DEFAULT_QUEUE_SIZE = 10000;

    @Autowired
    private DistPollerDao distPollerDao;
    private final Protocol protocol;
    private final String moduleId;

    public TelemetrySinkModule(Protocol protocol) {
        this.protocol = (Protocol) Objects.requireNonNull(protocol);
        this.moduleId = MODULE_ID_PREFIX + protocol.getName();
    }

    public String getId() {
        return this.moduleId;
    }

    public int getNumConsumerThreads() {
        return ((Integer) this.protocol.getNumThreads().orElse(Integer.valueOf(DEFAULT_NUM_THREADS))).intValue();
    }

    public byte[] marshal(TelemetryProtos.TelemetryMessageLog telemetryMessageLog) {
        return telemetryMessageLog.toByteArray();
    }

    /* renamed from: unmarshal, reason: merged with bridge method [inline-methods] */
    public TelemetryProtos.TelemetryMessageLog m65unmarshal(byte[] bArr) {
        try {
            return TelemetryProtos.TelemetryMessageLog.parseFrom(bArr);
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public byte[] marshalSingleMessage(TelemetryMessage telemetryMessage) {
        return marshal(((TelemetryProtos.TelemetryMessageLog.Builder) getAggregationPolicy().aggregate((Object) null, telemetryMessage)).build());
    }

    /* renamed from: unmarshalSingleMessage, reason: merged with bridge method [inline-methods] */
    public TelemetryMessage m64unmarshalSingleMessage(byte[] bArr) {
        TelemetryProtos.TelemetryMessageLog m65unmarshal = m65unmarshal(bArr);
        return new TelemetryMessage(new InetSocketAddress(m65unmarshal.getSourceAddress(), m65unmarshal.getSourcePort()), ByteBuffer.wrap(m65unmarshal.getMessage(0).getByteArray()), new Date(m65unmarshal.getMessage(0).getTimestamp()));
    }

    public AggregationPolicy<TelemetryMessage, TelemetryProtos.TelemetryMessageLog, TelemetryProtos.TelemetryMessageLog.Builder> getAggregationPolicy() {
        final String id = this.distPollerDao.whoami().getId();
        final String location = this.distPollerDao.whoami().getLocation();
        return new AggregationPolicy<TelemetryMessage, TelemetryProtos.TelemetryMessageLog, TelemetryProtos.TelemetryMessageLog.Builder>() { // from class: org.opennms.netmgt.telemetry.ipc.TelemetrySinkModule.1
            public int getCompletionSize() {
                return ((Integer) TelemetrySinkModule.this.protocol.getBatchSize().orElse(Integer.valueOf(TelemetrySinkModule.DEFAULT_BATCH_SIZE))).intValue();
            }

            public int getCompletionIntervalMs() {
                return ((Integer) TelemetrySinkModule.this.protocol.getBatchIntervalMs().orElse(Integer.valueOf(TelemetrySinkModule.DEFAULT_BATCH_INTERVAL_MS))).intValue();
            }

            public Object key(TelemetryMessage telemetryMessage) {
                return telemetryMessage.getSource();
            }

            public TelemetryProtos.TelemetryMessageLog.Builder aggregate(TelemetryProtos.TelemetryMessageLog.Builder builder, TelemetryMessage telemetryMessage) {
                if (builder == null) {
                    builder = TelemetryProtos.TelemetryMessageLog.newBuilder().setLocation(location).setSystemId(id).setSourceAddress(telemetryMessage.getSource().getHostString()).setSourcePort(telemetryMessage.getSource().getPort());
                }
                builder.addMessage(TelemetryProtos.TelemetryMessage.newBuilder().setTimestamp(telemetryMessage.getReceivedAt().getTime()).setBytes(ByteString.copyFrom(telemetryMessage.getBuffer())).m25build());
                return builder;
            }

            public TelemetryProtos.TelemetryMessageLog build(TelemetryProtos.TelemetryMessageLog.Builder builder) {
                return builder.build();
            }
        };
    }

    public AsyncPolicy getAsyncPolicy() {
        return new AsyncPolicy() { // from class: org.opennms.netmgt.telemetry.ipc.TelemetrySinkModule.2
            public int getQueueSize() {
                return ((Integer) TelemetrySinkModule.this.protocol.getQueueSize().orElse(Integer.valueOf(TelemetrySinkModule.DEFAULT_QUEUE_SIZE))).intValue();
            }

            public int getNumThreads() {
                return ((Integer) TelemetrySinkModule.this.protocol.getNumThreads().orElse(Integer.valueOf(TelemetrySinkModule.DEFAULT_NUM_THREADS))).intValue();
            }

            public boolean isBlockWhenFull() {
                return true;
            }
        };
    }

    public DistPollerDao getDistPollerDao() {
        return this.distPollerDao;
    }

    public void setDistPollerDao(DistPollerDao distPollerDao) {
        this.distPollerDao = distPollerDao;
    }
}
