/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.mysql.source.split;

import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils;
import com.ververica.cdc.connectors.mysql.source.utils.StatementUtils;
import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;

public final class MySqlSplitSerializer
implements SimpleVersionedSerializer<MySqlSplit> {
    public static final MySqlSplitSerializer INSTANCE = new MySqlSplitSerializer();
    private static final int VERSION = 4;
    private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
    private static final int SNAPSHOT_SPLIT_FLAG = 1;
    private static final int BINLOG_SPLIT_FLAG = 2;

    public int getVersion() {
        return 4;
    }

    public byte[] serialize(MySqlSplit split) throws IOException {
        if (split.isSnapshotSplit()) {
            MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
            if (snapshotSplit.serializedFormCache != null) {
                return snapshotSplit.serializedFormCache;
            }
            DataOutputSerializer out = SERIALIZER_CACHE.get();
            out.writeInt(1);
            out.writeUTF(StatementUtils.quote(snapshotSplit.getTableId()));
            out.writeUTF(snapshotSplit.splitId());
            out.writeUTF(snapshotSplit.getSplitKeyType().asSerializableString());
            Object[] splitStart = snapshotSplit.getSplitStart();
            Object[] splitEnd = snapshotSplit.getSplitEnd();
            out.writeUTF(SerializerUtils.rowToSerializedString(splitStart));
            out.writeUTF(SerializerUtils.rowToSerializedString(splitEnd));
            SerializerUtils.writeBinlogPosition(snapshotSplit.getHighWatermark(), out);
            MySqlSplitSerializer.writeTableSchemas(snapshotSplit.getTableSchemas(), out);
            byte[] result = out.getCopyOfBuffer();
            out.clear();
            snapshotSplit.serializedFormCache = result;
            return result;
        }
        MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
        if (binlogSplit.serializedFormCache != null) {
            return binlogSplit.serializedFormCache;
        }
        DataOutputSerializer out = SERIALIZER_CACHE.get();
        out.writeInt(2);
        out.writeUTF(binlogSplit.splitId());
        out.writeUTF("");
        SerializerUtils.writeBinlogPosition(binlogSplit.getStartingOffset(), out);
        SerializerUtils.writeBinlogPosition(binlogSplit.getEndingOffset(), out);
        MySqlSplitSerializer.writeFinishedSplitsInfo(binlogSplit.getFinishedSnapshotSplitInfos(), out);
        MySqlSplitSerializer.writeTableSchemas(binlogSplit.getTableSchemas(), out);
        out.writeInt(binlogSplit.getTotalFinishedSplitSize());
        out.writeBoolean(binlogSplit.isSuspended());
        byte[] result = out.getCopyOfBuffer();
        out.clear();
        binlogSplit.serializedFormCache = result;
        return result;
    }

    public MySqlSplit deserialize(int version, byte[] serialized) throws IOException {
        switch (version) {
            case 1: 
            case 2: 
            case 3: 
            case 4: {
                return this.deserializeSplit(version, serialized);
            }
        }
        throw new IOException("Unknown version: " + version);
    }

    public MySqlSplit deserializeSplit(int version, byte[] serialized) throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(serialized);
        int splitKind = in.readInt();
        if (splitKind == 1) {
            TableId tableId = TableId.parse((String)in.readUTF());
            String splitId = in.readUTF();
            RowType splitKeyType = (RowType)LogicalTypeParser.parse((String)in.readUTF());
            Object[] splitBoundaryStart = SerializerUtils.serializedStringToRow(in.readUTF());
            Object[] splitBoundaryEnd = SerializerUtils.serializedStringToRow(in.readUTF());
            BinlogOffset highWatermark = SerializerUtils.readBinlogPosition(version, in);
            Map<TableId, TableChanges.TableChange> tableSchemas = MySqlSplitSerializer.readTableSchemas(version, in);
            return new MySqlSnapshotSplit(tableId, splitId, splitKeyType, splitBoundaryStart, splitBoundaryEnd, highWatermark, tableSchemas);
        }
        if (splitKind == 2) {
            String splitId = in.readUTF();
            in.readUTF();
            BinlogOffset startingOffset = SerializerUtils.readBinlogPosition(version, in);
            BinlogOffset endingOffset = SerializerUtils.readBinlogPosition(version, in);
            List<FinishedSnapshotSplitInfo> finishedSplitsInfo = MySqlSplitSerializer.readFinishedSplitsInfo(version, in);
            Map<TableId, TableChanges.TableChange> tableChangeMap = MySqlSplitSerializer.readTableSchemas(version, in);
            int totalFinishedSplitSize = finishedSplitsInfo.size();
            boolean isSuspended = false;
            if (version >= 3) {
                totalFinishedSplitSize = in.readInt();
                if (version > 3) {
                    isSuspended = in.readBoolean();
                }
            }
            in.releaseArrays();
            return new MySqlBinlogSplit(splitId, startingOffset, endingOffset, finishedSplitsInfo, tableChangeMap, totalFinishedSplitSize, isSuspended);
        }
        throw new IOException("Unknown split kind: " + splitKind);
    }

    public static void writeTableSchemas(Map<TableId, TableChanges.TableChange> tableSchemas, DataOutputSerializer out) throws IOException {
        FlinkJsonTableChangeSerializer jsonSerializer = new FlinkJsonTableChangeSerializer();
        DocumentWriter documentWriter = DocumentWriter.defaultWriter();
        int size = tableSchemas.size();
        out.writeInt(size);
        for (Map.Entry<TableId, TableChanges.TableChange> entry : tableSchemas.entrySet()) {
            out.writeUTF(entry.getKey().toString());
            String tableChangeStr = documentWriter.write(jsonSerializer.toDocument(entry.getValue()));
            byte[] tableChangeBytes = tableChangeStr.getBytes(StandardCharsets.UTF_8);
            out.writeInt(tableChangeBytes.length);
            out.write(tableChangeBytes);
        }
    }

    public static Map<TableId, TableChanges.TableChange> readTableSchemas(int version, DataInputDeserializer in) throws IOException {
        DocumentReader documentReader = DocumentReader.defaultReader();
        HashMap<TableId, TableChanges.TableChange> tableSchemas = new HashMap<TableId, TableChanges.TableChange>();
        int size = in.readInt();
        for (int i = 0; i < size; ++i) {
            String tableChangeStr;
            TableId tableId = TableId.parse((String)in.readUTF());
            switch (version) {
                case 1: {
                    tableChangeStr = in.readUTF();
                    break;
                }
                case 2: 
                case 3: 
                case 4: {
                    int len = in.readInt();
                    byte[] bytes = new byte[len];
                    in.read(bytes);
                    tableChangeStr = new String(bytes, StandardCharsets.UTF_8);
                    break;
                }
                default: {
                    throw new IOException("Unknown version: " + version);
                }
            }
            Document document = documentReader.read(tableChangeStr);
            TableChanges.TableChange tableChange = FlinkJsonTableChangeSerializer.fromDocument((Document)document, (boolean)true);
            tableSchemas.put(tableId, tableChange);
        }
        return tableSchemas;
    }

    private static void writeFinishedSplitsInfo(List<FinishedSnapshotSplitInfo> finishedSplitsInfo, DataOutputSerializer out) throws IOException {
        int size = finishedSplitsInfo.size();
        out.writeInt(size);
        for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
            out.writeUTF(splitInfo.getTableId().toString());
            out.writeUTF(splitInfo.getSplitId());
            out.writeUTF(SerializerUtils.rowToSerializedString(splitInfo.getSplitStart()));
            out.writeUTF(SerializerUtils.rowToSerializedString(splitInfo.getSplitEnd()));
            SerializerUtils.writeBinlogPosition(splitInfo.getHighWatermark(), out);
        }
    }

    private static List<FinishedSnapshotSplitInfo> readFinishedSplitsInfo(int version, DataInputDeserializer in) throws IOException {
        ArrayList<FinishedSnapshotSplitInfo> finishedSplitsInfo = new ArrayList<FinishedSnapshotSplitInfo>();
        int size = in.readInt();
        for (int i = 0; i < size; ++i) {
            TableId tableId = TableId.parse((String)in.readUTF());
            String splitId = in.readUTF();
            Object[] splitStart = SerializerUtils.serializedStringToRow(in.readUTF());
            Object[] splitEnd = SerializerUtils.serializedStringToRow(in.readUTF());
            BinlogOffset highWatermark = SerializerUtils.readBinlogPosition(version, in);
            finishedSplitsInfo.add(new FinishedSnapshotSplitInfo(tableId, splitId, splitStart, splitEnd, highWatermark));
        }
        return finishedSplitsInfo;
    }
}

