/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.util.Assert;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.GroupedObservable;
import rx.observables.MathObservable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

public class WindowingOffsetManager
implements OffsetManager,
InitializingBean,
DisposableBean {
    private final CreatePartitionAndOffsetFunction createPartitionAndOffsetFunction = new CreatePartitionAndOffsetFunction();
    private final GetOffsetFunction getOffsetFunction = new GetOffsetFunction();
    private final ComputeMaximumOffsetByPartitionFunction findHighestOffsetInPartitionGroup = new ComputeMaximumOffsetByPartitionFunction();
    private final GetPartitionFunction getPartition = new GetPartitionFunction();
    private final FindHighestOffsetsByPartitionFunction findHighestOffsetsByPartition = new FindHighestOffsetsByPartitionFunction();
    private final DelegateUpdateOffsetAction delegateUpdateOffsetAction = new DelegateUpdateOffsetAction();
    private final NotifyObservableClosedAction notifyObservableClosed = new NotifyObservableClosedAction();
    private final OffsetManager delegate;
    private long timespan = 10000L;
    private int count;
    private Subject<PartitionAndOffset, PartitionAndOffset> offsets;
    private Subscription subscription;
    private int shutdownTimeout = 2000;
    private CountDownLatch shutdownLatch;

    public WindowingOffsetManager(OffsetManager offsetManager) {
        this.delegate = offsetManager;
    }

    public void setTimespan(long timespan) {
        Assert.isTrue((timespan >= 0L ? 1 : 0) != 0, (String)"Timespan must be a positive value");
        this.timespan = timespan;
    }

    public void setCount(int count) {
        Assert.isTrue((count >= 0 ? 1 : 0) != 0, (String)"Count must be a positive value");
        this.count = count;
    }

    public void setShutdownTimeout(int shutdownTimeout) {
        this.shutdownTimeout = shutdownTimeout;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.isTrue((boolean)(this.timespan > 0L ^ this.count > 0), (String)"Only one of the timespan or count must be set");
        if (this.timespan > 0L || this.count > 1) {
            this.offsets = new SerializedSubject((Subject)PublishSubject.create());
            Observable window = this.timespan > 0L ? this.offsets.window(this.timespan, TimeUnit.MILLISECONDS) : this.offsets.window(this.count);
            Observable maximumOffsetsByWindow = window.flatMap((Func1)this.findHighestOffsetsByPartition).doOnCompleted((Action0)this.notifyObservableClosed);
            this.subscription = maximumOffsetsByWindow.subscribe((Action1)this.delegateUpdateOffsetAction);
        } else {
            this.offsets = null;
        }
    }

    public void destroy() throws Exception {
        this.flush();
        this.close();
        if (this.delegate instanceof DisposableBean) {
            ((DisposableBean)this.delegate).destroy();
        }
    }

    public void updateOffset(Partition partition, long offset) {
        if (this.offsets != null) {
            this.offsets.onNext((Object)new PartitionAndOffset(partition, offset));
        } else {
            this.delegate.updateOffset(partition, offset);
        }
    }

    public long getOffset(Partition partition) {
        return this.delegate.getOffset(partition);
    }

    public void deleteOffset(Partition partition) {
        this.delegate.deleteOffset(partition);
    }

    public void resetOffsets(Collection<Partition> partition) {
        this.delegate.resetOffsets(partition);
    }

    public void close() throws IOException {
        if (this.offsets != null) {
            this.shutdownLatch = new CountDownLatch(1);
            this.offsets.onCompleted();
            try {
                this.shutdownLatch.await(this.shutdownTimeout, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.subscription.unsubscribe();
        }
        this.delegate.close();
    }

    public void flush() throws IOException {
        this.delegate.flush();
    }

    private class FindHighestOffsetsByPartitionFunction
    implements Func1<Observable<PartitionAndOffset>, Observable<PartitionAndOffset>> {
        private FindHighestOffsetsByPartitionFunction() {
        }

        public Observable<PartitionAndOffset> call(Observable<PartitionAndOffset> windowBuffer) {
            return windowBuffer.groupBy((Func1)WindowingOffsetManager.this.getPartition).flatMap((Func1)WindowingOffsetManager.this.findHighestOffsetInPartitionGroup);
        }
    }

    private class GetPartitionFunction
    implements Func1<PartitionAndOffset, Partition> {
        private GetPartitionFunction() {
        }

        public Partition call(PartitionAndOffset partitionAndOffset) {
            return partitionAndOffset.getPartition();
        }
    }

    private class ComputeMaximumOffsetByPartitionFunction
    implements Func1<GroupedObservable<Partition, PartitionAndOffset>, Observable<PartitionAndOffset>> {
        private ComputeMaximumOffsetByPartitionFunction() {
        }

        public Observable<PartitionAndOffset> call(GroupedObservable<Partition, PartitionAndOffset> group) {
            return Observable.zip((Observable)Observable.just((Object)group.getKey()), (Observable)MathObservable.max((Observable)group.map((Func1)WindowingOffsetManager.this.getOffsetFunction)), (Func2)WindowingOffsetManager.this.createPartitionAndOffsetFunction);
        }
    }

    private class GetOffsetFunction
    implements Func1<PartitionAndOffset, Long> {
        private GetOffsetFunction() {
        }

        public Long call(PartitionAndOffset partitionAndOffset) {
            return partitionAndOffset.getOffset();
        }
    }

    private class CreatePartitionAndOffsetFunction
    implements Func2<Partition, Long, PartitionAndOffset> {
        private CreatePartitionAndOffsetFunction() {
        }

        public PartitionAndOffset call(Partition partition, Long offset) {
            return new PartitionAndOffset(partition, offset);
        }
    }

    private class NotifyObservableClosedAction
    implements Action0 {
        private NotifyObservableClosedAction() {
        }

        public void call() {
            if (WindowingOffsetManager.this.shutdownLatch != null) {
                WindowingOffsetManager.this.shutdownLatch.countDown();
            }
        }
    }

    private class DelegateUpdateOffsetAction
    implements Action1<PartitionAndOffset> {
        private DelegateUpdateOffsetAction() {
        }

        public void call(PartitionAndOffset partitionAndOffset) {
            WindowingOffsetManager.this.delegate.updateOffset(partitionAndOffset.getPartition(), partitionAndOffset.getOffset().longValue());
        }
    }

    private final class PartitionAndOffset {
        private final Partition partition;
        private final Long offset;

        private PartitionAndOffset(Partition partition, Long offset) {
            this.partition = partition;
            this.offset = offset;
        }

        public Partition getPartition() {
            return this.partition;
        }

        public Long getOffset() {
            return this.offset;
        }
    }
}

