/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql.legacy;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.Module;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.legacy.BinlogReader;
import io.debezium.connector.mysql.legacy.BlockingReader;
import io.debezium.connector.mysql.legacy.ChainedReader;
import io.debezium.connector.mysql.legacy.Filters;
import io.debezium.connector.mysql.legacy.MySqlJdbcContext;
import io.debezium.connector.mysql.legacy.MySqlTaskContext;
import io.debezium.connector.mysql.legacy.ParallelSnapshotReader;
import io.debezium.connector.mysql.legacy.ReconcilingBinlogReader;
import io.debezium.connector.mysql.legacy.SnapshotReader;
import io.debezium.connector.mysql.legacy.SourceInfo;
import io.debezium.connector.mysql.legacy.TimedBlockingReader;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class MySqlConnectorTask
extends BaseSourceTask {
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private volatile MySqlTaskContext taskContext;
    private volatile MySqlJdbcContext connectionContext;
    private volatile ChainedReader readers;

    public String version() {
        return Module.version();
    }

    public ChangeEventSourceCoordinator start(Configuration config) {
        String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
        LoggingContext.PreviousContext prevLoggingContext = LoggingContext.forConnector((String)Module.contextName(), (String)serverName, (String)"task");
        try {
            SourceInfo source;
            boolean startWithSnapshot = false;
            Map partition = Collect.hashMapOf((Object)"server", (Object)serverName);
            Map<String, ?> offsets = this.getRestartOffset(this.context.offsetStorageReader().offset(partition));
            if (offsets != null) {
                Filters filters = SourceInfo.offsetsHaveFilterInfo(offsets) ? MySqlConnectorTask.getOldFilters(offsets, config) : MySqlConnectorTask.getAllFilters(config);
                this.taskContext = MySqlConnectorTask.createAndStartTaskContext(config, filters);
                this.connectionContext = this.taskContext.getConnectionContext();
                source = this.taskContext.source();
                source.setOffset(offsets);
                this.logger.info("Found existing offset: {}", offsets);
                if (!this.taskContext.historyExists()) {
                    if (this.taskContext.isSchemaOnlyRecoverySnapshot()) {
                        startWithSnapshot = true;
                        if (!this.isBinlogAvailable()) {
                            String msg = "The connector is trying to read binlog starting at " + (Object)((Object)source) + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.";
                            throw new ConnectException(msg);
                        }
                    } else {
                        String msg = "The db history topic is missing. You may attempt to recover it by reconfiguring the connector to " + (Object)((Object)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
                        throw new ConnectException(msg);
                    }
                    this.logger.info("The db-history topic is missing but we are in {} snapshot mode. Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.", (Object)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
                    this.taskContext.initializeHistoryStorage();
                } else {
                    this.taskContext.loadHistory(source);
                    if (source.isSnapshotInEffect()) {
                        if (this.taskContext.isSnapshotNeverAllowed()) {
                            String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
                            throw new ConnectException(msg);
                        }
                        startWithSnapshot = true;
                        this.logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
                    } else {
                        startWithSnapshot = false;
                        if (!this.isBinlogAvailable()) {
                            if (!this.taskContext.isSnapshotAllowedWhenNeeded()) {
                                String msg = "The connector is trying to read binlog starting at " + (Object)((Object)source) + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.";
                                throw new ConnectException(msg);
                            }
                            startWithSnapshot = true;
                        }
                    }
                }
            } else {
                this.taskContext = MySqlConnectorTask.createAndStartTaskContext(config, MySqlConnectorTask.getAllFilters(config));
                this.taskContext.initializeHistoryStorage();
                this.connectionContext = this.taskContext.getConnectionContext();
                source = this.taskContext.source();
                if (this.taskContext.isSnapshotNeverAllowed()) {
                    this.logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
                    source.setBinlogStartPoint("", 0L);
                    this.taskContext.initializeHistory();
                    String earliestBinlogFilename = this.earliestBinlogFilename();
                    if (earliestBinlogFilename == null) {
                        this.logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
                    } else if (!earliestBinlogFilename.endsWith("00001")) {
                        this.logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
                    }
                } else {
                    startWithSnapshot = true;
                    this.logger.info("Found no existing offset, so preparing to perform a snapshot");
                }
            }
            if (!startWithSnapshot && source.gtidSet() == null && this.connectionContext.isGtidModeEnabled()) {
                source.setCompletedGtidSet("");
            }
            boolean binlogFormatRow = this.isBinlogFormatRow();
            boolean binlogRowImageFull = this.isBinlogRowImageFull();
            boolean rowBinlogEnabled = binlogFormatRow && binlogRowImageFull;
            ChainedReader.Builder chainedReaderBuilder = new ChainedReader.Builder();
            if (startWithSnapshot) {
                SnapshotReader snapshotReader = new SnapshotReader("snapshot", this.taskContext);
                snapshotReader.generateReadEvents();
                if (!this.taskContext.getConnectorConfig().getSnapshotDelay().isZero()) {
                    chainedReaderBuilder.addReader(new TimedBlockingReader("timed-blocker", this.taskContext.getConnectorConfig().getSnapshotDelay()));
                }
                chainedReaderBuilder.addReader(snapshotReader);
                if (this.taskContext.isInitialSnapshotOnly()) {
                    this.logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
                    chainedReaderBuilder.addReader(new BlockingReader("blocker", "Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
                    chainedReaderBuilder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
                } else {
                    if (!rowBinlogEnabled) {
                        if (!binlogFormatRow) {
                            throw new ConnectException("The MySQL server is not configured to use a ROW binlog_format, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.");
                        }
                        throw new ConnectException("The MySQL server is not configured to use a FULL binlog_row_image, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_row_image=FULL and restart the connector.");
                    }
                    BinlogReader binlogReader = new BinlogReader("binlog", this.taskContext, null);
                    chainedReaderBuilder.addReader(binlogReader);
                }
            } else {
                source.maybeSetFilterDataFromConfig(config);
                if (!rowBinlogEnabled) {
                    throw new ConnectException("The MySQL server does not appear to be using a full row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
                }
                if (this.newTablesInConfig()) {
                    if (this.taskContext.getConnectorConfig().getSnapshotNewTables() == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
                        ServerIdGenerator serverIdGenerator = new ServerIdGenerator(config.getLong(MySqlConnectorConfig.SERVER_ID), config.getLong(MySqlConnectorConfig.SERVER_ID_OFFSET));
                        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(config, this.taskContext, MySqlConnectorTask.getNewFilters(offsets, config), serverIdGenerator);
                        MySqlTaskContext unifiedTaskContext = MySqlConnectorTask.createAndStartTaskContext(config, MySqlConnectorTask.getAllFilters(config));
                        unifiedTaskContext.source().completeSnapshot();
                        BinlogReader unifiedBinlogReader = new BinlogReader("binlog", unifiedTaskContext, null, serverIdGenerator.getConfiguredServerId());
                        ReconcilingBinlogReader reconcilingBinlogReader = parallelSnapshotReader.createReconcilingBinlogReader(unifiedBinlogReader);
                        chainedReaderBuilder.addReader(parallelSnapshotReader);
                        chainedReaderBuilder.addReader(reconcilingBinlogReader);
                        chainedReaderBuilder.addReader(unifiedBinlogReader);
                        unifiedBinlogReader.uponCompletion(unifiedTaskContext::shutdown);
                    }
                } else {
                    BinlogReader binlogReader = new BinlogReader("binlog", this.taskContext, null);
                    chainedReaderBuilder.addReader(binlogReader);
                }
            }
            this.readers = chainedReaderBuilder.build();
            this.readers.uponCompletion(this::completeReaders);
            this.readers.initialize();
            this.readers.start();
        }
        catch (Throwable e) {
            try {
                this.stop();
            }
            catch (Throwable s) {
                this.logger.error("Failed to start the connector (see other exception), but got this error while cleaning up", s);
            }
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
                throw new ConnectException("Interrupted while starting the connector", e);
            }
            if (e instanceof ConnectException) {
                throw (ConnectException)e;
            }
            throw new ConnectException(e);
        }
        finally {
            prevLoggingContext.restore();
        }
        return null;
    }

    private Map<String, ?> getRestartOffset(Map<String, ?> storedOffset) {
        HashMap restartOffset = new HashMap();
        if (storedOffset != null) {
            for (Map.Entry<String, ?> entry : storedOffset.entrySet()) {
                if (!entry.getKey().startsWith("RESTART_")) continue;
                String newKey = entry.getKey().substring("RESTART_".length());
                restartOffset.put(newKey, entry.getValue());
            }
        }
        return restartOffset.isEmpty() ? storedOffset : restartOffset;
    }

    private static MySqlTaskContext createAndStartTaskContext(Configuration config, Filters filters) {
        MySqlTaskContext taskContext = new MySqlTaskContext(config, filters);
        taskContext.start();
        return taskContext;
    }

    private boolean newTablesInConfig() {
        String elementSep = "/s*,/s*";
        BiFunction<String, String, Boolean> hasExclusiveElements = (a, b) -> {
            if (a == null || a.isEmpty()) {
                return false;
            }
            if (b == null || b.isEmpty()) {
                return true;
            }
            Set bSet = Stream.of(b.split("/s*,/s*")).collect(Collectors.toSet());
            return !Stream.of(a.split("/s*,/s*")).filter(x -> !bSet.contains(x)).collect(Collectors.toSet()).isEmpty();
        };
        SourceInfo sourceInfo = this.taskContext.source();
        Configuration config = this.taskContext.config();
        if (!sourceInfo.hasFilterInfo()) {
            return false;
        }
        if (hasExclusiveElements.apply(config.getFallbackStringProperty(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, MySqlConnectorConfig.DATABASE_WHITELIST), sourceInfo.getDatabaseIncludeList()).booleanValue()) {
            return true;
        }
        if (hasExclusiveElements.apply(config.getFallbackStringProperty(MySqlConnectorConfig.TABLE_INCLUDE_LIST, MySqlConnectorConfig.TABLE_WHITELIST), sourceInfo.getTableIncludeList()).booleanValue()) {
            return true;
        }
        if (hasExclusiveElements.apply(sourceInfo.getDatabaseExcludeList(), config.getFallbackStringProperty(MySqlConnectorConfig.DATABASE_EXCLUDE_LIST, MySqlConnectorConfig.DATABASE_BLACKLIST)).booleanValue()) {
            return true;
        }
        return hasExclusiveElements.apply(sourceInfo.getTableExcludeList(), config.getFallbackStringProperty(MySqlConnectorConfig.TABLE_EXCLUDE_LIST, MySqlConnectorConfig.TABLE_BLACKLIST)) != false;
    }

    private static Filters getNewFilters(Map<String, ?> offsets, Configuration config) {
        Filters oldFilters = MySqlConnectorTask.getOldFilters(offsets, config);
        return new Filters.Builder(config).excludeAllTables(oldFilters).build();
    }

    private static Filters getOldFilters(Map<String, ?> offsets, Configuration config) {
        return new Filters.Builder(config).setFiltersFromOffsets(offsets).build();
    }

    private static Filters getAllFilters(Configuration config) {
        return new Filters.Builder(config).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SourceRecord> doPoll() throws InterruptedException {
        ChainedReader currentReader = this.readers;
        if (currentReader == null) {
            return null;
        }
        LoggingContext.PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
        try {
            this.logger.trace("Polling for events");
            List<SourceRecord> list = currentReader.poll();
            return list;
        }
        finally {
            prevLoggingContext.restore();
        }
    }

    protected void doStop() {
        if (this.context != null) {
            LoggingContext.PreviousContext prevLoggingContext = null;
            if (this.taskContext != null) {
                prevLoggingContext = this.taskContext.configureLoggingContext("task");
            }
            try {
                this.logger.info("Stopping MySQL connector task");
                if (this.readers != null) {
                    this.readers.stop();
                    this.readers.destroy();
                }
            }
            finally {
                if (prevLoggingContext != null) {
                    prevLoggingContext.restore();
                }
            }
        }
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return MySqlConnectorConfig.ALL_FIELDS;
    }

    protected void completeReaders() {
        LoggingContext.PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
        try {
            if (this.taskContext != null) {
                this.taskContext.shutdown();
            }
        }
        catch (Throwable e) {
            this.logger.error("Unexpected error shutting down the database history and/or closing JDBC connections", e);
        }
        finally {
            this.context = null;
            this.logger.info("Connector task finished all work and is now shutdown");
            prevLoggingContext.restore();
        }
    }

    protected boolean isBinlogAvailable() {
        String gtidStr = this.taskContext.source().gtidSet();
        if (gtidStr != null) {
            GtidSet availableGtidSet;
            if (gtidStr.trim().isEmpty()) {
                return true;
            }
            String availableGtidStr = this.connectionContext.knownGtidSet();
            if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
                this.logger.info("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
                return false;
            }
            GtidSet gtidSet = new GtidSet(gtidStr).retainAll(this.taskContext.gtidSourceFilter());
            if (gtidSet.isContainedWithin(availableGtidSet = new GtidSet(availableGtidStr))) {
                this.logger.info("MySQL current GTID set {} does contain the GTID set required by the connector {}", (Object)availableGtidSet, (Object)gtidSet);
                GtidSet knownServerSet = availableGtidSet.retainAll(this.taskContext.gtidSourceFilter());
                GtidSet gtidSetToReplicate = this.connectionContext.subtractGtidSet(knownServerSet, gtidSet);
                GtidSet purgedGtidSet = this.connectionContext.purgedGtidSet();
                GtidSet nonPurgedGtidSetToReplicate = this.connectionContext.subtractGtidSet(gtidSetToReplicate, purgedGtidSet);
                this.logger.info("GTIDs known by the server but not processed yet {}, for replication are available only {}", (Object)gtidSetToReplicate, (Object)nonPurgedGtidSetToReplicate);
                if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) {
                    this.logger.info("Some of the GTIDs needed to replicate have been already purged");
                    return false;
                }
                return true;
            }
            this.logger.info("Connector last known GTIDs are {}, but MySQL has {}", (Object)gtidSet, (Object)availableGtidSet);
            return false;
        }
        String binlogFilename = this.taskContext.source().binlogFilename();
        if (binlogFilename == null) {
            return true;
        }
        if (binlogFilename.equals("")) {
            return true;
        }
        ArrayList logNames = new ArrayList();
        try {
            this.logger.info("Step 0: Get all known binlogs from MySQL");
            this.connectionContext.jdbc().query("SHOW BINARY LOGS", rs -> {
                while (rs.next()) {
                    logNames.add(rs.getString(1));
                }
            });
        }
        catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", (Throwable)e);
        }
        boolean found = logNames.stream().anyMatch(binlogFilename::equals);
        if (!found) {
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Connector requires binlog file '{}', but MySQL only has {}", (Object)binlogFilename, (Object)String.join((CharSequence)", ", logNames));
            }
        } else {
            this.logger.info("MySQL has the binlog file '{}' required by the connector", (Object)binlogFilename);
        }
        return found;
    }

    protected String earliestBinlogFilename() {
        ArrayList logNames = new ArrayList();
        try {
            this.logger.info("Checking all known binlogs from MySQL");
            this.connectionContext.jdbc().query("SHOW BINARY LOGS", rs -> {
                while (rs.next()) {
                    logNames.add(rs.getString(1));
                }
            });
        }
        catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", (Throwable)e);
        }
        if (logNames.isEmpty()) {
            return null;
        }
        return (String)logNames.get(0);
    }

    protected boolean isBinlogRowImageFull() {
        AtomicReference<String> rowImage = new AtomicReference<String>("");
        try {
            this.connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'", rs -> {
                if (rs.next()) {
                    rowImage.set(rs.getString(2));
                } else {
                    rowImage.set("FULL");
                }
            });
        }
        catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG_ROW_IMAGE mode: ", (Throwable)e);
        }
        this.logger.debug("binlog_row_image={}", (Object)rowImage.get());
        return "FULL".equalsIgnoreCase(rowImage.get());
    }

    protected boolean isBinlogFormatRow() {
        AtomicReference<String> mode = new AtomicReference<String>("");
        try {
            this.connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_format'", rs -> {
                if (rs.next()) {
                    mode.set(rs.getString(2));
                }
            });
        }
        catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode: ", (Throwable)e);
        }
        this.logger.debug("binlog_format={}", (Object)mode.get());
        return "ROW".equalsIgnoreCase(mode.get());
    }

    public class ServerIdGenerator {
        private final long configuredServerId;
        private final long offset;
        private int counter;

        private ServerIdGenerator(long configuredServerId, long configuredOffset) {
            this.configuredServerId = configuredServerId;
            this.offset = configuredOffset;
            this.counter = 0;
        }

        public long getNextServerId() {
            ++this.counter;
            return this.configuredServerId + (long)this.counter * this.offset;
        }

        public long getConfiguredServerId() {
            return this.configuredServerId;
        }
    }
}

