package org.khanacademy.core.net.downloadmanager.okhttp;

import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import okhttp3.ResponseBody;
import okio.BufferedSource;
import okio.Okio;
import org.khanacademy.core.net.downloadmanager.ByteStreamProgress;
import org.khanacademy.core.net.downloadmanager.okhttp.ResponseBodyWritingEvent;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

/* loaded from: classes.dex */
public class ResponseBodyWritingExecutor {
    private static final Function<Observable<ByteStreamProgress>, Observable<ByteStreamProgress>> DEFAULT_THROTTLE_STRATEGY = ResponseBodyWritingExecutor$$Lambda$9.lambdaFactory$();
    private final ExecutorService mClientExecutorService;
    private final Scheduler mClientScheduler;
    private final Map<File, ExecutingResponseBodyWritingRunnable> mExecutingRunnables;
    private final Function<Observable<ByteStreamProgress>, Observable<ByteStreamProgress>> mThrottleStrategy;
    private final Observable<ResponseBodyWritingEvent> mWritingEventObservable;
    private final PublishSubject<Observable<ResponseBodyWritingEvent>> mWritingEventSubject;
    private final ExecutorService mWritingExecutorService;

    /* loaded from: classes.dex */
    public static final class ExecutingResponseBodyWritingRunnable {
        private Future<?> mFuture;
        private final OkioWriteAllRunnable mRunnable;

        private ExecutingResponseBodyWritingRunnable(OkioWriteAllRunnable okioWriteAllRunnable) {
            this.mRunnable = (OkioWriteAllRunnable) Preconditions.checkNotNull(okioWriteAllRunnable);
        }

        /* synthetic */ ExecutingResponseBodyWritingRunnable(OkioWriteAllRunnable okioWriteAllRunnable, AnonymousClass1 anonymousClass1) {
            this(okioWriteAllRunnable);
        }

        public void start(ExecutorService executorService) {
            Preconditions.checkState(this.mFuture == null, "Runnable was already executed");
            this.mFuture = executorService.submit(this.mRunnable);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("future", this.mFuture).add("runnable", this.mRunnable).toString();
        }
    }

    static {
        Function<Observable<ByteStreamProgress>, Observable<ByteStreamProgress>> function;
        function = ResponseBodyWritingExecutor$$Lambda$9.instance;
        DEFAULT_THROTTLE_STRATEGY = function;
    }

    public ResponseBodyWritingExecutor() {
        this(DEFAULT_THROTTLE_STRATEGY);
    }

    ResponseBodyWritingExecutor(Function<Observable<ByteStreamProgress>, Observable<ByteStreamProgress>> function) {
        this(function, Executors.newSingleThreadExecutor(), createThreadPoolExecutor());
    }

    ResponseBodyWritingExecutor(Function<Observable<ByteStreamProgress>, Observable<ByteStreamProgress>> function, ExecutorService executorService, ExecutorService executorService2) {
        this.mThrottleStrategy = (Function) Preconditions.checkNotNull(function);
        this.mClientExecutorService = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.mClientScheduler = Schedulers.from(this.mClientExecutorService);
        this.mWritingExecutorService = (ExecutorService) Preconditions.checkNotNull(executorService2);
        this.mExecutingRunnables = Maps.newHashMap();
        this.mWritingEventSubject = PublishSubject.create();
        this.mWritingEventObservable = Observable.merge(this.mWritingEventSubject);
    }

    private static OkioWriteAllRunnable createRunnable(File file, long j, BufferedSource bufferedSource) throws FileNotFoundException {
        ByteCountingSink byteCountingSink = new ByteCountingSink(Okio.sink(file));
        return j == -1 ? OkioWriteAllRunnable.createRunnableWithUnknownTotal(byteCountingSink, bufferedSource) : OkioWriteAllRunnable.createRunnableWithKnownTotal(byteCountingSink, bufferedSource, j);
    }

