package jd.dd.network.quic;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import jd.dd.network.quic.QuicExecutor;
import jd.dd.network.tcp.MessageProducer;
import jd.dd.network.tcp.protocol.BaseMessage;
import jd.dd.waiter.util.LogUtils;
import jd.dd.waiter.util.concurrent.DDThreadFactory;

/* loaded from: classes9.dex */
public class DataReader implements MessageProducer.IListener, QuicExecutor.OnRecvDataListener {
    private static final Charset CHAT_SET = Charset.forName("utf-8");
    private static final String TAG = "DataReader";
    private CharBuffer mCharBuffer = CharBuffer.allocate(524288);
    private volatile boolean mDone;
    private QuicExecutor mExecutor;
    private OnReadListener mListener;
    private MessageProducer mMessageDispatcher;
    private BlockingQueue<ByteBuffer> mQueue;
    private Thread mReadThread;

    /* loaded from: classes9.dex */
    public interface OnReadListener {
        void onMessageReceive(BaseMessage baseMessage);

        void onReadError(Exception exc);
    }

    public DataReader(QuicExecutor quicExecutor, OnReadListener onReadListener) {
        this.mExecutor = quicExecutor;
        this.mListener = onReadListener;
        quicExecutor.setOnRecvDataListener(this);
        this.mDone = false;
        this.mQueue = new LinkedBlockingQueue();
        Thread newThreadInstance = DDThreadFactory.newThreadInstance(new Runnable() { // from class: jd.dd.network.quic.DataReader.1
            @Override // java.lang.Runnable
            public void run() {
                LogUtils.d(DataReader.TAG, "Data Reader running ...");
                DataReader.this.parsePacket();
            }
        });
        this.mReadThread = newThreadInstance;
        newThreadInstance.setDaemon(true);
        this.mReadThread.setName("Data Reader");
        this.mMessageDispatcher = new MessageProducer(this);
    }

    private void expandBuffer() {
        CharBuffer allocate = CharBuffer.allocate(this.mCharBuffer.capacity() + 524288);
        this.mCharBuffer.position(0);
        this.mCharBuffer.mark();
        for (char c : this.mCharBuffer.array()) {
            allocate.put(c);
        }
        this.mCharBuffer = allocate;
    }

    private void flushPacket() {
        try {
            this.mCharBuffer.flip();
            char[] cArr = new char[this.mCharBuffer.limit()];
            this.mCharBuffer.get(cArr);
            this.mMessageDispatcher.add(new String(cArr));
            this.mCharBuffer.clear();
        } catch (Exception e10) {
            LogUtils.e(TAG, "Data Reader decode buffer error->" + e10.getMessage());
        }
    }

    private void notifyError(Exception exc) {
        OnReadListener onReadListener = this.mListener;
        if (onReadListener != null) {
            onReadListener.onReadError(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void parsePacket() {
        Exception e10 = null;
        while (!this.mDone) {
            try {
                writeBuffer(this.mQueue.take());
            } catch (InterruptedException e11) {
                e10 = e11;
                LogUtils.d(TAG, "The reader thread is interrupted.");
            } catch (Exception e12) {
                e10 = e12;
                LogUtils.e(TAG, "The reader (" + this + ") parse data error->" + e10.getMessage());
            }
        }
        if (!this.mDone) {
            notifyError(e10);
            LogUtils.e(TAG, "notifyReaderError.");
        }
        LogUtils.i(TAG, "read thread finished, mDone:" + this.mDone);
    }

    private void writeBuffer(ByteBuffer byteBuffer) {
        CharBuffer decode;
        if (byteBuffer == null || (decode = CHAT_SET.decode(byteBuffer)) == null) {
            return;
        }
        while (decode.hasRemaining()) {
            char c = decode.get();
            if ('\n' == c) {
                flushPacket();
            } else {
                if (!this.mCharBuffer.hasRemaining()) {
                    expandBuffer();
                }
                this.mCharBuffer.put(c);
            }
        }
    }

    public void destroy() {
        LogUtils.i(TAG, "shutting down the reader ...");
        this.mDone = true;
        MessageProducer messageProducer = this.mMessageDispatcher;
        if (messageProducer != null) {
            messageProducer.shutdown();
        }
        Thread thread = this.mReadThread;
        if (thread != null) {
            try {
                thread.interrupt();
                this.mReadThread.join();
            } catch (Exception e10) {
                LogUtils.e(TAG, "The reader (" + this + ") interrupt error->" + e10.getMessage());
            }
        }
        LogUtils.i(TAG, "The reader (" + this + ") is destroyed.");
    }

    @Override // jd.dd.network.tcp.MessageProducer.IListener
    public void onMessageReady(BaseMessage baseMessage) {
        this.mListener.onMessageReceive(baseMessage);
    }

    @Override // jd.dd.network.quic.QuicExecutor.OnRecvDataListener
    public void onRecvData(ByteBuffer byteBuffer) {
        try {
            this.mQueue.put(byteBuffer);
        } catch (Exception e10) {
            LogUtils.e(TAG, "The reader (" + this + ") recv data error->" + e10.getMessage());
        }
    }

    public void startup() {
        LogUtils.d(TAG, "startup the reader ...");
        this.mMessageDispatcher.start();
        this.mReadThread.start();
    }
}
