package org.opennms.protocols.rt;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Category;
import org.apache.log4j.Logger;
import org.opennms.protocols.rt.Request;
import org.opennms.protocols.rt.Response;

/* loaded from: input_file:org/opennms/protocols/rt/RequestTracker.class */
public class RequestTracker<ReqT extends Request<?, ReqT, ReplyT>, ReplyT extends Response> {
    private static final Logger s_log = Logger.getLogger(RequestTracker.class);
    private RequestLocator<ReqT, ReplyT> m_requestLocator;
    private Messenger<ReqT, ReplyT> m_messenger;
    private Thread m_replyProcessor;
    private Thread m_timeoutProcessor;
    private static final int NEW = 0;
    private static final int STARTING = 1;
    private static final int STARTED = 2;
    private AtomicInteger m_state = new AtomicInteger(0);
    private BlockingQueue<ReplyT> m_pendingReplyQueue = new LinkedBlockingQueue();
    private DelayQueue<ReqT> m_timeoutQueue = new DelayQueue<>();

    public RequestTracker(String str, Messenger<ReqT, ReplyT> messenger, RequestLocator<ReqT, ReplyT> requestLocator) throws IOException {
        this.m_requestLocator = requestLocator;
        this.m_replyProcessor = new Thread(String.valueOf(str) + "-Reply-Processor") { // from class: org.opennms.protocols.rt.RequestTracker.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    RequestTracker.this.processReplies();
                } catch (InterruptedException e) {
                    RequestTracker.this.errorf("Thread %s interrupted!", this);
                } catch (Throwable th) {
                    RequestTracker.this.errorf(th, "Unexpected exception on Thread %s!", this);
                }
            }
        };
        this.m_timeoutProcessor = new Thread(String.valueOf(str) + "-Timeout-Processor") { // from class: org.opennms.protocols.rt.RequestTracker.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    RequestTracker.this.processTimeouts();
                } catch (InterruptedException e) {
                    RequestTracker.this.errorf("Thread %s interrupted!", this);
                } catch (Throwable th) {
                    RequestTracker.this.errorf(th, "Unexpected exception on Thread %s!", this);
                }
            }
        };
        this.m_messenger = messenger;
    }

    public synchronized void start() {
        if (this.m_state.compareAndSet(0, 1)) {
            this.m_messenger.start(this.m_pendingReplyQueue);
            this.m_timeoutProcessor.start();
            this.m_replyProcessor.start();
            this.m_state.set(2);
        }
    }

    public void assertStarted() {
        if (!(this.m_state.get() == 2)) {
            throw new IllegalStateException("RequestTracker not started!");
        }
    }

    public void sendRequest(ReqT reqt) throws Exception {
        assertStarted();
        if (this.m_requestLocator.trackRequest(reqt)) {
            this.m_messenger.sendRequest(reqt);
            debugf("Scheding timeout for request to %s in %d ms", reqt, Long.valueOf(reqt.getDelay(TimeUnit.MILLISECONDS)));
            this.m_timeoutQueue.offer((DelayQueue<ReqT>) reqt);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processReplies() throws InterruptedException {
        while (true) {
            ReplyT take = this.m_pendingReplyQueue.take();
            debugf("Found a reply to process: %s", take);
            ReqT locateMatchingRequest = locateMatchingRequest(take);
            if (locateMatchingRequest == null) {
                debugf("No request found for reply %s", take);
            } else if (processReply(take, locateMatchingRequest)) {
                this.m_requestLocator.requestComplete(locateMatchingRequest);
            }
        }
    }

    private ReqT locateMatchingRequest(ReplyT replyt) {
        try {
            return this.m_requestLocator.locateMatchingRequest(replyt);
        } catch (Throwable th) {
            errorf(th, "Unexpected error locating response to request %s. Discarding response!", replyt);
            return null;
        }
    }

    private boolean processReply(ReplyT replyt, ReqT reqt) {
        try {
            debugf("Processing reply %s for request %s", replyt, reqt);
            return reqt.processResponse(replyt);
        } catch (Throwable th) {
            errorf(th, "Unexpected error processingResponse to request: %s, reply is %s", reqt, replyt);
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processTimeouts() throws InterruptedException {
        while (true) {
            try {
                processNextTimeout(this.m_timeoutQueue.take());
            } catch (Throwable th) {
                errorf(th, "Unexpected error processingTimeout!", new Object[0]);
            }
        }
    }

    private void processNextTimeout(ReqT reqt) {
        if (reqt.isProcessed()) {
            return;
        }
        debugf("Found a possibly timedout request: %s", reqt);
        ReqT requestTimedOut = this.m_requestLocator.requestTimedOut(reqt);
        if (requestTimedOut != reqt) {
            if (requestTimedOut != null) {
                String format = String.format("A pending request %s with the same id exists but is not the timeout request %s from the queue!", requestTimedOut, reqt);
                errorf(format, new Object[0]);
                reqt.processError(new IllegalStateException(format));
                return;
            }
            return;
        }
        ReqT processTimeout = processTimeout(reqt);
        if (processTimeout != null) {
            try {
                sendRequest(processTimeout);
            } catch (Exception e) {
                processTimeout.processError(e);
            }
        }
    }

    private ReqT processTimeout(ReqT reqt) {
        try {
            debugf("Processing timeout for: %s", reqt);
            return (ReqT) reqt.processTimeout();
        } catch (Throwable th) {
            errorf(th, "Unexpected error processingTimout to request: %s", reqt);
            return null;
        }
    }

    private Category log() {
        return s_log;
    }

    private void debugf(String str, Object... objArr) {
        if (log().isDebugEnabled()) {
            log().debug(String.format(str, objArr));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void errorf(String str, Object... objArr) {
        log().error(String.format(str, objArr));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void errorf(Throwable th, String str, Object... objArr) {
        log().error(String.format(str, objArr), th);
    }
}
