package org.opennms.features.kafka.producer.datasync;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.opennms.core.ipc.common.kafka.Utils;
import org.opennms.features.kafka.producer.AlarmEqualityChecker;
import org.opennms.features.kafka.producer.OpennmsKafkaProducer;
import org.opennms.features.kafka.producer.ProtobufMapper;
import org.opennms.features.kafka.producer.model.OpennmsModelProtos;
import org.opennms.netmgt.alarmd.api.AlarmCallbackStateTracker;
import org.opennms.netmgt.model.OnmsAlarm;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.class */
public class KafkaAlarmDataSync implements AlarmDataStore, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAlarmDataSync.class);
    private static final String ALARM_STORE_NAME = "alarm_store";
    public static final String KAFKA_STREAMS_PID = "org.opennms.features.kafka.producer.streams";
    private final ConfigurationAdmin configAdmin;
    private final OpennmsKafkaProducer kafkaProducer;
    private final ProtobufMapper protobufMapper;
    private String alarmTopic;
    private boolean alarmSync;
    private KafkaStreams streams;
    private ScheduledExecutorService scheduler;
    private KTable<String, byte[]> alarmBytesKtable;
    private KTable<String, OpennmsModelProtos.Alarm> alarmKtable;
    private boolean suppressIncrementalAlarms;
    private final AtomicBoolean closed = new AtomicBoolean(true);
    private boolean startWithCleanState = false;
    private final AlarmEqualityChecker alarmEqualityChecker = AlarmEqualityChecker.with(AlarmEqualityChecker.Exclusions::defaultExclusions);

    public KafkaAlarmDataSync(ConfigurationAdmin configurationAdmin, OpennmsKafkaProducer opennmsKafkaProducer, ProtobufMapper protobufMapper) {
        this.configAdmin = (ConfigurationAdmin) Objects.requireNonNull(configurationAdmin);
        this.kafkaProducer = (OpennmsKafkaProducer) Objects.requireNonNull(opennmsKafkaProducer);
        this.protobufMapper = (ProtobufMapper) Objects.requireNonNull(protobufMapper);
    }

    @Override // org.opennms.features.kafka.producer.datasync.AlarmDataStore
    public void init() throws IOException {
        if (!isEnabled()) {
            LOG.info("Alarm synchronization disabled. Skipping initialization.");
            return;
        }
        Properties loadStreamsProperties = loadStreamsProperties();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.globalTable(this.alarmTopic, Consumed.with(Serdes.String(), Serdes.ByteArray()), Materialized.as(ALARM_STORE_NAME));
        Topology build = streamsBuilder.build();
        this.streams = (KafkaStreams) Utils.runWithGivenClassLoader(() -> {
            return new KafkaStreams(build, loadStreamsProperties);
        }, KStream.class.getClassLoader());
        this.streams.setUncaughtExceptionHandler((thread, th) -> {
            LOG.error(String.format("Stream error on thread: %s", thread.getName()), th);
        });
        this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-producer-alarm-datasync-%d").build());
        this.closed.set(false);
        this.scheduler.execute(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            if (this.kafkaProducer.getAlarmForwardedLatch().await(2L, TimeUnit.MINUTES)) {
                LOG.debug("Triggered: An alarm was successfully forwarded to the topic.");
            } else {
                LOG.debug("Triggered: Timeout reached before an alarm was successfully forwarded to the topic.");
            }
            try {
                if (this.startWithCleanState) {
                    LOG.info("Performing stream state cleanup.");
                    this.streams.cleanUp();
                }
                LOG.info("Starting alarm datasync stream.");
                this.streams.start();
                LOG.info("Starting alarm datasync started.");
            } catch (StreamsException | IllegalStateException e) {
                LOG.error("Failed to start alarm datasync stream. Synchronization will not be performed.", e);
            }
            LOG.info("Waiting for alarm data store to be ready.");
            while (!this.closed.get() && !isReady()) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e2) {
                    LOG.info("Interrupted while waiting for store to be ready. Synchronization will not be performed.");
                    return;
                }
            }
            LOG.info("Alarm data store is ready!");
        } catch (InterruptedException e3) {
            LOG.info("Interrupted while waiting for alarm to be forwarded. Synchronization will not be performed.");
        }
    }

    @Override // org.opennms.features.kafka.producer.datasync.AlarmDataStore
    public void destroy() {
        this.closed.set(true);
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
        if (this.streams != null) {
            this.streams.close(2L, TimeUnit.MINUTES);
        }
    }

    @Override // org.opennms.features.kafka.producer.datasync.AlarmDataStore
    public synchronized AlarmSyncResults handleAlarmSnapshot(List<OnmsAlarm> list) {
        if (!isReady()) {
            LOG.debug("Alarm store is not ready yet. Skipping synchronization.");
            return null;
        }
        LOG.debug("Performing alarm synchronization with ktable.");
        try {
            Map<String, OpennmsModelProtos.Alarm> alarms = getAlarms();
            Set<String> keySet = alarms.keySet();
            Stream<OnmsAlarm> stream = list.stream();
            OpennmsKafkaProducer opennmsKafkaProducer = this.kafkaProducer;
            opennmsKafkaProducer.getClass();
            List list2 = (List) stream.filter(opennmsKafkaProducer::shouldForwardAlarm).collect(Collectors.toList());
            Map map = (Map) list2.stream().collect(Collectors.toMap((v0) -> {
                return v0.getReductionKey();
            }, onmsAlarm -> {
                return onmsAlarm;
            }));
            Set keySet2 = map.keySet();
            AlarmCallbackStateTracker alarmCallbackStateTracker = this.kafkaProducer.getAlarmCallbackStateTracker();
            Set set = (Set) Sets.difference(keySet, keySet2).stream().filter(str -> {
                return !alarmCallbackStateTracker.wasAlarmWithReductionKeyUpdated(str);
            }).collect(Collectors.toSet());
            set.forEach(str2 -> {
                this.kafkaProducer.handleDeletedAlarm((int) ((OpennmsModelProtos.Alarm) alarms.get(str2)).getId(), str2);
            });
            Set set2 = (Set) Sets.difference(keySet2, keySet).stream().filter(str3 -> {
                return !alarmCallbackStateTracker.wasAlarmWithReductionKeyDeleted(str3);
            }).collect(Collectors.toSet());
            set2.forEach(str4 -> {
                this.kafkaProducer.handleNewOrUpdatedAlarm((OnmsAlarm) map.get(str4));
            });
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            Sets.intersection(keySet, keySet2).forEach(str5 -> {
                if (alarmCallbackStateTracker.wasAlarmWithReductionKeyUpdated(str5)) {
                    return;
                }
                OnmsAlarm onmsAlarm2 = (OnmsAlarm) map.get(str5);
                OpennmsModelProtos.Alarm.Builder alarm = this.protobufMapper.toAlarm(onmsAlarm2);
                OpennmsModelProtos.Alarm alarm2 = (OpennmsModelProtos.Alarm) alarms.get(str5);
                OpennmsModelProtos.Alarm.Builder m395toBuilder = ((OpennmsModelProtos.Alarm) alarms.get(str5)).m395toBuilder();
                if ((!this.suppressIncrementalAlarms || this.alarmEqualityChecker.equalsExcludingOnBoth(alarm, m395toBuilder)) && (this.suppressIncrementalAlarms || Objects.equals(alarm.m431build(), alarm2))) {
                    return;
                }
                this.kafkaProducer.handleNewOrUpdatedAlarm(onmsAlarm2);
                linkedHashSet.add(str5);
            });
            AlarmSyncResults alarmSyncResults = new AlarmSyncResults(alarms, list2, map, set2, set, linkedHashSet);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Done performing alarm synchronization with the ktable for {} alarms. Executed {} updates.", Integer.valueOf(alarmSyncResults.getAlarmsInDb().size()), Integer.valueOf(alarmSyncResults.getReductionKeysAdded().size() + alarmSyncResults.getReductionKeysDeleted().size() + alarmSyncResults.getReductionKeysUpdated().size()));
                LOG.debug("Reduction keys added to ktable: {}", alarmSyncResults.getReductionKeysAdded());
                LOG.debug("Reduction keys deleted from the ktable: {}", alarmSyncResults.getReductionKeysDeleted());
                LOG.debug("Reduction keys updated in the ktable: {}", alarmSyncResults.getReductionKeysUpdated());
            }
            return alarmSyncResults;
        } catch (Exception e) {
            LOG.error("An error occurred while performing alarm synchronization with the ktable. Will try again on next callback.", e);
            return null;
        }
    }

    private Properties loadStreamsProperties() throws IOException {
        Properties properties = new Properties();
        properties.put("application.id", "alarm-datasync");
        properties.put("state.dir", Paths.get(System.getProperty("karaf.data"), "kafka").toString());
        Dictionary properties2 = this.configAdmin.getConfiguration(OpennmsKafkaProducer.KAFKA_CLIENT_PID).getProperties();
        if (properties2 != null) {
            properties.put("bootstrap.servers", properties2.get("bootstrap.servers"));
        }
        Dictionary properties3 = this.configAdmin.getConfiguration(KAFKA_STREAMS_PID).getProperties();
        if (properties3 != null) {
            Enumeration keys = properties3.keys();
            while (keys.hasMoreElements()) {
                String str = (String) keys.nextElement();
                properties.put(str, properties3.get(str));
            }
        }
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.ByteArray().getClass());
        return properties;
    }

    public void setAlarmTopic(String str) {
        this.alarmTopic = str;
    }

    public void setAlarmSync(boolean z) {
        this.alarmSync = z;
    }

    @Override // org.opennms.features.kafka.producer.datasync.AlarmDataStore
    public void setStartWithCleanState(boolean z) {
        this.startWithCleanState = z;
    }

    private ReadOnlyKeyValueStore<String, byte[]> getAlarmTableNow() throws InvalidStateStoreException {
        return (ReadOnlyKeyValueStore) this.streams.store(ALARM_STORE_NAME, QueryableStoreTypes.keyValueStore());
    }

    @Override // org.opennms.features.kafka.producer.datasync.AlarmDataStore
    public boolean isEnabled() {
        return this.kafkaProducer.isForwardingAlarms() && this.alarmSync;
    }

    @Override // org.opennms.features.kafka.producer.datasync.AlarmDataStore
    public boolean isReady() {
        try {
            getAlarmTableNow();
            return true;
        } catch (InvalidStateStoreException e) {
            return false;
        }
    }

    @Override // org.opennms.features.kafka.producer.datasync.AlarmDataStore
    public Map<String, OpennmsModelProtos.Alarm> getAlarms() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        getAlarmTableNow().all().forEachRemaining(keyValue -> {
            try {
                linkedHashMap.put(keyValue.key, keyValue.value != null ? OpennmsModelProtos.Alarm.parseFrom((byte[]) keyValue.value) : null);
            } catch (InvalidProtocolBufferException e) {
                LOG.error("Failed to parse alarm for bytes at reduction key '{}'. Alarm will be empty in map.", keyValue.key);
                linkedHashMap.put(keyValue.key, null);
            }
        });
        return linkedHashMap;
    }

    @Override // org.opennms.features.kafka.producer.datasync.AlarmDataStore
    public OpennmsModelProtos.Alarm getAlarm(String str) {
        try {
            return OpennmsModelProtos.Alarm.parseFrom((byte[]) getAlarmTableNow().get(str));
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("Failed to parse alarm for bytes at reduction key " + str, e);
        }
    }

    public void setSuppressIncrementalAlarms(boolean z) {
        this.suppressIncrementalAlarms = z;
    }
}
