/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.features.kafka.producer.datasync;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.opennms.features.kafka.producer.OpennmsKafkaProducer;
import org.opennms.features.kafka.producer.ProtobufMapper;
import org.opennms.features.kafka.producer.datasync.AlarmDataStore;
import org.opennms.features.kafka.producer.datasync.AlarmSyncResults;
import org.opennms.features.kafka.producer.model.OpennmsModelProtos;
import org.opennms.netmgt.dao.api.AlarmDao;
import org.opennms.netmgt.model.OnmsAlarm;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionOperations;

public class KafkaAlarmDataSync
implements AlarmDataStore,
Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAlarmDataSync.class);
    private static final String ALARM_STORE_NAME = "alarm_store";
    public static final String KAFKA_STREAMS_PID = "org.opennms.features.kafka.producer.streams";
    private final ConfigurationAdmin configAdmin;
    private final OpennmsKafkaProducer kafkaProducer;
    private final TransactionOperations transactionOperations;
    private final AlarmDao alarmDao;
    private final ProtobufMapper protobufMapper;
    private final AtomicBoolean closed = new AtomicBoolean(true);
    private String alarmTopic;
    private long alarmSyncIntervalMs;
    private KafkaStreams streams;
    private ScheduledExecutorService scheduler;
    private KTable<String, byte[]> alarmBytesKtable;
    private KTable<String, OpennmsModelProtos.Alarm> alarmKtable;

    public KafkaAlarmDataSync(ConfigurationAdmin configAdmin, OpennmsKafkaProducer kafkaProducer, AlarmDao alarmDao, ProtobufMapper protobufMapper, TransactionOperations transactionOperations) {
        this.configAdmin = Objects.requireNonNull(configAdmin);
        this.kafkaProducer = Objects.requireNonNull(kafkaProducer);
        this.alarmDao = Objects.requireNonNull(alarmDao);
        this.protobufMapper = Objects.requireNonNull(protobufMapper);
        this.transactionOperations = Objects.requireNonNull(transactionOperations);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() throws IOException {
        if (!this.kafkaProducer.isForwardingAlarms() || this.alarmSyncIntervalMs <= 0L) {
            LOG.info("Alarm synchronization disabled.");
            return;
        }
        Properties streamProperties = this.loadStreamsProperties();
        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable alarmBytesKtable = builder.globalTable(this.alarmTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.ByteArray()), Materialized.as((String)ALARM_STORE_NAME));
        Topology topology = builder.build();
        ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(KStream.class.getClassLoader());
            this.streams = new KafkaStreams(topology, streamProperties);
        }
        finally {
            Thread.currentThread().setContextClassLoader(currentClassLoader);
        }
        this.streams.setUncaughtExceptionHandler((t, e) -> LOG.error(String.format("Stream error on thread: %s", t.getName()), e));
        this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-producer-alarm-datasync-%d").build());
        this.closed.set(false);
        this.scheduler.execute(this);
    }

    @Override
    public void run() {
        try {
            if (this.kafkaProducer.getAlarmForwardedLatch().await(2L, TimeUnit.MINUTES)) {
                LOG.debug("Triggered: An alarm was successfully forwarded to the topic.");
            } else {
                LOG.debug("Triggered: Timeout reached before an alarm was successfully forwarded to the topic.");
            }
        }
        catch (InterruptedException e) {
            LOG.info("Interrupted while waiting for alarm to be forwarded. Synchronization will not be performed.");
            return;
        }
        try {
            LOG.info("Starting alarm datasync stream.");
            this.streams.start();
            LOG.info("Starting alarm datasync started.");
        }
        catch (IllegalStateException | StreamsException e) {
            LOG.error("Failed to start alarm datasync stream. Synchronization will not be performed.", e);
        }
        LOG.info("Waiting for alarm data store to be ready.");
        while (!this.closed.get() && !this.isReady()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                LOG.info("Interrupted while waiting for store to be ready. Synchronization will not be performed.");
                return;
            }
        }
        LOG.info("Alarm data store is ready!");
        LOG.info("Scheduling periodic alarm synchronization every {}ms", (Object)this.alarmSyncIntervalMs);
        this.scheduler.scheduleWithFixedDelay(this::doSynchronizeAlarmsWithDb, Math.min(TimeUnit.MINUTES.toMillis(1L), this.alarmSyncIntervalMs), this.alarmSyncIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void destroy() {
        this.closed.set(true);
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
        if (this.streams != null) {
            this.streams.close(2L, TimeUnit.MINUTES);
        }
    }

    private void doSynchronizeAlarmsWithDb() {
        LOG.debug("Performing alarm synchronization with ktable.");
        try {
            AlarmSyncResults results = this.synchronizeAlarmsWithDb();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Done performing alarm synchronization with the ktable. Executed {} updates.", (Object)(results.getReductionKeysAdded().size() + results.getReductionKeysDeleted().size() + results.getReductionKeysUpdated().size()));
                LOG.debug("Reduction keys added to ktable: {}", results.getReductionKeysAdded());
                LOG.debug("Reduction keys deleted from the ktable: {}", results.getReductionKeysDeleted());
                LOG.debug("Reduction keys updated in the ktable: {}", results.getReductionKeysAdded());
            }
        }
        catch (Exception e) {
            LOG.error("An error occurred while performing alarm synchronization with the ktable. Will try again after {} ms.", (Object)this.alarmSyncIntervalMs, (Object)e);
        }
    }

    @Override
    public synchronized AlarmSyncResults synchronizeAlarmsWithDb() {
        Map<String, OpennmsModelProtos.Alarm> alarmsInKtableByReductionKey = this.getAlarms();
        return (AlarmSyncResults)this.transactionOperations.execute(status -> {
            Set reductionKeysInKtable = alarmsInKtableByReductionKey.keySet();
            List<OnmsAlarm> alarmsInDb = this.alarmDao.findAll().stream().filter(this.kafkaProducer::shouldForwardAlarm).collect(Collectors.toList());
            Map<String, OnmsAlarm> alarmsInDbByReductionKey = alarmsInDb.stream().collect(Collectors.toMap(OnmsAlarm::getReductionKey, a -> a));
            Set<String> reductionKeysInDb = alarmsInDbByReductionKey.keySet();
            Sets.SetView reductionKeysNotInDb = Sets.difference(reductionKeysInKtable, reductionKeysInDb);
            reductionKeysNotInDb.forEach(this.kafkaProducer::handleDeletedAlarm);
            Sets.SetView reductionKeysNotInKtable = Sets.difference(reductionKeysInDb, reductionKeysInKtable);
            reductionKeysNotInKtable.forEach(rkey -> this.kafkaProducer.handleNewOrUpdatedAlarm((OnmsAlarm)alarmsInDbByReductionKey.get(rkey)));
            LinkedHashSet<String> reductionKeysUpdated = new LinkedHashSet<String>();
            Sets.SetView commonReductionKeys = Sets.intersection(reductionKeysInKtable, reductionKeysInDb);
            commonReductionKeys.forEach(rkey -> {
                OpennmsModelProtos.Alarm alarmFromKtable;
                OnmsAlarm dbAlarm = (OnmsAlarm)alarmsInDbByReductionKey.get(rkey);
                OpennmsModelProtos.Alarm mappedDbAlarm = this.protobufMapper.toAlarm(dbAlarm).build();
                if (!Objects.equals(mappedDbAlarm, alarmFromKtable = (OpennmsModelProtos.Alarm)alarmsInKtableByReductionKey.get(rkey))) {
                    this.kafkaProducer.handleNewOrUpdatedAlarm(dbAlarm);
                    reductionKeysUpdated.add((String)rkey);
                }
            });
            return new AlarmSyncResults(alarmsInKtableByReductionKey, alarmsInDb, alarmsInDbByReductionKey, (Set<String>)reductionKeysNotInKtable, (Set<String>)reductionKeysNotInDb, reductionKeysUpdated);
        });
    }

    private Properties loadStreamsProperties() throws IOException {
        Dictionary properties;
        Properties streamsProperties = new Properties();
        streamsProperties.put("application.id", "alarm-datasync");
        Path kafkaDir = Paths.get(System.getProperty("karaf.data"), "kafka");
        streamsProperties.put("state.dir", kafkaDir.toString());
        Dictionary clientProperties = this.configAdmin.getConfiguration("org.opennms.features.kafka.producer.client").getProperties();
        if (clientProperties != null) {
            streamsProperties.put("bootstrap.servers", clientProperties.get("bootstrap.servers"));
        }
        if ((properties = this.configAdmin.getConfiguration(KAFKA_STREAMS_PID).getProperties()) != null) {
            Enumeration keys = properties.keys();
            while (keys.hasMoreElements()) {
                String key = (String)keys.nextElement();
                streamsProperties.put(key, properties.get(key));
            }
        }
        streamsProperties.put("default.key.serde", Serdes.String().getClass());
        streamsProperties.put("default.value.serde", Serdes.ByteArray().getClass());
        return streamsProperties;
    }

    public void setAlarmTopic(String alarmTopic) {
        this.alarmTopic = alarmTopic;
    }

    public void setAlarmSyncIntervalMs(long intervalMs) {
        this.alarmSyncIntervalMs = intervalMs;
    }

    private ReadOnlyKeyValueStore<String, byte[]> getAlarmTableNow() throws InvalidStateStoreException {
        return (ReadOnlyKeyValueStore)this.streams.store(ALARM_STORE_NAME, QueryableStoreTypes.keyValueStore());
    }

    @Override
    public boolean isEnabled() {
        return !this.kafkaProducer.isForwardingAlarms() || this.alarmSyncIntervalMs <= 0L;
    }

    @Override
    public boolean isReady() {
        try {
            this.getAlarmTableNow();
            return true;
        }
        catch (InvalidStateStoreException ignored) {
            return false;
        }
    }

    @Override
    public Map<String, OpennmsModelProtos.Alarm> getAlarms() {
        LinkedHashMap<String, OpennmsModelProtos.Alarm> alarmsByReductionKey = new LinkedHashMap<String, OpennmsModelProtos.Alarm>();
        this.getAlarmTableNow().all().forEachRemaining(kv -> {
            try {
                alarmsByReductionKey.put((String)kv.key, kv.value != null ? OpennmsModelProtos.Alarm.parseFrom((byte[])kv.value) : null);
            }
            catch (InvalidProtocolBufferException e) {
                LOG.error("Failed to parse alarm for bytes at reduction key '{}'. Alarm will be empty in map.", kv.key);
                alarmsByReductionKey.put((String)kv.key, (OpennmsModelProtos.Alarm)null);
            }
        });
        return alarmsByReductionKey;
    }

    @Override
    public OpennmsModelProtos.Alarm getAlarm(String reductionKey) {
        byte[] alarmBytes = (byte[])this.getAlarmTableNow().get((Object)reductionKey);
        try {
            return OpennmsModelProtos.Alarm.parseFrom(alarmBytes);
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("Failed to parse alarm for bytes at reduction key " + reductionKey, e);
        }
    }
}

