package org.apache.tika.pipes.async;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.PipesClient;
import org.apache.tika.pipes.PipesException;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/tika-core-2.1.0.jar:org/apache/tika/pipes/async/AsyncProcessor.class */
public class AsyncProcessor implements Closeable {
    static final int PARSER_FUTURE_CODE = 1;
    static final int WATCHER_FUTURE_CODE = 3;
    private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
    private final ExecutorCompletionService<Integer> executorCompletionService;
    private final ExecutorService executorService;
    private final AsyncConfig asyncConfig;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AsyncProcessor.class);
    private static long MAX_OFFER_WAIT_MS = 120000;
    private final AtomicLong totalProcessed = new AtomicLong(0);
    private volatile int numParserThreadsFinished = 0;
    private volatile int numEmitterThreadsFinished = 0;
    private boolean addedEmitterSemaphores = false;
    boolean isShuttingDown = false;
    private final ArrayBlockingQueue<EmitData> emitData = new ArrayBlockingQueue<>(100);

    /* loaded from: input_file:WEB-INF/lib/tika-core-2.1.0.jar:org/apache/tika/pipes/async/AsyncProcessor$FetchEmitWorker.class */
    private class FetchEmitWorker implements Callable<Integer> {
        private final AsyncConfig asyncConfig;
        private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
        private final ArrayBlockingQueue<EmitData> emitDataQueue;

        private FetchEmitWorker(AsyncConfig asyncConfig, ArrayBlockingQueue<FetchEmitTuple> arrayBlockingQueue, ArrayBlockingQueue<EmitData> arrayBlockingQueue2) {
            this.asyncConfig = asyncConfig;
            this.fetchEmitTuples = arrayBlockingQueue;
            this.emitDataQueue = arrayBlockingQueue2;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws Exception {
            PipesResult pipesResult;
            PipesClient pipesClient = new PipesClient(this.asyncConfig);
            Throwable th = null;
            while (true) {
                try {
                    FetchEmitTuple poll = this.fetchEmitTuples.poll(1L, TimeUnit.SECONDS);
                    if (poll == null) {
                        if (AsyncProcessor.LOG.isTraceEnabled()) {
                            AsyncProcessor.LOG.trace("null fetch emit tuple");
                        }
                    } else {
                        if (poll == PipesIterator.COMPLETED_SEMAPHORE) {
                            if (AsyncProcessor.LOG.isTraceEnabled()) {
                                AsyncProcessor.LOG.trace("hit completed semaphore");
                            }
                            if (pipesClient != null) {
                                if (0 != 0) {
                                    try {
                                        pipesClient.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    pipesClient.close();
                                }
                            }
                            return 1;
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        try {
                            pipesResult = pipesClient.process(poll);
                        } catch (IOException e) {
                            pipesResult = PipesResult.UNSPECIFIED_CRASH;
                        }
                        if (AsyncProcessor.LOG.isTraceEnabled()) {
                            AsyncProcessor.LOG.trace("timer -- pipes client process: {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        }
                        long currentTimeMillis2 = System.currentTimeMillis();
                        if ((pipesResult.getStatus() == PipesResult.STATUS.PARSE_SUCCESS || pipesResult.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) && !this.emitDataQueue.offer(pipesResult.getEmitData(), AsyncProcessor.MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS)) {
                            throw new RuntimeException("Couldn't offer emit data to queue within " + AsyncProcessor.MAX_OFFER_WAIT_MS + " ms");
                        }
                        if (AsyncProcessor.LOG.isTraceEnabled()) {
                            AsyncProcessor.LOG.trace("timer -- offered: {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                        }
                        this.asyncConfig.getPipesReporter().report(poll, pipesResult, System.currentTimeMillis() - currentTimeMillis);
                        AsyncProcessor.this.totalProcessed.incrementAndGet();
                    }
                } catch (Throwable th3) {
                    if (pipesClient != null) {
                        if (0 != 0) {
                            try {
                                pipesClient.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            pipesClient.close();
                        }
                    }
                    throw th3;
                }
            }
        }
    }

    public AsyncProcessor(Path path) throws TikaException, IOException {
        this.asyncConfig = AsyncConfig.load(path);
        this.fetchEmitTuples = new ArrayBlockingQueue<>(this.asyncConfig.getQueueSize());
        this.executorService = Executors.newFixedThreadPool(this.asyncConfig.getNumClients() + this.asyncConfig.getNumEmitters() + 1);
        this.executorCompletionService = new ExecutorCompletionService<>(this.executorService);
        this.executorCompletionService.submit(() -> {
            while (true) {
                try {
                    Thread.sleep(500L);
                    checkActive();
                } catch (InterruptedException e) {
                    return 3;
                }
            }
        });
        for (int i = 0; i < this.asyncConfig.getNumClients(); i++) {
            this.executorCompletionService.submit(new FetchEmitWorker(this.asyncConfig, this.fetchEmitTuples, this.emitData));
        }
        EmitterManager load = EmitterManager.load(path);
        for (int i2 = 0; i2 < this.asyncConfig.getNumEmitters(); i2++) {
            this.executorCompletionService.submit(new AsyncEmitter(this.asyncConfig, this.emitData, load));
        }
    }

    public synchronized boolean offer(List<FetchEmitTuple> list, long j) throws PipesException, InterruptedException {
        if (this.isShuttingDown) {
            throw new IllegalStateException("Can't call offer after calling close() or shutdownNow()");
        }
        if (list.size() > this.asyncConfig.getQueueSize()) {
            throw new OfferLargerThanQueueSize(list.size(), this.asyncConfig.getQueueSize());
        }
        long currentTimeMillis = System.currentTimeMillis();
        for (long currentTimeMillis2 = System.currentTimeMillis(); currentTimeMillis2 - currentTimeMillis < j; currentTimeMillis2 = System.currentTimeMillis()) {
            if (this.fetchEmitTuples.remainingCapacity() > list.size()) {
                try {
                    this.fetchEmitTuples.addAll(list);
                    return true;
                } catch (IllegalStateException e) {
                    e.printStackTrace();
                }
            }
            Thread.sleep(100L);
        }
        return false;
    }

    public int getCapacity() {
        return this.fetchEmitTuples.remainingCapacity();
    }

    public synchronized boolean offer(FetchEmitTuple fetchEmitTuple, long j) throws PipesException, InterruptedException {
        if (this.fetchEmitTuples == null) {
            throw new IllegalStateException("queue hasn't been initialized yet.");
        }
        if (this.isShuttingDown) {
            throw new IllegalStateException("Can't call offer after calling close() or shutdownNow()");
        }
        checkActive();
        return this.fetchEmitTuples.offer(fetchEmitTuple, j, TimeUnit.MILLISECONDS);
    }

    public void finished() throws InterruptedException {
        for (int i = 0; i < this.asyncConfig.getNumClients(); i++) {
            if (!this.fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE, MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS)) {
                throw new RuntimeException("Couldn't offer completed semaphore within " + MAX_OFFER_WAIT_MS + " ms");
            }
        }
    }

    public synchronized boolean checkActive() {
        Future<Integer> poll = this.executorCompletionService.poll();
        if (poll != null) {
            try {
                Integer num = poll.get();
                switch (num.intValue()) {
                    case 1:
                        this.numParserThreadsFinished++;
                        LOG.debug("fetchEmitWorker finished, total {}", Integer.valueOf(this.numParserThreadsFinished));
                        break;
                    case 2:
                        this.numEmitterThreadsFinished++;
                        LOG.debug("emitter thread finished, total {}", Integer.valueOf(this.numEmitterThreadsFinished));
                        break;
                    case 3:
                        LOG.debug("watcher thread finished");
                        break;
                    default:
                        throw new IllegalArgumentException("Don't recognize this future code: " + num);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
        if (this.numParserThreadsFinished == this.asyncConfig.getNumClients() && !this.addedEmitterSemaphores) {
            for (int i = 0; i < this.asyncConfig.getNumEmitters(); i++) {
                try {
                    if (!this.emitData.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE, MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Couldn't offer emit data stop semaphore within " + MAX_OFFER_WAIT_MS + " ms");
                    }
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
            this.addedEmitterSemaphores = true;
        }
        return (this.numParserThreadsFinished == this.asyncConfig.getNumClients() && this.numEmitterThreadsFinished == this.asyncConfig.getNumEmitters()) ? false : true;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.executorService.shutdownNow();
        this.asyncConfig.getPipesReporter().close();
    }

    public long getTotalProcessed() {
        return this.totalProcessed.get();
    }
}
