/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.util.Preconditions;

public class TimestampsAndWatermarksOperator<T>
extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>,
ProcessingTimeCallback {
    private static final long serialVersionUID = 1L;
    private final WatermarkStrategy<T> watermarkStrategy;
    private transient TimestampAssigner<T> timestampAssigner;
    private transient WatermarkGenerator<T> watermarkGenerator;
    private transient WatermarkOutput wmOutput;
    private transient long watermarkInterval;
    private final boolean emitProgressiveWatermarks;

    public TimestampsAndWatermarksOperator(WatermarkStrategy<T> watermarkStrategy, boolean emitProgressiveWatermarks) {
        this.watermarkStrategy = (WatermarkStrategy)Preconditions.checkNotNull(watermarkStrategy);
        this.emitProgressiveWatermarks = emitProgressiveWatermarks;
        this.chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.timestampAssigner = this.watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
        this.watermarkGenerator = this.emitProgressiveWatermarks ? this.watermarkStrategy.createWatermarkGenerator(this::getMetricGroup) : new NoWatermarksGenerator();
        this.wmOutput = new WatermarkEmitter(this.output, this.getContainingTask().getStreamStatusMaintainer());
        this.watermarkInterval = this.getExecutionConfig().getAutoWatermarkInterval();
        if (this.watermarkInterval > 0L && this.emitProgressiveWatermarks) {
            long now = this.getProcessingTimeService().getCurrentProcessingTime();
            this.getProcessingTimeService().registerTimer(now + this.watermarkInterval, this);
        }
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        T event = element.getValue();
        long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
        long newTimestamp = this.timestampAssigner.extractTimestamp(event, previousTimestamp);
        element.setTimestamp(newTimestamp);
        this.output.collect(element);
        this.watermarkGenerator.onEvent(event, newTimestamp, this.wmOutput);
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        this.watermarkGenerator.onPeriodicEmit(this.wmOutput);
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        this.getProcessingTimeService().registerTimer(now + this.watermarkInterval, this);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (mark.getTimestamp() == Long.MAX_VALUE) {
            this.wmOutput.emitWatermark(org.apache.flink.api.common.eventtime.Watermark.MAX_WATERMARK);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.watermarkGenerator.onPeriodicEmit(this.wmOutput);
    }

    public static final class WatermarkEmitter
    implements WatermarkOutput {
        private final Output<?> output;
        private final StreamStatusMaintainer statusMaintainer;
        private long currentWatermark;
        private boolean idle;

        public WatermarkEmitter(Output<?> output, StreamStatusMaintainer statusMaintainer) {
            this.output = output;
            this.statusMaintainer = statusMaintainer;
            this.currentWatermark = Long.MIN_VALUE;
        }

        public void emitWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
            long ts = watermark.getTimestamp();
            if (ts <= this.currentWatermark) {
                return;
            }
            this.currentWatermark = ts;
            if (this.idle) {
                this.idle = false;
                this.statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
            }
            this.output.emitWatermark(new Watermark(ts));
        }

        public void markIdle() {
            this.idle = true;
            this.statusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
        }
    }
}

