package org.opennms.netmgt.telemetry.protocols.collection;

import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.io.File;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.opennms.core.sysprops.SystemProperties;
import org.opennms.netmgt.collection.api.CollectionAgent;
import org.opennms.netmgt.collection.api.CollectionSet;
import org.opennms.netmgt.collection.api.PersisterFactory;
import org.opennms.netmgt.collection.api.ServiceParameters;
import org.opennms.netmgt.filter.api.FilterDao;
import org.opennms.netmgt.rrd.RrdRepository;
import org.opennms.netmgt.telemetry.api.adapter.TelemetryMessageLog;
import org.opennms.netmgt.telemetry.api.adapter.TelemetryMessageLogEntry;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
import org.opennms.netmgt.telemetry.config.api.PackageDefinition;
import org.opennms.netmgt.threshd.api.ThresholdInitializationException;
import org.opennms.netmgt.threshd.api.ThresholdingService;
import org.opennms.netmgt.threshd.api.ThresholdingSession;
import org.osgi.framework.BundleContext;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/opennms/netmgt/telemetry/protocols/collection/AbstractCollectionAdapter.class */
public abstract class AbstractCollectionAdapter extends AbstractAdapter {
    private static final ServiceParameters EMPTY_SERVICE_PARAMETERS = new ServiceParameters(Collections.emptyMap());
    private final LoadingCache<CacheKey, Optional<PackageDefinition>> cache;
    protected BundleContext bundleContext;

    @Autowired
    private FilterDao filterDao;

    @Autowired
    private PersisterFactory persisterFactory;

    @Autowired
    private ThresholdingService thresholdingService;
    private AtomicBoolean isThresholdingEnabled;
    private Integer thresholdingSessionTtlMinutes;
    private Cache<String, ThresholdingSession> agentThresholdingSessions;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opennms/netmgt/telemetry/protocols/collection/AbstractCollectionAdapter$CacheKey.class */
    public static class CacheKey {
        private String protocol;
        private String hostAddress;

        public CacheKey(String str, String str2) {
            this.protocol = (String) Objects.requireNonNull(str);
            this.hostAddress = (String) Objects.requireNonNull(str2);
        }

        public String getProtocol() {
            return this.protocol;
        }

        public String getHostAddress() {
            return this.hostAddress;
        }

        public int hashCode() {
            return Objects.hash(this.hostAddress, this.protocol);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            CacheKey cacheKey = (CacheKey) obj;
            return Objects.equals(this.hostAddress, cacheKey.hostAddress) && Objects.equals(this.protocol, cacheKey.protocol);
        }
    }

    public AbstractCollectionAdapter(AdapterDefinition adapterDefinition, MetricRegistry metricRegistry) {
        super(adapterDefinition, metricRegistry);
        this.cache = CacheBuilder.newBuilder().maximumSize(SystemProperties.getLong("org.opennms.features.telemetry.cache.ipAddressFilter.maximumSize", 1000).longValue()).expireAfterWrite(SystemProperties.getLong("org.opennms.features.telemetry.cache.ipAddressFilter.expireAfterWrite", 120).longValue(), TimeUnit.SECONDS).build(new CacheLoader<CacheKey, Optional<PackageDefinition>>() { // from class: org.opennms.netmgt.telemetry.protocols.collection.AbstractCollectionAdapter.1
            public Optional<PackageDefinition> load(CacheKey cacheKey) {
                for (PackageDefinition packageDefinition : AbstractCollectionAdapter.this.adapterConfig.getPackages()) {
                    if (packageDefinition.getFilterRule() != null && !AbstractCollectionAdapter.this.filterDao.isValid(cacheKey.getHostAddress(), packageDefinition.getFilterRule())) {
                    }
                    return Optional.of(packageDefinition);
                }
                return Optional.empty();
            }
        });
        this.isThresholdingEnabled = new AtomicBoolean(true);
        this.thresholdingSessionTtlMinutes = SystemProperties.getInteger("org.opennms.netmgt.telemetry.protocols.collection.thresholdingSessionTtlMinutes", 1440);
        this.agentThresholdingSessions = CacheBuilder.newBuilder().expireAfterAccess(this.thresholdingSessionTtlMinutes.intValue(), TimeUnit.MINUTES).build();
    }

