/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.mysql.source.assigners;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
import com.ververica.cdc.connectors.mysql.source.assigners.ChunkSplitter;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSnapshotSplitAssigner
implements MySqlSplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitAssigner.class);
    private final List<TableId> alreadyProcessedTables;
    private final List<MySqlSnapshotSplit> remainingSplits;
    private final Map<String, MySqlSnapshotSplit> assignedSplits;
    private final Map<String, BinlogOffset> splitFinishedOffsets;
    private boolean assignerFinished;
    private final MySqlSourceConfig sourceConfig;
    private final int currentParallelism;
    private final LinkedList<TableId> remainingTables;
    private final boolean isRemainingTablesCheckpointed;
    private ChunkSplitter chunkSplitter;
    private boolean isTableIdCaseSensitive;
    @Nullable
    private Long checkpointIdToFinish;

    public MySqlSnapshotSplitAssigner(MySqlSourceConfig sourceConfig, int currentParallelism, List<TableId> remainingTables, boolean isTableIdCaseSensitive) {
        this(sourceConfig, currentParallelism, new ArrayList<TableId>(), new ArrayList<MySqlSnapshotSplit>(), new HashMap<String, MySqlSnapshotSplit>(), new HashMap<String, BinlogOffset>(), false, remainingTables, isTableIdCaseSensitive, true);
    }

    public MySqlSnapshotSplitAssigner(MySqlSourceConfig sourceConfig, int currentParallelism, SnapshotPendingSplitsState checkpoint) {
        this(sourceConfig, currentParallelism, checkpoint.getAlreadyProcessedTables(), checkpoint.getRemainingSplits(), checkpoint.getAssignedSplits(), checkpoint.getSplitFinishedOffsets(), checkpoint.isAssignerFinished(), checkpoint.getRemainingTables(), checkpoint.isTableIdCaseSensitive(), checkpoint.isRemainingTablesCheckpointed());
    }

    private MySqlSnapshotSplitAssigner(MySqlSourceConfig sourceConfig, int currentParallelism, List<TableId> alreadyProcessedTables, List<MySqlSnapshotSplit> remainingSplits, Map<String, MySqlSnapshotSplit> assignedSplits, Map<String, BinlogOffset> splitFinishedOffsets, boolean assignerFinished, List<TableId> remainingTables, boolean isTableIdCaseSensitive, boolean isRemainingTablesCheckpointed) {
        this.sourceConfig = sourceConfig;
        this.currentParallelism = currentParallelism;
        this.alreadyProcessedTables = alreadyProcessedTables;
        this.remainingSplits = remainingSplits;
        this.assignedSplits = assignedSplits;
        this.splitFinishedOffsets = splitFinishedOffsets;
        this.assignerFinished = assignerFinished;
        this.remainingTables = new LinkedList<TableId>(remainingTables);
        this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
        this.isTableIdCaseSensitive = isTableIdCaseSensitive;
    }

    @Override
    public void open() {
        this.chunkSplitter = MySqlSnapshotSplitAssigner.createChunkSplitter(this.sourceConfig, this.isTableIdCaseSensitive);
        if (!this.isRemainingTablesCheckpointed && !this.assignerFinished) {
            try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(this.sourceConfig);){
                List<TableId> discoverTables = DebeziumUtils.discoverCapturedTables(jdbc, this.sourceConfig);
                discoverTables.removeAll(this.alreadyProcessedTables);
                this.remainingTables.addAll(discoverTables);
                this.isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Failed to discover remaining tables to capture", (Throwable)e);
            }
        }
    }

    @Override
    public Optional<MySqlSplit> getNext() {
        if (!this.remainingSplits.isEmpty()) {
            Iterator<MySqlSnapshotSplit> iterator = this.remainingSplits.iterator();
            MySqlSnapshotSplit split = iterator.next();
            iterator.remove();
            this.assignedSplits.put(split.splitId(), split);
            return Optional.of(split);
        }
        TableId nextTable = this.remainingTables.pollFirst();
        if (nextTable != null) {
            Collection<MySqlSnapshotSplit> splits = this.chunkSplitter.generateSplits(nextTable);
            this.remainingSplits.addAll(splits);
            this.alreadyProcessedTables.add(nextTable);
            return this.getNext();
        }
        return Optional.empty();
    }

    @Override
    public boolean waitingForFinishedSplits() {
        return !this.allSplitsFinished();
    }

    @Override
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        if (this.waitingForFinishedSplits()) {
            LOG.error("The assigner is not ready to offer finished split information, this should not be called");
            throw new FlinkRuntimeException("The assigner is not ready to offer finished split information, this should not be called");
        }
        List assignedSnapshotSplit = this.assignedSplits.values().stream().sorted(Comparator.comparing(MySqlSplit::splitId)).collect(Collectors.toList());
        ArrayList<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<FinishedSnapshotSplitInfo>();
        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
            BinlogOffset binlogOffset = this.splitFinishedOffsets.get(split.splitId());
            finishedSnapshotSplitInfos.add(new FinishedSnapshotSplitInfo(split.getTableId(), split.splitId(), split.getSplitStart(), split.getSplitEnd(), binlogOffset));
        }
        return finishedSnapshotSplitInfos;
    }

    @Override
    public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
        this.splitFinishedOffsets.putAll(splitFinishedOffsets);
        if (this.allSplitsFinished()) {
            if (this.currentParallelism == 1) {
                this.assignerFinished = true;
                LOG.info("Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
            } else {
                LOG.info("Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
            }
        }
    }

    @Override
    public void addSplits(Collection<MySqlSplit> splits) {
        for (MySqlSplit split : splits) {
            this.remainingSplits.add(split.asSnapshotSplit());
            this.assignedSplits.remove(split.splitId());
            this.splitFinishedOffsets.remove(split.splitId());
        }
    }

    @Override
    public SnapshotPendingSplitsState snapshotState(long checkpointId) {
        SnapshotPendingSplitsState state = new SnapshotPendingSplitsState(this.alreadyProcessedTables, this.remainingSplits, this.assignedSplits, this.splitFinishedOffsets, this.assignerFinished, this.remainingTables, this.isTableIdCaseSensitive, true);
        if (this.checkpointIdToFinish == null && !this.assignerFinished && this.allSplitsFinished()) {
            this.checkpointIdToFinish = checkpointId;
        }
        return state;
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        if (this.checkpointIdToFinish != null && !this.assignerFinished && this.allSplitsFinished()) {
            this.assignerFinished = checkpointId >= this.checkpointIdToFinish;
            LOG.info("Snapshot split assigner is turn into finished status.");
        }
    }

    @Override
    public void close() {
    }

    public boolean noMoreSplits() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty();
    }

    public boolean isFinished() {
        return this.assignerFinished;
    }

    public Map<String, MySqlSnapshotSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    public Map<String, BinlogOffset> getSplitFinishedOffsets() {
        return this.splitFinishedOffsets;
    }

    private boolean allSplitsFinished() {
        return this.noMoreSplits() && this.assignedSplits.size() == this.splitFinishedOffsets.size();
    }

    private static ChunkSplitter createChunkSplitter(MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) {
        MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, isTableIdCaseSensitive);
        return new ChunkSplitter(mySqlSchema, sourceConfig);
    }
}

