package io.camunda.zeebe.broker.system.partitions.impl.steps;

import io.atomix.raft.RaftServer;
import io.camunda.zeebe.broker.system.partitions.PartitionStartupAndTransitionContextImpl;
import io.camunda.zeebe.broker.system.partitions.PartitionStep;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/steps/StreamProcessorPartitionStep.class */
public class StreamProcessorPartitionStep implements PartitionStep {
    @Override // io.camunda.zeebe.broker.system.partitions.PartitionStep
    public ActorFuture<Void> open(PartitionStartupAndTransitionContextImpl partitionStartupAndTransitionContextImpl) {
        StreamProcessor createStreamProcessor = createStreamProcessor(partitionStartupAndTransitionContextImpl);
        ActorFuture openAsync = createStreamProcessor.openAsync(!partitionStartupAndTransitionContextImpl.shouldProcess());
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        openAsync.onComplete((r7, th) -> {
            if (th != null) {
                completableActorFuture.completeExceptionally(th);
                return;
            }
            partitionStartupAndTransitionContextImpl.setStreamProcessor(createStreamProcessor);
            if (partitionStartupAndTransitionContextImpl.shouldProcess()) {
                createStreamProcessor.resumeProcessing();
            } else {
                createStreamProcessor.pauseProcessing();
            }
            partitionStartupAndTransitionContextImpl.getComponentHealthMonitor().registerComponent(createStreamProcessor.getName(), createStreamProcessor);
            completableActorFuture.complete((Object) null);
        });
        return completableActorFuture;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionStep
    public ActorFuture<Void> close(PartitionStartupAndTransitionContextImpl partitionStartupAndTransitionContextImpl) {
        partitionStartupAndTransitionContextImpl.getComponentHealthMonitor().removeComponent(partitionStartupAndTransitionContextImpl.getStreamProcessor().getName());
        ActorFuture<Void> closeAsync = partitionStartupAndTransitionContextImpl.getStreamProcessor().closeAsync();
        partitionStartupAndTransitionContextImpl.setStreamProcessor(null);
        return closeAsync;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.PartitionStep
    public String getName() {
        return "StreamProcessor";
    }

    private StreamProcessor createStreamProcessor(PartitionStartupAndTransitionContextImpl partitionStartupAndTransitionContextImpl) {
        return StreamProcessor.builder().logStream(partitionStartupAndTransitionContextImpl.getLogStream()).actorSchedulingService(partitionStartupAndTransitionContextImpl.getActorSchedulingService()).zeebeDb(partitionStartupAndTransitionContextImpl.getZeebeDb()).eventApplierFactory(EventAppliers::new).nodeId(partitionStartupAndTransitionContextImpl.getNodeId()).commandResponseWriter(partitionStartupAndTransitionContextImpl.getCommandResponseWriter()).listener(typedRecord -> {
            partitionStartupAndTransitionContextImpl.getOnProcessedListener().accept(typedRecord);
        }).streamProcessorFactory(partitionStartupAndTransitionContextImpl.getStreamProcessorFactory()).streamProcessorMode(partitionStartupAndTransitionContextImpl.getCurrentRole() == RaftServer.Role.LEADER ? StreamProcessorMode.PROCESSING : StreamProcessorMode.REPLAY).build();
    }
}
