/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.exporters.kafka.record;

import io.camunda.zeebe.protocol.record.Record;
import io.zeebe.exporters.kafka.config.RecordConfig;
import io.zeebe.exporters.kafka.config.RecordsConfig;
import io.zeebe.exporters.kafka.record.RecordSerializer;
import io.zeebe.exporters.kafka.serde.RecordId;
import java.util.Objects;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;

public final class RecordHandler {
    private final RecordsConfig configuration;
    private final Serializer<Record<?>> serializer;

    public RecordHandler(RecordsConfig configuration) {
        this(configuration, new RecordSerializer());
    }

    public RecordHandler(RecordsConfig configuration, Serializer<Record<?>> serializer) {
        this.configuration = Objects.requireNonNull(configuration);
        this.serializer = Objects.requireNonNull(serializer);
    }

    public ProducerRecord<RecordId, byte[]> transform(Record record) {
        RecordConfig config = this.getRecordConfig(record);
        byte[] serializedRecord = this.serializer.serialize(config.getTopic(), record);
        return new ProducerRecord<RecordId, byte[]>(config.getTopic(), new RecordId(record.getPartitionId(), record.getPosition()), serializedRecord);
    }

    public boolean isAllowed(Record<?> record) {
        RecordConfig config = this.getRecordConfig(record);
        return config.getAllowedTypes().contains(record.getRecordType());
    }

    private RecordConfig getRecordConfig(Record<?> record) {
        return this.configuration.forType(Objects.requireNonNull(record).getValueType());
    }
}

