/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.features.kafka.producer;

import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opennms.features.kafka.producer.NodeCache;
import org.opennms.features.kafka.producer.ProtobufMapper;
import org.opennms.features.kafka.producer.model.OpennmsModelProtos;
import org.opennms.netmgt.alarmd.api.AlarmLifecycleListener;
import org.opennms.netmgt.alarmd.api.AlarmLifecycleSubscriptionService;
import org.opennms.netmgt.events.api.EventListener;
import org.opennms.netmgt.events.api.EventSubscriptionService;
import org.opennms.netmgt.model.OnmsAlarm;
import org.opennms.netmgt.model.OnmsNode;
import org.opennms.netmgt.xml.event.Event;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;

public class OpennmsKafkaProducer
implements AlarmLifecycleListener,
EventListener {
    private static final Logger LOG = LoggerFactory.getLogger(OpennmsKafkaProducer.class);
    public static final String KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client";
    private static final ExpressionParser SPEL_PARSER = new SpelExpressionParser();
    private final ProtobufMapper protobufMapper;
    private final NodeCache nodeCache;
    private final ConfigurationAdmin configAdmin;
    private final EventSubscriptionService eventSubscriptionService;
    private final AlarmLifecycleSubscriptionService alarmLifecycleSubscriptionService;
    private String eventTopic;
    private String alarmTopic;
    private String nodeTopic;
    private boolean forwardEvents;
    private boolean forwardAlarms;
    private boolean forwardNodes;
    private Expression eventFilterExpression;
    private Expression alarmFilterExpression;
    private final CountDownLatch forwardedEvent = new CountDownLatch(1);
    private final CountDownLatch forwardedAlarm = new CountDownLatch(1);
    private final CountDownLatch forwardedNode = new CountDownLatch(1);
    private KafkaProducer<String, byte[]> producer;

    public OpennmsKafkaProducer(ProtobufMapper protobufMapper, NodeCache nodeCache, ConfigurationAdmin configAdmin, EventSubscriptionService eventSubscriptionService, AlarmLifecycleSubscriptionService alarmLifecycleSubscriptionService) {
        this.protobufMapper = Objects.requireNonNull(protobufMapper);
        this.nodeCache = Objects.requireNonNull(nodeCache);
        this.configAdmin = Objects.requireNonNull(configAdmin);
        this.eventSubscriptionService = Objects.requireNonNull(eventSubscriptionService);
        this.alarmLifecycleSubscriptionService = Objects.requireNonNull(alarmLifecycleSubscriptionService);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() throws IOException {
        Properties producerConfig = new Properties();
        Dictionary properties = this.configAdmin.getConfiguration(KAFKA_CLIENT_PID).getProperties();
        if (properties != null) {
            Enumeration keys = properties.keys();
            while (keys.hasMoreElements()) {
                String key = (String)keys.nextElement();
                producerConfig.put(key, properties.get(key));
            }
        }
        producerConfig.put("key.serializer", StringSerializer.class.getCanonicalName());
        producerConfig.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
        ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(null);
            this.producer = new KafkaProducer(producerConfig);
        }
        finally {
            Thread.currentThread().setContextClassLoader(currentClassLoader);
        }
        if (this.forwardEvents) {
            this.eventSubscriptionService.addEventListener((EventListener)this);
        }
        if (this.forwardAlarms) {
            this.alarmLifecycleSubscriptionService.addAlarmLifecyleListener((AlarmLifecycleListener)this);
        }
    }

    public void destroy() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
        if (this.forwardEvents) {
            this.eventSubscriptionService.removeEventListener((EventListener)this);
        }
        if (this.forwardAlarms) {
            this.alarmLifecycleSubscriptionService.removeAlarmLifecycleListener((AlarmLifecycleListener)this);
        }
    }

    private void forwardEvent(Event event) {
        boolean shouldForwardEvent = true;
        if (this.eventFilterExpression != null) {
            try {
                shouldForwardEvent = (Boolean)this.eventFilterExpression.getValue((Object)event, Boolean.class);
            }
            catch (Exception e) {
                LOG.error("Event filter '{}' failed to return a result for event: {}. The event will be forwarded anyways.", new Object[]{this.eventFilterExpression.getExpressionString(), event.toStringSimple(), e});
            }
        }
        if (!shouldForwardEvent) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Event {} not forwarded due to event filter: {}", (Object)event.toStringSimple(), (Object)this.eventFilterExpression.getExpressionString());
            }
            return;
        }
        if (this.forwardNodes && event.getNodeid() != null && event.getNodeid() != 0L) {
            this.maybeUpdateNode(event.getNodeid());
        }
        this.sendRecord(() -> {
            OpennmsModelProtos.Event mappedEvent = this.protobufMapper.toEvent(event).build();
            LOG.debug("Sending event with UEI: {}", (Object)mappedEvent.getUei());
            return new ProducerRecord(this.eventTopic, (Object)mappedEvent.getUei(), (Object)mappedEvent.toByteArray());
        }, recordMetadata -> this.forwardedEvent.countDown());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean shouldForwardAlarm(OnmsAlarm alarm) {
        if (this.alarmFilterExpression != null) {
            OpennmsKafkaProducer opennmsKafkaProducer = this;
            synchronized (opennmsKafkaProducer) {
                try {
                    boolean shouldForwardAlarm = (Boolean)this.alarmFilterExpression.getValue((Object)alarm, Boolean.class);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Alarm {} not forwarded due to event filter: {}", (Object)alarm, (Object)this.alarmFilterExpression.getExpressionString());
                    }
                    return shouldForwardAlarm;
                }
                catch (Exception e) {
                    LOG.error("Alarm filter '{}' failed to return a result for event: {}. The alarm will be forwarded anyways.", new Object[]{this.alarmFilterExpression.getExpressionString(), alarm, e});
                }
            }
        }
        return true;
    }

    private void updateAlarm(String reductionKey, OnmsAlarm alarm) {
        if (alarm == null) {
            this.sendRecord(() -> {
                LOG.debug("Deleting alarm with reduction key: {}", (Object)reductionKey);
                return new ProducerRecord(this.alarmTopic, (Object)reductionKey, null);
            }, recordMetadata -> this.forwardedAlarm.countDown());
            return;
        }
        if (!this.shouldForwardAlarm(alarm)) {
            return;
        }
        if (this.forwardNodes && alarm.getNodeId() != null) {
            this.maybeUpdateNode(alarm.getNodeId().intValue());
        }
        this.sendRecord(() -> {
            OpennmsModelProtos.Alarm mappedAlarm = this.protobufMapper.toAlarm(alarm).build();
            LOG.debug("Sending alarm with reduction key: {}", (Object)reductionKey);
            return new ProducerRecord(this.alarmTopic, (Object)reductionKey, (Object)mappedAlarm.toByteArray());
        }, recordMetadata -> this.forwardedAlarm.countDown());
    }

    private void maybeUpdateNode(long nodeId) {
        this.nodeCache.triggerIfNeeded(nodeId, node -> {
            String nodeCriteria = node != null && node.getForeignSource() != null && node.getForeignId() != null ? String.format("%s:%s", node.getForeignSource(), node.getForeignId()) : Long.toString(nodeId);
            if (node == null) {
                this.sendRecord(() -> {
                    LOG.debug("Deleting node with criteria: {}", (Object)nodeCriteria);
                    return new ProducerRecord(this.nodeTopic, (Object)nodeCriteria, null);
                });
                return;
            }
            this.sendRecord(() -> {
                OpennmsModelProtos.Node mappedNode = this.protobufMapper.toNode((OnmsNode)node).build();
                LOG.debug("Sending node with criteria: {}", (Object)nodeCriteria);
                return new ProducerRecord(this.nodeTopic, (Object)nodeCriteria, (Object)mappedNode.toByteArray());
            }, recordMetadata -> this.forwardedNode.countDown());
        });
    }

    private void sendRecord(Callable<ProducerRecord<String, byte[]>> callable) {
        this.sendRecord(callable, null);
    }

    private void sendRecord(Callable<ProducerRecord<String, byte[]>> callable, Consumer<RecordMetadata> callback) {
        ProducerRecord<String, byte[]> record;
        if (this.producer == null) {
            return;
        }
        try {
            record = callable.call();
        }
        catch (Exception e2) {
            throw new RuntimeException(e2);
        }
        this.producer.send(record, (recordMetadata, e) -> {
            if (e != null) {
                LOG.warn("Failed to send record to producer: {}.", (Object)record, (Object)e);
                return;
            }
            if (callback != null) {
                callback.accept(recordMetadata);
            }
        });
    }

    public void handleNewOrUpdatedAlarm(OnmsAlarm alarm) {
        this.updateAlarm(alarm.getReductionKey(), alarm);
    }

    public void handleDeletedAlarm(int alarmId, String reductionKey) {
        this.handleDeletedAlarm(reductionKey);
    }

    public void handleDeletedAlarm(String reductionKey) {
        this.updateAlarm(reductionKey, null);
    }

    public String getName() {
        return OpennmsKafkaProducer.class.getName();
    }

    public void onEvent(Event event) {
        this.forwardEvent(event);
    }

    public void setEventTopic(String eventTopic) {
        this.eventTopic = eventTopic;
        this.forwardEvents = !Strings.isNullOrEmpty((String)eventTopic);
    }

    public void setAlarmTopic(String alarmTopic) {
        this.alarmTopic = alarmTopic;
        this.forwardAlarms = !Strings.isNullOrEmpty((String)alarmTopic);
    }

    public void setNodeTopic(String nodeTopic) {
        this.nodeTopic = nodeTopic;
        this.forwardNodes = !Strings.isNullOrEmpty((String)nodeTopic);
    }

    public void setEventFilter(String eventFilter) {
        this.eventFilterExpression = Strings.isNullOrEmpty((String)eventFilter) ? null : SPEL_PARSER.parseExpression(eventFilter);
    }

    public void setAlarmFilter(String alarmFilter) {
        this.alarmFilterExpression = Strings.isNullOrEmpty((String)alarmFilter) ? null : SPEL_PARSER.parseExpression(alarmFilter);
    }

    public boolean isForwardingAlarms() {
        return this.forwardAlarms;
    }

    public CountDownLatch getEventForwardedLatch() {
        return this.forwardedEvent;
    }

    public CountDownLatch getAlarmForwardedLatch() {
        return this.forwardedAlarm;
    }

    public CountDownLatch getNodeForwardedLatch() {
        return this.forwardedNode;
    }
}

