/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.consumer;

import com.taobao.gecko.core.util.ConcurrentHashSet;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.MessageAccessor;
import com.taobao.metamorphosis.client.consumer.ConcurrentLRUHashMap;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.FetchManager;
import com.taobao.metamorphosis.client.consumer.FetchRequest;
import com.taobao.metamorphosis.client.consumer.FetchRequestQueue;
import com.taobao.metamorphosis.client.consumer.InnerConsumer;
import com.taobao.metamorphosis.client.consumer.MessageIdCache;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.client.consumer.TopicPartitionRegInfo;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.consumer.ConsumerMessageFilter;
import com.taobao.metamorphosis.consumer.MessageIterator;
import com.taobao.metamorphosis.exception.InvalidMessageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.MetaStatLog;
import java.util.ArrayList;
import java.util.concurrent.RejectedExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class SimpleFetchManager
implements FetchManager {
    private volatile boolean shutdown = false;
    private Thread[] fetchThreads;
    private FetchRequestRunner[] requestRunners;
    private volatile int fetchRequestCount;
    private FetchRequestQueue requestQueue;
    private final ConsumerConfig consumerConfig;
    private static final ThreadLocal<TopicPartitionRegInfo> currentTopicRegInfo = new ThreadLocal();
    private final InnerConsumer consumer;
    public static final Byte PROCESSED = 1;
    private static final int CACAHE_SIZE = Integer.parseInt(System.getProperty("metaq.consumer.message_ids.lru_cache.size", "4096"));
    private static MessageIdCache messageIdCache = new ConcurrentLRUHashMap(CACAHE_SIZE);
    static final Log log = LogFactory.getLog(SimpleFetchManager.class);

    public static void setMessageIdCache(MessageIdCache newCache) {
        messageIdCache = newCache;
    }

    MessageIdCache getMessageIdCache() {
        return messageIdCache;
    }

    public SimpleFetchManager(ConsumerConfig consumerConfig, InnerConsumer consumer) {
        this.consumerConfig = consumerConfig;
        this.consumer = consumer;
    }

    public static TopicPartitionRegInfo currentTopicRegInfo() {
        return currentTopicRegInfo.get();
    }

    @Override
    public int getFetchRequestCount() {
        return this.fetchRequestCount;
    }

    @Override
    public boolean isShutdown() {
        return this.shutdown;
    }

    @Override
    public void stopFetchRunner() throws InterruptedException {
        this.shutdown = true;
        this.interruptRunners();
        if (this.requestQueue != null) {
            while (this.requestQueue.size() < this.fetchRequestCount) {
                this.interruptRunners();
            }
        }
        this.fetchRequestCount = 0;
    }

    private void interruptRunners() {
        if (this.fetchThreads != null) {
            for (int i = 0; i < this.fetchThreads.length; ++i) {
                Thread thread = this.fetchThreads[i];
                FetchRequestRunner runner = this.requestRunners[i];
                if (thread == null) continue;
                runner.shutdown();
                runner.interruptExecutor();
                thread.interrupt();
                try {
                    thread.join(100L);
                    continue;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    @Override
    public void resetFetchState() {
        this.fetchRequestCount = 0;
        this.requestQueue = new FetchRequestQueue();
        this.fetchThreads = new Thread[this.consumerConfig.getFetchRunnerCount()];
        this.requestRunners = new FetchRequestRunner[this.consumerConfig.getFetchRunnerCount()];
        for (int i = 0; i < this.fetchThreads.length; ++i) {
            FetchRequestRunner runner;
            this.requestRunners[i] = runner = new FetchRequestRunner();
            this.fetchThreads[i] = new Thread(runner);
            this.fetchThreads[i].setName(this.consumerConfig.getGroup() + "-fetch-Runner-" + i);
        }
    }

    @Override
    public void startFetchRunner() {
        this.fetchRequestCount = this.requestQueue.size();
        this.shutdown = false;
        for (Thread thread : this.fetchThreads) {
            thread.start();
        }
    }

    @Override
    public void addFetchRequest(FetchRequest request) {
        this.requestQueue.offer(request);
    }

    FetchRequest takeFetchRequest() throws InterruptedException {
        return this.requestQueue.take();
    }

    boolean isRetryTooMany(FetchRequest request) {
        return request.getRetries() > this.consumerConfig.getMaxFetchRetries();
    }

    boolean isRetryTooManyForIncrease(FetchRequest request) {
        return request.getRetries() > this.consumerConfig.getMaxIncreaseFetchDataRetries();
    }

    long getMaxDelayFetchTimeInMills() {
        return this.consumerConfig.getMaxDelayFetchTimeInMills();
    }

    class FetchRequestRunner
    implements Runnable {
        private static final int DELAY_NPARTS = 10;
        private volatile boolean stopped = false;
        private long lastLogNoConnectionTime;
        private final ConcurrentHashSet<Thread> executorThreads = new ConcurrentHashSet();

        FetchRequestRunner() {
        }

        void shutdown() {
            this.stopped = true;
        }

        @Override
        public void run() {
            while (!this.stopped) {
                try {
                    FetchRequest request = SimpleFetchManager.this.requestQueue.take();
                    this.processRequest(request);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        void processRequest(FetchRequest request) {
            try {
                MessageIterator iterator = SimpleFetchManager.this.consumer.fetch(request, -1L, null);
                MessageListener listener = SimpleFetchManager.this.consumer.getMessageListener(request.getTopic());
                ConsumerMessageFilter filter = SimpleFetchManager.this.consumer.getMessageFilter(request.getTopic());
                this.notifyListener(request, iterator, listener, filter, SimpleFetchManager.this.consumer.getConsumerConfig().getGroup());
            }
            catch (MetaClientException e) {
                this.updateDelay(request);
                this.LogAddRequest(request, e);
            }
            catch (InterruptedException e) {
                this.reAddFetchRequest2Queue(request);
            }
            catch (Throwable e) {
                this.updateDelay(request);
                this.LogAddRequest(request, e);
            }
        }

        private void LogAddRequest(FetchRequest request, Throwable e) {
            if (e instanceof MetaClientException && e.getCause() instanceof NotifyRemotingException && e.getMessage().contains("\u65e0\u53ef\u7528\u8fde\u63a5")) {
                long now = System.currentTimeMillis();
                if (this.lastLogNoConnectionTime <= 0L || now - this.lastLogNoConnectionTime > 30000L) {
                    log.error((Object)("\u83b7\u53d6\u6d88\u606f\u5931\u8d25,topic=" + request.getTopic() + ",partition=" + request.getPartition()), e);
                    this.lastLogNoConnectionTime = now;
                }
            } else {
                log.error((Object)("\u83b7\u53d6\u6d88\u606f\u5931\u8d25,topic=" + request.getTopic() + ",partition=" + request.getPartition()), e);
            }
            this.reAddFetchRequest2Queue(request);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void getOffsetAddRequest(FetchRequest request, InvalidMessageException e) {
            try {
                long newOffset = SimpleFetchManager.this.consumer.offset(request);
                request.resetRetries();
                if (!this.stopped) {
                    request.setOffset(newOffset, request.getLastMessageId(), request.getPartitionObject().isAutoAck());
                }
            }
            catch (MetaClientException ex) {
                log.error((Object)("\u67e5\u8be2offset\u5931\u8d25,topic=" + request.getTopic() + ",partition=" + request.getPartition()), (Throwable)e);
            }
            finally {
                this.reAddFetchRequest2Queue(request);
            }
        }

        public void interruptExecutor() {
            for (Thread thread : this.executorThreads) {
                if (thread.isInterrupted()) continue;
                thread.interrupt();
            }
        }

        private void notifyListener(final FetchRequest request, final MessageIterator it, final MessageListener listener, final ConsumerMessageFilter filter, final String group) {
            if (listener != null) {
                if (listener.getExecutor() != null) {
                    try {
                        listener.getExecutor().execute(new Runnable(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void run() {
                                Thread currentThread = Thread.currentThread();
                                FetchRequestRunner.this.executorThreads.add((Object)currentThread);
                                try {
                                    FetchRequestRunner.this.receiveMessages(request, it, listener, filter, group);
                                }
                                finally {
                                    FetchRequestRunner.this.executorThreads.remove((Object)currentThread);
                                }
                            }
                        });
                    }
                    catch (RejectedExecutionException e) {
                        log.error((Object)("MessageListener\u7ebf\u7a0b\u6c60\u7e41\u5fd9\uff0c\u65e0\u6cd5\u5904\u7406\u6d88\u606f,topic=" + request.getTopic() + ",partition=" + request.getPartition()), (Throwable)e);
                        this.reAddFetchRequest2Queue(request);
                    }
                } else {
                    this.receiveMessages(request, it, listener, filter, group);
                }
            }
        }

        private void reAddFetchRequest2Queue(FetchRequest request) {
            SimpleFetchManager.this.addFetchRequest(request);
        }

        private void receiveMessages(FetchRequest request, MessageIterator it, MessageListener listener, ConsumerMessageFilter filter, String group) {
            if (it != null && it.hasNext()) {
                if (this.processWhenRetryTooMany(request, it)) {
                    return;
                }
                Partition partition = request.getPartitionObject();
                if (this.processReceiveMessage(request, it, listener, filter, partition, group)) {
                    return;
                }
                this.postReceiveMessage(request, it, partition);
            } else {
                if (SimpleFetchManager.this.isRetryTooManyForIncrease(request) && it != null && it.getDataLength() > 0) {
                    request.increaseMaxSize();
                    log.warn((Object)("\u8b66\u544a\uff0c\u7b2c" + request.getRetries() + "\u6b21\u65e0\u6cd5\u62c9\u53d6topic=" + request.getTopic() + ",partition=" + request.getPartitionObject() + "\u7684\u6d88\u606f\uff0c\u9012\u589emaxSize=" + request.getMaxSize() + " Bytes"));
                }
                if (it != null) {
                    request.incrementRetriesAndGet();
                }
                this.updateDelay(request);
                this.reAddFetchRequest2Queue(request);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean processReceiveMessage(FetchRequest request, MessageIterator it, MessageListener listener, ConsumerMessageFilter filter, Partition partition, String group) {
            int count = 0;
            ArrayList<Long> inTransactionMsgIds = new ArrayList<Long>();
            while (it.hasNext()) {
                int prevOffset = it.getOffset();
                try {
                    Message msg = it.next();
                    if (this.isProcessed(msg.getId(), group)) continue;
                    MessageAccessor.setPartition((Message)msg, (Partition)partition);
                    boolean accept = this.isAcceptable(request, filter, group, msg);
                    if (accept) {
                        currentTopicRegInfo.set(request.getTopicPartitionRegInfo().clone(it));
                        try {
                            listener.recieveMessages(msg);
                        }
                        finally {
                            currentTopicRegInfo.remove();
                        }
                    }
                    if (MessageAccessor.isRollbackOnly((Message)msg)) {
                        it.setOffset(prevOffset);
                        break;
                    }
                    if (partition.isAutoAck()) {
                        ++count;
                        this.markProcessed(msg.getId(), group);
                        continue;
                    }
                    if (partition.isAcked()) {
                        ++count;
                        for (Long msgId : inTransactionMsgIds) {
                            this.markProcessed(msgId, group);
                        }
                        this.markProcessed(msg.getId(), group);
                        break;
                    }
                    if (partition.isRollback()) break;
                    inTransactionMsgIds.add(msg.getId());
                    ++count;
                }
                catch (InterruptedException e) {
                    it.setOffset(prevOffset);
                    log.error((Object)("Process messages thread was interrupted,topic=" + request.getTopic() + ",partition=" + request.getPartition()), (Throwable)e);
                    break;
                }
                catch (InvalidMessageException e) {
                    MetaStatLog.addStat(null, (String)"cli_invalid_message", (String)request.getTopic());
                    this.getOffsetAddRequest(request, e);
                    return true;
                }
                catch (Throwable e) {
                    it.setOffset(prevOffset);
                    log.error((Object)("Process messages failed,topic=" + request.getTopic() + ",partition=" + request.getPartition()), e);
                    break;
                }
            }
            MetaStatLog.addStatValue2(null, (String)"cli_get_msg_count", (String)request.getTopic(), (long)count);
            return false;
        }

        private boolean isProcessed(Long id, String group) {
            if (messageIdCache != null) {
                return messageIdCache.get(this.cacheKey(id, group)) != null;
            }
            return false;
        }

        private String cacheKey(Long id, String group) {
            return group + id;
        }

        private void markProcessed(Long msgId, String group) {
            if (messageIdCache != null) {
                messageIdCache.put(this.cacheKey(msgId, group), PROCESSED);
            }
        }

        private boolean isAcceptable(FetchRequest request, ConsumerMessageFilter filter, String group, Message msg) {
            if (filter == null) {
                return true;
            }
            try {
                return filter.accept(group, msg);
            }
            catch (Exception e) {
                log.error((Object)("Filter message failed,topic=" + request.getTopic() + ",group=" + group + ",filterClass=" + filter.getClass().getCanonicalName()));
                return false;
            }
        }

        private boolean processWhenRetryTooMany(FetchRequest request, MessageIterator it) {
            if (SimpleFetchManager.this.isRetryTooMany(request)) {
                try {
                    Message couldNotProecssMsg = it.next();
                    MessageAccessor.setPartition((Message)couldNotProecssMsg, (Partition)request.getPartitionObject());
                    MetaStatLog.addStat(null, (String)"cli_skip_msg_count", (String)couldNotProecssMsg.getTopic());
                    SimpleFetchManager.this.consumer.appendCouldNotProcessMessage(couldNotProecssMsg);
                }
                catch (InvalidMessageException e) {
                    MetaStatLog.addStat(null, (String)"cli_invalid_message", (String)request.getTopic());
                    this.getOffsetAddRequest(request, e);
                    return true;
                }
                catch (Throwable t) {
                    this.LogAddRequest(request, t);
                    return true;
                }
                request.resetRetries();
                if (!this.stopped) {
                    request.setOffset(request.getOffset() + (long)it.getOffset(), it.getPrevMessage().getId(), true);
                }
                request.setDelay(0L);
                this.reAddFetchRequest2Queue(request);
                return true;
            }
            return false;
        }

        private void postReceiveMessage(FetchRequest request, MessageIterator it, Partition partition) {
            if (it.getOffset() == 0) {
                request.incrementRetriesAndGet();
            } else {
                request.resetRetries();
            }
            if (!partition.isAutoAck()) {
                if (partition.isRollback()) {
                    request.rollbackOffset();
                    partition.reset();
                    this.addRequst(request);
                } else if (partition.isAcked()) {
                    partition.reset();
                    this.ackRequest(request, it, true);
                } else {
                    this.ackRequest(request, it, false);
                }
            } else {
                this.ackRequest(request, it, true);
            }
        }

        private void ackRequest(FetchRequest request, MessageIterator it, boolean ack) {
            long msgId = it.getPrevMessage() != null ? it.getPrevMessage().getId() : -1L;
            request.setOffset(request.getOffset() + (long)it.getOffset(), msgId, ack);
            this.addRequst(request);
        }

        private void addRequst(FetchRequest request) {
            long delay = this.getRetryDelay(request);
            request.setDelay(delay);
            this.reAddFetchRequest2Queue(request);
        }

        private long getRetryDelay(FetchRequest request) {
            long maxDelayFetchTimeInMills = SimpleFetchManager.this.getMaxDelayFetchTimeInMills();
            long nPartsDelayTime = maxDelayFetchTimeInMills / 10L;
            long delay = nPartsDelayTime * (long)request.getRetries();
            if (delay > maxDelayFetchTimeInMills) {
                delay = maxDelayFetchTimeInMills;
            }
            return delay;
        }

        private void updateDelay(FetchRequest request) {
            long delay = this.getNextDelay(request);
            request.setDelay(delay);
        }

        private long getNextDelay(FetchRequest request) {
            long maxDelayFetchTimeInMills = SimpleFetchManager.this.getMaxDelayFetchTimeInMills();
            long nPartsDelayTime = maxDelayFetchTimeInMills / 10L;
            long delay = request.getDelay() + nPartsDelayTime;
            if (delay > maxDelayFetchTimeInMills) {
                delay = maxDelayFetchTimeInMills;
            }
            return delay;
        }
    }
}

