package org.opennms.newts.gsod;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Guice;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.nio.file.Path;
import java.text.ParseException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.opennms.newts.api.MetricType;
import org.opennms.newts.api.Sample;
import org.opennms.newts.api.SampleRepository;
import org.opennms.newts.api.Timestamp;
import org.opennms.newts.persistence.cassandra.SchemaConstants;
import org.opennms.newts.reporter.metrics.NewtsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.apache.http.ObservableHttp;
import rx.apache.http.ObservableHttpResponse;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Functions;
import rx.schedulers.Schedulers;

/* loaded from: input_file:org/opennms/newts/gsod/ImportRunner.class */
public class ImportRunner {
    private File m_source;
    private SampleRepository m_repository;
    private static final Logger LOG = LoggerFactory.getLogger(ImportRunner.class);
    private int m_samplesPerBatch = 1000;
    private String m_restUrl = null;
    private int m_threadCount = 1;
    private int m_maxThreadQueueSize = 0;
    private double m_timescaleFactor = 1.0d;
    private long m_timeoffset = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.opennms.newts.gsod.ImportRunner$11, reason: invalid class name */
    /* loaded from: input_file:org/opennms/newts/gsod/ImportRunner$11.class */
    public class AnonymousClass11 implements Observable.Operator<ListenableFuture<Boolean>, List<Sample>> {
        final /* synthetic */ ListeningExecutorService val$executor;
        final /* synthetic */ Func1 val$insert;

        AnonymousClass11(ListeningExecutorService listeningExecutorService, Func1 func1) {
            this.val$executor = listeningExecutorService;
            this.val$insert = func1;
        }

