package io.zeebe.exporters.kafka.producer;

import io.zeebe.exporters.kafka.config.ProducerConfig;
import io.zeebe.exporters.kafka.record.FullRecordBatchException;
import io.zeebe.exporters.kafka.serde.RecordId;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Objects;
import java.util.UUID;
import java.util.function.LongConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/zeebe/exporters/kafka/producer/BoundedTransactionalRecordBatch.class */
public final class BoundedTransactionalRecordBatch implements RecordBatch {
    private final LinkedList<ProducerRecord<RecordId, byte[]>> records;
    private final KafkaProducerFactory producerFactory;
    private final ProducerConfig config;
    private final String producerId;
    private final int maxBatchSize;
    private final LongConsumer onFlushCallback;
    private final Logger logger;
    private Producer<RecordId, byte[]> producer;
    private boolean producerInitialized;
    private boolean transactionBegan;
    private int nextSendIndex;

    public BoundedTransactionalRecordBatch(ProducerConfig producerConfig, int i, LongConsumer longConsumer, Logger logger, KafkaProducerFactory kafkaProducerFactory) {
        this(producerConfig, i, longConsumer, logger, kafkaProducerFactory, UUID.randomUUID().toString());
    }

    public BoundedTransactionalRecordBatch(ProducerConfig producerConfig, int i, LongConsumer longConsumer, Logger logger, KafkaProducerFactory kafkaProducerFactory, String str) {
        this.records = new LinkedList<>();
        this.producerInitialized = false;
        this.transactionBegan = false;
        this.nextSendIndex = 0;
        this.config = (ProducerConfig) Objects.requireNonNull(producerConfig);
        this.maxBatchSize = i;
        this.onFlushCallback = (LongConsumer) Objects.requireNonNull(longConsumer);
        this.logger = (Logger) Objects.requireNonNull(logger);
        this.producerFactory = (KafkaProducerFactory) Objects.requireNonNull(kafkaProducerFactory);
        this.producerId = (String) Objects.requireNonNull(str);
    }

    @Override // io.zeebe.exporters.kafka.producer.RecordBatch
    public void add(ProducerRecord<RecordId, byte[]> producerRecord) throws FullRecordBatchException {
        if (this.records.size() >= this.maxBatchSize) {
            try {
                flushBatch();
            } catch (TimeoutException | InterruptException e) {
                throw new FullRecordBatchException(this.maxBatchSize, e);
            } catch (Exception e2) {
                close();
                throw new FullRecordBatchException(this.maxBatchSize, e2);
            }
        }
        this.records.add(producerRecord);
        try {
            sendUnsentRecords();
        } catch (TimeoutException | InterruptException e3) {
            this.logger.debug("Timed out or interrupted while sending unsent records, will be retried later", e3);
        } catch (Exception e4) {
            this.logger.warn("Failed to send unsent record, will be retried later with a new producer", e4);
            close();
        }
    }

    @Override // io.zeebe.exporters.kafka.producer.RecordBatch
    public void flush() {
        if (this.records.isEmpty()) {
            this.logger.trace("Skipping batch commit as there are no records in the batch");
            return;
        }
        this.logger.trace("Committing {} from the current batch, up to position {}", Integer.valueOf(this.records.size()), Long.valueOf(((RecordId) this.records.getLast().key()).getPosition()));
        try {
            flushBatch();
        } catch (TimeoutException | InterruptException e) {
            this.logger.debug("Timed out or interrupted while committing, will be retried later", e);
        } catch (Exception e2) {
            this.logger.warn("Non-recoverable error occurred while committing, retrying with new producer", e2);
            close();
        }
    }

    @Override // io.zeebe.exporters.kafka.producer.RecordBatch, java.lang.AutoCloseable
    public void close() {
        if (this.producer == null) {
            return;
        }
        Duration closeTimeout = this.config.getCloseTimeout();
        this.logger.debug("Closing producer with timeout {}", closeTimeout);
        try {
            this.producer.close(closeTimeout);
        } catch (Exception e) {
            this.logger.warn("Failed to gracefully close Kafka exporter; this is most likely fine, but may cause resource to leaks. Investigate if it keeps repeating itself.", e);
        }
        this.producer = null;
        this.producerInitialized = false;
        this.transactionBegan = false;
        this.nextSendIndex = 0;
    }

    private void flushBatch() throws KafkaException, IllegalStateException {
        sendUnsentRecords();
        long position = ((RecordId) this.records.getLast().key()).getPosition();
        commitTransaction();
        this.onFlushCallback.accept(position);
    }

    private void commitTransaction() {
        if (!this.transactionBegan) {
            throw new IllegalStateException("Expected to be in transaction, but no transaction is in flight");
        }
        this.producer.commitTransaction();
        this.transactionBegan = false;
        this.records.clear();
        this.nextSendIndex = 0;
    }

    private void sendUnsentRecords() {
        this.logger.trace("Sending {} remaining unsent records from the current batch", Integer.valueOf(Math.max(0, this.records.size() - this.nextSendIndex)));
        ensureWithinTransaction();
        while (this.nextSendIndex < this.records.size()) {
            ProducerRecord<RecordId, byte[]> producerRecord = this.records.get(this.nextSendIndex);
            this.producer.send(producerRecord);
            this.logger.trace("Sent record {}", producerRecord);
            this.nextSendIndex++;
        }
    }

    private void ensureProducer() {
        if (this.producer != null) {
            return;
        }
        this.producer = this.producerFactory.newProducer(this.config, this.producerId);
        this.logger.trace("Created new producer");
    }

    private void ensureProducerInitialized() {
        ensureProducer();
        if (this.producerInitialized) {
            return;
        }
        this.producer.initTransactions();
        this.producerInitialized = true;
        this.logger.trace("Initialized producer for transactions");
    }

    private void ensureWithinTransaction() {
        ensureProducerInitialized();
        if (this.transactionBegan) {
            return;
        }
        this.producer.beginTransaction();
        this.transactionBegan = true;
        this.logger.trace("Began new producer transaction");
    }
}
