package org.opennms.netmgt.rtc;

import java.io.InputStream;
import java.io.Reader;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.opennms.core.concurrent.LogPreservingThreadFactory;
import org.opennms.core.fiber.Fiber;
import org.opennms.netmgt.config.RTCConfigFactory;
import org.opennms.netmgt.events.api.annotations.EventHandler;
import org.opennms.netmgt.events.api.annotations.EventListener;
import org.opennms.netmgt.poller.monitors.HttpMonitor;
import org.opennms.netmgt.rtc.datablock.HttpPostInfo;
import org.opennms.netmgt.rtc.datablock.RTCCategory;
import org.opennms.netmgt.rtc.utils.PipedMarshaller;
import org.opennms.netmgt.xml.event.Event;
import org.opennms.netmgt.xml.event.Parm;
import org.opennms.netmgt.xml.event.Value;
import org.opennms.netmgt.xml.rtc.EuiLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EventListener(name = "RTC:DataSender", logPrefix = "rtc")
/* loaded from: input_file:org/opennms/netmgt/rtc/DataSender.class */
public class DataSender implements Fiber {
    private static final Logger LOG = LoggerFactory.getLogger(DataSender.class);
    private final Map<String, Set<HttpPostInfo>> m_catUrlMap = new HashMap();
    final BlockingQueue<Runnable> m_queue;
    private final ExecutorService m_dsrPool;
    private final int POST_ERROR_LIMIT;
    private int m_status;
    private final AvailabilityService m_dataMgr;

    /* loaded from: input_file:org/opennms/netmgt/rtc/DataSender$SendRequest.class */
    private class SendRequest implements Runnable {
        private SendRequest() {
        }

        @Override // java.lang.Runnable
        public void run() {
            DataSender.this.sendData();
        }
    }

    public DataSender(AvailabilityService availabilityService, RTCConfigFactory rTCConfigFactory) {
        this.m_dataMgr = availabilityService;
        this.m_queue = new LinkedBlockingDeque(Math.max(4 * rTCConfigFactory.getSenders(), 32));
        this.m_dsrPool = new ThreadPoolExecutor(1, rTCConfigFactory.getSenders(), 30L, TimeUnit.SECONDS, this.m_queue, (ThreadFactory) new LogPreservingThreadFactory(getClass().getSimpleName(), rTCConfigFactory.getSenders()));
        this.POST_ERROR_LIMIT = rTCConfigFactory.getErrorsBeforeUrlUnsubscribe();
    }

    public synchronized void start() {
        this.m_status = 2;
    }

    public synchronized void stop() {
        this.m_status = 3;
        LOG.info("DataSender - shutting down the data sender pool");
        try {
            this.m_dsrPool.shutdown();
        } catch (Throwable th) {
            LOG.error("Error shutting down data sender pool", th);
        }
        this.m_status = 4;
        LOG.info("DataSender shutdown complete");
    }

    public String getName() {
        return "OpenNMS.RTC.DataSender";
    }

    public int getStatus() {
        return this.m_status;
    }

