/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.TransactionOperation;
import io.camunda.zeebe.db.ZeebeDbTransaction;
import io.camunda.zeebe.engine.processing.streamprocessor.EventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.MetadataEventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.MigratedStreamProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingException;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProcessorMap;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProtocolVersionFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.NoopResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.ReprocessingStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.mutable.MutableLastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.util.buffer.BufferReader;
import io.camunda.zeebe.util.retry.EndlessRetryStrategy;
import io.camunda.zeebe.util.retry.RetryStrategy;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.slf4j.Logger;

public final class ReProcessingStateMachine {
    private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
    private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT = "Expected to find event processor for event '{} {}', but caught an exception. Skip this event.";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_FOLLOW_UP_EVENT = "Expected to find last follow-up event position '%d', but last position was '%d'. Failed to reprocess on processor";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_NEXT_EVENT = "Expected to find last follow-up event position '%d', but found no next event. Failed to reprocess on processor";
    private static final String LOG_STMT_REPROCESSING_FINISHED = "Processor finished reprocessing at event position {}";
    private static final String LOG_STMT_FAILED_ON_PROCESSING = "Event {} failed on processing last time, will call #onError to update process instance blacklist.";
    private static final String ERROR_INCONSISTENT_LOG = "Expected that position '%d' of current event is higher then position '%d' of last event, but was not. Inconsistent log detected!";
    private static final Consumer<SideEffectProducer> NOOP_SIDE_EFFECT_CONSUMER = sideEffect -> {};
    private static final Consumer<Long> NOOP_LONG_CONSUMER = instanceKey -> {};
    private final RecordMetadata metadata = new RecordMetadata();
    private final MutableZeebeState zeebeState;
    private final KeyGeneratorControls keyGeneratorControls;
    private final MutableLastProcessedPositionState lastProcessedPositionState;
    private final ActorControl actor;
    private final ErrorRecord errorRecord = new ErrorRecord();
    private final TypedEventImpl typedEvent;
    private final RecordValues recordValues;
    private final RecordProcessorMap recordProcessorMap;
    private final EventFilter eventFilter = new MetadataEventFilter(new RecordProtocolVersionFilter());
    private final LogStreamReader logStreamReader;
    private final ReprocessingStreamWriter reprocessingStreamWriter;
    private final TypedResponseWriter noopResponseWriter = new NoopResponseWriter();
    private final EventApplier eventApplier;
    private final TransactionContext transactionContext;
    private final RetryStrategy updateStateRetryStrategy;
    private final RetryStrategy processRetryStrategy;
    private final BooleanSupplier abortCondition;
    private final Set<Long> failedEventPositions = new HashSet<Long>();
    private long lastSourceEventPosition;
    private long lastFollowUpEventPosition;
    private long snapshotPosition;
    private long highestRecordKey = -1L;
    private final Map<Long, Long> lastGeneratedKeyBySourceCommandPosition = new HashMap<Long, Long>();
    private ActorFuture<Long> recoveryFuture;
    private LoggedEvent currentEvent;
    private TypedRecordProcessor eventProcessor;
    private ZeebeDbTransaction zeebeDbTransaction;

    public ReProcessingStateMachine(ProcessingContext context) {
        this.actor = context.getActor();
        this.logStreamReader = context.getLogStreamReader();
        this.recordValues = context.getRecordValues();
        this.recordProcessorMap = context.getRecordProcessorMap();
        this.transactionContext = context.getTransactionContext();
        this.zeebeState = context.getZeebeState();
        this.abortCondition = context.getAbortCondition();
        this.eventApplier = context.getEventApplier();
        this.keyGeneratorControls = context.getKeyGeneratorControls();
        this.lastProcessedPositionState = context.getLastProcessedPositionState();
        this.typedEvent = new TypedEventImpl(context.getLogStream().getPartitionId());
        this.updateStateRetryStrategy = new EndlessRetryStrategy(this.actor);
        this.processRetryStrategy = new EndlessRetryStrategy(this.actor);
        this.reprocessingStreamWriter = context.getReprocessingStreamWriter();
    }

