/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.system.management;

import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.ClusterEventService;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentRequestHandler;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.protocol.impl.encoding.ErrorResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

public final class LeaderManagementRequestHandler
extends Actor
implements PartitionListener,
DiskSpaceUsageListener {
    private static final String DEPLOYMENT_TOPIC = "deployment";
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private final Int2ObjectHashMap<LogStreamRecordWriter> leaderForPartitions = new Int2ObjectHashMap();
    private final String actorName;
    private PushDeploymentRequestHandler pushDeploymentRequestHandler;
    private final ErrorResponse outOfDiskSpaceError;
    private final ClusterCommunicationService communicationService;
    private final ClusterEventService eventService;

    public LeaderManagementRequestHandler(BrokerInfo localBroker, ClusterCommunicationService communicationService, ClusterEventService eventService) {
        this.communicationService = communicationService;
        this.eventService = eventService;
        this.actorName = LeaderManagementRequestHandler.buildActorName((int)localBroker.getNodeId(), (String)"ManagementRequestHandler");
        this.outOfDiskSpaceError = new ErrorResponse();
        this.outOfDiskSpaceError.setErrorCode(ErrorCode.RESOURCE_EXHAUSTED).setErrorData(BufferUtil.wrapString((String)String.format("Broker %d is out of disk space. Rejecting deployment request.", localBroker.getNodeId())));
    }

    @Override
    public ActorFuture<Void> onBecomingFollower(int partitionId, long term) {
        return this.actor.call(() -> {
            this.leaderForPartitions.remove(partitionId);
            return null;
        });
    }

    @Override
    public ActorFuture<Void> onBecomingLeader(int partitionId, long term, LogStream logStream) {
        CompletableActorFuture future = new CompletableActorFuture();
        this.actor.submit(() -> logStream.newLogStreamRecordWriter().onComplete((recordWriter, error) -> {
            if (error == null) {
                this.leaderForPartitions.put(partitionId, recordWriter);
                future.complete(null);
            } else {
                LOG.error("Unexpected error on retrieving write buffer for partition {}", (Object)partitionId, error);
                future.completeExceptionally(error);
            }
        }));
        return future;
    }

    @Override
    public ActorFuture<Void> onBecomingInactive(int partitionId, long term) {
        return this.actor.call(() -> {
            this.leaderForPartitions.remove(partitionId);
            return null;
        });
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.pushDeploymentRequestHandler = new PushDeploymentRequestHandler(this.leaderForPartitions, this.actor, this.eventService);
        this.communicationService.subscribe(DEPLOYMENT_TOPIC, (Function)this.pushDeploymentRequestHandler);
    }

    public PushDeploymentRequestHandler getPushDeploymentRequestHandler() {
        return this.pushDeploymentRequestHandler;
    }

    @Override
    public void onDiskSpaceNotAvailable() {
        this.actor.call(() -> {
            LOG.debug("Broker is out of disk space. All requests with topic {} will be rejected.", (Object)DEPLOYMENT_TOPIC);
            this.communicationService.unsubscribe(DEPLOYMENT_TOPIC);
            this.communicationService.subscribe(DEPLOYMENT_TOPIC, b -> CompletableFuture.completedFuture(this.outOfDiskSpaceError.toBytes()));
        });
    }

    @Override
    public void onDiskSpaceAvailable() {
        this.actor.call(() -> {
            LOG.debug("Broker has disk space available again. All requests with topic {} will be accepted.", (Object)DEPLOYMENT_TOPIC);
            this.communicationService.unsubscribe(DEPLOYMENT_TOPIC);
            this.communicationService.subscribe(DEPLOYMENT_TOPIC, (Function)this.pushDeploymentRequestHandler);
        });
    }
}

