package io.camunda.zeebe.broker.system.partitions.impl;

import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.camunda.zeebe.broker.system.partitions.AtomixRecordEntrySupplier;
import io.camunda.zeebe.broker.system.partitions.SnapshotReplication;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStore;
import io.camunda.zeebe.snapshots.ReceivedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.snapshots.SnapshotChunkReader;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.function.ToLongFunction;
import org.agrona.collections.Object2NullableObjectHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/StateControllerImpl.class */
public class StateControllerImpl implements StateController, PersistedSnapshotListener {
    private static final ReplicationContext INVALID_SNAPSHOT = new ReplicationContext(null, -1, null);
    private static final Logger LOG = Loggers.SNAPSHOT_LOGGER;
    private final SnapshotReplication replication;
    private final Map<String, ReplicationContext> receivedSnapshots = new Object2NullableObjectHashMap();
    private final Path runtimeDirectory;
    private final ZeebeDbFactory zeebeDbFactory;
    private final ToLongFunction<ZeebeDb> exporterPositionSupplier;
    private final AtomixRecordEntrySupplier entrySupplier;
    private final SnapshotReplicationMetrics metrics;
    private ZeebeDb db;
    private final ConstructableSnapshotStore constructableSnapshotStore;
    private final ReceivableSnapshotStore receivableSnapshotStore;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/StateControllerImpl$ReplicationContext.class */
    public static final class ReplicationContext {
        private final long startTimestamp;
        private final ReceivedSnapshot receivedSnapshot;
        private final SnapshotReplicationMetrics metrics;
        private long chunkCount;

        ReplicationContext(SnapshotReplicationMetrics snapshotReplicationMetrics, long j, ReceivedSnapshot receivedSnapshot) {
            this.metrics = snapshotReplicationMetrics;
            if (snapshotReplicationMetrics != null) {
                snapshotReplicationMetrics.incrementCount();
            }
            this.startTimestamp = j;
            this.chunkCount = 0L;
            this.receivedSnapshot = receivedSnapshot;
        }

