/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.engine.impl;

import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandMessageHandler;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

public final class SubscriptionApiCommandMessageHandlerService
extends Actor
implements PartitionListener,
DiskSpaceUsageListener {
    private static final String SUBSCRIPTION_TOPIC = "subscription";
    private static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private final Int2ObjectHashMap<LogStreamRecordWriter> leaderPartitions = new Int2ObjectHashMap();
    private final ClusterCommunicationService communicationService;
    private final String actorName;
    private SubscriptionCommandMessageHandler messageHandler;

    public SubscriptionApiCommandMessageHandlerService(BrokerInfo localBroker, ClusterCommunicationService communicationService) {
        this.communicationService = communicationService;
        this.actorName = SubscriptionApiCommandMessageHandlerService.buildActorName((int)localBroker.getNodeId(), (String)"SubscriptionApi");
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.messageHandler = new SubscriptionCommandMessageHandler(arg_0 -> ((ActorControl)this.actor).call(arg_0), arg_0 -> this.leaderPartitions.get(arg_0));
        this.communicationService.subscribe(SUBSCRIPTION_TOPIC, (Function)this.messageHandler);
    }

    @Override
    public ActorFuture<Void> onBecomingFollower(int partitionId, long term) {
        return this.actor.call(() -> {
            this.leaderPartitions.remove(partitionId);
            return null;
        });
    }

    @Override
    public ActorFuture<Void> onBecomingLeader(int partitionId, long term, LogStream logStream) {
        CompletableActorFuture future = new CompletableActorFuture();
        this.actor.submit(() -> logStream.newLogStreamRecordWriter().onComplete((recordWriter, error) -> {
            if (error == null) {
                this.leaderPartitions.put(partitionId, recordWriter);
                future.complete(null);
            } else {
                LOG.error("Unexpected error on retrieving write buffer for partition {}", (Object)partitionId, error);
                future.completeExceptionally(error);
            }
        }));
        return future;
    }

    @Override
    public ActorFuture<Void> onBecomingInactive(int partitionId, long term) {
        return this.actor.call(() -> {
            this.leaderPartitions.remove(partitionId);
            return null;
        });
    }

    @Override
    public void onDiskSpaceNotAvailable() {
        this.actor.call(() -> {
            LOG.debug("Broker is out of disk space. All requests with topic {} will be rejected.", (Object)SUBSCRIPTION_TOPIC);
            this.communicationService.unsubscribe(SUBSCRIPTION_TOPIC);
            this.communicationService.subscribe(SUBSCRIPTION_TOPIC, b -> CompletableFuture.completedFuture(null));
        });
    }

    @Override
    public void onDiskSpaceAvailable() {
        this.actor.call(() -> {
            LOG.debug("Broker has disk space available again. All requests with topic {} will be accepted.", (Object)SUBSCRIPTION_TOPIC);
            this.communicationService.unsubscribe(SUBSCRIPTION_TOPIC);
            this.communicationService.subscribe(SUBSCRIPTION_TOPIC, (Function)this.messageHandler);
        });
    }
}

