/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingStateMachine;
import io.camunda.zeebe.engine.processing.streamprocessor.ReProcessingStateMachine;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProcessorMap;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorBuilder;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZbColumnFamilies;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthStatus;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorCondition;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;

public class StreamProcessor
extends Actor
implements HealthMonitorable {
    public static final long UNSET_POSITION = -1L;
    public static final Duration HEALTH_CHECK_TICK_DURATION = Duration.ofSeconds(5L);
    private static final String ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED = "Expected to find event with the snapshot position %s in log stream, but nothing was found. Failed to recover '%s'.";
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final ActorScheduler actorScheduler;
    private final AtomicBoolean isOpened = new AtomicBoolean(false);
    private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
    private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
    private final Set<FailureListener> failureListeners = new HashSet<FailureListener>();
    private final LogStream logStream;
    private final int partitionId;
    private final ZeebeDb zeebeDb;
    private final ProcessingContext processingContext;
    private final TypedRecordProcessorFactory typedRecordProcessorFactory;
    private final String actorName;
    private LogStreamReader logStreamReader;
    private ActorCondition onCommitPositionUpdatedCondition;
    private long snapshotPosition = -1L;
    private ProcessingStateMachine processingStateMachine;
    private volatile Phase phase = Phase.REPROCESSING;
    private CompletableActorFuture<Void> openFuture;
    private CompletableActorFuture<Void> closeFuture = CompletableActorFuture.completed(null);
    private volatile long lastTickTime;
    private boolean shouldProcess = true;
    private ActorFuture<Long> recoverFuture;

    protected StreamProcessor(StreamProcessorBuilder processorBuilder) {
        this.actorScheduler = processorBuilder.getActorScheduler();
        this.lifecycleAwareListeners = processorBuilder.getLifecycleListeners();
        this.typedRecordProcessorFactory = processorBuilder.getTypedRecordProcessorFactory();
        this.zeebeDb = processorBuilder.getZeebeDb();
        this.eventApplierFactory = processorBuilder.getEventApplierFactory();
        this.processingContext = processorBuilder.getProcessingContext().eventCache(new RecordValues()).actor(this.actor).abortCondition(this::isClosed);
        this.logStream = this.processingContext.getLogStream();
        this.partitionId = this.logStream.getPartitionId();
        this.actorName = StreamProcessor.buildActorName((int)processorBuilder.getNodeId(), (String)"StreamProcessor", (int)this.partitionId);
    }

    public static StreamProcessorBuilder builder() {
        return new StreamProcessorBuilder();
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.actor.runOnCompletionBlockingCurrentPhase(this.logStream.newLogStreamBatchWriter(), this::onRetrievingWriter);
    }

    protected void onActorStarted() {
        try {
            LOG.debug("Recovering state of partition {} from snapshot", (Object)this.partitionId);
            long startTime = System.currentTimeMillis();
            this.snapshotPosition = this.recoverFromSnapshot();
            this.initProcessors();
            this.processingStateMachine = new ProcessingStateMachine(this.processingContext, this::shouldProcessNext);
            this.healthCheckTick();
            this.openFuture.complete(null);
            ReProcessingStateMachine reProcessingStateMachine = new ReProcessingStateMachine(this.processingContext);
            this.processingContext.disableLogStreamWriter();
            this.processingContext.enableReprocessingStreamWriter();
            this.recoverFuture = reProcessingStateMachine.startRecover(this.snapshotPosition);
            this.actor.runOnCompletion(this.recoverFuture, (lastReprocessedPosition, throwable) -> {
                if (throwable != null) {
                    LOG.error("Unexpected error on recovery happens.", throwable);
                    this.onFailure((Throwable)throwable);
                } else {
                    this.onRecovered((long)lastReprocessedPosition);
                    new StreamProcessorMetrics(this.partitionId).recoveryTime(System.currentTimeMillis() - startTime);
                }
            });
        }
        catch (RuntimeException e) {
            this.onFailure(e);
        }
    }

    protected void onActorClosing() {
        this.tearDown();
    }

    protected void onActorClosed() {
        this.closeFuture.complete(null);
        LOG.debug("Closed stream processor controller {}.", (Object)this.getName());
    }

    protected void onActorCloseRequested() {
        if (!this.isFailed()) {
            this.lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onClose);
        }
    }

    public ActorFuture<Void> closeAsync() {
        if (this.isOpened.compareAndSet(true, false)) {
            this.closeFuture = new CompletableActorFuture();
            this.actor.close();
        }
        return this.closeFuture;
    }

    protected void handleFailure(Exception failure) {
        this.onFailure(failure);
    }

    public void onActorFailed() {
        this.phase = Phase.FAILED;
        this.closeFuture = CompletableActorFuture.completed(null);
        this.isOpened.set(false);
        this.lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onFailed);
        this.tearDown();
    }

    private boolean shouldProcessNext() {
        return this.isOpened() && this.shouldProcess;
    }

    private void tearDown() {
        this.processingContext.getLogStreamReader().close();
        if (this.onCommitPositionUpdatedCondition != null) {
            this.logStream.removeOnCommitPositionUpdatedCondition(this.onCommitPositionUpdatedCondition);
            this.onCommitPositionUpdatedCondition = null;
        }
    }

    private void healthCheckTick() {
        this.lastTickTime = ActorClock.currentTimeMillis();
        this.actor.runDelayed(HEALTH_CHECK_TICK_DURATION, this::healthCheckTick);
    }

    private void onRetrievingWriter(LogStreamBatchWriter batchWriter, Throwable errorOnReceivingWriter) {
        if (errorOnReceivingWriter == null) {
            this.processingContext.maxFragmentSize(batchWriter.getMaxFragmentLength()).logStreamWriter(new TypedStreamWriterImpl(batchWriter));
            this.actor.runOnCompletionBlockingCurrentPhase(this.logStream.newLogStreamReader(), this::onRetrievingReader);
        } else {
            LOG.error("Unexpected error on retrieving batch writer from log stream.", errorOnReceivingWriter);
            this.actor.close();
        }
    }

    private void onRetrievingReader(LogStreamReader reader, Throwable errorOnReceivingReader) {
        if (errorOnReceivingReader == null) {
            this.logStreamReader = reader;
            this.processingContext.logStreamReader(reader);
        } else {
            LOG.error("Unexpected error on retrieving reader from log stream.", errorOnReceivingReader);
            this.actor.close();
        }
    }

    public ActorFuture<Void> openAsync(boolean pauseOnStart) {
        if (this.isOpened.compareAndSet(false, true)) {
            this.shouldProcess = !pauseOnStart;
            this.openFuture = new CompletableActorFuture();
            this.actorScheduler.submitActor((Actor)this);
        }
        return this.openFuture;
    }

    private void initProcessors() {
        TypedRecordProcessors typedRecordProcessors = this.typedRecordProcessorFactory.createProcessors(this.processingContext);
        this.lifecycleAwareListeners.addAll(typedRecordProcessors.getLifecycleListeners());
        RecordProcessorMap recordProcessorMap = typedRecordProcessors.getRecordProcessorMap();
        recordProcessorMap.values().forEachRemaining(this.lifecycleAwareListeners::add);
        this.processingContext.recordProcessorMap(recordProcessorMap);
    }

    private long recoverFromSnapshot() {
        boolean failedToRecoverReader;
        ZeebeDbState zeebeState = this.recoverState();
        long snapshotPosition = zeebeState.getLastProcessedPositionState().getLastSuccessfulProcessedRecordPosition();
        boolean bl = failedToRecoverReader = !this.logStreamReader.seekToNextEvent(snapshotPosition);
        if (failedToRecoverReader) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED, snapshotPosition, this.getName()));
        }
        LOG.info("Recovered state of partition {} from snapshot at position {}", (Object)this.partitionId, (Object)snapshotPosition);
        return snapshotPosition;
    }

    private ZeebeDbState recoverState() {
        TransactionContext transactionContext = this.zeebeDb.createContext();
        ZeebeDbState zeebeState = new ZeebeDbState(this.partitionId, (ZeebeDb<ZbColumnFamilies>)this.zeebeDb, transactionContext);
        this.processingContext.transactionContext(transactionContext);
        this.processingContext.zeebeState(zeebeState);
        this.processingContext.eventApplier(this.eventApplierFactory.apply(zeebeState));
        return zeebeState;
    }

    private void onRecovered(long lastReprocessedPosition) {
        this.phase = Phase.PROCESSING;
        this.processingContext.enableLogStreamWriter();
        this.onCommitPositionUpdatedCondition = this.actor.onCondition(this.getName() + "-on-commit-position-updated", this.processingStateMachine::readNextEvent);
        this.logStream.registerOnCommitPositionUpdatedCondition(this.onCommitPositionUpdatedCondition);
        this.lifecycleAwareListeners.forEach(l -> l.onRecovered(this.processingContext));
        this.processingStateMachine.startProcessing(lastReprocessedPosition);
        if (!this.shouldProcess) {
            this.setStateToPausedAndNotifyListeners();
        }
    }

    private void onFailure(Throwable throwable) {
        LOG.error("Actor {} failed in phase {}.", new Object[]{this.actorName, this.actor.getLifecyclePhase(), throwable});
        this.actor.fail();
        if (!this.openFuture.isDone()) {
            this.openFuture.completeExceptionally(throwable);
        }
        if (throwable instanceof UnrecoverableException) {
            this.failureListeners.forEach(FailureListener::onUnrecoverableFailure);
        } else {
            this.failureListeners.forEach(FailureListener::onFailure);
        }
    }

    public boolean isOpened() {
        return this.isOpened.get();
    }

    public boolean isClosed() {
        return !this.isOpened.get();
    }

    public boolean isFailed() {
        return this.phase == Phase.FAILED;
    }

    public ActorFuture<Long> getLastProcessedPositionAsync() {
        return this.actor.call(this.processingStateMachine::getLastSuccessfulProcessedEventPosition);
    }

    public ActorFuture<Long> getLastWrittenPositionAsync() {
        return this.actor.call(this.processingStateMachine::getLastWrittenEventPosition);
    }

    public HealthStatus getHealthStatus() {
        if (this.actor.isClosed()) {
            return HealthStatus.UNHEALTHY;
        }
        if (!this.processingStateMachine.isMakingProgress()) {
            return HealthStatus.UNHEALTHY;
        }
        if (ActorClock.currentTimeMillis() - this.lastTickTime > HEALTH_CHECK_TICK_DURATION.toMillis() * 2L) {
            return HealthStatus.UNHEALTHY;
        }
        if (this.phase == Phase.FAILED) {
            return HealthStatus.UNHEALTHY;
        }
        return HealthStatus.HEALTHY;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.add(failureListener));
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.failureListeners.remove(failureListener));
    }

    public ActorFuture<Phase> getCurrentPhase() {
        return this.actor.call(() -> this.phase);
    }

    public ActorFuture<Void> pauseProcessing() {
        return this.actor.call(() -> this.recoverFuture.onComplete((v, t) -> {
            if (this.shouldProcess) {
                this.setStateToPausedAndNotifyListeners();
            }
        }));
    }

    private void setStateToPausedAndNotifyListeners() {
        this.lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onPaused);
        this.shouldProcess = false;
        this.phase = Phase.PAUSED;
        LOG.debug("Paused processing for partition {}", (Object)this.partitionId);
    }

    public void resumeProcessing() {
        this.actor.call(() -> this.recoverFuture.onComplete((v, t) -> {
            if (!this.shouldProcess) {
                this.lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onResumed);
                this.shouldProcess = true;
                this.phase = Phase.PROCESSING;
                this.actor.submit(this.processingStateMachine::readNextEvent);
                LOG.debug("Resumed processing for partition {}", (Object)this.partitionId);
            }
        }));
    }

    public static enum Phase {
        REPROCESSING,
        PROCESSING,
        FAILED,
        PAUSED;

    }
}