    ActorFuture<Long> startRecover(long snapshotPosition) {
        this.recoveryFuture = new CompletableActorFuture();
        this.snapshotPosition = snapshotPosition;
        LOG.trace("Start scanning the log for error events.");
        this.lastSourceEventPosition = this.scanLog(snapshotPosition);
        LOG.trace("Finished scanning the log for error events.");
        if (this.lastSourceEventPosition > snapshotPosition) {
            LOG.info("Processor starts reprocessing, until last source event position {}", (Object)this.lastSourceEventPosition);
            this.logStreamReader.seekToNextEvent(snapshotPosition);
            this.reprocessNextEvent();
        } else if (snapshotPosition > 0L) {
            this.recoveryFuture.complete((Object)snapshotPosition);
        } else {
            this.recoveryFuture.complete((Object)-1L);
        }
        return this.recoveryFuture;
    }

    private long scanLog(long snapshotPosition) {
        long lastSourceEventPosition = -1L;
        if (this.logStreamReader.hasNext()) {
            lastSourceEventPosition = snapshotPosition;
            long lastPosition = snapshotPosition;
            while (this.logStreamReader.hasNext()) {
                long recordKey;
                long sourceEventPosition;
                LoggedEvent newEvent = (LoggedEvent)this.logStreamReader.next();
                long currentPosition = newEvent.getPosition();
                if (lastPosition >= currentPosition) {
                    throw new IllegalStateException(String.format(ERROR_INCONSISTENT_LOG, currentPosition, lastPosition));
                }
                lastPosition = currentPosition;
                this.metadata.reset();
                newEvent.readMetadata((BufferReader)this.metadata);
                long errorPosition = -1L;
                if (this.metadata.getValueType() == ValueType.ERROR) {
                    newEvent.readValue((BufferReader)this.errorRecord);
                    errorPosition = this.errorRecord.getErrorEventPosition();
                }
                if (errorPosition >= 0L) {
                    LOG.debug("Found error-prone event {} on reprocessing, will add position {} to the blacklist.", (Object)newEvent, (Object)errorPosition);
                    this.failedEventPositions.add(errorPosition);
                }
                if ((sourceEventPosition = newEvent.getSourceEventPosition()) > 0L) {
                    if (sourceEventPosition > lastSourceEventPosition) {
                        lastSourceEventPosition = sourceEventPosition;
                    }
                    if (currentPosition > this.lastFollowUpEventPosition) {
                        this.lastFollowUpEventPosition = currentPosition;
                    }
                }
                UnifiedRecordValue recordValue = this.recordValues.readRecordValue(newEvent, this.metadata.getValueType());
                this.typedEvent.wrap(newEvent, this.metadata, recordValue);
                if (MigratedStreamProcessors.isMigrated(this.typedEvent) && this.typedEvent.getRecordType() == RecordType.COMMAND) {
                    this.lastGeneratedKeyBySourceCommandPosition.put(currentPosition, -1L);
                }
                if (Protocol.decodePartitionId((long)(recordKey = newEvent.getKey())) != this.zeebeState.getPartitionId()) continue;
                this.lastGeneratedKeyBySourceCommandPosition.computeIfPresent(sourceEventPosition, (position, key) -> Math.max(recordKey, key));
                this.highestRecordKey = Math.max(recordKey, this.highestRecordKey);
            }
            this.logStreamReader.seek(snapshotPosition + 1L);
        }
        return lastSourceEventPosition;
    }

