/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.netmgt.telemetry.protocols.bmp.adapter.openbmp;

import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opennms.netmgt.telemetry.protocols.bmp.adapter.openbmp.BmpMessageHandler;
import org.opennms.netmgt.telemetry.protocols.bmp.adapter.openbmp.proto.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BmpKafkaProducer
implements BmpMessageHandler {
    private static final Logger LOG = LoggerFactory.getLogger(BmpKafkaProducer.class);
    private final String topicPrefix;
    private final KafkaProducer<String, String> producer;

    public BmpKafkaProducer(String topicPrefix, Map<String, Object> kafkaConfig) {
        this.topicPrefix = topicPrefix != null ? String.format("%s.", topicPrefix) : "";
        this.producer = BmpKafkaProducer.buildProducer(kafkaConfig);
    }

    private static KafkaProducer<String, String> buildProducer(Map<String, Object> kafkaConfig) {
        kafkaConfig.putIfAbsent("batch.size", 100);
        kafkaConfig.putIfAbsent("max.request.size", 1000000);
        kafkaConfig.putIfAbsent("retries", 2);
        kafkaConfig.putIfAbsent("retry.backoff.ms", 100);
        return new KafkaProducer(kafkaConfig, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    }

    @Override
    public void handle(Message message) {
        StringBuffer buffer = new StringBuffer();
        message.serialize(buffer);
        String topic = this.topicPrefix + message.getType().getTopic();
        ProducerRecord record = new ProducerRecord(topic, (Object)message.getCollectorHashId(), (Object)buffer.toString());
        this.producer.send(record, (meta, err) -> {
            if (err != null) {
                LOG.warn("Failed to send OpenBMP message", (Throwable)err);
            } else {
                LOG.trace("Send OpenBMP message: {} = {}@{}", new Object[]{meta.topic(), meta.offset(), meta.partition()});
            }
        });
    }

    @Override
    public void close() {
        this.producer.close();
    }
}

