/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.exporters.kafka;

import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.exporter.api.context.ScheduledTask;
import io.camunda.zeebe.protocol.record.Record;
import io.zeebe.exporters.kafka.config.Config;
import io.zeebe.exporters.kafka.config.parser.ConfigParser;
import io.zeebe.exporters.kafka.config.parser.RawConfigParser;
import io.zeebe.exporters.kafka.config.raw.RawConfig;
import io.zeebe.exporters.kafka.producer.RecordBatch;
import io.zeebe.exporters.kafka.producer.RecordBatchFactory;
import io.zeebe.exporters.kafka.record.KafkaRecordFilter;
import io.zeebe.exporters.kafka.record.RecordHandler;
import io.zeebe.exporters.kafka.record.RecordSerializer;
import io.zeebe.exporters.kafka.serde.RecordId;
import java.util.Objects;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;

public final class KafkaExporter
implements Exporter {
    private final RecordBatchFactory recordBatchFactory;
    private final ConfigParser<RawConfig, Config> configParser;
    private Controller controller;
    private Logger logger;
    private Config config;
    private RecordHandler recordHandler;
    private ScheduledTask flushTask;
    private RecordBatch recordBatch;

    public KafkaExporter() {
        this(RecordBatchFactory.defaultFactory(), new RawConfigParser());
    }

    public KafkaExporter(RecordBatchFactory recordBatchFactory, ConfigParser<RawConfig, Config> configParser) {
        this.recordBatchFactory = Objects.requireNonNull(recordBatchFactory);
        this.configParser = Objects.requireNonNull(configParser);
    }

    public void configure(Context context) {
        this.logger = Objects.requireNonNull(context.getLogger());
        RawConfig rawConfig = Objects.requireNonNull((RawConfig)context.getConfiguration().instantiate(RawConfig.class));
        this.config = this.configParser.parse(rawConfig);
        RecordSerializer serializer = new RecordSerializer();
        serializer.configure(this.config.getProducer().getConfig(), false);
        this.recordHandler = new RecordHandler(this.config.getRecords(), serializer);
        context.setFilter((Context.RecordFilter)new KafkaRecordFilter(this.config.getRecords()));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Configured Kafka exporter: {}", (Object)this.config);
        } else {
            this.logger.info("Configured Kafka exporter");
        }
    }

    public void open(Controller controller) {
        this.controller = controller;
        this.recordBatch = this.recordBatchFactory.newRecordBatch(this.config.getProducer(), this.config.getMaxBatchSize(), this::updatePosition, this.logger);
        this.scheduleFlushBatchTask();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Opened Kafka exporter with configuration: {}", (Object)this.config);
        } else {
            this.logger.info("Opened Kafka exporter");
        }
    }

    public void close() {
        if (this.flushTask != null) {
            this.flushTask.cancel();
        }
        if (this.recordBatch != null) {
            this.recordBatch.flush();
            this.recordBatch.close();
        }
        if (this.logger != null) {
            this.logger.info("Closed Kafka exporter");
        }
    }

    public void export(Record record) {
        if (!this.recordHandler.isAllowed(record)) {
            this.logger.trace("Ignoring record {}", (Object)record);
            return;
        }
        ProducerRecord<RecordId, byte[]> producerRecord = this.recordHandler.transform(record);
        this.recordBatch.add(producerRecord);
        this.logger.trace("Added {} to the batch", producerRecord);
    }

    private void scheduleFlushBatchTask() {
        this.logger.trace("Rescheduling flush task in {}", (Object)this.config.getFlushInterval());
        this.flushTask = this.controller.scheduleCancellableTask(this.config.getFlushInterval(), this::flushBatchTask);
    }

    private void flushBatchTask() {
        try {
            this.recordBatch.flush();
        }
        finally {
            this.scheduleFlushBatchTask();
        }
    }

    private void updatePosition(long position) {
        this.controller.updateLastExportedRecordPosition(position);
        this.logger.trace("Flushed batch and updated last exported record position to {}", (Object)position);
    }
}

