package org.opennms.features.kafka.producer;

import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.opennms.features.kafka.producer.datasync.KafkaAlarmDataSync;
import org.opennms.features.kafka.producer.model.OpennmsModelProtos;
import org.opennms.netmgt.alarmd.api.AlarmLifecycleListener;
import org.opennms.netmgt.events.api.EventListener;
import org.opennms.netmgt.events.api.EventSubscriptionService;
import org.opennms.netmgt.model.OnmsAlarm;
import org.opennms.netmgt.xml.event.Event;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;

/* loaded from: input_file:org/opennms/features/kafka/producer/OpennmsKafkaProducer.class */
public class OpennmsKafkaProducer implements AlarmLifecycleListener, EventListener {
    public static final String KAFKA_CLIENT_PID = "org.opennms.features.kafka.producer.client";
    private final ProtobufMapper protobufMapper;
    private final NodeCache nodeCache;
    private final ConfigurationAdmin configAdmin;
    private final EventSubscriptionService eventSubscriptionService;
    private KafkaAlarmDataSync dataSync;
    private String eventTopic;
    private String alarmTopic;
    private String nodeTopic;
    private boolean forwardEvents;
    private boolean forwardAlarms;
    private boolean forwardNodes;
    private Expression eventFilterExpression;
    private Expression alarmFilterExpression;
    private final CountDownLatch forwardedEvent = new CountDownLatch(1);
    private final CountDownLatch forwardedAlarm = new CountDownLatch(1);
    private final CountDownLatch forwardedNode = new CountDownLatch(1);
    private KafkaProducer<String, byte[]> producer;
    private static final Logger LOG = LoggerFactory.getLogger(OpennmsKafkaProducer.class);
    private static final ExpressionParser SPEL_PARSER = new SpelExpressionParser();

    public OpennmsKafkaProducer(ProtobufMapper protobufMapper, NodeCache nodeCache, ConfigurationAdmin configurationAdmin, EventSubscriptionService eventSubscriptionService) {
        this.protobufMapper = (ProtobufMapper) Objects.requireNonNull(protobufMapper);
        this.nodeCache = (NodeCache) Objects.requireNonNull(nodeCache);
        this.configAdmin = (ConfigurationAdmin) Objects.requireNonNull(configurationAdmin);
        this.eventSubscriptionService = (EventSubscriptionService) Objects.requireNonNull(eventSubscriptionService);
    }

    public void init() throws IOException {
        Properties properties = new Properties();
        Dictionary properties2 = this.configAdmin.getConfiguration(KAFKA_CLIENT_PID).getProperties();
        if (properties2 != null) {
            Enumeration keys = properties2.keys();
            while (keys.hasMoreElements()) {
                String str = (String) keys.nextElement();
                properties.put(str, properties2.get(str));
            }
        }
        properties.put("key.serializer", StringSerializer.class.getCanonicalName());
        properties.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(null);
            this.producer = new KafkaProducer<>(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            if (this.forwardEvents) {
                this.eventSubscriptionService.addEventListener(this);
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public void destroy() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
        if (this.forwardEvents) {
            this.eventSubscriptionService.removeEventListener(this);
        }
    }

    private void forwardEvent(Event event) {
        boolean z = true;
        if (this.eventFilterExpression != null) {
            try {
                z = ((Boolean) this.eventFilterExpression.getValue(event, Boolean.class)).booleanValue();
            } catch (Exception e) {
                LOG.error("Event filter '{}' failed to return a result for event: {}. The event will be forwarded anyways.", new Object[]{this.eventFilterExpression.getExpressionString(), event.toStringSimple(), e});
            }
        }
        if (!z) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Event {} not forwarded due to event filter: {}", event.toStringSimple(), this.eventFilterExpression.getExpressionString());
            }
        } else {
            if (this.forwardNodes && event.getNodeid() != null && event.getNodeid().longValue() != 0) {
                maybeUpdateNode(event.getNodeid().longValue());
            }
            sendRecord(() -> {
                OpennmsModelProtos.Event build = this.protobufMapper.toEvent(event).build();
                LOG.debug("Sending event with UEI: {}", build.getUei());
                return new ProducerRecord(this.eventTopic, build.getUei(), build.toByteArray());
            }, recordMetadata -> {
                this.forwardedEvent.countDown();
            });
        }
    }

