package my.com.iflix.core.data.kinesis;

import com.amazonaws.mobileconnectors.kinesis.kinesisrecorder.KinesisRecorder;
import com.amazonaws.util.StringUtils;
import com.google.gson.Gson;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import my.com.iflix.core.utils.RxHelpers;
import timber.log.Timber;

/* loaded from: classes.dex */
public class BatchedKinesisRecorder {
    public static final int MAX_BUFFER_SIZE = 10;
    public static final int SUBMIT_DELAY_SECONDS = 60;
    List<String> bufferedRecords;
    final Gson gson;
    Scheduler intervalScheduler;
    KinesisRecorder kinesisRecorder;
    final String loggingTag;
    Disposable recordSubmissionDisposable;
    String streamName;

    public BatchedKinesisRecorder(Gson gson, String str) {
        this(gson, str, Schedulers.computation());
    }

    public BatchedKinesisRecorder(Gson gson, String str, Scheduler scheduler) {
        this.loggingTag = str;
        this.gson = gson;
        this.intervalScheduler = scheduler;
        this.bufferedRecords = Collections.synchronizedList(new ArrayList(10));
    }

    private void bufferRecord(String str) {
        if (this.bufferedRecords.size() >= 10) {
            this.bufferedRecords.remove(0);
        }
        this.bufferedRecords.add(str);
    }

    private void saveAndClearBufferedRecords() {
        Observable.fromCallable(BatchedKinesisRecorder$$Lambda$8.lambdaFactory$(this)).compose(RxHelpers.applyDefaultSchedulers()).subscribe(BatchedKinesisRecorder$$Lambda$9.lambdaFactory$(this), BatchedKinesisRecorder$$Lambda$10.lambdaFactory$(this));
    }

    private void scheduleRecordSubmission(int i) {
        if (this.recordSubmissionDisposable != null) {
            this.recordSubmissionDisposable.dispose();
        }
        this.recordSubmissionDisposable = Observable.interval(i, i, TimeUnit.SECONDS, this.intervalScheduler).subscribe(BatchedKinesisRecorder$$Lambda$7.lambdaFactory$(this));
    }

    public KinesisRecorder getKinesisRecorder() {
        return this.kinesisRecorder;
    }

    public void initialise(KinesisRecorder kinesisRecorder, String str) {
        initialise(kinesisRecorder, str, 60);
    }

    public void initialise(KinesisRecorder kinesisRecorder, String str, int i) {
        if (kinesisRecorder == null || str == null) {
            return;
        }
        this.kinesisRecorder = kinesisRecorder;
        this.streamName = str;
        saveAndClearBufferedRecords();
        if (i <= 0) {
            i = 60;
        }
        scheduleRecordSubmission(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ Integer lambda$saveAndClearBufferedRecords$7() throws Exception {
        int size = this.bufferedRecords.size();
        Iterator<String> it = this.bufferedRecords.iterator();
        while (it.hasNext()) {
            this.kinesisRecorder.saveRecord(it.next().getBytes(StringUtils.UTF8), this.streamName);
        }
        this.bufferedRecords.clear();
        return Integer.valueOf(size);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ void lambda$saveAndClearBufferedRecords$8(Integer num) throws Exception {
        Timber.d("[%s] Logged %d buffered records.", this.loggingTag, num);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ void lambda$saveAndClearBufferedRecords$9(Throwable th) throws Exception {
        Timber.w(th, "[%s] Error logging record", this.loggingTag);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ String lambda$saveRecord$0(Object obj) throws Exception {
        String json = this.gson.toJson(obj);
        if (this.kinesisRecorder == null) {
            bufferRecord(json);
            return "[Buffered] " + json;
        }
        this.kinesisRecorder.saveRecord(json.getBytes(StringUtils.UTF8), this.streamName);
        return json;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ void lambda$saveRecord$1(String str) throws Exception {
        Timber.d("[%s] Logged: %s", this.loggingTag, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ void lambda$saveRecord$2(Throwable th) throws Exception {
        Timber.w(th, "[%s] Error logging record", this.loggingTag);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ void lambda$scheduleRecordSubmission$6(Long l) throws Exception {
        submitRecords();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ Long lambda$submitRecords$3() throws Exception {
        this.kinesisRecorder.submitAllRecords();
        return Long.valueOf(this.kinesisRecorder.getDiskBytesUsed());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ void lambda$submitRecords$4(Long l) throws Exception {
        Timber.d("[%s] Record submitted. Disk used: %s bytes", this.loggingTag, l);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public /* synthetic */ void lambda$submitRecords$5(Throwable th) throws Exception {
        Timber.w(th, "[%s] Error submitting Records", this.loggingTag);
    }

    public void saveRecord(Object obj) {
        Observable.fromCallable(BatchedKinesisRecorder$$Lambda$1.lambdaFactory$(this, obj)).compose(RxHelpers.applyDefaultSchedulers()).subscribe(BatchedKinesisRecorder$$Lambda$2.lambdaFactory$(this), BatchedKinesisRecorder$$Lambda$3.lambdaFactory$(this));
    }

    public void submitRecords() {
        Observable.fromCallable(BatchedKinesisRecorder$$Lambda$4.lambdaFactory$(this)).compose(RxHelpers.applyDefaultSchedulers()).subscribe(BatchedKinesisRecorder$$Lambda$5.lambdaFactory$(this), BatchedKinesisRecorder$$Lambda$6.lambdaFactory$(this));
    }
}
