package org.opennms.netmgt.telemetry.protocols.bmp.adapter.openbmp;

import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opennms.netmgt.telemetry.protocols.bmp.adapter.openbmp.proto.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/netmgt/telemetry/protocols/bmp/adapter/openbmp/BmpKafkaProducer.class */
public class BmpKafkaProducer implements BmpMessageHandler {
    private static final Logger LOG = LoggerFactory.getLogger(BmpKafkaProducer.class);
    private final String topicPrefix;
    private final KafkaProducer<String, String> producer;

    public BmpKafkaProducer(String str, Map<String, Object> map) {
        if (str != null) {
            this.topicPrefix = String.format("%s.", str);
        } else {
            this.topicPrefix = "";
        }
        this.producer = buildProducer(map);
    }

    private static KafkaProducer<String, String> buildProducer(Map<String, Object> map) {
        map.putIfAbsent("batch.size", 100);
        map.putIfAbsent("max.request.size", 1000000);
        map.putIfAbsent("retries", 2);
        map.putIfAbsent("retry.backoff.ms", 100);
        return new KafkaProducer<>(map, new StringSerializer(), new StringSerializer());
    }

    @Override // org.opennms.netmgt.telemetry.protocols.bmp.adapter.openbmp.BmpMessageHandler
    public void handle(Message message, Context context) {
        StringBuffer stringBuffer = new StringBuffer();
        message.serialize(stringBuffer);
        this.producer.send(new ProducerRecord(this.topicPrefix + message.getType().getTopic(), message.getCollectorHashId(), stringBuffer.toString()), (recordMetadata, exc) -> {
            if (exc != null) {
                LOG.warn("Failed to send OpenBMP message", exc);
            } else {
                LOG.trace("Send OpenBMP message: {} = {}@{}", new Object[]{recordMetadata.topic(), Long.valueOf(recordMetadata.offset()), Integer.valueOf(recordMetadata.partition())});
            }
        });
    }

    @Override // org.opennms.netmgt.telemetry.protocols.bmp.adapter.openbmp.BmpMessageHandler
    public void close() {
        this.producer.close();
    }
}
