/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.relational.history;

import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Collect;
import io.debezium.util.Threads;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class KafkaDatabaseHistory
extends AbstractDatabaseHistory {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDatabaseHistory.class);
    private static final String CLEANUP_POLICY_NAME = "cleanup.policy";
    private static final String CLEANUP_POLICY_VALUE = "delete";
    private static final String RETENTION_MS_NAME = "retention.ms";
    private static final long RETENTION_MS_MAX = Long.MAX_VALUE;
    private static final long RETENTION_MS_MIN = Duration.of(1825L, ChronoUnit.DAYS).toMillis();
    private static final String RETENTION_BYTES_NAME = "retention.bytes";
    private static final int UNLIMITED_VALUE = -1;
    private static final int PARTITION_COUNT = 1;
    private static final String DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME = "default.replication.factor";
    private static final short DEFAULT_TOPIC_REPLICATION_FACTOR = 1;
    public static final Field TOPIC = Field.create("database.history.kafka.topic").withDisplayName("Database history topic name").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 32)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the database schema history").withValidation(KafkaDatabaseHistory.forKafka(Field::isRequired));
    public static final Field BOOTSTRAP_SERVERS = Field.create("database.history.kafka.bootstrap.servers").withDisplayName("Kafka broker addresses").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 31)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("A list of host/port pairs that the connector will use for establishing the initial connection to the Kafka cluster for retrieving database schema history previously stored by the connector. This should point to the same Kafka cluster used by the Kafka Connect process.").withValidation(KafkaDatabaseHistory.forKafka(Field::isRequired));
    public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create("database.history.kafka.recovery.poll.interval.ms").withDisplayName("Poll interval during database history recovery (ms)").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of milliseconds to wait while polling for persisted data during recovery.").withDefault(100).withValidation(Field::isNonNegativeInteger);
    public static final Field RECOVERY_POLL_ATTEMPTS = Field.create("database.history.kafka.recovery.attempts").withDisplayName("Max attempts to recovery database history").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).").withDefault(100).withValidation(Field::isInteger);
    public static final Field INTERNAL_CONNECTOR_CLASS = Field.create("database.history.connector.class").withDisplayName("Debezium connector class").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The class of the Debezium database connector").withNoValidation();
    public static final Field INTERNAL_CONNECTOR_ID = Field.create("database.history.connector.id").withDisplayName("Debezium connector identifier").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.HIGH).withDescription("The unique identifier of the Debezium connector").withNoValidation();
    public static final Field KAFKA_QUERY_TIMEOUT_MS = Field.create("database.history.kafka.query.timeout.ms").withDisplayName("Kafka admin client query timeout (ms)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 33)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of milliseconds to wait while fetching cluster information using Kafka admin client.").withDefault(Duration.ofSeconds(3L).toMillis()).withValidation(Field::isPositiveInteger);
    public static Field.Set ALL_FIELDS = Field.setOf(TOPIC, BOOTSTRAP_SERVERS, DatabaseHistory.NAME, RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID, KAFKA_QUERY_TIMEOUT_MS);
    private static final String CONSUMER_PREFIX = "database.history.consumer.";
    private static final String PRODUCER_PREFIX = "database.history.producer.";
    private static final Integer PARTITION = 0;
    private final DocumentReader reader = DocumentReader.defaultReader();
    private String topicName;
    private Configuration consumerConfig;
    private Configuration producerConfig;
    private volatile KafkaProducer<String, String> producer;
    private int maxRecoveryAttempts;
    private Duration pollInterval;
    private ExecutorService checkTopicSettingsExecutor;
    private Duration kafkaQueryTimeout;
    private static final boolean USE_KAFKA_24_NEW_TOPIC_CONSTRUCTOR = KafkaDatabaseHistory.hasNewTopicConstructorWithOptionals();

    private static boolean hasNewTopicConstructorWithOptionals() {
        try {
            NewTopic.class.getConstructor(String.class, Optional.class, Optional.class);
            return true;
        }
        catch (NoSuchMethodException nsme) {
            return false;
        }
    }

    @Override
    public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
        super.configure(config, comparator, listener, useCatalogBeforeSchema);
        if (!config.validateAndRecord(ALL_FIELDS, arg_0 -> ((Logger)LOGGER).error(arg_0))) {
            throw new ConnectException("Error configuring an instance of " + this.getClass().getSimpleName() + "; check the logs for details");
        }
        this.topicName = config.getString(TOPIC);
        this.pollInterval = Duration.ofMillis(config.getInteger(RECOVERY_POLL_INTERVAL_MS));
        this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS);
        this.kafkaQueryTimeout = Duration.ofMillis(config.getLong(KAFKA_QUERY_TIMEOUT_MS));
        String bootstrapServers = config.getString(BOOTSTRAP_SERVERS);
        String dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
        this.consumerConfig = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)config.subset(CONSUMER_PREFIX, true).edit().withDefault("bootstrap.servers", bootstrapServers).withDefault("client.id", dbHistoryName).withDefault("group.id", dbHistoryName).withDefault("fetch.min.bytes", 1)).withDefault("enable.auto.commit", false)).withDefault("session.timeout.ms", 10000)).withDefault("auto.offset.reset", OffsetResetStrategy.EARLIEST.toString().toLowerCase()).withDefault("key.deserializer", StringDeserializer.class)).withDefault("value.deserializer", StringDeserializer.class)).build();
        this.producerConfig = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)config.subset(PRODUCER_PREFIX, true).edit().withDefault("bootstrap.servers", bootstrapServers).withDefault("client.id", dbHistoryName).withDefault("acks", 1)).withDefault("retries", 1)).withDefault("batch.size", 32768)).withDefault("linger.ms", 0)).withDefault("buffer.memory", 0x100000)).withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).withDefault("max.block.ms", 10000)).build();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("KafkaDatabaseHistory Consumer config: {}", (Object)this.consumerConfig.withMaskedPasswords());
            LOGGER.info("KafkaDatabaseHistory Producer config: {}", (Object)this.producerConfig.withMaskedPasswords());
        }
        try {
            String connectorClassname = config.getString(INTERNAL_CONNECTOR_CLASS);
            if (connectorClassname != null) {
                this.checkTopicSettingsExecutor = Threads.newSingleThreadExecutor(Class.forName(connectorClassname), config.getString(INTERNAL_CONNECTOR_ID), "db-history-config-check", true);
            }
        }
        catch (ClassNotFoundException e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    @Override
    public synchronized void start() {
        super.start();
        if (this.producer == null) {
            this.producer = new KafkaProducer(this.producerConfig.asProperties());
        }
    }

    @Override
    protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
        if (this.producer == null) {
            throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database history records.");
        }
        LOGGER.trace("Storing record into database history: {}", (Object)record);
        try {
            ProducerRecord produced = new ProducerRecord(this.topicName, PARTITION, null, (Object)record.toString());
            Future future = this.producer.send(produced);
            this.producer.flush();
            RecordMetadata metadata = (RecordMetadata)future.get();
            if (metadata != null) {
                LOGGER.debug("Stored record in topic '{}' partition {} at offset {} ", new Object[]{metadata.topic(), metadata.partition(), metadata.offset()});
            }
        }
        catch (InterruptedException e) {
            LOGGER.trace("Interrupted before record was written into database history: {}", (Object)record);
            Thread.currentThread().interrupt();
            throw new DatabaseHistoryException(e);
        }
        catch (ExecutionException e) {
            throw new DatabaseHistoryException(e);
        }
    }

    @Override
    protected void recoverRecords(Consumer<HistoryRecord> records) {
        try (KafkaConsumer historyConsumer = new KafkaConsumer(this.consumerConfig.asProperties());){
            LOGGER.debug("Subscribing to database history topic '{}'", (Object)this.topicName);
            historyConsumer.subscribe(Collect.arrayListOf(this.topicName, new String[0]));
            long lastProcessedOffset = -1L;
            Long endOffset = null;
            int recoveryAttempts = 0;
            do {
                if (recoveryAttempts > this.maxRecoveryAttempts) {
                    throw new IllegalStateException("The database history couldn't be recovered. Consider to increase the value for " + RECOVERY_POLL_INTERVAL_MS.name());
                }
                endOffset = this.getEndOffsetOfDbHistoryTopic(endOffset, (KafkaConsumer<String, String>)historyConsumer);
                LOGGER.debug("End offset of database history topic is {}", (Object)endOffset);
                ConsumerRecords recoveredRecords = historyConsumer.poll(this.pollInterval.toMillis());
                int numRecordsProcessed = 0;
                for (ConsumerRecord record : recoveredRecords) {
                    try {
                        if (lastProcessedOffset >= record.offset()) continue;
                        if (record.value() == null) {
                            LOGGER.warn("Skipping null database history record. This is often not an issue, but if it happens repeatedly please check the '{}' topic.", (Object)this.topicName);
                        } else {
                            HistoryRecord recordObj = new HistoryRecord(this.reader.read((String)record.value()));
                            LOGGER.trace("Recovering database history: {}", (Object)recordObj);
                            if (recordObj == null || !recordObj.isValid()) {
                                LOGGER.warn("Skipping invalid database history record '{}'. This is often not an issue, but if it happens repeatedly please check the '{}' topic.", (Object)recordObj, (Object)this.topicName);
                            } else {
                                records.accept(recordObj);
                                LOGGER.trace("Recovered database history: {}", (Object)recordObj);
                            }
                        }
                        lastProcessedOffset = record.offset();
                        ++numRecordsProcessed;
                    }
                    catch (IOException e) {
                        LOGGER.error("Error while deserializing history record '{}'", (Object)record, (Object)e);
                    }
                    catch (Exception e) {
                        LOGGER.error("Unexpected exception while processing record '{}'", (Object)record, (Object)e);
                        throw e;
                    }
                }
                if (numRecordsProcessed == 0) {
                    LOGGER.debug("No new records found in the database history; will retry");
                    ++recoveryAttempts;
                    continue;
                }
                LOGGER.debug("Processed {} records from database history", (Object)numRecordsProcessed);
            } while (lastProcessedOffset < endOffset - 1L);
        }
    }

    private Long getEndOffsetOfDbHistoryTopic(Long previousEndOffset, KafkaConsumer<String, String> historyConsumer) {
        Map offsets = historyConsumer.endOffsets(Collections.singleton(new TopicPartition(this.topicName, PARTITION.intValue())));
        Long endOffset = (Long)offsets.entrySet().iterator().next().getValue();
        if (previousEndOffset != null && !previousEndOffset.equals(endOffset)) {
            throw new IllegalStateException("Detected changed end offset of database history topic (previous: " + previousEndOffset + ", current: " + endOffset + "). Make sure that the same history topic isn't shared by multiple connector instances.");
        }
        return endOffset;
    }

    @Override
    public boolean storageExists() {
        try (KafkaConsumer checkTopicConsumer = new KafkaConsumer(this.consumerConfig.asProperties());){
            boolean bl = checkTopicConsumer.listTopics().containsKey(this.topicName);
            return bl;
        }
    }

    @Override
    public boolean exists() {
        boolean exists = false;
        if (this.storageExists()) {
            try (KafkaConsumer historyConsumer = new KafkaConsumer(this.consumerConfig.asProperties());){
                this.checkTopicSettings(this.topicName);
                Set<TopicPartition> historyTopic = Collections.singleton(new TopicPartition(this.topicName, PARTITION.intValue()));
                Map beginningOffsets = historyConsumer.beginningOffsets(historyTopic);
                Map endOffsets = historyConsumer.endOffsets(historyTopic);
                Long beginOffset = (Long)beginningOffsets.entrySet().iterator().next().getValue();
                Long endOffset = (Long)endOffsets.entrySet().iterator().next().getValue();
                exists = endOffset > beginOffset;
            }
        }
        return exists;
    }

    private void checkTopicSettings(String topicName) {
        if (this.checkTopicSettingsExecutor == null || this.checkTopicSettingsExecutor.isShutdown()) {
            return;
        }
        this.checkTopicSettingsExecutor.execute(() -> {
            String clientId = this.producerConfig.getString("client.id") + "-topic-check";
            Properties clientConfig = this.producerConfig.asProperties();
            clientConfig.put("client.id", clientId);
            try (AdminClient admin = AdminClient.create((Properties)clientConfig);){
                Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName));
                Map configs = (Map)admin.describeConfigs(resources).all().get(this.kafkaQueryTimeout.toMillis(), TimeUnit.MILLISECONDS);
                if (configs.size() != 1) {
                    LOGGER.info("Expected one topic '{}' to match the query but got {}", (Object)topicName, (Object)configs.values().size());
                    return;
                }
                Config topic = (Config)configs.values().iterator().next();
                if (topic == null) {
                    LOGGER.info("Could not get config for topic '{}'", (Object)topic);
                    return;
                }
                String cleanupPolicy = topic.get(CLEANUP_POLICY_NAME).value();
                if (!CLEANUP_POLICY_VALUE.equals(cleanupPolicy)) {
                    LOGGER.warn("Database history topic '{}' option '{}' should be '{}' but is '{}'", new Object[]{topicName, CLEANUP_POLICY_NAME, CLEANUP_POLICY_VALUE, cleanupPolicy});
                    return;
                }
                String retentionBytes = topic.get(RETENTION_BYTES_NAME).value();
                if (retentionBytes != null && Long.parseLong(retentionBytes) != -1L) {
                    LOGGER.warn("Database history topic '{}' option '{}' should be '{}' but is '{}'", new Object[]{topicName, RETENTION_BYTES_NAME, -1, retentionBytes});
                    return;
                }
                String retentionMs = topic.get(RETENTION_MS_NAME).value();
                if (retentionMs != null && Long.parseLong(retentionMs) != -1L && Long.parseLong(retentionMs) < RETENTION_MS_MIN) {
                    LOGGER.warn("Database history topic '{}' option '{}' should be '{}' or greater than '{}' (5 years) but is '{}'", new Object[]{topicName, RETENTION_MS_NAME, -1, RETENTION_MS_MIN, retentionMs});
                    return;
                }
                DescribeTopicsResult result = admin.describeTopics(Collections.singleton(topicName));
                if (result.values().size() != 1) {
                    LOGGER.info("Expected one topic '{}' to match the query but got {}", (Object)topicName, (Object)result.values().size());
                    return;
                }
                TopicDescription topicDesc = (TopicDescription)((KafkaFuture)result.values().values().iterator().next()).get();
                if (topicDesc == null) {
                    LOGGER.info("Could not get description for topic '{}'", (Object)topicName);
                    return;
                }
                int partitions = topicDesc.partitions().size();
                if (partitions != 1) {
                    LOGGER.warn("Database history topic '{}' should have one partiton but has '{}'", (Object)topicName, (Object)partitions);
                    return;
                }
                LOGGER.info("Database history topic '{}' has correct settings", (Object)topicName);
            }
            catch (Throwable e) {
                LOGGER.info("Attempted to validate database history topic but failed", e);
            }
            this.stopCheckTopicSettingsExecutor();
        });
    }

    @Override
    public synchronized void stop() {
        block6: {
            this.stopCheckTopicSettingsExecutor();
            try {
                if (this.producer == null) break block6;
                try {
                    this.producer.flush();
                }
                finally {
                    this.producer.close(Duration.ofSeconds(30L));
                }
            }
            finally {
                this.producer = null;
                super.stop();
            }
        }
    }

    private void stopCheckTopicSettingsExecutor() {
        if (this.checkTopicSettingsExecutor != null) {
            this.checkTopicSettingsExecutor.shutdown();
        }
    }

    public String toString() {
        if (this.topicName != null) {
            return "Kafka topic " + this.topicName + ":" + PARTITION + " using brokers at " + this.producerConfig.getString(BOOTSTRAP_SERVERS);
        }
        return "Kafka topic";
    }

    protected static String consumerConfigPropertyName(String kafkaConsumerPropertyName) {
        return CONSUMER_PREFIX + kafkaConsumerPropertyName;
    }

    @Override
    public void initializeStorage() {
        super.initializeStorage();
        try (AdminClient admin = AdminClient.create((Properties)this.producerConfig.asProperties());){
            NewTopic topic;
            block12: {
                topic = null;
                try {
                    if (USE_KAFKA_24_NEW_TOPIC_CONSTRUCTOR) {
                        topic = new NewTopic(this.topicName, Optional.of(1), Optional.empty());
                    }
                }
                catch (Exception ex) {
                    if (ex.getCause() instanceof UnsupportedVersionException) break block12;
                    throw ex;
                }
            }
            if (topic == null) {
                short replicationFactor = this.getDefaultTopicReplicationFactor(admin);
                topic = new NewTopic(this.topicName, 1, replicationFactor);
            }
            topic.configs(Collect.hashMapOf(CLEANUP_POLICY_NAME, CLEANUP_POLICY_VALUE, RETENTION_MS_NAME, Long.toString(Long.MAX_VALUE), RETENTION_BYTES_NAME, Long.toString(-1L)));
            admin.createTopics(Collections.singleton(topic));
            LOGGER.info("Database history topic '{}' created", (Object)topic);
        }
        catch (Exception e) {
            throw new ConnectException("Creation of database history topic failed, please create the topic manually", (Throwable)e);
        }
    }

    private short getDefaultTopicReplicationFactor(AdminClient admin) throws Exception {
        block3: {
            try {
                Config brokerConfig = this.getKafkaBrokerConfig(admin);
                String defaultReplicationFactorValue = brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value();
                if (defaultReplicationFactorValue != null) {
                    return Short.parseShort(defaultReplicationFactorValue);
                }
            }
            catch (ExecutionException ex) {
                if (ex.getCause() instanceof UnsupportedVersionException) break block3;
                throw ex;
            }
        }
        LOGGER.warn("Unable to obtain the default replication factor from the brokers at {}. Setting value to {} instead.", (Object)this.producerConfig.getString(BOOTSTRAP_SERVERS), (Object)1);
        return 1;
    }

    private Config getKafkaBrokerConfig(AdminClient admin) throws Exception {
        Collection nodes = (Collection)admin.describeCluster().nodes().get(this.kafkaQueryTimeout.toMillis(), TimeUnit.MILLISECONDS);
        if (nodes.isEmpty()) {
            throw new ConnectException("No brokers available to obtain default settings");
        }
        String nodeId = ((Node)nodes.iterator().next()).idString();
        Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodeId));
        Map configs = (Map)admin.describeConfigs(resources).all().get(this.kafkaQueryTimeout.toMillis(), TimeUnit.MILLISECONDS);
        if (configs.isEmpty()) {
            throw new ConnectException("No configs have been received");
        }
        return (Config)configs.values().iterator().next();
    }

    private static Field.Validator forKafka(Field.Validator validator) {
        return (config, field, problems) -> {
            String history = config.getString(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY);
            return KafkaDatabaseHistory.class.getName().equals(history) ? validator.validate(config, field, problems) : 0;
        };
    }
}

