package com.zhulong.eduvideo.module_video.view.cc.view.live;

import com.baidu.mobstat.Config;
import com.blankj.utilcode.util.LogUtils;
import com.orhanobut.logger.Logger;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: classes3.dex */
public class RxSocket {
    private static final AtomicReference<RxSocket> INSTANCE = new AtomicReference<>();
    private BufferedReader bufferedReader;
    private BufferedWriter bufferedWriter;
    private Socket socket;
    private boolean connect = false;
    private boolean threadStart = false;

    /* renamed from: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket$2, reason: invalid class name */
    /* loaded from: classes3.dex */
    class AnonymousClass2 implements ObservableTransformer<Boolean, String> {
        AnonymousClass2() {
        }

        @Override // io.reactivex.ObservableTransformer
        public ObservableSource<String> apply(Observable<Boolean> observable) {
            return observable.flatMap(new Function<Boolean, ObservableSource<? extends String>>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.2.1
                @Override // io.reactivex.functions.Function
                public ObservableSource<? extends String> apply(Boolean bool) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<String>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.2.1.1
                        @Override // io.reactivex.ObservableOnSubscribe
                        public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                            RxSocket.this.startThread(observableEmitter);
                        }
                    });
                }
            }).retry().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket$4, reason: invalid class name */
    /* loaded from: classes3.dex */
    public class AnonymousClass4 implements ObservableTransformer<Boolean, Boolean> {
        final /* synthetic */ String val$data;
        final /* synthetic */ int val$period;

        AnonymousClass4(int i, String str) {
            this.val$period = i;
            this.val$data = str;
        }

        @Override // io.reactivex.ObservableTransformer
        public ObservableSource<Boolean> apply(Observable<Boolean> observable) {
            return observable.flatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.4.1
                @Override // io.reactivex.functions.Function
                public ObservableSource<? extends Boolean> apply(Boolean bool) throws Exception {
                    return RxSocket.this.interval(AnonymousClass4.this.val$period).flatMap(new Function<Long, ObservableSource<? extends Boolean>>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.4.1.1
                        @Override // io.reactivex.functions.Function
                        public ObservableSource<? extends Boolean> apply(Long l) throws Exception {
                            return RxSocket.this.send(AnonymousClass4.this.val$data);
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX INFO: Add missing generic type declarations: [T] */
    /* renamed from: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket$6, reason: invalid class name */
    /* loaded from: classes3.dex */
    public class AnonymousClass6<T> implements ObservableTransformer<T, String> {
        AnonymousClass6() {
        }

        @Override // io.reactivex.ObservableTransformer
        public ObservableSource<String> apply(Observable<T> observable) {
            return observable.flatMap(new Function<T, ObservableSource<? extends String>>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.6.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // io.reactivex.functions.Function
                public ObservableSource<? extends String> apply(T t) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<String>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.6.1.1
                        @Override // io.reactivex.ObservableOnSubscribe
                        public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                            RxSocket.this.startThread(observableEmitter);
                        }
                    });
                }

                @Override // io.reactivex.functions.Function
                public /* bridge */ /* synthetic */ ObservableSource<? extends String> apply(Object obj) throws Exception {
                    return apply((AnonymousClass1) obj);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public class ReadThread extends Thread {
        private SocketCallBack callBack;

        private ReadThread(SocketCallBack socketCallBack) {
            this.callBack = socketCallBack;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                RxSocket.this.threadStart = true;
                if (RxSocket.this.socket.isConnected()) {
                    if (RxSocket.this.socket.getInputStream() != null) {
                        RxSocket.this.bufferedReader = new BufferedReader(new InputStreamReader(RxSocket.this.socket.getInputStream()));
                    }
                    if (RxSocket.this.socket.getOutputStream() != null) {
                        RxSocket.this.bufferedWriter = new BufferedWriter(new OutputStreamWriter(RxSocket.this.socket.getOutputStream()));
                    }
                }
                while (RxSocket.this.connect) {
                    if (RxSocket.this.bufferedReader != null) {
                        this.callBack.onReceive(RxSocket.this.bufferedReader.readLine());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                Logger.v((String) Objects.requireNonNull(e.getMessage()), new Object[0]);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public interface SocketCallBack {
        void onReceive(String str);
    }

    private Observable<Boolean> connect(final String str, final int i) {
        return Observable.create(new ObservableOnSubscribe<Boolean>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.3
            @Override // io.reactivex.ObservableOnSubscribe
            public void subscribe(ObservableEmitter<Boolean> observableEmitter) throws Exception {
                RxSocket.this.close();
                RxSocket.this.socket = new Socket(str, i);
                observableEmitter.onNext(Boolean.valueOf(RxSocket.this.connect = true));
                observableEmitter.onComplete();
            }
        }).retry();
    }

    public static RxSocket getInstance() {
        RxSocket rxSocket;
        do {
            RxSocket rxSocket2 = INSTANCE.get();
            if (rxSocket2 != null) {
                return rxSocket2;
            }
            rxSocket = new RxSocket();
        } while (!INSTANCE.compareAndSet(null, rxSocket));
        return rxSocket;
    }

    private ObservableTransformer<Boolean, Boolean> heartBeat(int i, String str) {
        return new AnonymousClass4(i, str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Long> interval(int i) {
        return Observable.interval(0L, i, TimeUnit.SECONDS);
    }

    public static <T> ObservableTransformer<T, T> io_main() {
        return new ObservableTransformer() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxSocket$K0bMfdz2bd1S0oxCdNIBkDoKK0w
            @Override // io.reactivex.ObservableTransformer
            public final ObservableSource apply(Observable observable) {
                ObservableSource observeOn;
                observeOn = observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
                return observeOn;
            }
        };
    }

    private <T> ObservableTransformer<T, String> read() {
        return new AnonymousClass6();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startThread(final ObservableEmitter<String> observableEmitter) {
        if (!this.connect || this.threadStart) {
            return;
        }
        Logger.d("socket-读取线程开启");
        observableEmitter.onNext(Config.TRACE_VISIT_FIRST);
        LogUtils.v("第一次进入或者重新连接");
        new ReadThread(new SocketCallBack() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxSocket$VhydVbY2LNGL70LLaEGjJw4kNAo
            @Override // com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.SocketCallBack
            public final void onReceive(String str) {
                ObservableEmitter.this.onNext(str);
            }
        }).start();
    }

    public void close() throws IOException {
        Logger.d("socke-初始化Socket");
        this.connect = false;
        this.threadStart = false;
        Socket socket = this.socket;
        if (socket != null) {
            socket.close();
            this.socket = null;
        }
        BufferedReader bufferedReader = this.bufferedReader;
        if (bufferedReader != null) {
            bufferedReader.close();
            this.bufferedReader = null;
        }
        BufferedWriter bufferedWriter = this.bufferedWriter;
        if (bufferedWriter != null) {
            bufferedWriter.close();
            this.bufferedWriter = null;
        }
    }

    public ObservableTransformer<Boolean, String> heartBeatChange() {
        return new AnonymousClass2();
    }

    public Observable<String> reconnection(String str, int i) {
        return connect(str, i).compose(read()).retry().compose(io_main());
    }

    public Observable<Long> reconnectionAndHeartBeat(String str, int i, final int i2) {
        return connect(str, i).flatMap(new Function<Boolean, ObservableSource<? extends Long>>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.1
            @Override // io.reactivex.functions.Function
            public ObservableSource<? extends Long> apply(Boolean bool) throws Exception {
                return RxSocket.this.interval(i2);
            }
        });
    }

    public Observable<String> reconnectionAndHeartBeat(String str, int i, int i2, String str2) {
        return connect(str, i).compose(heartBeat(i2, str2)).compose(read()).retry().compose(io_main());
    }

    public Observable<Boolean> send(String str) {
        return Observable.just(str).flatMap(new Function<String, ObservableSource<? extends Boolean>>() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.RxSocket.5
            @Override // io.reactivex.functions.Function
            public ObservableSource<? extends Boolean> apply(String str2) throws Exception {
                if (!RxSocket.this.connect || RxSocket.this.bufferedWriter == null) {
                    return Observable.just(false);
                }
                RxSocket.this.bufferedWriter.write(str2.concat("\r\n"));
                RxSocket.this.bufferedWriter.flush();
                return Observable.just(true);
            }
        }).subscribeOn(Schedulers.io()).timeout(5L, TimeUnit.SECONDS, Observable.just(false));
    }
}