    public synchronized void subscribe(final String str, final String str2, final String str3, final String str4) {
        final RTCCategory rTCCategory = this.m_dataMgr.getCategories().get(str2);
        if (rTCCategory == null) {
            LOG.warn("RTC: No information available for category: {}", str2);
            return;
        }
        try {
            final HttpPostInfo httpPostInfo = new HttpPostInfo(str, str2, str3, str4);
            Set<HttpPostInfo> set = this.m_catUrlMap.get(str2);
            if (set == null) {
                set = new HashSet();
                this.m_catUrlMap.put(str2, set);
            }
            if (set.add(httpPostInfo)) {
                LOG.debug("Subscribed to URL: {}\tcatlabel: {}\tuser:{}", new Object[]{str, str2, str3});
            } else {
                LOG.debug("Already subscribed to URL: {}\tcatlabel: {}\tuser: {} - IGNORING LATEST subscribe event", new Object[]{str, str2, str3});
            }
            try {
                this.m_dsrPool.execute(new Runnable() { // from class: org.opennms.netmgt.rtc.DataSender.1
                    @Override // java.lang.Runnable
                    public void run() {
                        Reader reader = null;
                        InputStream inputStream = null;
                        try {
                            try {
                                DataSender.LOG.debug("DataSender: posting data to: {}", str);
                                reader = new PipedMarshaller(DataSender.this.m_dataMgr.getEuiLevel(rTCCategory)).getReader();
                                inputStream = HttpUtils.post(httpPostInfo.getURL(), reader, str3, str4, 8192, 60000);
                                byte[] bArr = new byte[HttpUtils.DEFAULT_POST_BUFFER_SIZE];
                                while (true) {
                                    int read = inputStream.read(bArr);
                                    if (read == -1) {
                                        DataSender.LOG.debug("DataSender: posted data for category: {}", str2);
                                        IOUtils.closeQuietly(inputStream);
                                        IOUtils.closeQuietly(reader);
                                        return;
                                    } else if (DataSender.LOG.isDebugEnabled() && read > 0) {
                                        DataSender.LOG.debug("DataSender: post response: {}", new String(bArr, 0, read));
                                    }
                                }
                            } catch (ConnectException e) {
                                DataSender.LOG.warn("DataSender:  Unable to send category '{}' to URL '{}': {}", new Object[]{str2, str, e.getMessage()});
                                IOUtils.closeQuietly(inputStream);
                                IOUtils.closeQuietly(reader);
                            } catch (Throwable th) {
                                DataSender.LOG.warn("DataSender:  Unable to send category '{}' to URL '{}'", new Object[]{str2, str, th});
                                IOUtils.closeQuietly(inputStream);
                                IOUtils.closeQuietly(reader);
                            }
                        } catch (Throwable th2) {
                            IOUtils.closeQuietly(inputStream);
                            IOUtils.closeQuietly(reader);
                            throw th2;
                        }
                    }
                });
            } catch (RejectedExecutionException e) {
                LOG.warn("Unable to queue datasender. The task was rejected by the pool. Current queue size: {}.", Integer.valueOf(this.m_queue.size()), e);
            }
        } catch (MalformedURLException e2) {
            LOG.warn("ERROR subscribing: Invalid URL '{}' - Data WILL NOT be SENT to the specified url", str);
        }
    }

    public synchronized void unsubscribe(String str) {
        try {
            URL url = new URL(str);
            Iterator<String> it = this.m_catUrlMap.keySet().iterator();
            while (it.hasNext()) {
                Set<HttpPostInfo> set = this.m_catUrlMap.get(it.next());
                if (set != null) {
                    Iterator<HttpPostInfo> it2 = set.iterator();
                    while (it2.hasNext()) {
                        if (url.toExternalForm().equals(it2.next().getURL().toExternalForm())) {
                            it2.remove();
                        }
                    }
                }
            }
            LOG.debug("Unsubscribed URL: {}", url);
        } catch (MalformedURLException e) {
            LOG.warn("ERROR unsubscribing: Invalid URL: {}", str);
        }
    }

