package org.opennms.netmgt.telemetry.ipc;

import java.util.Objects;
import org.opennms.core.ipc.sink.api.AggregationPolicy;
import org.opennms.core.ipc.sink.api.AsyncPolicy;
import org.opennms.core.ipc.sink.xml.AbstractXmlSinkModule;
import org.opennms.netmgt.dao.api.DistPollerDao;
import org.opennms.netmgt.telemetry.listeners.api.TelemetryMessage;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/opennms/netmgt/telemetry/ipc/TelemetrySinkModule.class */
public class TelemetrySinkModule extends AbstractXmlSinkModule<TelemetryMessage, TelemetryMessageLogDTO> {
    private static final String MODULE_ID_PREFIX = "Telemetry-";
    private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private static final int DEFAULT_BATCH_SIZE = 1000;
    private static final int DEFAULT_BATCH_INTERVAL_MS = 500;
    private static final int DEFAULT_QUEUE_SIZE = 10000;

    @Autowired
    private DistPollerDao distPollerDao;
    private final ProtocolDefinition protocol;
    private final String moduleId;

    public TelemetrySinkModule(ProtocolDefinition protocolDefinition) {
        super(TelemetryMessageLogDTO.class);
        this.protocol = (ProtocolDefinition) Objects.requireNonNull(protocolDefinition);
        this.moduleId = MODULE_ID_PREFIX + protocolDefinition.getName();
    }

    public String getId() {
        return this.moduleId;
    }

    public int getNumConsumerThreads() {
        return this.protocol.getNumThreads().orElse(Integer.valueOf(DEFAULT_NUM_THREADS)).intValue();
    }

    public AggregationPolicy<TelemetryMessage, TelemetryMessageLogDTO> getAggregationPolicy() {
        final String id = this.distPollerDao.whoami().getId();
        final String location = this.distPollerDao.whoami().getLocation();
        return new AggregationPolicy<TelemetryMessage, TelemetryMessageLogDTO>() { // from class: org.opennms.netmgt.telemetry.ipc.TelemetrySinkModule.1
            public int getCompletionSize() {
                return TelemetrySinkModule.this.protocol.getBatchSize().orElse(Integer.valueOf(TelemetrySinkModule.DEFAULT_BATCH_SIZE)).intValue();
            }

            public int getCompletionIntervalMs() {
                return TelemetrySinkModule.this.protocol.getBatchIntervalMs().orElse(Integer.valueOf(TelemetrySinkModule.DEFAULT_BATCH_INTERVAL_MS)).intValue();
            }

            public Object key(TelemetryMessage telemetryMessage) {
                return telemetryMessage.getSource();
            }

            public TelemetryMessageLogDTO aggregate(TelemetryMessageLogDTO telemetryMessageLogDTO, TelemetryMessage telemetryMessage) {
                if (telemetryMessageLogDTO == null) {
                    telemetryMessageLogDTO = new TelemetryMessageLogDTO(location, id, telemetryMessage.getSource());
                }
                telemetryMessageLogDTO.getMessages().add(new TelemetryMessageDTO(telemetryMessage.getBuffer()));
                return telemetryMessageLogDTO;
            }
        };
    }

    public AsyncPolicy getAsyncPolicy() {
        return new AsyncPolicy() { // from class: org.opennms.netmgt.telemetry.ipc.TelemetrySinkModule.2
            public int getQueueSize() {
                return TelemetrySinkModule.this.protocol.getQueueSize().orElse(Integer.valueOf(TelemetrySinkModule.DEFAULT_QUEUE_SIZE)).intValue();
            }

            public int getNumThreads() {
                return TelemetrySinkModule.this.protocol.getNumThreads().orElse(Integer.valueOf(TelemetrySinkModule.DEFAULT_NUM_THREADS)).intValue();
            }

            public boolean isBlockWhenFull() {
                return true;
            }
        };
    }

    public DistPollerDao getDistPollerDao() {
        return this.distPollerDao;
    }

    public void setDistPollerDao(DistPollerDao distPollerDao) {
        this.distPollerDao = distPollerDao;
    }
}
