/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.ScheduledTimer;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import java.time.Duration;

public final class PendingProcessMessageSubscriptionChecker
implements StreamProcessorLifecycleAware {
    private static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(10L);
    private static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30L);
    private final SubscriptionCommandSender commandSender;
    private final MutablePendingProcessMessageSubscriptionState pendingState;
    private final long subscriptionTimeoutInMillis;
    private ActorControl actor;
    private ScheduledTimer timer;

    public PendingProcessMessageSubscriptionChecker(SubscriptionCommandSender commandSender, MutablePendingProcessMessageSubscriptionState pendingState) {
        this.commandSender = commandSender;
        this.pendingState = pendingState;
        this.subscriptionTimeoutInMillis = SUBSCRIPTION_TIMEOUT.toMillis();
    }

    @Override
    public void onRecovered(ReadonlyProcessingContext context) {
        this.actor = context.getActor();
        this.scheduleTimer();
    }

    @Override
    public void onClose() {
        this.cancelTimer();
    }

    @Override
    public void onFailed() {
        this.cancelTimer();
    }

    @Override
    public void onPaused() {
        this.cancelTimer();
    }

    @Override
    public void onResumed() {
        this.scheduleTimer();
    }

    private void scheduleTimer() {
        if (this.timer == null) {
            this.timer = this.actor.runAtFixedRate(SUBSCRIPTION_CHECK_INTERVAL, this::checkPendingSubscriptions);
        }
    }

    private void cancelTimer() {
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
    }

    private void checkPendingSubscriptions() {
        this.pendingState.visitSubscriptionBefore(ActorClock.currentTimeMillis() - this.subscriptionTimeoutInMillis, this::sendPendingCommand);
    }

    private boolean sendPendingCommand(ProcessMessageSubscription subscription) {
        boolean success = subscription.isOpening() ? this.sendOpenCommand(subscription) : this.sendCloseCommand(subscription);
        if (success) {
            long sentTime = ActorClock.currentTimeMillis();
            this.pendingState.updateSentTime(subscription.getRecord(), sentTime);
        }
        return success;
    }

    private boolean sendOpenCommand(ProcessMessageSubscription subscription) {
        return this.commandSender.openMessageSubscription(subscription.getRecord().getSubscriptionPartitionId(), subscription.getRecord().getProcessInstanceKey(), subscription.getRecord().getElementInstanceKey(), subscription.getRecord().getBpmnProcessIdBuffer(), subscription.getRecord().getMessageNameBuffer(), subscription.getRecord().getCorrelationKeyBuffer(), subscription.getRecord().isInterrupting());
    }

    private boolean sendCloseCommand(ProcessMessageSubscription subscription) {
        return this.commandSender.closeMessageSubscription(subscription.getRecord().getSubscriptionPartitionId(), subscription.getRecord().getProcessInstanceKey(), subscription.getRecord().getElementInstanceKey(), subscription.getRecord().getMessageNameBuffer());
    }
}