    public abstract Stream<CollectionSetWithAgent> handleCollectionMessage(TelemetryMessageLogEntry telemetryMessageLogEntry, TelemetryMessageLog telemetryMessageLog);

    @Override // org.opennms.netmgt.telemetry.protocols.collection.AbstractAdapter
    public final void handleMessage(TelemetryMessageLogEntry telemetryMessageLogEntry, TelemetryMessageLog telemetryMessageLog) {
        handleCollectionMessage(telemetryMessageLogEntry, telemetryMessageLog).forEach(collectionSetWithAgent -> {
            PackageDefinition packageFor = getPackageFor(this.adapterConfig, collectionSetWithAgent.getAgent());
            if (packageFor == null) {
                this.LOG.warn("No matching package found for message: {}. Dropping.", telemetryMessageLogEntry);
                return;
            }
            RrdRepository rrdRepository = new RrdRepository();
            rrdRepository.setStep(packageFor.getRrd().getStep().intValue());
            rrdRepository.setHeartBeat(rrdRepository.getStep() * 2);
            rrdRepository.setRraList(packageFor.getRrd().getRras());
            rrdRepository.setRrdBaseDir(new File(packageFor.getRrd().getBaseDir()));
            CollectionSet collectionSet = collectionSetWithAgent.getCollectionSet();
            this.LOG.trace("Persisting collection set: {} for message: {}", collectionSet, telemetryMessageLogEntry);
            collectionSet.visit(this.persisterFactory.createPersister(EMPTY_SERVICE_PARAMETERS, rrdRepository));
            try {
                if (this.isThresholdingEnabled.get()) {
                    getSessionForAgent(collectionSetWithAgent.getAgent(), rrdRepository).accept(collectionSet);
                }
            } catch (ThresholdInitializationException e) {
                this.LOG.warn("Failed Thresholding of CollectionSet : {} for agent: {}", e.getMessage(), collectionSetWithAgent.getAgent());
            }
        });
    }

    private ThresholdingSession getSessionForAgent(CollectionAgent collectionAgent, RrdRepository rrdRepository) throws ThresholdInitializationException {
        if (this.thresholdingService == null) {
            this.isThresholdingEnabled.set(false);
            throw new ThresholdInitializationException("No ThresholdingService available. No future Threshholding will be done");
        }
        int nodeId = collectionAgent.getNodeId();
        String hostAddress = collectionAgent.getHostAddress();
        String name = this.adapterConfig.getName();
        String sessionKey = getSessionKey(nodeId, hostAddress, name);
        ThresholdingSession thresholdingSession = (ThresholdingSession) this.agentThresholdingSessions.getIfPresent(sessionKey);
        if (thresholdingSession == null) {
            thresholdingSession = this.thresholdingService.createSession(nodeId, hostAddress, name, rrdRepository, EMPTY_SERVICE_PARAMETERS);
            this.agentThresholdingSessions.put(sessionKey, thresholdingSession);
        }
        return thresholdingSession;
    }

    private String getSessionKey(int i, String str, String str2) {
        return String.valueOf(i) + str + str2;
    }

    private PackageDefinition getPackageFor(AdapterDefinition adapterDefinition, CollectionAgent collectionAgent) {
        try {
            return (PackageDefinition) ((Optional) this.cache.get(new CacheKey(adapterDefinition.getName(), collectionAgent.getHostAddress()))).orElse(null);
        } catch (ExecutionException e) {
            this.LOG.error("Error while retrieving package from Cache: {}.", e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    public void setFilterDao(FilterDao filterDao) {
        this.filterDao = filterDao;
    }

    public void setPersisterFactory(PersisterFactory persisterFactory) {
        this.persisterFactory = persisterFactory;
    }

    public ThresholdingService getThresholdingService() {
        return this.thresholdingService;
    }

    public void setThresholdingService(ThresholdingService thresholdingService) {
        this.thresholdingService = thresholdingService;
    }

    public void setBundleContext(BundleContext bundleContext) {
        this.bundleContext = bundleContext;
    }
}
