/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.system.partitions;

import io.atomix.raft.RaftRoleChangeListener;
import io.atomix.raft.RaftServer;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.HealthMetrics;
import io.camunda.zeebe.broker.system.partitions.PartitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransition;
import io.camunda.zeebe.broker.system.partitions.ZeebePartitionHealth;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.CriticalComponentsHealthMonitor;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitor;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthStatus;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;

public final class ZeebePartition
extends Actor
implements RaftRoleChangeListener,
HealthMonitorable,
FailureListener,
DiskSpaceUsageListener {
    private static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private RaftServer.Role raftRole;
    private final String actorName;
    private final List<FailureListener> failureListeners;
    private final HealthMetrics healthMetrics;
    private final ZeebePartitionHealth zeebePartitionHealth;
    private final PartitionContext context;
    private final PartitionTransition transition;
    private CompletableActorFuture<Void> closeFuture;
    private ActorFuture<Void> currentTransitionFuture;

    public ZeebePartition(PartitionContext context, PartitionTransition transition) {
        this.context = context;
        this.transition = transition;
        context.setActor(this.actor);
        context.setDiskSpaceAvailable(true);
        this.actorName = ZeebePartition.buildActorName((int)context.getNodeId(), (String)"ZeebePartition", (int)context.getPartitionId());
        context.setComponentHealthMonitor((HealthMonitor)new CriticalComponentsHealthMonitor(this.actor, LOG));
        this.zeebePartitionHealth = new ZeebePartitionHealth(context.getPartitionId());
        this.healthMetrics = new HealthMetrics(context.getPartitionId());
        this.healthMetrics.setUnhealthy();
        this.failureListeners = new ArrayList<FailureListener>();
    }

    public void onNewRole(RaftServer.Role newRole, long newTerm) {
        this.actor.run(() -> this.onRoleChange(newRole, newTerm));
    }

    private void onRoleChange(RaftServer.Role newRole, long newTerm) {
        ActorFuture<Void> nextTransitionFuture = null;
        switch (newRole) {
            case LEADER: {
                if (this.raftRole == RaftServer.Role.LEADER) break;
                nextTransitionFuture = this.leaderTransition(newTerm);
                break;
            }
            case INACTIVE: {
                nextTransitionFuture = this.transitionToInactive();
                break;
            }
            default: {
                if (this.raftRole != null && this.raftRole != RaftServer.Role.LEADER) break;
                nextTransitionFuture = this.followerTransition(newTerm);
            }
        }
        if (nextTransitionFuture != null) {
            this.currentTransitionFuture = nextTransitionFuture;
        }
        LOG.debug("Partition role transitioning from {} to {} in term {}", new Object[]{this.raftRole, newRole, newTerm});
        this.raftRole = newRole;
    }

    private ActorFuture<Void> leaderTransition(long newTerm) {
        ActorFuture<Void> leaderTransitionFuture = this.transition.toLeader(newTerm);
        leaderTransitionFuture.onComplete((success, error) -> {
            if (error == null) {
                List listenerFutures = this.context.getPartitionListeners().stream().map(l -> l.onBecomingLeader(this.context.getPartitionId(), newTerm, this.context.getLogStream())).collect(Collectors.toList());
                this.actor.runOnCompletion(listenerFutures, t -> {
                    if (t != null) {
                        this.onInstallFailure((Throwable)t);
                    }
                });
                this.onRecoveredInternal();
            } else {
                LOG.error("Failed to install leader partition {}", (Object)this.context.getPartitionId(), error);
                this.onInstallFailure((Throwable)error);
            }
        });
        return leaderTransitionFuture;
    }

    private ActorFuture<Void> followerTransition(long newTerm) {
        ActorFuture<Void> followerTransitionFuture = this.transition.toFollower(newTerm);
        followerTransitionFuture.onComplete((success, error) -> {
            if (error == null) {
                List listenerFutures = this.context.getPartitionListeners().stream().map(l -> l.onBecomingFollower(this.context.getPartitionId(), newTerm)).collect(Collectors.toList());
                this.actor.runOnCompletion(listenerFutures, t -> {
                    if (t != null) {
                        this.onInstallFailure((Throwable)t);
                    }
                });
                this.onRecoveredInternal();
            } else {
                LOG.error("Failed to install follower partition {}", (Object)this.context.getPartitionId(), error);
                this.onInstallFailure((Throwable)error);
            }
        });
        return followerTransitionFuture;
    }

    private ActorFuture<Void> transitionToInactive() {
        this.zeebePartitionHealth.setServicesInstalled(false);
        ActorFuture<Void> inactiveTransitionFuture = this.transition.toInactive();
        this.currentTransitionFuture = inactiveTransitionFuture;
        return inactiveTransitionFuture;
    }

    public String getName() {
        return this.actorName;
    }

    public void onActorStarting() {
        this.context.getRaftPartition().addRoleChangeListener((RaftRoleChangeListener)this);
        this.context.getComponentHealthMonitor().addFailureListener((FailureListener)this);
        this.onRoleChange(this.context.getRaftPartition().getRole(), this.context.getRaftPartition().term());
    }

    protected void onActorStarted() {
        this.context.getComponentHealthMonitor().startMonitoring();
        this.context.getComponentHealthMonitor().registerComponent(this.context.getRaftPartition().name(), (HealthMonitorable)this.context.getRaftPartition());
        this.context.getComponentHealthMonitor().registerComponent(this.zeebePartitionHealth.getName(), (HealthMonitorable)this.zeebePartitionHealth);
    }

    protected void onActorClosing() {
        this.transitionToInactive().onComplete((nothing, err) -> {
            this.context.getRaftPartition().removeRoleChangeListener((RaftRoleChangeListener)this);
            this.context.getComponentHealthMonitor().removeComponent(this.context.getRaftPartition().name());
            this.closeFuture.complete(null);
        });
    }

    protected void onActorCloseRequested() {
        LOG.debug("Closing ZeebePartition {}", (Object)this.context.getPartitionId());
        this.context.getComponentHealthMonitor().removeComponent(this.zeebePartitionHealth.getName());
    }

    public ActorFuture<Void> closeAsync() {
        if (this.closeFuture != null) {
            return this.closeFuture;
        }
        this.closeFuture = new CompletableActorFuture();
        this.actor.run(() -> this.currentTransitionFuture.onComplete((nothing, err) -> super.closeAsync()));
        return this.closeFuture;
    }

    protected void handleFailure(Exception failure) {
        LOG.warn("Uncaught exception in {}.", (Object)this.actorName, (Object)failure);
        this.onInstallFailure(failure);
    }

    public void onFailure() {
        this.actor.run(() -> {
            this.healthMetrics.setUnhealthy();
            this.failureListeners.forEach(FailureListener::onFailure);
        });
    }

    public void onRecovered() {
        this.actor.run(() -> {
            this.healthMetrics.setHealthy();
            this.failureListeners.forEach(FailureListener::onRecovered);
        });
    }

    public void onUnrecoverableFailure() {
        this.actor.run(this::handleUnrecoverableFailure);
    }

    private void onInstallFailure(Throwable error) {
        if (error instanceof UnrecoverableException) {
            LOG.error("Failed to install partition {} (role {}, term {}) with unrecoverable failure: ", new Object[]{this.context.getPartitionId(), this.context.getCurrentRole(), this.context.getCurrentTerm(), error});
            this.handleUnrecoverableFailure();
        } else {
            this.handleRecoverableFailure();
        }
    }

    private void handleRecoverableFailure() {
        this.zeebePartitionHealth.setServicesInstalled(false);
        this.context.getPartitionListeners().forEach(l -> l.onBecomingInactive(this.context.getPartitionId(), this.context.getCurrentTerm()));
        if (this.context.getCurrentRole() == RaftServer.Role.LEADER && this.context.getCurrentTerm() == this.context.getRaftPartition().term()) {
            LOG.info("Unexpected failure occurred in partition {} (role {}, term {}), stepping down", new Object[]{this.context.getPartitionId(), this.context.getCurrentRole(), this.context.getCurrentTerm()});
            this.context.getRaftPartition().stepDown();
        } else if (this.context.getCurrentRole() == RaftServer.Role.FOLLOWER) {
            LOG.info("Unexpected failure occurred in partition {} (role {}, term {}), transitioning to inactive", new Object[]{this.context.getPartitionId(), this.context.getCurrentRole(), this.context.getCurrentTerm()});
            this.context.getRaftPartition().goInactive();
        }
    }

    private void handleUnrecoverableFailure() {
        this.healthMetrics.setDead();
        this.zeebePartitionHealth.onUnrecoverableFailure();
        this.transitionToInactive();
        this.context.getRaftPartition().goInactive();
        this.failureListeners.forEach(FailureListener::onUnrecoverableFailure);
        this.context.getPartitionListeners().forEach(l -> l.onBecomingInactive(this.context.getPartitionId(), this.context.getCurrentTerm()));
    }

    private void onRecoveredInternal() {
        this.zeebePartitionHealth.setServicesInstalled(true);
    }

    public HealthStatus getHealthStatus() {
        return this.context.getComponentHealthMonitor().getHealthStatus();
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListeners.add(failureListener);
            if (this.getHealthStatus() == HealthStatus.HEALTHY) {
                failureListener.onRecovered();
            } else {
                failureListener.onFailure();
            }
        });
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.remove(failureListener));
    }

    @Override
    public void onDiskSpaceNotAvailable() {
        this.actor.call(() -> {
            this.context.setDiskSpaceAvailable(false);
            this.zeebePartitionHealth.setDiskSpaceAvailable(false);
            if (this.context.getStreamProcessor() != null) {
                LOG.warn("Disk space usage is above threshold. Pausing stream processor.");
                this.context.getStreamProcessor().pauseProcessing();
            }
        });
    }

    @Override
    public void onDiskSpaceAvailable() {
        this.actor.call(() -> {
            this.context.setDiskSpaceAvailable(true);
            this.zeebePartitionHealth.setDiskSpaceAvailable(false);
            if (this.context.getStreamProcessor() != null && this.context.shouldProcess()) {
                LOG.info("Disk space usage is below threshold. Resuming stream processor.");
                this.context.getStreamProcessor().resumeProcessing();
            }
        });
    }

    public ActorFuture<Void> pauseProcessing() {
        CompletableActorFuture completed = new CompletableActorFuture();
        this.actor.call(() -> {
            try {
                this.context.pauseProcessing();
                if (this.context.getStreamProcessor() != null && !this.context.shouldProcess()) {
                    this.context.getStreamProcessor().pauseProcessing().onComplete((BiConsumer)completed);
                } else {
                    completed.complete(null);
                }
            }
            catch (IOException e) {
                LOG.error("Could not pause processing state", (Throwable)e);
                completed.completeExceptionally((Throwable)e);
            }
        });
        return completed;
    }

    public void resumeProcessing() {
        this.actor.call(() -> {
            try {
                this.context.resumeProcessing();
                if (this.context.getStreamProcessor() != null && this.context.shouldProcess()) {
                    this.context.getStreamProcessor().resumeProcessing();
                }
            }
            catch (IOException e) {
                LOG.error("Could not resume processing", (Throwable)e);
            }
        });
    }

    public int getPartitionId() {
        return this.context.getPartitionId();
    }

    public PersistedSnapshotStore getSnapshotStore() {
        return this.context.getRaftPartition().getServer().getPersistedSnapshotStore();
    }

    public void triggerSnapshot() {
        this.actor.call(() -> {
            if (this.context.getSnapshotDirector() != null) {
                this.context.getSnapshotDirector().forceSnapshot();
            }
        });
    }

    public ActorFuture<Optional<StreamProcessor>> getStreamProcessor() {
        return this.actor.call(() -> Optional.ofNullable(this.context.getStreamProcessor()));
    }

    public ActorFuture<Optional<ExporterDirector>> getExporterDirector() {
        return this.actor.call(() -> Optional.ofNullable(this.context.getExporterDirector()));
    }

    public ActorFuture<Void> pauseExporting() {
        CompletableActorFuture completed = new CompletableActorFuture();
        this.actor.call(() -> {
            try {
                boolean pauseStatePersisted = this.context.pauseExporting();
                if (this.context.getExporterDirector() != null && pauseStatePersisted) {
                    this.context.getExporterDirector().pauseExporting().onComplete((BiConsumer)completed);
                } else {
                    completed.complete(null);
                }
            }
            catch (IOException e) {
                LOG.error("Could not pause exporting", (Throwable)e);
                completed.completeExceptionally((Throwable)e);
            }
        });
        return completed;
    }

    public void resumeExporting() {
        this.actor.call(() -> {
            try {
                this.context.resumeExporting();
                if (this.context.getExporterDirector() != null && this.context.shouldExport()) {
                    this.context.getExporterDirector().resumeExporting();
                }
            }
            catch (IOException e) {
                LOG.error("Could not resume exporting", (Throwable)e);
            }
        });
    }
}

