package io.zeebe.exporters.kafka;

import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.exporter.api.context.ScheduledTask;
import io.camunda.zeebe.protocol.record.Record;
import io.zeebe.exporters.kafka.config.Config;
import io.zeebe.exporters.kafka.config.parser.ConfigParser;
import io.zeebe.exporters.kafka.config.parser.RawConfigParser;
import io.zeebe.exporters.kafka.config.raw.RawConfig;
import io.zeebe.exporters.kafka.producer.RecordBatch;
import io.zeebe.exporters.kafka.producer.RecordBatchFactory;
import io.zeebe.exporters.kafka.record.KafkaRecordFilter;
import io.zeebe.exporters.kafka.record.RecordHandler;
import io.zeebe.exporters.kafka.record.RecordSerializer;
import io.zeebe.exporters.kafka.serde.RecordId;
import java.util.Objects;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/exporters/kafka/KafkaExporter.class */
public final class KafkaExporter implements Exporter {
    private final RecordBatchFactory recordBatchFactory;
    private final ConfigParser<RawConfig, Config> configParser;
    private Controller controller;
    private Logger logger;
    private Config config;
    private RecordHandler recordHandler;
    private ScheduledTask flushTask;
    private RecordBatch recordBatch;

    public KafkaExporter() {
        this(RecordBatchFactory.defaultFactory(), new RawConfigParser());
    }

    public KafkaExporter(RecordBatchFactory recordBatchFactory, ConfigParser<RawConfig, Config> configParser) {
        this.recordBatchFactory = (RecordBatchFactory) Objects.requireNonNull(recordBatchFactory);
        this.configParser = (ConfigParser) Objects.requireNonNull(configParser);
    }

    public void configure(Context context) {
        this.logger = (Logger) Objects.requireNonNull(context.getLogger());
        this.config = this.configParser.parse((RawConfig) Objects.requireNonNull((RawConfig) context.getConfiguration().instantiate(RawConfig.class)));
        RecordSerializer recordSerializer = new RecordSerializer();
        recordSerializer.configure(this.config.getProducer().getConfig(), false);
        this.recordHandler = new RecordHandler(this.config.getRecords(), recordSerializer);
        context.setFilter(new KafkaRecordFilter(this.config.getRecords()));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Configured Kafka exporter: {}", this.config);
        } else {
            this.logger.info("Configured Kafka exporter");
        }
    }

    public void open(Controller controller) {
        this.controller = controller;
        this.recordBatch = this.recordBatchFactory.newRecordBatch(this.config.getProducer(), this.config.getMaxBatchSize(), this::updatePosition, this.logger);
        scheduleFlushBatchTask();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Opened Kafka exporter with configuration: {}", this.config);
        } else {
            this.logger.info("Opened Kafka exporter");
        }
    }

    public void close() {
        if (this.flushTask != null) {
            this.flushTask.cancel();
        }
        if (this.recordBatch != null) {
            this.recordBatch.flush();
            this.recordBatch.close();
        }
        if (this.logger != null) {
            this.logger.info("Closed Kafka exporter");
        }
    }

    public void export(Record record) {
        if (!this.recordHandler.isAllowed(record)) {
            this.logger.trace("Ignoring record {}", record);
            return;
        }
        ProducerRecord<RecordId, byte[]> transform = this.recordHandler.transform(record);
        this.recordBatch.add(transform);
        this.logger.trace("Added {} to the batch", transform);
    }

    private void scheduleFlushBatchTask() {
        this.logger.trace("Rescheduling flush task in {}", this.config.getFlushInterval());
        this.flushTask = this.controller.scheduleCancellableTask(this.config.getFlushInterval(), this::flushBatchTask);
    }

    private void flushBatchTask() {
        try {
            this.recordBatch.flush();
        } finally {
            scheduleFlushBatchTask();
        }
    }

    private void updatePosition(long j) {
        this.controller.updateLastExportedRecordPosition(j);
        this.logger.trace("Flushed batch and updated last exported record position to {}", Long.valueOf(j));
    }
}