    private static ThreadPoolExecutor createThreadPoolExecutor() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5L, TimeUnit.SECONDS, new SynchronousQueue());
    }

    private void emitError(File file, Throwable th) {
        emitError(file, th, (ByteStreamProgress) null);
    }

    private void emitError(File file, Throwable th, ByteStreamProgress byteStreamProgress) {
        this.mWritingEventSubject.onNext(Observable.just(ResponseBodyWritingEvent.errorWithThrowable(file, byteStreamProgress, th)));
    }

    private void emitError(File file, Throwable th, OkioWriteAllRunnable okioWriteAllRunnable) {
        emitError(file, th, okioWriteAllRunnable != null ? okioWriteAllRunnable.getWritingProgress() : null);
    }

    public static /* synthetic */ ResponseBodyWritingEvent lambda$observeRunnable$233(File file, ByteStreamProgress byteStreamProgress) {
        return new ResponseBodyWritingEvent(ResponseBodyWritingEvent.Type.RECEIVED_DATA, file, byteStreamProgress);
    }

    private void observeRunnable(File file, OkioWriteAllRunnable okioWriteAllRunnable) {
        Func1 func1;
        Observable<R> map = this.mThrottleStrategy.apply(okioWriteAllRunnable.getWritingProgressObservable()).observeOn(this.mClientScheduler).map(ResponseBodyWritingExecutor$$Lambda$5.lambdaFactory$(file));
        func1 = ResponseBodyWritingExecutor$$Lambda$6.instance;
        this.mWritingEventSubject.onNext(map.takeUntil((Func1<? super R, Boolean>) func1).onErrorResumeNext(ResponseBodyWritingExecutor$$Lambda$7.lambdaFactory$(file, okioWriteAllRunnable)).doOnCompleted(ResponseBodyWritingExecutor$$Lambda$8.lambdaFactory$(this, file)));
    }

    private void runClientCommand(Runnable runnable) {
        this.mClientExecutorService.execute(runnable);
    }

    public Observable<ResponseBodyWritingEvent> getWritingEventObservable() {
        return this.mWritingEventObservable;
    }

    public /* synthetic */ void lambda$observeRunnable$236(File file) {
        this.mExecutingRunnables.remove(file);
    }

    public /* synthetic */ void lambda$removeRunnable$230(File file) {
        if (this.mExecutingRunnables.remove(file) == null) {
            emitError(file, new IllegalStateException("No runnable exists for file: " + file));
        }
    }

    public /* synthetic */ void lambda$start$232(File file, OkioWriteAllRunnable okioWriteAllRunnable) {
        ExecutingResponseBodyWritingRunnable executingResponseBodyWritingRunnable = this.mExecutingRunnables.get(file);
        if (executingResponseBodyWritingRunnable != null) {
            emitError(file, new IllegalStateException("Runnable already exists for file: " + file), executingResponseBodyWritingRunnable.mRunnable);
            return;
        }
        observeRunnable(file, okioWriteAllRunnable);
        ExecutingResponseBodyWritingRunnable executingResponseBodyWritingRunnable2 = new ExecutingResponseBodyWritingRunnable(okioWriteAllRunnable);
        this.mExecutingRunnables.put(file, executingResponseBodyWritingRunnable2);
        executingResponseBodyWritingRunnable2.start(this.mWritingExecutorService);
    }

    public void removeRunnable(File file) {
        Preconditions.checkNotNull(file);
        runClientCommand(ResponseBodyWritingExecutor$$Lambda$2.lambdaFactory$(this, file));
    }

    void start(File file, OkioWriteAllRunnable okioWriteAllRunnable) {
        Preconditions.checkNotNull(file);
        Preconditions.checkNotNull(okioWriteAllRunnable);
        runClientCommand(ResponseBodyWritingExecutor$$Lambda$4.lambdaFactory$(this, file, okioWriteAllRunnable));
    }

    public void start(ResponseBody responseBody, File file) {
        Preconditions.checkNotNull(responseBody);
        Preconditions.checkNotNull(file);
        long contentLength = responseBody.contentLength();
        try {
            start(file, createRunnable(file, contentLength, responseBody.source()));
        } catch (FileNotFoundException e) {
            emitError(file, e, OkHttpUtils.getInitialProgress(contentLength));
        }
    }
}