        @Override // rx.functions.Func1
        public Subscriber<? super List<Sample>> call(final Subscriber<? super ListenableFuture<Boolean>> subscriber) {
            return new Subscriber<List<Sample>>() { // from class: org.opennms.newts.gsod.ImportRunner.11.1
                @Override // rx.Observer
                public void onCompleted() {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                    }
                    AnonymousClass11.this.val$executor.shutdown();
                }

                @Override // rx.Observer
                public void onError(Throwable th) {
                    if (subscriber.isUnsubscribed()) {
                        return;
                    }
                    subscriber.onError(th);
                }

                @Override // rx.Observer
                public void onNext(final List<Sample> list) {
                    if (subscriber.isUnsubscribed()) {
                        return;
                    }
                    try {
                        subscriber.onNext(AnonymousClass11.this.val$executor.submit((Callable) new Callable<Boolean>() { // from class: org.opennms.newts.gsod.ImportRunner.11.1.1
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.util.concurrent.Callable
                            public Boolean call() throws Exception {
                                return (Boolean) AnonymousClass11.this.val$insert.call(list);
                            }
                        }));
                    } catch (Throwable th) {
                        onError(th);
                    }
                }
            };
        }
    }

    private void checkArgument(boolean z, String str) {
        if (!z) {
            throw new IllegalArgumentException(str);
        }
    }

    @Option(name = "-n", aliases = {"--samples-per-batch"}, metaVar = "sample-count", usage = "the maxinum number of samples to include in each post to the repository (default: 1000)")
    public void setSamplesPerBatch(int i) {
        checkArgument(i > 0, "samples per batch must be greater than zero!");
        this.m_samplesPerBatch = i;
    }

    @Option(name = "-u", aliases = {"--url"}, metaVar = "url", usage = "publish data via a Newts REST server at the given url (default: use direct access via Newts API)")
    public void setURL(String str) {
        checkArgument((str == null || str.isEmpty()) ? false : true, "the url must not be empty");
        this.m_restUrl = str;
    }

    @Option(name = "-p", aliases = {"--parallelism"}, metaVar = "thread-count", usage = "when using direct the size of the thread pool that posts the results.  (defaults to 1 ie no parallelism)")
    public void setParallelism(int i) {
        checkArgument(i > 0, "thread count must be at least 1.");
        this.m_threadCount = i;
    }

    @Option(name = "-q", aliases = {"--max-work-queue-size"}, metaVar = "batch-count", usage = "when using direct the max size of the work-queue (defaults to thread-count * 3)")
    public void setMaxThreadQueueSize(int i) {
        checkArgument(i > 0, "max thread queue size must be at least 1.");
        this.m_maxThreadQueueSize = i;
    }

    @Option(name = "-f", aliases = {"--time-scale-factor"}, metaVar = "long", usage = "to scale down the date we compress time dividing time by this factor")
    public void setTimescaleFactor(double d) {
        this.m_timescaleFactor = d;
    }

    @Option(name = "-o", aliases = {"--time-offset"}, metaVar = "timestamp", usage = "adjust epoch time in seconds to be <time-offset>. defaults to no offset.  'now' is allowed.")
    public void setTimeoffset(String str) {
        if (str.equals("now")) {
            this.m_timeoffset = System.currentTimeMillis();
        } else {
            this.m_timeoffset = Long.valueOf(str).longValue() * 1000;
        }
    }

    @Argument(metaVar = "sourceDir", required = true, usage = "the source directory that contains gsod data to import. These must be gzip'd files")
    public void setSource(File file) {
        checkArgument(file.exists(), "the source directory " + file + " does not exist");
        checkArgument(file.isDirectory(), "the source directory must be a directory");
        this.m_source = file;
    }

    public static void main(String... strArr) throws Exception {
        new ImportRunner().execute(strArr);
    }

    public void execute(String... strArr) throws Exception {
        CmdLineParser cmdLineParser = new CmdLineParser(this);
        try {
            cmdLineParser.parseArgument(strArr);
            MetricRegistry metricRegistry = new MetricRegistry();
            final long currentTimeMillis = System.currentTimeMillis();
            metricRegistry.register("elapsed-seconds", new Gauge<Double>() { // from class: org.opennms.newts.gsod.ImportRunner.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.codahale.metrics.Gauge
                public Double getValue() {
                    return Double.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000.0d);
                }
            });
            final ConsoleReporter build = ConsoleReporter.forRegistry(metricRegistry).outputTo(System.err).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();
            build.start(10L, TimeUnit.SECONDS);
            if (this.m_restUrl == null) {
                NewtsReporter.forRegistry(metricRegistry).name("importer").convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build(repository()).start(1L, TimeUnit.SECONDS);
            }
            LOG.debug("Scanning {} for GSOD data files...", this.m_source);
            Observable<List<Sample>> buffer = FileObservable.fileTreeWalker(this.m_source.toPath()).subscribeOn(Schedulers.io()).map(meter(metricRegistry.meter("files"), Path.class)).map(reportFile()).mergeMap(FileObservable.lines()).filter(exclude("YEARMODA")).mergeMap(samples()).map(adjustTime()).map(meter(metricRegistry.meter(SchemaConstants.T_SAMPLES), Sample.class)).buffer(this.m_samplesPerBatch);
            Observable<Boolean> restPoster = this.m_restUrl != null ? restPoster(buffer, metricRegistry) : directPoster(buffer, metricRegistry);
            System.err.println("doImport = " + restPoster);
            final AtomicReference atomicReference = new AtomicReference();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            Subscription subscribe = restPoster.subscribe(new Observer<Boolean>() { // from class: org.opennms.newts.gsod.ImportRunner.2
                @Override // rx.Observer
                public void onCompleted() {
                    System.err.println("Finished Importing Everything!");
                    build.report();
                    countDownLatch.countDown();
                    System.exit(0);
                }

                @Override // rx.Observer
                public void onError(Throwable th) {
                    atomicBoolean.set(true);
                    System.err.println("Error importing!");
                    th.printStackTrace();
                    try {
                        Subscription subscription = (Subscription) atomicReference.get();
                        if (subscription != null) {
                            subscription.unsubscribe();
                        }
                    } catch (Exception e) {
                        System.err.println("Failed to close httpClient!");
                        e.printStackTrace();
                    }
                }

                @Override // rx.Observer
                public void onNext(Boolean bool) {
                    System.err.println("Received a boolen: " + bool);
                }
            });
            atomicReference.set(subscribe);
            if (atomicBoolean.get()) {
                subscribe.unsubscribe();
            }
            System.err.println("Return from Subscribe!");
            countDownLatch.await();
        } catch (CmdLineException e) {
            System.err.println(e.getMessage());
            cmdLineParser.printUsage(System.err);
        }
    }

    private Func1<? super Sample, ? extends Sample> adjustTime() {
        return new Func1<Sample, Sample>() { // from class: org.opennms.newts.gsod.ImportRunner.3
            @Override // rx.functions.Func1
            public Sample call(Sample sample) {
                return new Sample(Timestamp.fromEpochMillis(ImportRunner.this.m_timeoffset + Math.round(sample.getTimestamp().asMillis() / ImportRunner.this.m_timescaleFactor)), sample.getResource(), sample.getName(), sample.getType(), sample.getValue());
            }
        };
    }

    private SampleRepository repository() {
        if (this.m_repository == null) {
            this.m_repository = (SampleRepository) Guice.createInjector(new Config()).getInstance(SampleRepository.class);
        }
        return this.m_repository;
    }

    private Observable<Boolean> directPoster(Observable<List<Sample>> observable, MetricRegistry metricRegistry) {
        final SampleRepository repository = repository();
        final Timer timer = metricRegistry.timer("writes");
        final Meter meter = metricRegistry.meter("samples-completed");
        Func1<List<Sample>, Boolean> func1 = new Func1<List<Sample>, Boolean>() { // from class: org.opennms.newts.gsod.ImportRunner.4
            @Override // rx.functions.Func1
            public Boolean call(List<Sample> list) {
                int size = list.size();
                try {
                    Timer.Context time = timer.time();
                    Throwable th = null;
                    try {
                        try {
                            repository.insert(list);
                            if (time != null) {
                                if (0 != 0) {
                                    try {
                                        time.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    time.close();
                                }
                            }
                            return true;
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    meter.mark(size);
                }
            }
        };
        return (this.m_threadCount == 1 ? observable.map(func1) : parMap(observable, metricRegistry, func1)).all(Functions.identity());
    }

    private Observable<Boolean> parMap(Observable<List<Sample>> observable, MetricRegistry metricRegistry, Func1<List<Sample>, Boolean> func1) {
        final Timer timer = metricRegistry.timer("wait-time");
        final LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<Runnable>(this.m_maxThreadQueueSize == 0 ? this.m_threadCount * 3 : this.m_maxThreadQueueSize) { // from class: org.opennms.newts.gsod.ImportRunner.5
            @Override // java.util.concurrent.LinkedBlockingQueue, java.util.Queue, java.util.concurrent.BlockingQueue
            public boolean offer(Runnable runnable) {
                try {
                    Timer.Context time = timer.time();
                    Throwable th = null;
                    try {
                        try {
                            put(runnable);
                            if (time != null) {
                                if (0 != 0) {
                                    try {
                                        time.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    time.close();
                                }
                            }
                            return true;
                        } finally {
                        }
                    } finally {
                    }
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
            }

            @Override // java.util.AbstractQueue, java.util.AbstractCollection, java.util.Collection, java.util.Queue, java.util.concurrent.BlockingQueue
            public boolean add(Runnable runnable) {
                try {
                    Timer.Context time = timer.time();
                    Throwable th = null;
                    try {
                        try {
                            put(runnable);
                            if (time != null) {
                                if (0 != 0) {
                                    try {
                                        time.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    time.close();
                                }
                            }
                            return true;
                        } finally {
                        }
                    } finally {
                    }
                } catch (InterruptedException e) {
                    throw Exceptions.propagate(e);
                }
            }
        };
        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.m_threadCount, this.m_threadCount, 0L, TimeUnit.MILLISECONDS, linkedBlockingQueue);
        metricRegistry.register("active-threads", new Gauge<Integer>() { // from class: org.opennms.newts.gsod.ImportRunner.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return Integer.valueOf(threadPoolExecutor.getActiveCount());
            }
        });
        metricRegistry.register("pool-size", new Gauge<Integer>() { // from class: org.opennms.newts.gsod.ImportRunner.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return Integer.valueOf(threadPoolExecutor.getPoolSize());
            }
        });
        metricRegistry.register("largest-pool-size", new Gauge<Integer>() { // from class: org.opennms.newts.gsod.ImportRunner.8
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return Integer.valueOf(threadPoolExecutor.getLargestPoolSize());
            }
        });
        metricRegistry.register("work-queue-size", new Gauge<Integer>() { // from class: org.opennms.newts.gsod.ImportRunner.9
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return Integer.valueOf(linkedBlockingQueue.size());
            }
        });
        return parMap(observable, threadPoolExecutor, metricRegistry, func1);
    }

    private Observable<Boolean> parMap(Observable<List<Sample>> observable, ExecutorService executorService, MetricRegistry metricRegistry, Func1<List<Sample>, Boolean> func1) {
        return observable.lift(new AnonymousClass11(MoreExecutors.listeningDecorator(executorService), func1)).observeOn(Schedulers.io()).map(new Func1<ListenableFuture<Boolean>, Boolean>() { // from class: org.opennms.newts.gsod.ImportRunner.10
            @Override // rx.functions.Func1
            public Boolean call(ListenableFuture<Boolean> listenableFuture) {
                try {
                    return listenableFuture.get();
                } catch (Throwable th) {
                    throw Exceptions.propagate(th);
                }
            }
        });
    }

    private Observable<Boolean> restPoster(Observable<List<Sample>> observable, MetricRegistry metricRegistry) {
        final CloseableHttpAsyncClient createDefault = HttpAsyncClients.createDefault();
        createDefault.start();
        return observable.map(toJSON()).map(meter(metricRegistry.meter("posts"), String.class)).mergeMap(postJSON(this.m_restUrl, createDefault)).map(meter(metricRegistry.meter("responses"), ObservableHttpResponse.class)).map(meter(metricRegistry.meter("samples-completed"), this.m_samplesPerBatch, ObservableHttpResponse.class)).all(successful()).doOnCompleted(new Action0() { // from class: org.opennms.newts.gsod.ImportRunner.12
            @Override // rx.functions.Action0
            public void call() {
                try {
                    createDefault.close();
                } catch (IOException e) {
                    System.err.println("Failed to close httpClient!");
                    e.printStackTrace();
                }
            }
        });
    }

    private static Func1<? super Path, ? extends Path> reportFile() {
        return new Func1<Path, Path>() { // from class: org.opennms.newts.gsod.ImportRunner.13
            @Override // rx.functions.Func1
            public Path call(Path path) {
                System.err.println("Begin Processing: " + path);
                return path;
            }
        };
    }

    public static Func1<String, Observable<Sample>> samples() {
        final LineParser lineParser = new LineParser();
        return new Func1<String, Observable<Sample>>() { // from class: org.opennms.newts.gsod.ImportRunner.14
            @Override // rx.functions.Func1
            public Observable<Sample> call(String str) {
                try {
                    return Observable.from((Iterable) LineParser.this.parseLine(str));
                } catch (ParseException e) {
                    throw Exceptions.propagate(e);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isNaN(Sample sample) {
        return sample.getType() == MetricType.GAUGE && Double.isNaN(sample.getValue().doubleValue());
    }

    public static Func1<List<Sample>, String> toJSON() {
        return new Func1<List<Sample>, String>() { // from class: org.opennms.newts.gsod.ImportRunner.15
            @Override // rx.functions.Func1
            public String call(List<Sample> list) {
                JSONBuilder jSONBuilder = new JSONBuilder();
                for (Sample sample : list) {
                    if (!ImportRunner.isNaN(sample)) {
                        jSONBuilder.newObject();
                        jSONBuilder.attr("timestamp", sample.getTimestamp().asMillis());
                        jSONBuilder.attr(SchemaConstants.F_RESOURCE, sample.getResource().getId());
                        jSONBuilder.attr("name", sample.getName());
                        jSONBuilder.attr("type", sample.getType().name());
                        if (sample.getType() == MetricType.GAUGE) {
                            jSONBuilder.attr("value", sample.getValue().doubleValue());
                        } else {
                            jSONBuilder.attr("value", sample.getValue().longValue());
                        }
                    }
                }
                return jSONBuilder.toString();
            }
        };
    }

    private static Func1<ObservableHttpResponse, Boolean> successful() {
        return new Func1<ObservableHttpResponse, Boolean>() { // from class: org.opennms.newts.gsod.ImportRunner.16
            @Override // rx.functions.Func1
            public Boolean call(ObservableHttpResponse observableHttpResponse) {
                if (observableHttpResponse.getResponse().getStatusLine().getStatusCode() >= 400) {
                    throw new RuntimeException("Failed to post samples: " + observableHttpResponse.getResponse().getStatusLine());
                }
                return true;
            }
        };
    }

    public static Func1<String, Observable<ObservableHttpResponse>> postJSON(String str, final CloseableHttpAsyncClient closeableHttpAsyncClient) {
        final URI create = URI.create(str);
        return new Func1<String, Observable<ObservableHttpResponse>>() { // from class: org.opennms.newts.gsod.ImportRunner.17
            @Override // rx.functions.Func1
            public Observable<ObservableHttpResponse> call(String str2) {
                try {
                    return ObservableHttp.createRequest(HttpAsyncMethods.createPost(create, str2, ContentType.APPLICATION_JSON), closeableHttpAsyncClient).toObservable();
                } catch (UnsupportedEncodingException e) {
                    throw Exceptions.propagate(e);
                }
            }
        };
    }

    public static Func1<String, Boolean> exclude(final String str) {
        return new Func1<String, Boolean>() { // from class: org.opennms.newts.gsod.ImportRunner.18
            @Override // rx.functions.Func1
            public Boolean call(String str2) {
                return Boolean.valueOf(!str2.contains(str));
            }
        };
    }

    public static <T> Func1<T, T> meter(Meter meter, Class<T> cls) {
        return meter(meter, 1, cls);
    }

    public static <T> Func1<T, T> meter(final Meter meter, final int i, Class<T> cls) {
        return new Func1<T, T>() { // from class: org.opennms.newts.gsod.ImportRunner.19
            @Override // rx.functions.Func1
            public T call(T t) {
                Meter.this.mark(i);
                return t;
            }
        };
    }
}
