/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.system.partitions.impl;

import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.broker.system.partitions.impl.RandomDuration;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthStatus;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorCondition;
import io.camunda.zeebe.util.sched.SchedulingHints;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;

public final class AsyncSnapshotDirector
extends Actor
implements HealthMonitorable {
    public static final Duration MINIMUM_SNAPSHOT_PERIOD = Duration.ofMinutes(1L);
    private static final Logger LOG = Loggers.SNAPSHOT_LOGGER;
    private static final String LOG_MSG_WAIT_UNTIL_COMMITTED = "Finished taking temporary snapshot, need to wait until last written event position {} is committed, current commit position is {}. After that snapshot will be committed.";
    private static final String ERROR_MSG_ON_RESOLVE_PROCESSED_POS = "Unexpected error in resolving last processed position.";
    private static final String ERROR_MSG_ON_RESOLVE_WRITTEN_POS = "Unexpected error in resolving last written position.";
    private static final String ERROR_MSG_MOVE_SNAPSHOT = "Unexpected exception occurred on moving valid snapshot.";
    private final StateController stateController;
    private final LogStream logStream;
    private final Duration snapshotRate;
    private final String processorName;
    private final StreamProcessor streamProcessor;
    private final String actorName;
    private final Set<FailureListener> listeners = new HashSet<FailureListener>();
    private ActorCondition commitCondition;
    private Long lastWrittenEventPosition;
    private TransientSnapshot pendingSnapshot;
    private long lowerBoundSnapshotPosition;
    private boolean takingSnapshot;
    private boolean persistingSnapshot;
    private volatile HealthStatus healthStatus = HealthStatus.HEALTHY;

    public AsyncSnapshotDirector(int nodeId, StreamProcessor streamProcessor, StateController stateController, LogStream logStream, Duration snapshotRate) {
        this.streamProcessor = streamProcessor;
        this.stateController = stateController;
        this.logStream = logStream;
        this.processorName = streamProcessor.getName();
        this.snapshotRate = snapshotRate;
        this.actorName = AsyncSnapshotDirector.buildActorName((int)nodeId, (String)"SnapshotDirector", (int)logStream.getPartitionId());
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.actor.setSchedulingHints(SchedulingHints.ioBound());
        Duration firstSnapshotTime = RandomDuration.getRandomDurationMinuteBased(MINIMUM_SNAPSHOT_PERIOD, this.snapshotRate);
        this.actor.runDelayed(firstSnapshotTime, this::scheduleSnapshotOnRate);
        this.lastWrittenEventPosition = null;
        this.commitCondition = this.actor.onCondition(this.getConditionNameForPosition(), this::persistSnapshotIfLastWrittenPositionCommitted);
        this.logStream.registerOnCommitPositionUpdatedCondition(this.commitCondition);
    }

    protected void onActorCloseRequested() {
        this.logStream.removeOnCommitPositionUpdatedCondition(this.commitCondition);
    }

    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return CompletableActorFuture.completed(null);
        }
        return super.closeAsync();
    }

    protected void handleFailure(Exception failure) {
        LOG.error("No snapshot was taken due to failure in '{}'. Will try to take snapshot after snapshot period {}. {}", new Object[]{this.actorName, this.snapshotRate, failure});
        this.resetStateOnFailure();
        this.healthStatus = HealthStatus.UNHEALTHY;
        for (FailureListener listener : this.listeners) {
            listener.onFailure();
        }
    }

    private void scheduleSnapshotOnRate() {
        this.actor.runAtFixedRate(this.snapshotRate, this::prepareTakingSnapshot);
        this.prepareTakingSnapshot();
    }

    private String getConditionNameForPosition() {
        return this.getName() + "-wait-for-endPosition-committed";
    }

    public void forceSnapshot() {
        this.actor.call(this::prepareTakingSnapshot);
    }

    public HealthStatus getHealthStatus() {
        return this.healthStatus;
    }

    public void addFailureListener(FailureListener listener) {
        this.actor.run(() -> this.listeners.add(listener));
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> this.listeners.remove(failureListener));
    }

    private void prepareTakingSnapshot() {
        if (this.takingSnapshot) {
            return;
        }
        this.takingSnapshot = true;
        ActorFuture futureLastProcessedPosition = this.streamProcessor.getLastProcessedPositionAsync();
        this.actor.runOnCompletion(futureLastProcessedPosition, (lastProcessedPosition, error) -> {
            if (error == null) {
                if (lastProcessedPosition == -1L) {
                    LOG.debug("We will skip taking this snapshot, because we haven't processed something yet.");
                    this.takingSnapshot = false;
                    return;
                }
                this.lowerBoundSnapshotPosition = lastProcessedPosition;
                this.logStream.getCommitPositionAsync().onComplete((commitPosition, errorOnRetrievingCommitPosition) -> {
                    if (errorOnRetrievingCommitPosition != null) {
                        this.takingSnapshot = false;
                        LOG.error("Unexpected error on retrieving commit position", errorOnRetrievingCommitPosition);
                        return;
                    }
                    this.takeSnapshot((long)commitPosition);
                });
            } else {
                LOG.error(ERROR_MSG_ON_RESOLVE_PROCESSED_POS, error);
                this.takingSnapshot = false;
            }
        });
    }

    private void takeSnapshot(long initialCommitPosition) {
        Optional<TransientSnapshot> optionalPendingSnapshot = this.stateController.takeTransientSnapshot(this.lowerBoundSnapshotPosition);
        if (optionalPendingSnapshot.isEmpty()) {
            this.takingSnapshot = false;
            return;
        }
        optionalPendingSnapshot.get().onSnapshotTaken((isValid, snapshotTakenError) -> this.actor.run(() -> {
            if (snapshotTakenError != null) {
                LOG.error("Could not take a snapshot for {}", (Object)this.processorName, snapshotTakenError);
                return;
            }
            LOG.trace("Created temporary snapshot for {}", (Object)this.processorName);
            this.pendingSnapshot = (TransientSnapshot)optionalPendingSnapshot.get();
            this.onRecovered();
            ActorFuture lastWrittenPosition = this.streamProcessor.getLastWrittenPositionAsync();
            this.actor.runOnCompletion(lastWrittenPosition, (endPosition, error) -> {
                if (error == null) {
                    LOG.info(LOG_MSG_WAIT_UNTIL_COMMITTED, endPosition, (Object)initialCommitPosition);
                    this.lastWrittenEventPosition = endPosition;
                    this.persistingSnapshot = false;
                    this.persistSnapshotIfLastWrittenPositionCommitted();
                } else {
                    this.resetStateOnFailure();
                    LOG.error(ERROR_MSG_ON_RESOLVE_WRITTEN_POS, error);
                }
            });
        }));
    }

    private void onRecovered() {
        if (this.healthStatus != HealthStatus.HEALTHY) {
            this.healthStatus = HealthStatus.HEALTHY;
            this.listeners.forEach(FailureListener::onRecovered);
        }
    }

    private void persistSnapshotIfLastWrittenPositionCommitted() {
        this.logStream.getCommitPositionAsync().onComplete((currentCommitPosition, error) -> {
            if (this.pendingSnapshot != null && this.lastWrittenEventPosition != null && currentCommitPosition >= this.lastWrittenEventPosition && !this.persistingSnapshot) {
                this.persistingSnapshot = true;
                LOG.debug("Current commit position {} >= {}, committing snapshot {}.", new Object[]{currentCommitPosition, this.lastWrittenEventPosition, this.pendingSnapshot});
                ActorFuture snapshotPersistFuture = this.pendingSnapshot.persist();
                snapshotPersistFuture.onComplete((snapshot, persistError) -> {
                    if (persistError != null) {
                        LOG.error(ERROR_MSG_MOVE_SNAPSHOT, persistError);
                    }
                    this.lastWrittenEventPosition = null;
                    this.takingSnapshot = false;
                    this.pendingSnapshot = null;
                    this.persistingSnapshot = false;
                });
            }
        });
    }

    private void resetStateOnFailure() {
        this.lastWrittenEventPosition = null;
        this.takingSnapshot = false;
        if (this.pendingSnapshot != null) {
            this.pendingSnapshot.abort();
            this.pendingSnapshot = null;
        }
    }
}