        /*  JADX ERROR: Failed to decode insn: 0x0007: MOVE_MULTI, method: io.camunda.zeebe.broker.system.partitions.impl.StateControllerImpl.ReplicationContext.incrementCount():long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        long incrementCount() {
            /*
                r6 = this;
                r0 = r6
                r1 = r0
                long r1 = r1.chunkCount
                r2 = 1
                long r1 = r1 + r2
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.chunkCount = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: io.camunda.zeebe.broker.system.partitions.impl.StateControllerImpl.ReplicationContext.incrementCount():long");
        }

        long getChunkCount() {
            return this.chunkCount;
        }

        void abort() {
            try {
                this.receivedSnapshot.abort();
            } finally {
                this.metrics.decrementCount();
            }
        }

        void persist() {
            try {
                this.receivedSnapshot.persist();
                long currentTimeMillis = System.currentTimeMillis();
                this.metrics.decrementCount();
                this.metrics.observeDuration(currentTimeMillis - this.startTimestamp);
            } catch (Throwable th) {
                long currentTimeMillis2 = System.currentTimeMillis();
                this.metrics.decrementCount();
                this.metrics.observeDuration(currentTimeMillis2 - this.startTimestamp);
                throw th;
            }
        }

        public void apply(SnapshotChunk snapshotChunk) {
            this.receivedSnapshot.apply(snapshotChunk).join();
        }
    }

    public StateControllerImpl(int i, ZeebeDbFactory zeebeDbFactory, ConstructableSnapshotStore constructableSnapshotStore, ReceivableSnapshotStore receivableSnapshotStore, Path path, SnapshotReplication snapshotReplication, AtomixRecordEntrySupplier atomixRecordEntrySupplier, ToLongFunction<ZeebeDb> toLongFunction) {
        this.constructableSnapshotStore = constructableSnapshotStore;
        this.receivableSnapshotStore = receivableSnapshotStore;
        this.runtimeDirectory = path;
        this.zeebeDbFactory = zeebeDbFactory;
        this.exporterPositionSupplier = toLongFunction;
        this.entrySupplier = atomixRecordEntrySupplier;
        this.replication = snapshotReplication;
        this.metrics = new SnapshotReplicationMetrics(Integer.toString(i));
    }

    @Override // io.camunda.zeebe.broker.system.partitions.StateController
    public Optional<TransientSnapshot> takeTransientSnapshot(long j) {
        if (!isDbOpened()) {
            LOG.warn("Expected to take snapshot for last processed position {}, but database was closed.", Long.valueOf(j));
            return Optional.empty();
        }
        long applyAsLong = this.exporterPositionSupplier.applyAsLong(openDb());
        long determineSnapshotPosition = determineSnapshotPosition(j, applyAsLong);
        Optional<IndexedRaftLogEntry> previousIndexedEntry = this.entrySupplier.getPreviousIndexedEntry(determineSnapshotPosition);
        if (previousIndexedEntry.isEmpty()) {
            LOG.warn("Failed to take snapshot. Expected to find an indexed entry for determined snapshot position {} (processedPosition = {}, exportedPosition={}), but found no matching indexed entry which contains this position.", new Object[]{Long.valueOf(determineSnapshotPosition), Long.valueOf(j), Long.valueOf(applyAsLong)});
            return Optional.empty();
        }
        IndexedRaftLogEntry indexedRaftLogEntry = previousIndexedEntry.get();
        Optional<TransientSnapshot> newTransientSnapshot = this.constructableSnapshotStore.newTransientSnapshot(indexedRaftLogEntry.index(), indexedRaftLogEntry.term(), j, applyAsLong);
        newTransientSnapshot.ifPresent(this::takeSnapshot);
        return newTransientSnapshot;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.StateController
    public void consumeReplicatedSnapshots() {
        this.replication.consume(this::consumeSnapshotChunk);
    }

    @Override // io.camunda.zeebe.broker.system.partitions.StateController
    public void recover() throws Exception {
        FileUtil.deleteFolderIfExists(this.runtimeDirectory);
        Optional latestSnapshot = this.constructableSnapshotStore.getLatestSnapshot();
        if (latestSnapshot.isPresent()) {
            PersistedSnapshot persistedSnapshot = (PersistedSnapshot) latestSnapshot.get();
            LOG.debug("Available snapshot: {}", persistedSnapshot);
            FileUtil.copySnapshot(this.runtimeDirectory, persistedSnapshot.getPath());
            try {
                openDb();
                LOG.debug("Recovered state from snapshot '{}'", persistedSnapshot);
            } catch (Exception e) {
                LOG.error("Failed to open snapshot '{}'. No snapshots available to recover from. Manual action is required.", persistedSnapshot, e);
                FileUtil.deleteFolder(this.runtimeDirectory);
                throw new IllegalStateException("Failed to recover from snapshots", e);
            }
        }
    }

    @Override // io.camunda.zeebe.broker.system.partitions.StateController
    public ZeebeDb openDb() {
        if (this.db == null) {
            this.db = this.zeebeDbFactory.createDb(this.runtimeDirectory.toFile());
            LOG.debug("Opened database from '{}'.", this.runtimeDirectory);
        }
        return this.db;
    }

    @Override // io.camunda.zeebe.broker.system.partitions.StateController
    public int getValidSnapshotsCount() {
        return this.constructableSnapshotStore.getLatestSnapshot().isPresent() ? 1 : 0;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.db != null) {
            this.db.close();
            this.db = null;
            LOG.debug("Closed database from '{}'.", this.runtimeDirectory);
        }
        FileUtil.deleteFolderIfExists(this.runtimeDirectory);
    }

    boolean isDbOpened() {
        return this.db != null;
    }

    private ActorFuture<Boolean> takeSnapshot(TransientSnapshot transientSnapshot) {
        return transientSnapshot.take(path -> {
            if (this.db == null) {
                LOG.error("Expected to take a snapshot, but no database was opened");
                return false;
            }
            LOG.debug("Taking temporary snapshot into {}.", path);
            try {
                this.db.createSnapshot(path.toFile());
                return true;
            } catch (Exception e) {
                LOG.error("Failed to create snapshot of runtime database", e);
                return false;
            }
        });
    }

    public void onNewSnapshot(PersistedSnapshot persistedSnapshot) {
        LOG.debug("Start replicating new snapshot {}", persistedSnapshot.getId());
        SnapshotChunkReader newChunkReader = persistedSnapshot.newChunkReader();
        while (newChunkReader.hasNext()) {
            try {
                this.replication.replicate((SnapshotChunk) newChunkReader.next());
            } catch (Throwable th) {
                if (newChunkReader != null) {
                    try {
                        newChunkReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (newChunkReader != null) {
            newChunkReader.close();
        }
    }

    private void consumeSnapshotChunk(SnapshotChunk snapshotChunk) {
        String snapshotId = snapshotChunk.getSnapshotId();
        String chunkName = snapshotChunk.getChunkName();
        ReplicationContext computeIfAbsent = this.receivedSnapshots.computeIfAbsent(snapshotId, str -> {
            long currentTimeMillis = System.currentTimeMillis();
            ReceivedSnapshot newReceivedSnapshot = this.receivableSnapshotStore.newReceivedSnapshot(snapshotChunk.getSnapshotId());
            LOG.debug("Started receiving new snapshot {} with {} chunks.", newReceivedSnapshot.snapshotId(), Integer.valueOf(snapshotChunk.getTotalCount()));
            return newReplication(currentTimeMillis, newReceivedSnapshot);
        });
        if (computeIfAbsent == INVALID_SNAPSHOT) {
            LOG.trace("Ignore snapshot chunk {}, because snapshot {} is marked as invalid.", chunkName, snapshotId);
            return;
        }
        try {
            computeIfAbsent.apply(snapshotChunk);
            validateWhenReceivedAllChunks(snapshotChunk, computeIfAbsent);
        } catch (Exception e) {
            LOG.warn("Unexpected error on writing the received snapshot chunk {}, marking snapshot {} as invalid", new Object[]{snapshotChunk, snapshotId, e});
            markSnapshotAsInvalid(computeIfAbsent, snapshotChunk);
        }
    }

    private void markSnapshotAsInvalid(ReplicationContext replicationContext, SnapshotChunk snapshotChunk) {
        LOG.debug("Abort snapshot {} and mark it as invalid.", snapshotChunk.getSnapshotId());
        replicationContext.abort();
        this.receivedSnapshots.put(snapshotChunk.getSnapshotId(), INVALID_SNAPSHOT);
    }

    private void validateWhenReceivedAllChunks(SnapshotChunk snapshotChunk, ReplicationContext replicationContext) {
        int totalCount = snapshotChunk.getTotalCount();
        if (replicationContext.incrementCount() != totalCount) {
            LOG.trace("Waiting for more snapshot chunks of snapshot {}, currently have {}/{}", new Object[]{snapshotChunk.getSnapshotId(), Long.valueOf(replicationContext.getChunkCount()), Integer.valueOf(totalCount)});
            return;
        }
        LOG.debug("Received all snapshot chunks ({}/{}) of snapshot {}. Committing snapshot.", new Object[]{Long.valueOf(replicationContext.getChunkCount()), Integer.valueOf(totalCount), snapshotChunk.getSnapshotId()});
        if (tryToMarkSnapshotAsValid(snapshotChunk, replicationContext)) {
            return;
        }
        LOG.debug("Failed to commit snapshot {}", snapshotChunk.getSnapshotId());
    }

    private boolean tryToMarkSnapshotAsValid(SnapshotChunk snapshotChunk, ReplicationContext replicationContext) {
        try {
            try {
                replicationContext.persist();
                this.receivedSnapshots.remove(snapshotChunk.getSnapshotId());
                return true;
            } catch (Exception e) {
                markSnapshotAsInvalid(replicationContext, snapshotChunk);
                LOG.warn("Unexpected error on persisting received snapshot.", e);
                this.receivedSnapshots.remove(snapshotChunk.getSnapshotId());
                return false;
            }
        } catch (Throwable th) {
            this.receivedSnapshots.remove(snapshotChunk.getSnapshotId());
            throw th;
        }
    }

    private ReplicationContext newReplication(long j, ReceivedSnapshot receivedSnapshot) {
        return new ReplicationContext(this.metrics, j, receivedSnapshot);
    }

    private long determineSnapshotPosition(long j, long j2) {
        long min = Math.min(j2, j);
        LOG.trace("Based on lowest exporter position '{}' and last processed position '{}', determined '{}' as snapshot position.", new Object[]{Long.valueOf(j2), Long.valueOf(j), Long.valueOf(min)});
        return min;
    }
}
