/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.exporters.kafka.producer;

import io.zeebe.exporters.kafka.config.ProducerConfig;
import io.zeebe.exporters.kafka.producer.KafkaProducerFactory;
import io.zeebe.exporters.kafka.producer.RecordBatch;
import io.zeebe.exporters.kafka.record.FullRecordBatchException;
import io.zeebe.exporters.kafka.serde.RecordId;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Objects;
import java.util.UUID;
import java.util.function.LongConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;

final class BoundedTransactionalRecordBatch
implements RecordBatch {
    private final LinkedList<ProducerRecord<RecordId, byte[]>> records = new LinkedList();
    private final KafkaProducerFactory producerFactory;
    private final ProducerConfig config;
    private final String producerId;
    private final int maxBatchSize;
    private final LongConsumer onFlushCallback;
    private final Logger logger;
    private Producer<RecordId, byte[]> producer;
    private boolean producerInitialized = false;
    private boolean transactionBegan = false;
    private int nextSendIndex = 0;

    public BoundedTransactionalRecordBatch(ProducerConfig config, int maxBatchSize, LongConsumer onFlushCallback, Logger logger, KafkaProducerFactory producerFactory) {
        this(config, maxBatchSize, onFlushCallback, logger, producerFactory, UUID.randomUUID().toString());
    }

    public BoundedTransactionalRecordBatch(ProducerConfig config, int maxBatchSize, LongConsumer onFlushCallback, Logger logger, KafkaProducerFactory producerFactory, String producerId) {
        this.config = Objects.requireNonNull(config);
        this.maxBatchSize = maxBatchSize;
        this.onFlushCallback = Objects.requireNonNull(onFlushCallback);
        this.logger = Objects.requireNonNull(logger);
        this.producerFactory = Objects.requireNonNull(producerFactory);
        this.producerId = Objects.requireNonNull(producerId);
    }

    @Override
    public void add(ProducerRecord<RecordId, byte[]> record) throws FullRecordBatchException {
        if (this.records.size() >= this.maxBatchSize) {
            try {
                this.flushBatch();
            }
            catch (InterruptException | TimeoutException e) {
                throw new FullRecordBatchException(this.maxBatchSize, (Throwable)e);
            }
            catch (Exception e) {
                this.close();
                throw new FullRecordBatchException(this.maxBatchSize, (Throwable)e);
            }
        }
        this.records.add(record);
        try {
            this.sendUnsentRecords();
        }
        catch (InterruptException | TimeoutException e) {
            this.logger.debug("Timed out or interrupted while sending unsent records, will be retried later", (Throwable)e);
        }
        catch (Exception e) {
            this.logger.warn("Failed to send unsent record, will be retried later with a new producer", (Throwable)e);
            this.close();
        }
    }

    @Override
    public void flush() {
        if (this.records.isEmpty()) {
            this.logger.trace("Skipping batch commit as there are no records in the batch");
            return;
        }
        this.logger.trace("Committing {} from the current batch, up to position {}", (Object)this.records.size(), (Object)this.records.getLast().key().getPosition());
        try {
            this.flushBatch();
        }
        catch (InterruptException | TimeoutException e) {
            this.logger.debug("Timed out or interrupted while committing, will be retried later", (Throwable)e);
        }
        catch (Exception e) {
            this.logger.warn("Non-recoverable error occurred while committing, retrying with new producer", (Throwable)e);
            this.close();
        }
    }

    @Override
    public void close() {
        if (this.producer == null) {
            return;
        }
        Duration closeTimeout = this.config.getCloseTimeout();
        this.logger.debug("Closing producer with timeout {}", (Object)closeTimeout);
        try {
            this.producer.close(closeTimeout);
        }
        catch (Exception e) {
            this.logger.warn("Failed to gracefully close Kafka exporter; this is most likely fine, but may cause resource to leaks. Investigate if it keeps repeating itself.", (Throwable)e);
        }
        this.producer = null;
        this.producerInitialized = false;
        this.transactionBegan = false;
        this.nextSendIndex = 0;
    }

    private void flushBatch() throws KafkaException, IllegalStateException {
        this.sendUnsentRecords();
        long commitPosition = this.records.getLast().key().getPosition();
        this.commitTransaction();
        this.onFlushCallback.accept(commitPosition);
    }

    private void commitTransaction() {
        if (!this.transactionBegan) {
            throw new IllegalStateException("Expected to be in transaction, but no transaction is in flight");
        }
        this.producer.commitTransaction();
        this.transactionBegan = false;
        this.records.clear();
        this.nextSendIndex = 0;
    }

    private void sendUnsentRecords() {
        int unsentRecords = Math.max(0, this.records.size() - this.nextSendIndex);
        this.logger.trace("Sending {} remaining unsent records from the current batch", (Object)unsentRecords);
        this.ensureWithinTransaction();
        while (this.nextSendIndex < this.records.size()) {
            ProducerRecord<RecordId, byte[]> record = this.records.get(this.nextSendIndex);
            this.producer.send(record);
            this.logger.trace("Sent record {}", record);
            ++this.nextSendIndex;
        }
    }

    private void ensureProducer() {
        if (this.producer != null) {
            return;
        }
        this.producer = this.producerFactory.newProducer(this.config, this.producerId);
        this.logger.trace("Created new producer");
    }

    private void ensureProducerInitialized() {
        this.ensureProducer();
        if (!this.producerInitialized) {
            this.producer.initTransactions();
            this.producerInitialized = true;
            this.logger.trace("Initialized producer for transactions");
        }
    }

    private void ensureWithinTransaction() {
        this.ensureProducerInitialized();
        if (!this.transactionBegan) {
            this.producer.beginTransaction();
            this.transactionBegan = true;
            this.logger.trace("Began new producer transaction");
        }
    }
}

