/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.channel;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Schedulers;

public class FluxMessageChannel
extends AbstractMessageChannel
implements Publisher<Message<?>>,
ReactiveStreamsSubscribableChannel {
    private final EmitterProcessor<Message<?>> processor;
    private final FluxSink<Message<?>> sink;
    private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create((int)1);
    private final Disposable.Composite upstreamSubscriptions = Disposables.composite();

    public FluxMessageChannel() {
        this.processor = EmitterProcessor.create((int)1, (boolean)false);
        this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    @Override
    protected boolean doSend(Message<?> message, long timeout) {
        Assert.state((boolean)this.processor.hasDownstreams(), () -> "The [" + this + "] doesn't have subscribers to accept messages");
        this.sink.next(message);
        return true;
    }

    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        this.processor.doFinally(s -> this.subscribedSignal.onNext((Object)this.processor.hasDownstreams())).subscribe(subscriber);
        this.subscribedSignal.onNext((Object)this.processor.hasDownstreams());
    }

    @Override
    public void subscribeTo(Publisher<? extends Message<?>> publisher) {
        this.upstreamSubscriptions.add(Flux.from(publisher).delaySubscription((Publisher)this.subscribedSignal.filter(Boolean::booleanValue).next()).publishOn(Schedulers.boundedElastic()).doOnNext(message -> {
            try {
                this.send((Message<?>)message);
            }
            catch (Exception ex) {
                this.logger.warn((Object)("Error during processing event: " + message), (Throwable)ex);
            }
        }).subscribe());
    }

    @Override
    public void destroy() {
        this.subscribedSignal.onNext((Object)false);
        this.upstreamSubscriptions.dispose();
        this.processor.onComplete();
        super.destroy();
    }
}

