/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.features.kafka.producer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.opennms.features.kafka.producer.AlarmEqualityChecker;
import org.opennms.features.kafka.producer.NodeCache;
import org.opennms.features.kafka.producer.ProtobufMapper;
import org.opennms.features.kafka.producer.datasync.KafkaAlarmDataSync;
import org.opennms.features.kafka.producer.model.OpennmsModelProtos;
import org.opennms.netmgt.alarmd.api.AlarmLifecycleListener;
import org.opennms.netmgt.events.api.EventListener;
import org.opennms.netmgt.events.api.EventSubscriptionService;
import org.opennms.netmgt.model.OnmsAlarm;
import org.opennms.netmgt.model.OnmsNode;
import org.opennms.netmgt.xml.event.Event;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;

public class OpennmsKafkaProducer
implements AlarmLifecycleListener,
EventListener {
    private static final Logger LOG = LoggerFactory.getLogger(OpennmsKafkaProducer.class);
    private static final RateLimitedLog RATE_LIMITED_LOGGER = RateLimitedLog.withRateLimit((Logger)LOG).maxRate(5).every(Duration.standardSeconds((long)30L)).build();
    public static final String KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client";
    private static final ExpressionParser SPEL_PARSER = new SpelExpressionParser();
    private final ProtobufMapper protobufMapper;
    private final NodeCache nodeCache;
    private final ConfigurationAdmin configAdmin;
    private final EventSubscriptionService eventSubscriptionService;
    private KafkaAlarmDataSync dataSync;
    private String eventTopic;
    private String alarmTopic;
    private String nodeTopic;
    private boolean forwardEvents;
    private boolean forwardAlarms;
    private boolean suppressIncrementalAlarms;
    private boolean forwardNodes;
    private Expression eventFilterExpression;
    private Expression alarmFilterExpression;
    private final CountDownLatch forwardedEvent = new CountDownLatch(1);
    private final CountDownLatch forwardedAlarm = new CountDownLatch(1);
    private final CountDownLatch forwardedNode = new CountDownLatch(1);
    private KafkaProducer<String, byte[]> producer;
    private final Map<String, OpennmsModelProtos.Alarm> outstandingAlarms = new ConcurrentHashMap<String, OpennmsModelProtos.Alarm>();
    private final AlarmEqualityChecker alarmEqualityChecker = AlarmEqualityChecker.with(AlarmEqualityChecker.Exclusions::defaultExclusions);
    private int kafkaSendQueueCapacity;
    private BlockingQueue<KafkaRecord> kafkaSendQueue;
    private final ExecutorService kafkaSendQueueExecutor = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "KafkaSendQueueProcessor"));

    public OpennmsKafkaProducer(ProtobufMapper protobufMapper, NodeCache nodeCache, ConfigurationAdmin configAdmin, EventSubscriptionService eventSubscriptionService) {
        this.protobufMapper = Objects.requireNonNull(protobufMapper);
        this.nodeCache = Objects.requireNonNull(nodeCache);
        this.configAdmin = Objects.requireNonNull(configAdmin);
        this.eventSubscriptionService = Objects.requireNonNull(eventSubscriptionService);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() throws IOException {
        Properties producerConfig = new Properties();
        Dictionary properties = this.configAdmin.getConfiguration(KAFKA_CLIENT_PID).getProperties();
        if (properties != null) {
            Enumeration keys = properties.keys();
            while (keys.hasMoreElements()) {
                String key = (String)keys.nextElement();
                producerConfig.put(key, properties.get(key));
            }
        }
        producerConfig.put("key.serializer", StringSerializer.class.getCanonicalName());
        producerConfig.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
        ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(null);
            this.producer = new KafkaProducer(producerConfig);
        }
        finally {
            Thread.currentThread().setContextClassLoader(currentClassLoader);
        }
        if (this.kafkaSendQueueCapacity <= 0) {
            this.kafkaSendQueueCapacity = 1000;
            LOG.info("Defaulted the 'kafkaSendQueueCapacity' to 1000 since no property was set");
        }
        this.kafkaSendQueue = new LinkedBlockingQueue<KafkaRecord>(this.kafkaSendQueueCapacity);
        this.kafkaSendQueueExecutor.execute(this::processKafkaSendQueue);
        if (this.forwardEvents) {
            this.eventSubscriptionService.addEventListener((EventListener)this);
        }
    }

    public void destroy() {
        this.kafkaSendQueueExecutor.shutdownNow();
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
        if (this.forwardEvents) {
            this.eventSubscriptionService.removeEventListener((EventListener)this);
        }
    }

    private void forwardEvent(Event event) {
        boolean shouldForwardEvent = true;
        if (this.eventFilterExpression != null) {
            try {
                shouldForwardEvent = (Boolean)this.eventFilterExpression.getValue((Object)event, Boolean.class);
            }
            catch (Exception e) {
                LOG.error("Event filter '{}' failed to return a result for event: {}. The event will be forwarded anyways.", new Object[]{this.eventFilterExpression.getExpressionString(), event.toStringSimple(), e});
            }
        }
        if (!shouldForwardEvent) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Event {} not forwarded due to event filter: {}", (Object)event.toStringSimple(), (Object)this.eventFilterExpression.getExpressionString());
            }
            return;
        }
        if (this.forwardNodes && event.getNodeid() != null && event.getNodeid() != 0L) {
            this.maybeUpdateNode(event.getNodeid());
        }
        this.sendRecord(() -> {
            OpennmsModelProtos.Event mappedEvent = this.protobufMapper.toEvent(event).build();
            LOG.debug("Sending event with UEI: {}", (Object)mappedEvent.getUei());
            return new ProducerRecord(this.eventTopic, (Object)mappedEvent.getUei(), (Object)mappedEvent.toByteArray());
        }, recordMetadata -> this.forwardedEvent.countDown());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean shouldForwardAlarm(OnmsAlarm alarm) {
        if (this.alarmFilterExpression != null) {
            OpennmsKafkaProducer opennmsKafkaProducer = this;
            synchronized (opennmsKafkaProducer) {
                try {
                    boolean shouldForwardAlarm = (Boolean)this.alarmFilterExpression.getValue((Object)alarm, Boolean.class);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Alarm {} not forwarded due to event filter: {}", (Object)alarm, (Object)this.alarmFilterExpression.getExpressionString());
                    }
                    return shouldForwardAlarm;
                }
                catch (Exception e) {
                    LOG.error("Alarm filter '{}' failed to return a result for event: {}. The alarm will be forwarded anyways.", new Object[]{this.alarmFilterExpression.getExpressionString(), alarm, e});
                }
            }
        }
        return true;
    }

    private boolean isIncrementalAlarm(String reductionKey, OnmsAlarm alarm) {
        OpennmsModelProtos.Alarm existingAlarm = this.outstandingAlarms.get(reductionKey);
        return existingAlarm != null && this.alarmEqualityChecker.equalsExcludingOnFirst(this.protobufMapper.toAlarm(alarm), existingAlarm);
    }

    private void recordIncrementalAlarm(String reductionKey, OnmsAlarm alarm) {
        this.outstandingAlarms.put(reductionKey, AlarmEqualityChecker.Exclusions.defaultExclusions(this.protobufMapper.toAlarm(alarm)).build());
    }

    private void updateAlarm(String reductionKey, OnmsAlarm alarm) {
        if (alarm == null) {
            this.outstandingAlarms.remove(reductionKey);
            this.sendRecord(() -> {
                LOG.debug("Deleting alarm with reduction key: {}", (Object)reductionKey);
                return new ProducerRecord(this.alarmTopic, (Object)reductionKey, null);
            }, recordMetadata -> this.forwardedAlarm.countDown());
            return;
        }
        if (!this.shouldForwardAlarm(alarm)) {
            return;
        }
        if (this.suppressIncrementalAlarms && this.isIncrementalAlarm(reductionKey, alarm)) {
            return;
        }
        if (this.forwardNodes && alarm.getNodeId() != null) {
            this.maybeUpdateNode(alarm.getNodeId().intValue());
        }
        this.sendRecord(() -> {
            OpennmsModelProtos.Alarm mappedAlarm = this.protobufMapper.toAlarm(alarm).build();
            LOG.debug("Sending alarm with reduction key: {}", (Object)reductionKey);
            if (this.suppressIncrementalAlarms) {
                this.recordIncrementalAlarm(reductionKey, alarm);
            }
            return new ProducerRecord(this.alarmTopic, (Object)reductionKey, (Object)mappedAlarm.toByteArray());
        }, recordMetadata -> this.forwardedAlarm.countDown());
    }

    private void maybeUpdateNode(long nodeId) {
        this.nodeCache.triggerIfNeeded(nodeId, node -> {
            String nodeCriteria = node != null && node.getForeignSource() != null && node.getForeignId() != null ? String.format("%s:%s", node.getForeignSource(), node.getForeignId()) : Long.toString(nodeId);
            if (node == null) {
                this.sendRecord(() -> {
                    LOG.debug("Deleting node with criteria: {}", (Object)nodeCriteria);
                    return new ProducerRecord(this.nodeTopic, (Object)nodeCriteria, null);
                });
                return;
            }
            this.sendRecord(() -> {
                OpennmsModelProtos.Node mappedNode = this.protobufMapper.toNode((OnmsNode)node).build();
                LOG.debug("Sending node with criteria: {}", (Object)nodeCriteria);
                return new ProducerRecord(this.nodeTopic, (Object)nodeCriteria, (Object)mappedNode.toByteArray());
            }, recordMetadata -> this.forwardedNode.countDown());
        });
    }

    private void sendRecord(Callable<ProducerRecord<String, byte[]>> callable) {
        this.sendRecord(callable, null);
    }

    private void sendRecord(Callable<ProducerRecord<String, byte[]>> callable, Consumer<RecordMetadata> callback) {
        ProducerRecord<String, byte[]> record;
        if (this.producer == null) {
            return;
        }
        try {
            record = callable.call();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (!this.kafkaSendQueue.offer(new KafkaRecord(record, callback))) {
            RATE_LIMITED_LOGGER.warn("Dropped a Kafka record due to queue capacity being full.");
        }
    }

    private void processKafkaSendQueue() {
        block4: while (true) {
            try {
                while (true) {
                    KafkaRecord kafkaRecord = this.kafkaSendQueue.take();
                    ProducerRecord<String, byte[]> producerRecord = kafkaRecord.getProducerRecord();
                    Consumer<RecordMetadata> consumer = kafkaRecord.getConsumer();
                    try {
                        this.producer.send(producerRecord, (recordMetadata, e) -> {
                            if (e != null) {
                                LOG.warn("Failed to send record to producer: {}.", (Object)producerRecord, (Object)e);
                                return;
                            }
                            if (consumer != null) {
                                consumer.accept(recordMetadata);
                            }
                        });
                        continue block4;
                    }
                    catch (RuntimeException e2) {
                        LOG.warn("Failed to send record to producer: {}.", producerRecord, (Object)e2);
                        continue;
                    }
                    break;
                }
            }
            catch (InterruptedException ignore) {
                return;
            }
        }
    }

    public void handleAlarmSnapshot(List<OnmsAlarm> alarms) {
        if (!this.forwardAlarms || this.dataSync == null) {
            return;
        }
        Set reductionKeysInSnapshot = alarms.stream().map(OnmsAlarm::getReductionKey).collect(Collectors.toSet());
        this.outstandingAlarms.keySet().removeIf(reductionKey -> !reductionKeysInSnapshot.contains(reductionKey));
        this.dataSync.handleAlarmSnapshot(alarms);
    }

    public void handleNewOrUpdatedAlarm(OnmsAlarm alarm) {
        if (!this.forwardAlarms) {
            return;
        }
        this.updateAlarm(alarm.getReductionKey(), alarm);
    }

    public void handleDeletedAlarm(int alarmId, String reductionKey) {
        if (!this.forwardAlarms) {
            return;
        }
        this.handleDeletedAlarm(reductionKey);
    }

    public void handleDeletedAlarm(String reductionKey) {
        this.updateAlarm(reductionKey, null);
    }

    public String getName() {
        return OpennmsKafkaProducer.class.getName();
    }

    public void onEvent(Event event) {
        this.forwardEvent(event);
    }

    public void setEventTopic(String eventTopic) {
        this.eventTopic = eventTopic;
        this.forwardEvents = !Strings.isNullOrEmpty((String)eventTopic);
    }

    public void setAlarmTopic(String alarmTopic) {
        this.alarmTopic = alarmTopic;
        this.forwardAlarms = !Strings.isNullOrEmpty((String)alarmTopic);
    }

    public void setNodeTopic(String nodeTopic) {
        this.nodeTopic = nodeTopic;
        this.forwardNodes = !Strings.isNullOrEmpty((String)nodeTopic);
    }

    public void setEventFilter(String eventFilter) {
        this.eventFilterExpression = Strings.isNullOrEmpty((String)eventFilter) ? null : SPEL_PARSER.parseExpression(eventFilter);
    }

    public void setAlarmFilter(String alarmFilter) {
        this.alarmFilterExpression = Strings.isNullOrEmpty((String)alarmFilter) ? null : SPEL_PARSER.parseExpression(alarmFilter);
    }

    public OpennmsKafkaProducer setDataSync(KafkaAlarmDataSync dataSync) {
        this.dataSync = dataSync;
        return this;
    }

    public boolean isForwardingAlarms() {
        return this.forwardAlarms;
    }

    public CountDownLatch getEventForwardedLatch() {
        return this.forwardedEvent;
    }

    public CountDownLatch getAlarmForwardedLatch() {
        return this.forwardedAlarm;
    }

    public CountDownLatch getNodeForwardedLatch() {
        return this.forwardedNode;
    }

    public void setSuppressIncrementalAlarms(boolean suppressIncrementalAlarms) {
        this.suppressIncrementalAlarms = suppressIncrementalAlarms;
    }

    @VisibleForTesting
    KafkaAlarmDataSync getDataSync() {
        return this.dataSync;
    }

    public void setKafkaSendQueueCapacity(int kafkaSendQueueCapacity) {
        this.kafkaSendQueueCapacity = kafkaSendQueueCapacity;
    }

    private static final class KafkaRecord {
        private final ProducerRecord<String, byte[]> producerRecord;
        private final Consumer<RecordMetadata> consumer;

        KafkaRecord(ProducerRecord<String, byte[]> producerRecord, Consumer<RecordMetadata> consumer) {
            this.producerRecord = producerRecord;
            this.consumer = consumer;
        }

        ProducerRecord<String, byte[]> getProducerRecord() {
            return this.producerRecord;
        }

        Consumer<RecordMetadata> getConsumer() {
            return this.consumer;
        }
    }
}

