/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.features.kafka.producer.datasync;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.opennms.core.ipc.common.kafka.Utils;
import org.opennms.features.kafka.producer.AlarmEqualityChecker;
import org.opennms.features.kafka.producer.OpennmsKafkaProducer;
import org.opennms.features.kafka.producer.ProtobufMapper;
import org.opennms.features.kafka.producer.datasync.AlarmDataStore;
import org.opennms.features.kafka.producer.datasync.AlarmSyncResults;
import org.opennms.features.kafka.producer.model.OpennmsModelProtos;
import org.opennms.netmgt.alarmd.api.AlarmCallbackStateTracker;
import org.opennms.netmgt.model.OnmsAlarm;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAlarmDataSync
implements AlarmDataStore,
Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAlarmDataSync.class);
    private static final String ALARM_STORE_NAME = "alarm_store";
    public static final String KAFKA_STREAMS_PID = "org.opennms.features.kafka.producer.streams";
    private final ConfigurationAdmin configAdmin;
    private final OpennmsKafkaProducer kafkaProducer;
    private final ProtobufMapper protobufMapper;
    private final AtomicBoolean closed = new AtomicBoolean(true);
    private String alarmTopic;
    private boolean alarmSync;
    private boolean startWithCleanState = false;
    private KafkaStreams streams;
    private ScheduledExecutorService scheduler;
    private KTable<String, byte[]> alarmBytesKtable;
    private KTable<String, OpennmsModelProtos.Alarm> alarmKtable;
    private final AlarmEqualityChecker alarmEqualityChecker = AlarmEqualityChecker.with(AlarmEqualityChecker.Exclusions::defaultExclusions);
    private boolean suppressIncrementalAlarms;

    public KafkaAlarmDataSync(ConfigurationAdmin configAdmin, OpennmsKafkaProducer kafkaProducer, ProtobufMapper protobufMapper) {
        this.configAdmin = Objects.requireNonNull(configAdmin);
        this.kafkaProducer = Objects.requireNonNull(kafkaProducer);
        this.protobufMapper = Objects.requireNonNull(protobufMapper);
    }

    @Override
    public void init() throws IOException {
        if (!this.isEnabled()) {
            LOG.info("Alarm synchronization disabled. Skipping initialization.");
            return;
        }
        Properties streamProperties = this.loadStreamsProperties();
        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable alarmBytesKtable = builder.globalTable(this.alarmTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.ByteArray()), Materialized.as((String)ALARM_STORE_NAME));
        Topology topology = builder.build();
        this.streams = (KafkaStreams)Utils.runWithGivenClassLoader(() -> new KafkaStreams(topology, streamProperties), (ClassLoader)KStream.class.getClassLoader());
        this.streams.setUncaughtExceptionHandler((t, e) -> LOG.error(String.format("Stream error on thread: %s", t.getName()), e));
        this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-producer-alarm-datasync-%d").build());
        this.closed.set(false);
        this.scheduler.execute(this);
    }

    @Override
    public void run() {
        try {
            if (this.kafkaProducer.getAlarmForwardedLatch().await(2L, TimeUnit.MINUTES)) {
                LOG.debug("Triggered: An alarm was successfully forwarded to the topic.");
            } else {
                LOG.debug("Triggered: Timeout reached before an alarm was successfully forwarded to the topic.");
            }
        }
        catch (InterruptedException e) {
            LOG.info("Interrupted while waiting for alarm to be forwarded. Synchronization will not be performed.");
            return;
        }
        try {
            if (this.startWithCleanState) {
                LOG.info("Performing stream state cleanup.");
                this.streams.cleanUp();
            }
            LOG.info("Starting alarm datasync stream.");
            this.streams.start();
            LOG.info("Starting alarm datasync started.");
        }
        catch (IllegalStateException | StreamsException e) {
            LOG.error("Failed to start alarm datasync stream. Synchronization will not be performed.", e);
        }
        LOG.info("Waiting for alarm data store to be ready.");
        while (!this.closed.get() && !this.isReady()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                LOG.info("Interrupted while waiting for store to be ready. Synchronization will not be performed.");
                return;
            }
        }
        LOG.info("Alarm data store is ready!");
    }

    @Override
    public void destroy() {
        this.closed.set(true);
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
        if (this.streams != null) {
            this.streams.close(2L, TimeUnit.MINUTES);
        }
    }

    @Override
    public synchronized AlarmSyncResults handleAlarmSnapshot(List<OnmsAlarm> alarms) {
        AlarmSyncResults results;
        if (!this.isReady()) {
            LOG.debug("Alarm store is not ready yet. Skipping synchronization.");
            return null;
        }
        LOG.debug("Performing alarm synchronization with ktable.");
        try {
            Map<String, OpennmsModelProtos.Alarm> alarmsInKtableByReductionKey = this.getAlarms();
            Set<String> reductionKeysInKtable = alarmsInKtableByReductionKey.keySet();
            List<OnmsAlarm> alarmsInDb = alarms.stream().filter(this.kafkaProducer::shouldForwardAlarm).collect(Collectors.toList());
            Map<String, OnmsAlarm> alarmsInDbByReductionKey = alarmsInDb.stream().collect(Collectors.toMap(OnmsAlarm::getReductionKey, a -> a));
            Set<String> reductionKeysInDb = alarmsInDbByReductionKey.keySet();
            AlarmCallbackStateTracker stateTracker = this.kafkaProducer.getAlarmCallbackStateTracker();
            Set<String> reductionKeysNotInDb = Sets.difference(reductionKeysInKtable, reductionKeysInDb).stream().filter(reductionKey -> !stateTracker.wasAlarmWithReductionKeyUpdated(reductionKey)).collect(Collectors.toSet());
            reductionKeysNotInDb.forEach(rkey -> this.kafkaProducer.handleDeletedAlarm((int)((OpennmsModelProtos.Alarm)alarmsInKtableByReductionKey.get(rkey)).getId(), (String)rkey));
            Set<String> reductionKeysNotInKtable = Sets.difference(reductionKeysInDb, reductionKeysInKtable).stream().filter(reductionKey -> !stateTracker.wasAlarmWithReductionKeyDeleted(reductionKey)).collect(Collectors.toSet());
            reductionKeysNotInKtable.forEach(rkey -> this.kafkaProducer.handleNewOrUpdatedAlarm((OnmsAlarm)alarmsInDbByReductionKey.get(rkey)));
            LinkedHashSet<String> reductionKeysUpdated = new LinkedHashSet<String>();
            Sets.SetView commonReductionKeys = Sets.intersection(reductionKeysInKtable, reductionKeysInDb);
            commonReductionKeys.forEach(rkey -> {
                if (stateTracker.wasAlarmWithReductionKeyUpdated(rkey)) {
                    return;
                }
                OnmsAlarm dbAlarm = (OnmsAlarm)alarmsInDbByReductionKey.get(rkey);
                OpennmsModelProtos.Alarm.Builder mappedDbAlarm = this.protobufMapper.toAlarm(dbAlarm);
                OpennmsModelProtos.Alarm alarmFromKtable = (OpennmsModelProtos.Alarm)alarmsInKtableByReductionKey.get(rkey);
                OpennmsModelProtos.Alarm.Builder alarmBuilderFromKtable = ((OpennmsModelProtos.Alarm)alarmsInKtableByReductionKey.get(rkey)).toBuilder();
                if (this.suppressIncrementalAlarms && !this.alarmEqualityChecker.equalsExcludingOnBoth(mappedDbAlarm, alarmBuilderFromKtable) || !this.suppressIncrementalAlarms && !Objects.equals(mappedDbAlarm.build(), alarmFromKtable)) {
                    this.kafkaProducer.handleNewOrUpdatedAlarm(dbAlarm);
                    reductionKeysUpdated.add((String)rkey);
                }
            });
            results = new AlarmSyncResults(alarmsInKtableByReductionKey, alarmsInDb, alarmsInDbByReductionKey, reductionKeysNotInKtable, reductionKeysNotInDb, reductionKeysUpdated);
        }
        catch (Exception e) {
            LOG.error("An error occurred while performing alarm synchronization with the ktable. Will try again on next callback.", (Throwable)e);
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Done performing alarm synchronization with the ktable for {} alarms. Executed {} updates.", (Object)results.getAlarmsInDb().size(), (Object)(results.getReductionKeysAdded().size() + results.getReductionKeysDeleted().size() + results.getReductionKeysUpdated().size()));
            LOG.debug("Reduction keys added to ktable: {}", results.getReductionKeysAdded());
            LOG.debug("Reduction keys deleted from the ktable: {}", results.getReductionKeysDeleted());
            LOG.debug("Reduction keys updated in the ktable: {}", results.getReductionKeysUpdated());
        }
        return results;
    }

    private Properties loadStreamsProperties() throws IOException {
        Dictionary properties;
        Properties streamsProperties = new Properties();
        streamsProperties.put("application.id", "alarm-datasync");
        Path kafkaDir = Paths.get(System.getProperty("karaf.data"), "kafka");
        streamsProperties.put("state.dir", kafkaDir.toString());
        Dictionary clientProperties = this.configAdmin.getConfiguration("org.opennms.features.kafka.producer.client").getProperties();
        if (clientProperties != null) {
            streamsProperties.put("bootstrap.servers", clientProperties.get("bootstrap.servers"));
        }
        if ((properties = this.configAdmin.getConfiguration(KAFKA_STREAMS_PID).getProperties()) != null) {
            Enumeration keys = properties.keys();
            while (keys.hasMoreElements()) {
                String key = (String)keys.nextElement();
                streamsProperties.put(key, properties.get(key));
            }
        }
        streamsProperties.put("default.key.serde", Serdes.String().getClass());
        streamsProperties.put("default.value.serde", Serdes.ByteArray().getClass());
        return streamsProperties;
    }

    public void setAlarmTopic(String alarmTopic) {
        this.alarmTopic = alarmTopic;
    }

    public void setAlarmSync(boolean alarmSync) {
        this.alarmSync = alarmSync;
    }

    @Override
    public void setStartWithCleanState(boolean startWithCleanState) {
        this.startWithCleanState = startWithCleanState;
    }

    private ReadOnlyKeyValueStore<String, byte[]> getAlarmTableNow() throws InvalidStateStoreException {
        return (ReadOnlyKeyValueStore)this.streams.store(ALARM_STORE_NAME, QueryableStoreTypes.keyValueStore());
    }

    @Override
    public boolean isEnabled() {
        return this.kafkaProducer.isForwardingAlarms() && this.alarmSync;
    }

    @Override
    public boolean isReady() {
        try {
            this.getAlarmTableNow();
            return true;
        }
        catch (InvalidStateStoreException ignored) {
            return false;
        }
    }

    @Override
    public Map<String, OpennmsModelProtos.Alarm> getAlarms() {
        LinkedHashMap<String, OpennmsModelProtos.Alarm> alarmsByReductionKey = new LinkedHashMap<String, OpennmsModelProtos.Alarm>();
        this.getAlarmTableNow().all().forEachRemaining(kv -> {
            try {
                alarmsByReductionKey.put((String)kv.key, kv.value != null ? OpennmsModelProtos.Alarm.parseFrom((byte[])kv.value) : null);
            }
            catch (InvalidProtocolBufferException e) {
                LOG.error("Failed to parse alarm for bytes at reduction key '{}'. Alarm will be empty in map.", kv.key);
                alarmsByReductionKey.put((String)kv.key, (OpennmsModelProtos.Alarm)null);
            }
        });
        return alarmsByReductionKey;
    }

    @Override
    public OpennmsModelProtos.Alarm getAlarm(String reductionKey) {
        byte[] alarmBytes = (byte[])this.getAlarmTableNow().get((Object)reductionKey);
        try {
            return OpennmsModelProtos.Alarm.parseFrom(alarmBytes);
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("Failed to parse alarm for bytes at reduction key " + reductionKey, e);
        }
    }

    public void setSuppressIncrementalAlarms(boolean suppressIncrementalAlarms) {
        this.suppressIncrementalAlarms = suppressIncrementalAlarms;
    }
}

