package io.camunda.zeebe.broker.system.partitions.impl.steps;

import io.atomix.raft.RaftServer;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorBuilder;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.util.function.Supplier;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/steps/StreamProcessorTransitionStep.class */
public final class StreamProcessorTransitionStep implements PartitionTransitionStep {
    private final Supplier<StreamProcessorBuilder> streamProcessorBuilderSupplier;

    public StreamProcessorTransitionStep() {
        this(StreamProcessor::builder);
    }

    public StreamProcessorTransitionStep(Supplier<StreamProcessorBuilder> supplier) {
        this.streamProcessorBuilderSupplier = supplier;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep
    public void onNewRaftRole(PartitionTransitionContext partitionTransitionContext, RaftServer.Role role) {
        RaftServer.Role currentRole = partitionTransitionContext.getCurrentRole();
        StreamProcessor streamProcessor = partitionTransitionContext.getStreamProcessor();
        if (streamProcessor == null) {
            return;
        }
        if (shouldInstallOnTransition(role, currentRole) || role == RaftServer.Role.INACTIVE) {
            streamProcessor.pauseProcessing();
        }
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep
    public ActorFuture<Void> prepareTransition(PartitionTransitionContext partitionTransitionContext, long j, RaftServer.Role role) {
        RaftServer.Role currentRole = partitionTransitionContext.getCurrentRole();
        StreamProcessor streamProcessor = partitionTransitionContext.getStreamProcessor();
        if (streamProcessor == null || !(shouldInstallOnTransition(role, currentRole) || role == RaftServer.Role.INACTIVE)) {
            return CompletableActorFuture.completed((Object) null);
        }
        partitionTransitionContext.getComponentHealthMonitor().removeComponent(streamProcessor.getName());
        ActorFuture<Void> closeAsync = streamProcessor.closeAsync();
        closeAsync.onComplete((r4, th) -> {
            if (th == null) {
                partitionTransitionContext.setStreamProcessor(null);
            }
        });
        return closeAsync;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep
    public ActorFuture<Void> transitionTo(PartitionTransitionContext partitionTransitionContext, long j, RaftServer.Role role) {
        if (!shouldInstallOnTransition(role, partitionTransitionContext.getCurrentRole()) && (partitionTransitionContext.getStreamProcessor() != null || role == RaftServer.Role.INACTIVE)) {
            return CompletableActorFuture.completed((Object) null);
        }
        StreamProcessor createStreamProcessor = createStreamProcessor(partitionTransitionContext, role);
        partitionTransitionContext.setStreamProcessor(createStreamProcessor);
        ActorFuture openAsync = createStreamProcessor.openAsync(!partitionTransitionContext.shouldProcess());
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        openAsync.onComplete((r7, th) -> {
            if (th != null) {
                completableActorFuture.completeExceptionally(th);
                return;
            }
            if (partitionTransitionContext.shouldProcess()) {
                createStreamProcessor.resumeProcessing();
            } else {
                createStreamProcessor.pauseProcessing();
            }
            partitionTransitionContext.getComponentHealthMonitor().registerComponent(createStreamProcessor.getName(), createStreamProcessor);
            completableActorFuture.complete((Object) null);
        });
        return completableActorFuture;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep
    public String getName() {
        return "StreamProcessor";
    }

    private boolean shouldInstallOnTransition(RaftServer.Role role, RaftServer.Role role2) {
        return role == RaftServer.Role.LEADER || (role == RaftServer.Role.FOLLOWER && role2 != RaftServer.Role.CANDIDATE) || (role == RaftServer.Role.CANDIDATE && role2 != RaftServer.Role.FOLLOWER);
    }

    private StreamProcessor createStreamProcessor(PartitionTransitionContext partitionTransitionContext, RaftServer.Role role) {
        return this.streamProcessorBuilderSupplier.get().logStream(partitionTransitionContext.getLogStream()).actorSchedulingService(partitionTransitionContext.getActorSchedulingService()).zeebeDb(partitionTransitionContext.getZeebeDb()).eventApplierFactory(EventAppliers::new).nodeId(partitionTransitionContext.getNodeId()).commandResponseWriter(partitionTransitionContext.getCommandResponseWriter()).listener(typedRecord -> {
            partitionTransitionContext.getOnProcessedListener().accept(typedRecord);
        }).streamProcessorFactory(partitionTransitionContext.getStreamProcessorFactory()).streamProcessorMode(role == RaftServer.Role.LEADER ? StreamProcessorMode.PROCESSING : StreamProcessorMode.REPLAY).build();
    }
}
