package io.zeebe.containers.exporter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule;
import io.camunda.zeebe.protocol.record.Record;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
import org.apache.hc.core5.http.nio.AsyncResponseProducer;
import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apiguardian.api.API;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@API(status = API.Status.INTERNAL)
/* loaded from: input_file:io/zeebe/containers/exporter/RecordHandler.class */
final class RecordHandler implements AsyncServerRequestHandler<Message<HttpRequest, byte[]>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RecordHandler.class);
    private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new ZeebeProtocolModule());
    private final Consumer<Record<?>> recordConsumer;
    private final boolean autoAcknowledge;
    private final Map<Integer, Long> positions;

    RecordHandler(Consumer<Record<?>> consumer) {
        this(consumer, true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordHandler(Consumer<Record<?>> consumer, boolean z) {
        this.positions = new HashMap();
        this.recordConsumer = (Consumer) Objects.requireNonNull(consumer, "must specify a record consumer");
        this.autoAcknowledge = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledge(int i, long j) {
        this.positions.merge(Integer.valueOf(i), Long.valueOf(j), (v0, v1) -> {
            return Math.max(v0, v1);
        });
    }

    public AsyncRequestConsumer<Message<HttpRequest, byte[]>> prepare(HttpRequest httpRequest, EntityDetails entityDetails, HttpContext httpContext) {
        return new BasicRequestConsumer(new BasicAsyncEntityConsumer());
    }

    public void handle(Message<HttpRequest, byte[]> message, AsyncServerRequestHandler.ResponseTrigger responseTrigger, HttpContext httpContext) throws HttpException, IOException {
        byte[] bArr = (byte[]) message.getBody();
        if (bArr == null || bArr.length == 0) {
            responseTrigger.submitResponse(new BasicResponseProducer(new BasicHttpResponse(400, "must send a list of records as body")), httpContext);
            return;
        }
        try {
            List<Record<?>> list = (List) MAPPER.readValue(bArr, new TypeReference<List<Record<?>>>() { // from class: io.zeebe.containers.exporter.RecordHandler.1
            });
            if (list.isEmpty()) {
                responseTrigger.submitResponse(new BasicResponseProducer(new BasicHttpResponse(204, "no records given")), httpContext);
                return;
            }
            for (Record<?> record : list) {
                this.recordConsumer.accept(record);
                if (this.autoAcknowledge) {
                    acknowledge(record.getPartitionId(), record.getPosition());
                }
            }
            responseTrigger.submitResponse(createSuccessfulResponse(((Record) list.get(0)).getPartitionId()), httpContext);
        } catch (IOException e) {
            responseTrigger.submitResponse(new BasicResponseProducer(new BasicHttpResponse(400, "failed to deserialize records, see receiver logs for more")), httpContext);
            LOGGER.warn("Failed to deserialize exported records", e);
        }
    }

    private AsyncResponseProducer createSuccessfulResponse(int i) throws JsonProcessingException {
        Long l = this.positions.get(Integer.valueOf(i));
        if (l == null) {
            return new BasicResponseProducer(new BasicHttpResponse(204, "no acknowledged position for partition " + i));
        }
        BasicHttpResponse basicHttpResponse = new BasicHttpResponse(200);
        basicHttpResponse.setHeader("Content-Type", "application/json");
        return new BasicResponseProducer(basicHttpResponse, new BasicAsyncEntityProducer(MAPPER.writeValueAsBytes(Collections.singletonMap("position", l))));
    }
}
