/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.runtime.io.BufferStorage;
import org.apache.flink.streaming.runtime.io.CachedBufferStorage;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.EmptyBufferStorage;
import org.apache.flink.streaming.runtime.io.LinkedBufferStorage;
import org.apache.flink.util.Preconditions;

@Internal
public class InputProcessorUtil {
    public static CheckpointedInputGate createCheckpointedInputGate(AbstractInvokable toNotifyOnCheckpoint, CheckpointingMode checkpointMode, IOManager ioManager, InputGate inputGate, Configuration taskManagerConfig, String taskName) throws IOException {
        int pageSize = ConfigurationParserUtils.getPageSize((Configuration)taskManagerConfig);
        BufferStorage bufferStorage = InputProcessorUtil.createBufferStorage(checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
        CheckpointBarrierHandler barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(checkpointMode, inputGate.getNumberOfInputChannels(), taskName, toNotifyOnCheckpoint);
        return new CheckpointedInputGate(inputGate, bufferStorage, barrierHandler);
    }

    public static CheckpointedInputGate[] createCheckpointedInputGatePair(AbstractInvokable toNotifyOnCheckpoint, CheckpointingMode checkpointMode, IOManager ioManager, InputGate inputGate1, InputGate inputGate2, Configuration taskManagerConfig, TaskIOMetricGroup taskIOMetricGroup, String taskName) throws IOException {
        int pageSize = ConfigurationParserUtils.getPageSize((Configuration)taskManagerConfig);
        BufferStorage mainBufferStorage1 = InputProcessorUtil.createBufferStorage(checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
        BufferStorage mainBufferStorage2 = InputProcessorUtil.createBufferStorage(checkpointMode, ioManager, pageSize, taskManagerConfig, taskName);
        Preconditions.checkState((mainBufferStorage1.getMaxBufferedBytes() == mainBufferStorage2.getMaxBufferedBytes() ? 1 : 0) != 0);
        LinkedBufferStorage linkedBufferStorage1 = new LinkedBufferStorage(mainBufferStorage1, mainBufferStorage2, mainBufferStorage1.getMaxBufferedBytes());
        LinkedBufferStorage linkedBufferStorage2 = new LinkedBufferStorage(mainBufferStorage2, mainBufferStorage1, mainBufferStorage1.getMaxBufferedBytes());
        CheckpointBarrierHandler barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(checkpointMode, inputGate1.getNumberOfInputChannels() + inputGate2.getNumberOfInputChannels(), taskName, toNotifyOnCheckpoint);
        taskIOMetricGroup.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);
        return new CheckpointedInputGate[]{new CheckpointedInputGate(inputGate1, linkedBufferStorage1, barrierHandler), new CheckpointedInputGate(inputGate2, (BufferStorage)linkedBufferStorage2, barrierHandler, inputGate1.getNumberOfInputChannels())};
    }

    private static CheckpointBarrierHandler createCheckpointBarrierHandler(CheckpointingMode checkpointMode, int numberOfInputChannels, String taskName, AbstractInvokable toNotifyOnCheckpoint) {
        switch (checkpointMode) {
            case EXACTLY_ONCE: {
                return new CheckpointBarrierAligner(numberOfInputChannels, taskName, toNotifyOnCheckpoint);
            }
            case AT_LEAST_ONCE: {
                return new CheckpointBarrierTracker(numberOfInputChannels, toNotifyOnCheckpoint);
            }
        }
        throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + (Object)((Object)checkpointMode));
    }

    private static BufferStorage createBufferStorage(CheckpointingMode checkpointMode, IOManager ioManager, int pageSize, Configuration taskManagerConfig, String taskName) {
        switch (checkpointMode) {
            case EXACTLY_ONCE: {
                long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
                if (maxAlign != -1L && maxAlign <= 0L) {
                    throw new IllegalConfigurationException(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key() + " must be positive or -1 (infinite)");
                }
                return new CachedBufferStorage(pageSize, maxAlign, taskName);
            }
            case AT_LEAST_ONCE: {
                return new EmptyBufferStorage();
            }
        }
        throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + (Object)((Object)checkpointMode));
    }
}