    private void readNextEvent() {
        if (!this.logStreamReader.hasNext()) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_NEXT_EVENT, this.lastFollowUpEventPosition));
        }
        this.currentEvent = (LoggedEvent)this.logStreamReader.next();
        if (this.currentEvent.getPosition() > this.lastFollowUpEventPosition) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_FOLLOW_UP_EVENT, this.lastFollowUpEventPosition, this.currentEvent.getPosition()));
        }
    }

    private void reprocessNextEvent() {
        try {
            this.readNextEvent();
            if (this.eventFilter.applies(this.currentEvent)) {
                this.reprocessEvent(this.currentEvent);
            } else {
                this.onRecordReprocessed(this.currentEvent);
            }
        }
        catch (RuntimeException e) {
            ProcessingException processingException = new ProcessingException("Unable to reprocess record", this.currentEvent, this.metadata, e);
            this.recoveryFuture.completeExceptionally((Throwable)processingException);
        }
    }

    private void reprocessEvent(LoggedEvent currentEvent) {
        try {
            this.metadata.reset();
            currentEvent.readMetadata((BufferReader)this.metadata);
            this.eventProcessor = this.recordProcessorMap.get(this.metadata.getRecordType(), this.metadata.getValueType(), this.metadata.getIntent().value());
        }
        catch (Exception e) {
            LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, new Object[]{currentEvent, this.metadata, e});
        }
        UnifiedRecordValue value = this.recordValues.readRecordValue(currentEvent, this.metadata.getValueType());
        this.typedEvent.wrap(currentEvent, this.metadata, value);
        this.processUntilDone(currentEvent.getPosition(), this.typedEvent);
    }

    private void processUntilDone(long position, TypedRecord<?> currentEvent) {
        TransactionOperation operationOnProcessing = this.chooseOperationForEvent(position, currentEvent);
        ActorFuture resultFuture = this.processRetryStrategy.runWithRetry(() -> {
            boolean onRetry;
            boolean bl = onRetry = this.zeebeDbTransaction != null;
            if (onRetry) {
                this.zeebeDbTransaction.rollback();
            }
            this.zeebeDbTransaction = this.transactionContext.getCurrentTransaction();
            this.zeebeDbTransaction.run(operationOnProcessing);
            return true;
        }, this.abortCondition);
        this.actor.runOnCompletion(resultFuture, (v, t) -> {
            assert (t == null) : "On reprocessing there shouldn't be any exception thrown.";
            this.updateStateUntilDone();
        });
    }

    private TransactionOperation chooseOperationForEvent(long position, TypedRecord<?> currentEvent) {
        TransactionOperation operationOnProcessing;
        if (this.failedEventPositions.contains(position)) {
            LOG.info(LOG_STMT_FAILED_ON_PROCESSING, currentEvent);
            operationOnProcessing = () -> this.zeebeState.getBlackListState().tryToBlacklist(currentEvent, NOOP_LONG_CONSUMER);
        } else {
            operationOnProcessing = () -> {
                boolean isNotOnBlacklist;
                boolean bl = isNotOnBlacklist = !this.zeebeState.getBlackListState().isOnBlacklist(this.typedEvent);
                if (isNotOnBlacklist) {
                    this.reprocessRecord(currentEvent);
                }
                this.lastProcessedPositionState.markAsProcessed(position);
            };
        }
        return operationOnProcessing;
    }

    private void reprocessRecord(TypedRecord<?> currentEvent) {
        long recordPosition = currentEvent.getPosition();
        if (MigratedStreamProcessors.isMigrated(currentEvent)) {
            if (currentEvent.getRecordType() == RecordType.EVENT && currentEvent.getSourceRecordPosition() > this.snapshotPosition) {
                this.eventApplier.applyState(currentEvent.getKey(), currentEvent.getIntent(), (RecordValue)currentEvent.getValue());
            } else if (currentEvent.getRecordType() == RecordType.COMMAND) {
                Optional.ofNullable(this.lastGeneratedKeyBySourceCommandPosition.get(currentEvent.getPosition())).ifPresent(this.keyGeneratorControls::setKeyIfHigher);
            }
        } else if (recordPosition <= this.lastSourceEventPosition && this.eventProcessor != null) {
            this.reprocessingStreamWriter.configureSourceContext(recordPosition);
            this.eventProcessor.processRecord(recordPosition, this.typedEvent, this.noopResponseWriter, this.reprocessingStreamWriter, NOOP_SIDE_EFFECT_CONSUMER);
        }
    }

    private void updateStateUntilDone() {
        ActorFuture retryFuture = this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.commit();
            this.zeebeDbTransaction = null;
            return true;
        }, this.abortCondition);
        this.actor.runOnCompletion(retryFuture, (bool, throwable) -> {
            assert (throwable == null) : "On reprocessing there shouldn't be any exception thrown.";
            this.onRecordReprocessed(this.currentEvent);
        });
    }

    private void onRecordReprocessed(LoggedEvent currentEvent) {
        this.reprocessingStreamWriter.removeRecord(currentEvent.getKey(), currentEvent.getSourceEventPosition());
        if (currentEvent.getPosition() >= this.lastFollowUpEventPosition) {
            LOG.info(LOG_STMT_REPROCESSING_FINISHED, (Object)currentEvent.getPosition());
            this.logStreamReader.seekToNextEvent(this.lastSourceEventPosition);
            this.onRecovered(this.lastSourceEventPosition);
        } else {
            this.actor.submit(this::reprocessNextEvent);
        }
    }

    private void onRecovered(long lastProcessedPosition) {
        this.keyGeneratorControls.setKeyIfHigher(this.highestRecordKey);
        this.failedEventPositions.clear();
        this.lastGeneratedKeyBySourceCommandPosition.clear();
        this.recoveryFuture.complete((Object)lastProcessedPosition);
    }
}

