/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.engine.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.ClusterEventService;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.clustering.topology.TopologyPartitionListenerImpl;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentRequest;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentResponse;
import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributor;
import io.camunda.zeebe.protocol.impl.encoding.ErrorResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

public final class DeploymentDistributorImpl
implements DeploymentDistributor {
    public static final Duration PUSH_REQUEST_TIMEOUT = Duration.ofSeconds(15L);
    public static final Duration RETRY_DELAY = Duration.ofMillis(100L);
    private static final Logger LOG = Loggers.PROCESS_REPOSITORY_LOGGER;
    private static final String DEPLOYMENT_PUSH_TOPIC = "deployment";
    private final PushDeploymentResponse pushDeploymentResponse = new PushDeploymentResponse();
    private final ErrorResponse errorResponse = new ErrorResponse();
    private final TopologyPartitionListenerImpl partitionListener;
    private final ActorControl actor;
    private final ClusterCommunicationService communicationService;
    private final ClusterEventService eventService;

    public DeploymentDistributorImpl(ClusterCommunicationService communicationService, ClusterEventService eventService, TopologyPartitionListenerImpl partitionListener, ActorControl actor) {
        this.communicationService = communicationService;
        this.eventService = eventService;
        this.partitionListener = partitionListener;
        this.actor = actor;
    }

    public ActorFuture<Void> pushDeploymentToPartition(long key, int partitionId, DirectBuffer deploymentBuffer) {
        CompletableActorFuture pushedFuture = new CompletableActorFuture();
        LOG.debug("Distribute deployment {} to partition {}.", (Object)key, (Object)partitionId);
        PushDeploymentRequest pushRequest = new PushDeploymentRequest().deployment(deploymentBuffer).deploymentKey(key);
        this.actor.runDelayed(PUSH_REQUEST_TIMEOUT, () -> {
            String topic = DeploymentDistributorImpl.getDeploymentResponseTopic(pushRequest.deploymentKey(), partitionId);
            if (!pushedFuture.isDone()) {
                LOG.warn("Failed to receive deployment response for partition {} (on topic '{}'). Retrying", (Object)partitionId, (Object)topic);
                this.sendPushDeploymentRequest(partitionId, (CompletableActorFuture<Void>)pushedFuture, pushRequest);
            }
        });
        this.sendPushDeploymentRequest(partitionId, (CompletableActorFuture<Void>)pushedFuture, pushRequest);
        return pushedFuture;
    }

    private void sendPushDeploymentRequest(int partitionId, CompletableActorFuture<Void> pushedFuture, PushDeploymentRequest pushRequest) {
        Int2IntHashMap currentPartitionLeaders = this.partitionListener.getPartitionLeaders();
        if (currentPartitionLeaders.containsKey(partitionId)) {
            int leader = currentPartitionLeaders.get(partitionId);
            this.createResponseSubscription(pushRequest.deploymentKey(), partitionId, pushedFuture);
            this.pushDeploymentToPartition(leader, partitionId, pushRequest);
        }
    }

    private void pushDeploymentToPartition(int partitionLeaderId, int partition, PushDeploymentRequest pushRequest) {
        pushRequest.partitionId(partition);
        byte[] bytes = pushRequest.toBytes();
        MemberId memberId = new MemberId(Integer.toString(partitionLeaderId));
        CompletableFuture pushDeploymentFuture = this.communicationService.send(DEPLOYMENT_PUSH_TOPIC, (Object)bytes, memberId, PUSH_REQUEST_TIMEOUT);
        pushDeploymentFuture.whenComplete((response, throwable) -> {
            if (throwable != null) {
                LOG.warn("Failed to push deployment to node {} for partition {}", new Object[]{partitionLeaderId, partition, throwable});
                this.handleRetry(partitionLeaderId, partition, pushRequest);
            } else {
                UnsafeBuffer responseBuffer = new UnsafeBuffer(response);
                if (this.errorResponse.tryWrap((DirectBuffer)responseBuffer)) {
                    this.handleErrorResponseOnPushDeploymentRequest(partitionLeaderId, partition, pushRequest, (DirectBuffer)responseBuffer);
                }
            }
        });
    }

    private void handleErrorResponseOnPushDeploymentRequest(int partitionLeaderId, int partition, PushDeploymentRequest pushRequest, DirectBuffer responseBuffer) {
        this.errorResponse.wrap(responseBuffer, 0, responseBuffer.capacity());
        ErrorCode errorCode = this.errorResponse.getErrorCode();
        if (errorCode == ErrorCode.PARTITION_LEADER_MISMATCH) {
            int responsePartition = this.errorResponse.getErrorData().getInt(0, ByteOrder.LITTLE_ENDIAN);
            LOG.debug("Received partition leader mismatch error from partition {} for deployment {}. Retrying.", (Object)responsePartition, (Object)pushRequest.deploymentKey());
        } else {
            if (errorCode == ErrorCode.RESOURCE_EXHAUSTED) {
                LOG.warn("Received rejected deployment push due to error of type {}: '{}'. Will be retried after a delay", (Object)errorCode.name(), (Object)BufferUtil.bufferAsString((DirectBuffer)this.errorResponse.getErrorData()));
                return;
            }
            LOG.warn("Received rejected deployment push due to error of type {}: '{}'", (Object)errorCode.name(), (Object)BufferUtil.bufferAsString((DirectBuffer)this.errorResponse.getErrorData()));
        }
        this.handleRetry(partitionLeaderId, partition, pushRequest);
    }

    private void createResponseSubscription(long deploymentKey, int partitionId, CompletableActorFuture<Void> distributedFuture) {
        String topic = DeploymentDistributorImpl.getDeploymentResponseTopic(deploymentKey, partitionId);
        if (this.eventService.getSubscriptions(topic).isEmpty()) {
            LOG.trace("Setting up deployment subscription for topic {}", (Object)topic);
            this.eventService.subscribe(topic, response -> {
                LOG.trace("Receiving deployment response on topic {}", (Object)topic);
                UnsafeBuffer responseBuffer = new UnsafeBuffer(response);
                if (this.pushDeploymentResponse.tryWrap((DirectBuffer)responseBuffer)) {
                    if (!distributedFuture.isDone()) {
                        distributedFuture.complete(null);
                    }
                } else if (this.errorResponse.tryWrap((DirectBuffer)responseBuffer)) {
                    this.errorResponse.wrap((DirectBuffer)responseBuffer, 0, responseBuffer.capacity());
                    LOG.warn("Received rejected deployment push due to error of type {}: '{}'", (Object)this.errorResponse.getErrorCode().name(), (Object)BufferUtil.bufferAsString((DirectBuffer)this.errorResponse.getErrorData()));
                } else {
                    LOG.warn("Received unknown deployment response on topic {}", (Object)topic);
                }
                return CompletableFuture.completedFuture(null);
            });
        }
    }

    private void handleRetry(int partitionLeaderId, int partition, PushDeploymentRequest pushRequest) {
        LOG.trace("Retry deployment push to partition {} after {}", (Object)partition, (Object)RETRY_DELAY);
        this.actor.runDelayed(RETRY_DELAY, () -> {
            Int2IntHashMap partitionLeaders = this.partitionListener.getPartitionLeaders();
            if (partitionLeaders.containsKey(partition)) {
                this.pushDeploymentToPartition(partitionLeaders.get(partition), partition, pushRequest);
            } else {
                this.pushDeploymentToPartition(partitionLeaderId, partition, pushRequest);
            }
        });
    }

    public static String getDeploymentResponseTopic(long deploymentKey, int partitionId) {
        return String.format("deployment-response-%d-%d", deploymentKey, partitionId);
    }
}