    public synchronized void sendData() {
        LOG.debug("In DataSender sendData()");
        for (RTCCategory rTCCategory : this.m_dataMgr.getCategories().values()) {
            String label = rTCCategory.getLabel();
            LOG.debug("DataSender:sendData(): Category '{}'", label);
            Set<HttpPostInfo> set = this.m_catUrlMap.get(label);
            if (set == null || set.size() <= 0) {
                LOG.debug("DataSender: category '{}' has no listeners", label);
            } else {
                LOG.debug("DataSender: category '{}' has listeners - converting to xml...", label);
                try {
                    EuiLevel euiLevel = this.m_dataMgr.getEuiLevel(rTCCategory);
                    if (set != null && set.size() > 0) {
                        Iterator<HttpPostInfo> it = set.iterator();
                        while (it.hasNext()) {
                            HttpPostInfo next = it.next();
                            Reader reader = null;
                            InputStream inputStream = null;
                            try {
                                try {
                                    reader = new PipedMarshaller(euiLevel).getReader();
                                    LOG.debug("DataSender: posting data to: {}", next.getURLString());
                                    inputStream = HttpUtils.post(next.getURL(), reader, next.getUser(), next.getPassword(), 8192, -1);
                                    LOG.debug("DataSender: posted data for category: {}", label);
                                    byte[] bArr = new byte[HttpUtils.DEFAULT_POST_BUFFER_SIZE];
                                    while (true) {
                                        int read = inputStream.read(bArr);
                                        if (read == -1) {
                                            break;
                                        } else if (LOG.isDebugEnabled() && read > 0) {
                                            LOG.debug("DataSender: post response: {}", new String(bArr, 0, read));
                                        }
                                    }
                                    next.clearErrors();
                                    IOUtils.closeQuietly(inputStream);
                                    IOUtils.closeQuietly(reader);
                                } catch (Throwable th) {
                                    LOG.warn("DataSender: unable to send data for category: {} due to {}: {}", new Object[]{label, th.getClass().getName(), th.getMessage(), th});
                                    next.incrementErrors();
                                    IOUtils.closeQuietly(inputStream);
                                    IOUtils.closeQuietly(reader);
                                }
                                if (this.POST_ERROR_LIMIT > 0 && next.getErrors() >= this.POST_ERROR_LIMIT) {
                                    it.remove();
                                    LOG.warn("URL {} UNSUBSCRIBED due to reaching error limit {}", next.getURLString(), Integer.valueOf(next.getErrors()));
                                }
                            } catch (Throwable th2) {
                                IOUtils.closeQuietly(inputStream);
                                IOUtils.closeQuietly(reader);
                                throw th2;
                            }
                        }
                    }
                } catch (Throwable th3) {
                    LOG.warn("DataSender: unable to convert data to xml for category: '{}'", label, th3);
                }
            }
        }
    }

    public synchronized void notifyToSend() {
        try {
            this.m_dsrPool.execute(new SendRequest());
        } catch (RejectedExecutionException e) {
            LOG.warn("Unable to queue datasender. The task was rejected by the pool. Current queue size: {}.", Integer.valueOf(this.m_queue.size()), e);
        }
    }

    @EventHandler(uei = "uei.opennms.org/internal/rtc/subscribe")
    public void handleRtcSubscribe(Event event) {
        List<Parm> parmCollection = event.getParmCollection();
        if (parmCollection == null) {
            LOG.warn("{} ignored - info incomplete (null event parms)", event.getUei());
            return;
        }
        String str = null;
        String str2 = null;
        String str3 = null;
        String str4 = null;
        for (Parm parm : parmCollection) {
            String parmName = parm.getParmName();
            Value value = parm.getValue();
            if (value != null) {
                String content = value.getContent();
                if (parmName.equals("url")) {
                    str = content;
                } else if (parmName.equals("catlabel")) {
                    str2 = content;
                } else if (parmName.equals(HttpMonitor.PARAMETER_USER)) {
                    str3 = content;
                } else if (parmName.equals("passwd")) {
                    str4 = content;
                }
            }
        }
        if (str == null || str2 == null || str3 == null || str4 == null) {
            LOG.warn("{} did not have all required information. Values contained url: {} catlabel: {} user: {} passwd: {}", new Object[]{event.getUei(), str, str2, str3, str4});
        } else {
            subscribe(str, str2, str3, str4);
            LOG.debug("{} subscribed {}: {}: {}", new Object[]{event.getUei(), str, str2, str3});
        }
    }

    @EventHandler(uei = "uei.opennms.org/internal/rtc/unsubscribe")
    public void handleRtcUnsubscribe(Event event) {
        List<Parm> parmCollection = event.getParmCollection();
        if (parmCollection == null) {
            LOG.warn("{} ignored - info incomplete (null event parms)", event.getUei());
            return;
        }
        String str = null;
        for (Parm parm : parmCollection) {
            String parmName = parm.getParmName();
            Value value = parm.getValue();
            if (value != null) {
                String content = value.getContent();
                if (parmName.equals("url")) {
                    str = content;
                }
            }
        }
        if (str == null) {
            LOG.warn("{} did not have required information.  Value of url: {}", event.getUei(), str);
        } else {
            unsubscribe(str);
            LOG.debug("{} unsubscribed {}", event.getUei(), str);
        }
    }
}
