/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.clogproxy.client.connection;

import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.connection.Connection;
import com.oceanbase.clogproxy.client.connection.ConnectionFactory;
import com.oceanbase.clogproxy.client.connection.ConnectionParams;
import com.oceanbase.clogproxy.client.connection.StreamContext;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.listener.StatusListener;
import io.netty.handler.ssl.SslContext;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientStream {
    private static final Logger logger = LoggerFactory.getLogger(ClientStream.class);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private Thread thread = null;
    private StreamContext context = null;
    private String checkpointString;
    private int retryTimes = 0;
    private Connection connection = null;
    private final AtomicBoolean reconnecting = new AtomicBoolean(true);
    private final AtomicBoolean reconnect = new AtomicBoolean(true);
    private final List<RecordListener> listeners = new ArrayList<RecordListener>();
    private final List<StatusListener> statusListeners = new ArrayList<StatusListener>();

    public ClientStream(ConnectionParams connectionParams, SslContext sslContext) {
        this.context = new StreamContext(this, connectionParams, sslContext);
    }

    public void stop() {
        if (!this.started.compareAndSet(true, false)) {
            logger.info("stopping LogProxy Client....");
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
            this.join();
            this.thread = null;
        }
        logger.info("stopped LogProxy Client");
    }

    public void join() {
        if (this.thread != null) {
            try {
                this.thread.join();
            }
            catch (InterruptedException e) {
                logger.warn("ClientStream thread is interrupted: " + e.getMessage());
                this.stop();
            }
        }
    }

    public void triggerStop() {
        new Thread(this::stop).start();
    }

    public void triggerException(LogProxyClientException e) {
        new Thread(() -> {
            for (RecordListener listener : this.listeners) {
                listener.onException(e);
            }
        }).start();
    }

    public void start() {
        this.context.params.setEnableMonitor(!this.statusListeners.isEmpty());
        if (this.started.compareAndSet(false, true)) {
            this.thread = new Thread(() -> {
                block11: while (this.isRunning()) {
                    StreamContext.TransferPacket packet;
                    ReconnectState state = this.reconnect();
                    if (state == ReconnectState.EXIT) {
                        logger.error("read thread to exit");
                        this.triggerException(new LogProxyClientException(ErrorCode.E_MAX_RECONNECT, "exceed max reconnect retry"));
                        break;
                    }
                    if (state == ReconnectState.RETRY) {
                        try {
                            TimeUnit.SECONDS.sleep(ClientConf.RETRY_INTERVAL_S);
                        }
                        catch (InterruptedException interruptedException) {}
                        continue;
                    }
                    while (true) {
                        try {
                            packet = this.context.recordQueue().poll(ClientConf.READ_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
                        }
                        catch (InterruptedException interruptedException) {
                            continue;
                        }
                        break;
                    }
                    if (packet == null) continue;
                    try {
                        switch (packet.getType()) {
                            case DATA_CLIENT: {
                                for (RecordListener recordListener : this.listeners) {
                                    recordListener.notify(packet.getRecord());
                                }
                                continue block11;
                            }
                            case STATUS: {
                                for (StatusListener statusListener : this.statusListeners) {
                                    statusListener.notify(packet.getStatus());
                                }
                                continue block11;
                            }
                            default: {
                                throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "Unsupported Packet Type: " + packet.getType());
                            }
                        }
                    }
                    catch (LogProxyClientException e) {
                        this.triggerStop();
                        this.triggerException(e);
                        return;
                    }
                    catch (Exception e) {
                        this.triggerStop();
                        this.triggerException(new LogProxyClientException(ErrorCode.E_USER, e));
                        return;
                    }
                }
                this.started.set(false);
                if (this.connection != null) {
                    this.connection.close();
                }
                this.thread = null;
                logger.warn("!!! read thread exit !!!");
            });
            this.thread.setDaemon(false);
            this.thread.start();
        }
    }

    public boolean isRunning() {
        return this.started.get();
    }

    private ReconnectState reconnect() {
        if (this.reconnect.compareAndSet(true, false)) {
            logger.warn("start to reconnect...");
            try {
                if (ClientConf.MAX_RECONNECT_TIMES != -1 && this.retryTimes >= ClientConf.MAX_RECONNECT_TIMES) {
                    logger.error("failed to reconnect, exceed max reconnect retry time: {}", (Object)ClientConf.MAX_RECONNECT_TIMES);
                    this.reconnect.set(true);
                    ReconnectState reconnectState = ReconnectState.EXIT;
                    return reconnectState;
                }
                if (this.connection != null) {
                    this.connection.close();
                    this.connection = null;
                }
                if (StringUtils.isNotEmpty((CharSequence)this.checkpointString)) {
                    logger.warn("reconnect set checkpoint: {}", (Object)this.checkpointString);
                    this.context.getParams().updateCheckpoint(this.checkpointString);
                }
                this.connection = ConnectionFactory.instance().createConnection(this.context);
                if (this.connection != null) {
                    logger.warn("reconnect SUCC");
                    this.retryTimes = 0;
                    this.reconnect.compareAndSet(true, false);
                    ReconnectState reconnectState = ReconnectState.SUCCESS;
                    return reconnectState;
                }
                logger.error("failed to reconnect, retry count: {}, max: {}", (Object)(++this.retryTimes), (Object)ClientConf.MAX_RECONNECT_TIMES);
                this.reconnect.set(true);
                ReconnectState reconnectState = ReconnectState.RETRY;
                return reconnectState;
            }
            catch (Exception e) {
                logger.error("failed to reconnect, retry count: {}, max: {}, message: {}", new Object[]{++this.retryTimes, ClientConf.MAX_RECONNECT_TIMES, e});
                this.reconnect.set(true);
                ReconnectState reconnectState = ReconnectState.RETRY;
                return reconnectState;
            }
            finally {
                this.reconnecting.set(false);
            }
        }
        return ReconnectState.SUCCESS;
    }

    public void triggerReconnect() {
        if (this.reconnecting.compareAndSet(false, true)) {
            this.reconnect.compareAndSet(false, true);
        }
    }

    public synchronized void addListener(RecordListener recordListener) {
        this.listeners.add(recordListener);
    }

    public synchronized void addStatusListener(StatusListener statusListener) {
        this.statusListeners.add(statusListener);
    }

    private static enum ReconnectState {
        SUCCESS,
        RETRY,
        EXIT;

    }
}

