package org.opennms.protocols.rt;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.opennms.protocols.rt.Request;
import org.opennms.protocols.rt.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/protocols/rt/RequestTracker.class */
public class RequestTracker<ReqT extends Request<?, ReqT, ReplyT>, ReplyT extends Response> implements ReplyHandler<ReplyT> {
    private static final Logger s_log = LoggerFactory.getLogger(RequestTracker.class);
    private RequestLocator<ReqT, ReplyT> m_requestLocator;
    private Messenger<ReqT, ReplyT> m_messenger;
    private Thread m_replyProcessor;
    private Thread m_timeoutProcessor;
    private static final int NEW = 0;
    private static final int STARTING = 1;
    private static final int STARTED = 2;
    private AtomicInteger m_state = new AtomicInteger(NEW);
    private BlockingQueue<ReplyT> m_pendingReplyQueue = new LinkedBlockingQueue();
    private final Map<Object, Boolean> m_requestIdsWithPendingReplies = new ConcurrentHashMap();
    private DelayQueue<ReqT> m_timeoutQueue = new DelayQueue<>();

    public RequestTracker(String str, Messenger<ReqT, ReplyT> messenger, RequestLocator<ReqT, ReplyT> requestLocator) throws IOException {
        this.m_requestLocator = requestLocator;
        this.m_replyProcessor = new Thread(str + "-Reply-Processor") { // from class: org.opennms.protocols.rt.RequestTracker.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    RequestTracker.this.processReplies();
                } catch (InterruptedException e) {
                    RequestTracker.s_log.error("Thread {} interrupted!", this);
                } catch (Throwable th) {
                    RequestTracker.s_log.error("Unexpected exception on Thread " + this + "!", th);
                }
            }
        };
        this.m_timeoutProcessor = new Thread(str + "-Timeout-Processor") { // from class: org.opennms.protocols.rt.RequestTracker.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    RequestTracker.this.processTimeouts();
                } catch (InterruptedException e) {
                    RequestTracker.s_log.error("Thread {} interrupted!", this);
                } catch (Throwable th) {
                    RequestTracker.s_log.error("Unexpected exception on Thread " + this + "!", th);
                }
            }
        };
        this.m_messenger = messenger;
    }

    public synchronized void start() {
        if (this.m_state.compareAndSet(NEW, STARTING)) {
            this.m_messenger.start(this);
            this.m_timeoutProcessor.start();
            this.m_replyProcessor.start();
            this.m_state.set(STARTED);
        }
    }

    public void assertStarted() {
        if (!(this.m_state.get() == STARTED)) {
            throw new IllegalStateException("RequestTracker not started!");
        }
    }

    public void sendRequest(ReqT reqt) throws Exception {
        assertStarted();
        if (this.m_requestLocator.trackRequest(reqt)) {
            this.m_messenger.sendRequest(reqt);
            s_log.debug("Scheding timeout for request to {} in {} ms", reqt, Long.valueOf(reqt.getDelay(TimeUnit.MILLISECONDS)));
            this.m_timeoutQueue.offer((DelayQueue<ReqT>) reqt);
        }
    }

    @Override // org.opennms.protocols.rt.ReplyHandler
    public void handleReply(ReplyT replyt) {
        this.m_pendingReplyQueue.offer(replyt);
        if (replyt instanceof ResponseWithId) {
            this.m_requestIdsWithPendingReplies.put(((ResponseWithId) replyt).getRequestId(), Boolean.TRUE);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processReplies() throws InterruptedException {
        while (true) {
            ReplyT take = this.m_pendingReplyQueue.take();
            s_log.debug("Found a reply to process: {}", take);
            ReqT locateMatchingRequest = locateMatchingRequest(take);
            if (locateMatchingRequest != null) {
                if (processReply(take, locateMatchingRequest)) {
                    this.m_requestLocator.requestComplete(locateMatchingRequest);
                }
                this.m_requestIdsWithPendingReplies.remove(locateMatchingRequest.getId());
            } else {
                s_log.debug("No request found for reply {}", take);
            }
        }
    }

    private ReqT locateMatchingRequest(ReplyT replyt) {
        try {
            return this.m_requestLocator.locateMatchingRequest(replyt);
        } catch (Throwable th) {
            s_log.error("Unexpected error locating response to request " + replyt + ". Discarding response!", th);
            return null;
        }
    }

    private boolean processReply(ReplyT replyt, ReqT reqt) {
        try {
            s_log.debug("Processing reply {} for request {}", replyt, reqt);
            return reqt.processResponse(replyt);
        } catch (Throwable th) {
            s_log.error("Unexpected error processingResponse to request: " + reqt + ", reply is " + replyt, th);
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processTimeouts() throws InterruptedException {
        while (true) {
            try {
                processNextTimeout(this.m_timeoutQueue.take());
            } catch (Throwable th) {
                s_log.error("Unexpected error processingTimeout!", th);
            }
        }
    }

    private void processNextTimeout(ReqT reqt) {
        if (reqt.isProcessed()) {
            return;
        }
        s_log.debug("Found a possibly timed-out request: {}", reqt);
        ReqT requestTimedOut = this.m_requestLocator.requestTimedOut(reqt);
        if (requestTimedOut != reqt) {
            if (requestTimedOut != null) {
                String format = String.format("A pending request %s with the same id exists but is not the timeout request %s from the queue!", requestTimedOut, reqt);
                s_log.error(format);
                reqt.processError(new IllegalStateException(format));
                return;
            }
            return;
        }
        if (this.m_requestIdsWithPendingReplies.containsKey(reqt.getId())) {
            s_log.info("A timeout was issued while the reply is pending processing for: {}", reqt);
            return;
        }
        ReqT processTimeout = processTimeout(reqt);
        if (processTimeout != null) {
            try {
                sendRequest(processTimeout);
            } catch (Exception e) {
                processTimeout.processError(e);
            }
        }
    }

    private ReqT processTimeout(ReqT reqt) {
        try {
            s_log.debug("Processing timeout for: {}", reqt);
            return (ReqT) reqt.processTimeout();
        } catch (Throwable th) {
            s_log.error("Unexpected error processingTimout to request: " + reqt, th);
            return null;
        }
    }
}
