/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceReaderOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;

@Internal
public class SourceReaderStreamTask<T>
extends StreamTask<T, SourceReaderOperator<T>> {
    public SourceReaderStreamTask(Environment env) {
        super(env);
    }

    @Override
    public void init() {
        StreamTaskSourceInput input = new StreamTaskSourceInput((SourceReaderOperator)this.headOperator);
        StreamTaskSourceOutput output = new StreamTaskSourceOutput(this.operatorChain.getChainEntryPoint(), this.getStreamStatusMaintainer(), this.getCheckpointLock());
        this.inputProcessor = new StreamOneInputProcessor(input, output, this.getCheckpointLock(), this.operatorChain);
    }

    private static class StreamTaskSourceOutput<T>
    extends AbstractDataOutput<T> {
        private final Output<StreamRecord<T>> output;

        StreamTaskSourceOutput(Output<StreamRecord<T>> output, StreamStatusMaintainer streamStatusMaintainer, Object lock) {
            super(streamStatusMaintainer, lock);
            this.output = (Output)Preconditions.checkNotNull(output);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitRecord(StreamRecord<T> streamRecord) {
            Object object = this.lock;
            synchronized (object) {
                this.output.collect(streamRecord);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            Object object = this.lock;
            synchronized (object) {
                this.output.emitLatencyMarker(latencyMarker);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitWatermark(Watermark watermark) {
            Object object = this.lock;
            synchronized (object) {
                this.output.emitWatermark(watermark);
            }
        }
    }
}