    public boolean shouldForwardAlarm(OnmsAlarm onmsAlarm) {
        boolean booleanValue;
        if (this.alarmFilterExpression == null) {
            return true;
        }
        synchronized (this) {
            try {
                booleanValue = ((Boolean) this.alarmFilterExpression.getValue(onmsAlarm, Boolean.class)).booleanValue();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Alarm {} not forwarded due to event filter: {}", onmsAlarm, this.alarmFilterExpression.getExpressionString());
                }
            } catch (Exception e) {
                LOG.error("Alarm filter '{}' failed to return a result for event: {}. The alarm will be forwarded anyways.", new Object[]{this.alarmFilterExpression.getExpressionString(), onmsAlarm, e});
                return true;
            }
        }
        return booleanValue;
    }

    private void updateAlarm(String str, OnmsAlarm onmsAlarm) {
        if (onmsAlarm == null) {
            sendRecord(() -> {
                LOG.debug("Deleting alarm with reduction key: {}", str);
                return new ProducerRecord(this.alarmTopic, str, (Object) null);
            }, recordMetadata -> {
                this.forwardedAlarm.countDown();
            });
        } else if (shouldForwardAlarm(onmsAlarm)) {
            if (this.forwardNodes && onmsAlarm.getNodeId() != null) {
                maybeUpdateNode(onmsAlarm.getNodeId().intValue());
            }
            sendRecord(() -> {
                OpennmsModelProtos.Alarm m45build = this.protobufMapper.toAlarm(onmsAlarm).m45build();
                LOG.debug("Sending alarm with reduction key: {}", str);
                return new ProducerRecord(this.alarmTopic, str, m45build.toByteArray());
            }, recordMetadata2 -> {
                this.forwardedAlarm.countDown();
            });
        }
    }

    private void maybeUpdateNode(long j) {
        this.nodeCache.triggerIfNeeded(j, onmsNode -> {
            String l = (onmsNode == null || onmsNode.getForeignSource() == null || onmsNode.getForeignId() == null) ? Long.toString(j) : String.format("%s:%s", onmsNode.getForeignSource(), onmsNode.getForeignId());
            if (onmsNode == null) {
                String str = l;
                sendRecord(() -> {
                    LOG.debug("Deleting node with criteria: {}", str);
                    return new ProducerRecord(this.nodeTopic, str, (Object) null);
                });
            } else {
                String str2 = l;
                sendRecord(() -> {
                    OpennmsModelProtos.Node build = this.protobufMapper.toNode(onmsNode).build();
                    LOG.debug("Sending node with criteria: {}", str2);
                    return new ProducerRecord(this.nodeTopic, str2, build.toByteArray());
                }, recordMetadata -> {
                    this.forwardedNode.countDown();
                });
            }
        });
    }

    private void sendRecord(Callable<ProducerRecord<String, byte[]>> callable) {
        sendRecord(callable, null);
    }

    private void sendRecord(Callable<ProducerRecord<String, byte[]>> callable, Consumer<RecordMetadata> consumer) {
        if (this.producer == null) {
            return;
        }
        try {
            ProducerRecord<String, byte[]> call = callable.call();
            this.producer.send(call, (recordMetadata, exc) -> {
                if (exc != null) {
                    LOG.warn("Failed to send record to producer: {}.", call, exc);
                } else if (consumer != null) {
                    consumer.accept(recordMetadata);
                }
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void handleAlarmSnapshot(List<OnmsAlarm> list) {
        if (!this.forwardAlarms || this.dataSync == null) {
            return;
        }
        this.dataSync.handleAlarmSnapshot(list);
    }

    public void handleNewOrUpdatedAlarm(OnmsAlarm onmsAlarm) {
        if (this.forwardAlarms) {
            updateAlarm(onmsAlarm.getReductionKey(), onmsAlarm);
        }
    }

    public void handleDeletedAlarm(int i, String str) {
        if (this.forwardAlarms) {
            handleDeletedAlarm(str);
        }
    }

    public void handleDeletedAlarm(String str) {
        updateAlarm(str, null);
    }

    public String getName() {
        return OpennmsKafkaProducer.class.getName();
    }

    public void onEvent(Event event) {
        forwardEvent(event);
    }

    public void setEventTopic(String str) {
        this.eventTopic = str;
        this.forwardEvents = !Strings.isNullOrEmpty(str);
    }

    public void setAlarmTopic(String str) {
        this.alarmTopic = str;
        this.forwardAlarms = !Strings.isNullOrEmpty(str);
    }

    public void setNodeTopic(String str) {
        this.nodeTopic = str;
        this.forwardNodes = !Strings.isNullOrEmpty(str);
    }

    public void setEventFilter(String str) {
        if (Strings.isNullOrEmpty(str)) {
            this.eventFilterExpression = null;
        } else {
            this.eventFilterExpression = SPEL_PARSER.parseExpression(str);
        }
    }

    public void setAlarmFilter(String str) {
        if (Strings.isNullOrEmpty(str)) {
            this.alarmFilterExpression = null;
        } else {
            this.alarmFilterExpression = SPEL_PARSER.parseExpression(str);
        }
    }

    public OpennmsKafkaProducer setDataSync(KafkaAlarmDataSync kafkaAlarmDataSync) {
        this.dataSync = kafkaAlarmDataSync;
        return this;
    }

    public boolean isForwardingAlarms() {
        return this.forwardAlarms;
    }

    public CountDownLatch getEventForwardedLatch() {
        return this.forwardedEvent;
    }

    public CountDownLatch getAlarmForwardedLatch() {
        return this.forwardedAlarm;
    }

    public CountDownLatch getNodeForwardedLatch() {
        return this.forwardedNode;
    }
}
