package com.taobao.metamorphosis.client.consumer;

import com.taobao.gecko.core.util.ConcurrentHashSet;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.MessageAccessor;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.consumer.ConsumerMessageFilter;
import com.taobao.metamorphosis.exception.InvalidMessageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.MetaStatLog;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.RejectedExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/consumer/SimpleFetchManager.class */
public class SimpleFetchManager implements FetchManager {
    private volatile boolean shutdown = false;
    private Thread[] fetchThreads;
    private FetchRequestRunner[] requestRunners;
    private volatile int fetchRequestCount;
    private FetchRequestQueue requestQueue;
    private final ConsumerConfig consumerConfig;
    private final InnerConsumer consumer;
    private static final ThreadLocal<TopicPartitionRegInfo> currentTopicRegInfo = new ThreadLocal<>();
    public static final Byte PROCESSED = (byte) 1;
    private static final int CACAHE_SIZE = Integer.parseInt(System.getProperty("metaq.consumer.message_ids.lru_cache.size", "4096"));
    private static MessageIdCache messageIdCache = new ConcurrentLRUHashMap(CACAHE_SIZE);
    static final Log log = LogFactory.getLog(SimpleFetchManager.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/taobao/metamorphosis/client/consumer/SimpleFetchManager$FetchRequestRunner.class */
    public class FetchRequestRunner implements Runnable {
        private static final int DELAY_NPARTS = 10;
        private long lastLogNoConnectionTime;
        private volatile boolean stopped = false;
        private final ConcurrentHashSet<Thread> executorThreads = new ConcurrentHashSet<>();

        FetchRequestRunner() {
        }

        void shutdown() {
            this.stopped = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                try {
                    processRequest(SimpleFetchManager.this.requestQueue.take());
                } catch (InterruptedException e) {
                }
            }
        }

        void processRequest(FetchRequest fetchRequest) {
            try {
                notifyListener(fetchRequest, SimpleFetchManager.this.consumer.fetch(fetchRequest, -1L, null), SimpleFetchManager.this.consumer.getMessageListener(fetchRequest.getTopic()), SimpleFetchManager.this.consumer.getMessageFilter(fetchRequest.getTopic()), SimpleFetchManager.this.consumer.getConsumerConfig().getGroup());
            } catch (MetaClientException e) {
                updateDelay(fetchRequest);
                LogAddRequest(fetchRequest, e);
            } catch (InterruptedException e2) {
                reAddFetchRequest2Queue(fetchRequest);
            } catch (Throwable th) {
                updateDelay(fetchRequest);
                LogAddRequest(fetchRequest, th);
            }
        }

        private void LogAddRequest(FetchRequest fetchRequest, Throwable th) {
            if ((th instanceof MetaClientException) && (th.getCause() instanceof NotifyRemotingException) && th.getMessage().contains("无可用连接")) {
                long currentTimeMillis = System.currentTimeMillis();
                if (this.lastLogNoConnectionTime <= 0 || currentTimeMillis - this.lastLogNoConnectionTime > 30000) {
                    SimpleFetchManager.log.error("获取消息失败,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), th);
                    this.lastLogNoConnectionTime = currentTimeMillis;
                }
            } else {
                SimpleFetchManager.log.error("获取消息失败,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), th);
            }
            reAddFetchRequest2Queue(fetchRequest);
        }

        private void getOffsetAddRequest(FetchRequest fetchRequest, InvalidMessageException invalidMessageException) {
            try {
                try {
                    long offset = SimpleFetchManager.this.consumer.offset(fetchRequest);
                    fetchRequest.resetRetries();
                    if (!this.stopped) {
                        fetchRequest.setOffset(offset, fetchRequest.getLastMessageId(), fetchRequest.getPartitionObject().isAutoAck());
                    }
                    reAddFetchRequest2Queue(fetchRequest);
                } catch (MetaClientException e) {
                    SimpleFetchManager.log.error("查询offset失败,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), invalidMessageException);
                    reAddFetchRequest2Queue(fetchRequest);
                }
            } catch (Throwable th) {
                reAddFetchRequest2Queue(fetchRequest);
                throw th;
            }
        }

        public void interruptExecutor() {
            Iterator it = this.executorThreads.iterator();
            while (it.hasNext()) {
                Thread thread = (Thread) it.next();
                if (!thread.isInterrupted()) {
                    thread.interrupt();
                }
            }
        }

        private void notifyListener(final FetchRequest fetchRequest, final com.taobao.metamorphosis.consumer.MessageIterator messageIterator, final MessageListener messageListener, final ConsumerMessageFilter consumerMessageFilter, final String str) {
            if (messageListener != null) {
                if (messageListener.getExecutor() == null) {
                    receiveMessages(fetchRequest, messageIterator, messageListener, consumerMessageFilter, str);
                    return;
                }
                try {
                    messageListener.getExecutor().execute(new Runnable() { // from class: com.taobao.metamorphosis.client.consumer.SimpleFetchManager.FetchRequestRunner.1
                        @Override // java.lang.Runnable
                        public void run() {
                            Thread currentThread = Thread.currentThread();
                            FetchRequestRunner.this.executorThreads.add(currentThread);
                            try {
                                FetchRequestRunner.this.receiveMessages(fetchRequest, messageIterator, messageListener, consumerMessageFilter, str);
                                FetchRequestRunner.this.executorThreads.remove(currentThread);
                            } catch (Throwable th) {
                                FetchRequestRunner.this.executorThreads.remove(currentThread);
                                throw th;
                            }
                        }
                    });
                } catch (RejectedExecutionException e) {
                    SimpleFetchManager.log.error("MessageListener线程池繁忙，无法处理消息,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), e);
                    reAddFetchRequest2Queue(fetchRequest);
                }
            }
        }

        private void reAddFetchRequest2Queue(FetchRequest fetchRequest) {
            SimpleFetchManager.this.addFetchRequest(fetchRequest);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void receiveMessages(FetchRequest fetchRequest, com.taobao.metamorphosis.consumer.MessageIterator messageIterator, MessageListener messageListener, ConsumerMessageFilter consumerMessageFilter, String str) {
            if (messageIterator != null && messageIterator.hasNext()) {
                if (processWhenRetryTooMany(fetchRequest, messageIterator)) {
                    return;
                }
                Partition partitionObject = fetchRequest.getPartitionObject();
                if (processReceiveMessage(fetchRequest, messageIterator, messageListener, consumerMessageFilter, partitionObject, str)) {
                    return;
                }
                postReceiveMessage(fetchRequest, messageIterator, partitionObject);
                return;
            }
            if (SimpleFetchManager.this.isRetryTooManyForIncrease(fetchRequest) && messageIterator != null && messageIterator.getDataLength() > 0) {
                fetchRequest.increaseMaxSize();
                SimpleFetchManager.log.warn("警告，第" + fetchRequest.getRetries() + "次无法拉取topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartitionObject() + "的消息，递增maxSize=" + fetchRequest.getMaxSize() + " Bytes");
            }
            if (messageIterator != null) {
                fetchRequest.incrementRetriesAndGet();
            }
            updateDelay(fetchRequest);
            reAddFetchRequest2Queue(fetchRequest);
        }

        private boolean processReceiveMessage(FetchRequest fetchRequest, com.taobao.metamorphosis.consumer.MessageIterator messageIterator, MessageListener messageListener, ConsumerMessageFilter consumerMessageFilter, Partition partition, String str) {
            int i = 0;
            ArrayList arrayList = new ArrayList();
            while (true) {
                if (!messageIterator.hasNext()) {
                    break;
                }
                int offset = messageIterator.getOffset();
                try {
                    Message next = messageIterator.next();
                    if (!isProcessed(Long.valueOf(next.getId()), str)) {
                        MessageAccessor.setPartition(next, partition);
                        if (isAcceptable(fetchRequest, consumerMessageFilter, str, next)) {
                            SimpleFetchManager.currentTopicRegInfo.set(fetchRequest.getTopicPartitionRegInfo().clone(messageIterator));
                            try {
                                messageListener.recieveMessages(next);
                                SimpleFetchManager.currentTopicRegInfo.remove();
                            } catch (Throwable th) {
                                SimpleFetchManager.currentTopicRegInfo.remove();
                                throw th;
                            }
                        }
                        if (MessageAccessor.isRollbackOnly(next)) {
                            messageIterator.setOffset(offset);
                            break;
                        }
                        if (partition.isAutoAck()) {
                            i++;
                            markProcessed(Long.valueOf(next.getId()), str);
                        } else if (partition.isAcked()) {
                            i++;
                            Iterator it = arrayList.iterator();
                            while (it.hasNext()) {
                                markProcessed((Long) it.next(), str);
                            }
                            markProcessed(Long.valueOf(next.getId()), str);
                        } else {
                            if (partition.isRollback()) {
                                break;
                            }
                            arrayList.add(Long.valueOf(next.getId()));
                            i++;
                        }
                    }
                } catch (InvalidMessageException e) {
                    MetaStatLog.addStat((String) null, "cli_invalid_message", fetchRequest.getTopic());
                    getOffsetAddRequest(fetchRequest, e);
                    return true;
                } catch (InterruptedException e2) {
                    messageIterator.setOffset(offset);
                    SimpleFetchManager.log.error("Process messages thread was interrupted,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), e2);
                } catch (Throwable th2) {
                    messageIterator.setOffset(offset);
                    SimpleFetchManager.log.error("Process messages failed,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition(), th2);
                }
            }
            MetaStatLog.addStatValue2((String) null, "cli_get_msg_count", fetchRequest.getTopic(), i);
            return false;
        }

        private boolean isProcessed(Long l, String str) {
            return (SimpleFetchManager.messageIdCache == null || SimpleFetchManager.messageIdCache.get(cacheKey(l, str)) == null) ? false : true;
        }

        private String cacheKey(Long l, String str) {
            return str + l;
        }

        private void markProcessed(Long l, String str) {
            if (SimpleFetchManager.messageIdCache != null) {
                SimpleFetchManager.messageIdCache.put(cacheKey(l, str), SimpleFetchManager.PROCESSED);
            }
        }

        private boolean isAcceptable(FetchRequest fetchRequest, ConsumerMessageFilter consumerMessageFilter, String str, Message message) {
            if (consumerMessageFilter == null) {
                return true;
            }
            try {
                return consumerMessageFilter.accept(str, message);
            } catch (Exception e) {
                SimpleFetchManager.log.error("Filter message failed,topic=" + fetchRequest.getTopic() + ",group=" + str + ",filterClass=" + consumerMessageFilter.getClass().getCanonicalName());
                return false;
            }
        }

        private boolean processWhenRetryTooMany(FetchRequest fetchRequest, com.taobao.metamorphosis.consumer.MessageIterator messageIterator) {
            if (!SimpleFetchManager.this.isRetryTooMany(fetchRequest)) {
                return false;
            }
            try {
                Message next = messageIterator.next();
                MessageAccessor.setPartition(next, fetchRequest.getPartitionObject());
                MetaStatLog.addStat((String) null, "cli_skip_msg_count", next.getTopic());
                SimpleFetchManager.this.consumer.appendCouldNotProcessMessage(next);
                fetchRequest.resetRetries();
                if (!this.stopped) {
                    fetchRequest.setOffset(fetchRequest.getOffset() + messageIterator.getOffset(), messageIterator.getPrevMessage().getId(), true);
                }
                fetchRequest.setDelay(0L);
                reAddFetchRequest2Queue(fetchRequest);
                return true;
            } catch (InvalidMessageException e) {
                MetaStatLog.addStat((String) null, "cli_invalid_message", fetchRequest.getTopic());
                getOffsetAddRequest(fetchRequest, e);
                return true;
            } catch (Throwable th) {
                LogAddRequest(fetchRequest, th);
                return true;
            }
        }

        private void postReceiveMessage(FetchRequest fetchRequest, com.taobao.metamorphosis.consumer.MessageIterator messageIterator, Partition partition) {
            if (messageIterator.getOffset() == 0) {
                fetchRequest.incrementRetriesAndGet();
            } else {
                fetchRequest.resetRetries();
            }
            if (partition.isAutoAck()) {
                ackRequest(fetchRequest, messageIterator, true);
                return;
            }
            if (partition.isRollback()) {
                fetchRequest.rollbackOffset();
                partition.reset();
                addRequst(fetchRequest);
            } else if (!partition.isAcked()) {
                ackRequest(fetchRequest, messageIterator, false);
            } else {
                partition.reset();
                ackRequest(fetchRequest, messageIterator, true);
            }
        }

        private void ackRequest(FetchRequest fetchRequest, com.taobao.metamorphosis.consumer.MessageIterator messageIterator, boolean z) {
            fetchRequest.setOffset(fetchRequest.getOffset() + messageIterator.getOffset(), messageIterator.getPrevMessage() != null ? messageIterator.getPrevMessage().getId() : -1L, z);
            addRequst(fetchRequest);
        }

        private void addRequst(FetchRequest fetchRequest) {
            fetchRequest.setDelay(getRetryDelay(fetchRequest));
            reAddFetchRequest2Queue(fetchRequest);
        }

        private long getRetryDelay(FetchRequest fetchRequest) {
            long maxDelayFetchTimeInMills = SimpleFetchManager.this.getMaxDelayFetchTimeInMills();
            long retries = (maxDelayFetchTimeInMills / 10) * fetchRequest.getRetries();
            if (retries > maxDelayFetchTimeInMills) {
                retries = maxDelayFetchTimeInMills;
            }
            return retries;
        }

        private void updateDelay(FetchRequest fetchRequest) {
            fetchRequest.setDelay(getNextDelay(fetchRequest));
        }

        private long getNextDelay(FetchRequest fetchRequest) {
            long maxDelayFetchTimeInMills = SimpleFetchManager.this.getMaxDelayFetchTimeInMills();
            long delay = fetchRequest.getDelay() + (maxDelayFetchTimeInMills / 10);
            if (delay > maxDelayFetchTimeInMills) {
                delay = maxDelayFetchTimeInMills;
            }
            return delay;
        }
    }

    public static void setMessageIdCache(MessageIdCache messageIdCache2) {
        messageIdCache = messageIdCache2;
    }

    MessageIdCache getMessageIdCache() {
        return messageIdCache;
    }

    public SimpleFetchManager(ConsumerConfig consumerConfig, InnerConsumer innerConsumer) {
        this.consumerConfig = consumerConfig;
        this.consumer = innerConsumer;
    }

    public static TopicPartitionRegInfo currentTopicRegInfo() {
        return currentTopicRegInfo.get();
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public int getFetchRequestCount() {
        return this.fetchRequestCount;
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public boolean isShutdown() {
        return this.shutdown;
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public void stopFetchRunner() throws InterruptedException {
        this.shutdown = true;
        interruptRunners();
        if (this.requestQueue != null) {
            while (this.requestQueue.size() < this.fetchRequestCount) {
                interruptRunners();
            }
        }
        this.fetchRequestCount = 0;
    }

    private void interruptRunners() {
        if (this.fetchThreads != null) {
            for (int i = 0; i < this.fetchThreads.length; i++) {
                Thread thread = this.fetchThreads[i];
                FetchRequestRunner fetchRequestRunner = this.requestRunners[i];
                if (thread != null) {
                    fetchRequestRunner.shutdown();
                    fetchRequestRunner.interruptExecutor();
                    thread.interrupt();
                    try {
                        thread.join(100L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public void resetFetchState() {
        this.fetchRequestCount = 0;
        this.requestQueue = new FetchRequestQueue();
        this.fetchThreads = new Thread[this.consumerConfig.getFetchRunnerCount()];
        this.requestRunners = new FetchRequestRunner[this.consumerConfig.getFetchRunnerCount()];
        for (int i = 0; i < this.fetchThreads.length; i++) {
            FetchRequestRunner fetchRequestRunner = new FetchRequestRunner();
            this.requestRunners[i] = fetchRequestRunner;
            this.fetchThreads[i] = new Thread(fetchRequestRunner);
            this.fetchThreads[i].setName(this.consumerConfig.getGroup() + "-fetch-Runner-" + i);
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public void startFetchRunner() {
        this.fetchRequestCount = this.requestQueue.size();
        this.shutdown = false;
        for (Thread thread : this.fetchThreads) {
            thread.start();
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.FetchManager
    public void addFetchRequest(FetchRequest fetchRequest) {
        this.requestQueue.offer(fetchRequest);
    }

    FetchRequest takeFetchRequest() throws InterruptedException {
        return this.requestQueue.take();
    }

    boolean isRetryTooMany(FetchRequest fetchRequest) {
        return fetchRequest.getRetries() > this.consumerConfig.getMaxFetchRetries();
    }

    boolean isRetryTooManyForIncrease(FetchRequest fetchRequest) {
        return fetchRequest.getRetries() > this.consumerConfig.getMaxIncreaseFetchDataRetries();
    }

    long getMaxDelayFetchTimeInMills() {
        return this.consumerConfig.getMaxDelayFetchTimeInMills();
    }
}
