package org.apache.rocketmq.client.impl.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AckCallback;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.CMResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;

/* loaded from: input_file:BOOT-INF/lib/rocketmq-client-5.2.0.jar:org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.class */
public class ConsumeMessagePopConcurrentlyService implements ConsumeMessageService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumeMessagePopConcurrentlyService.class);
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final MessageListenerConcurrently messageListener;
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final BlockingQueue<Runnable> consumeRequestQueue = new LinkedBlockingQueue();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-client-5.2.0.jar:org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService$ConsumeRequest.class */
    public class ConsumeRequest implements Runnable {
        private final List<MessageExt> msgs;
        private final PopProcessQueue processQueue;
        private final MessageQueue messageQueue;
        private long popTime;
        private long invisibleTime;

        public ConsumeRequest(List<MessageExt> list, PopProcessQueue popProcessQueue, MessageQueue messageQueue) {
            this.popTime = 0L;
            this.invisibleTime = 0L;
            this.msgs = list;
            this.processQueue = popProcessQueue;
            this.messageQueue = messageQueue;
            try {
                String[] split = ExtraInfoUtil.split(list.get(0).getProperty(MessageConst.PROPERTY_POP_CK));
                this.popTime = ExtraInfoUtil.getPopTime(split).longValue();
                this.invisibleTime = ExtraInfoUtil.getInvisibleTime(split).longValue();
            } catch (Throwable th) {
                ConsumeMessagePopConcurrentlyService.log.error("parse extra info error. msg:" + list.get(0), th);
            }
        }

        public boolean isPopTimeout() {
            return this.msgs.size() == 0 || this.popTime <= 0 || this.invisibleTime <= 0 || System.currentTimeMillis() - this.popTime >= this.invisibleTime;
        }

        public List<MessageExt> getMsgs() {
            return this.msgs;
        }

        public PopProcessQueue getPopProcessQueue() {
            return this.processQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.processQueue.isDropped()) {
                ConsumeMessagePopConcurrentlyService.log.info("the message queue not be able to consume, because it's dropped(pop). group={} {}", ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }
            if (isPopTimeout()) {
                ConsumeMessagePopConcurrentlyService.log.info("the pop message time out so abort consume. popTime={} invisibleTime={}, group={} {}", Long.valueOf(this.popTime), Long.valueOf(this.invisibleTime), ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.messageQueue);
                this.processQueue.decFoundMsg(-this.msgs.size());
                return;
            }
            MessageListenerConcurrently messageListenerConcurrently = ConsumeMessagePopConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext consumeConcurrentlyContext = new ConsumeConcurrentlyContext(this.messageQueue);
            ConsumeConcurrentlyStatus consumeConcurrentlyStatus = null;
            ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.resetRetryAndNamespace(this.msgs, ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setNamespace(ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumer.getNamespace());
                consumeMessageContext.setConsumerGroup(ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap());
                consumeMessageContext.setMq(this.messageQueue);
                consumeMessageContext.setMsgList(this.msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = false;
            ConsumeReturnType consumeReturnType = ConsumeReturnType.SUCCESS;
            try {
                if (this.msgs != null && !this.msgs.isEmpty()) {
                    Iterator<MessageExt> it = this.msgs.iterator();
                    while (it.hasNext()) {
                        MessageAccessor.setConsumeStartTimeStamp(it.next(), String.valueOf(System.currentTimeMillis()));
                    }
                }
                consumeConcurrentlyStatus = messageListenerConcurrently.consumeMessage(Collections.unmodifiableList(this.msgs), consumeConcurrentlyContext);
            } catch (Throwable th) {
                ConsumeMessagePopConcurrentlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", UtilAll.exceptionSimpleDesc(th), ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue);
                z = true;
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (null == consumeConcurrentlyStatus) {
                consumeReturnType = z ? ConsumeReturnType.EXCEPTION : ConsumeReturnType.RETURNNULL;
            } else if (currentTimeMillis2 >= this.invisibleTime * 1000) {
                consumeReturnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == consumeConcurrentlyStatus) {
                consumeReturnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus) {
                consumeReturnType = ConsumeReturnType.SUCCESS;
            }
            if (null == consumeConcurrentlyStatus) {
                ConsumeMessagePopConcurrentlyService.log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue);
                consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            if (ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, consumeReturnType.name());
                consumeMessageContext.setStatus(consumeConcurrentlyStatus.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == consumeConcurrentlyStatus);
                consumeMessageContext.setAccessChannel(ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumer.getAccessChannel());
                ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }
            ConsumeMessagePopConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.messageQueue.getTopic(), currentTimeMillis2);
            if (!this.processQueue.isDropped() && !isPopTimeout()) {
                ConsumeMessagePopConcurrentlyService.this.processConsumeResult(consumeConcurrentlyStatus, consumeConcurrentlyContext, this);
                return;
            }
            if (this.msgs != null) {
                this.processQueue.decFoundMsg(-this.msgs.size());
            }
            ConsumeMessagePopConcurrentlyService.log.warn("processQueue invalid. isDropped={}, isPopTimeout={}, messageQueue={}, msgs={}", Boolean.valueOf(this.processQueue.isDropped()), Boolean.valueOf(isPopTimeout()), this.messageQueue, this.msgs);
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }
    }

    public ConsumeMessagePopConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListenerConcurrently) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListenerConcurrently;
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void start() {
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void shutdown(long j) {
        this.scheduledExecutorService.shutdown();
        ThreadUtils.shutdownGracefully(this.consumeExecutor, j, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void updateCorePoolSize(int i) {
        if (i <= 0 || i > 32767 || i >= this.defaultMQPushConsumer.getConsumeThreadMax()) {
            return;
        }
        this.consumeExecutor.setCorePoolSize(i);
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void incCorePoolSize() {
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void decCorePoolSize() {
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public int getCorePoolSize() {
        return this.consumeExecutor.getCorePoolSize();
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt messageExt, String str) {
        ConsumeMessageDirectlyResult consumeMessageDirectlyResult = new ConsumeMessageDirectlyResult();
        consumeMessageDirectlyResult.setOrder(false);
        consumeMessageDirectlyResult.setAutoCommit(true);
        ArrayList arrayList = new ArrayList();
        arrayList.add(messageExt);
        MessageQueue messageQueue = new MessageQueue();
        messageQueue.setBrokerName(str);
        messageQueue.setTopic(messageExt.getTopic());
        messageQueue.setQueueId(messageExt.getQueueId());
        ConsumeConcurrentlyContext consumeConcurrentlyContext = new ConsumeConcurrentlyContext(messageQueue);
        this.defaultMQPushConsumerImpl.resetRetryAndNamespace(arrayList, this.consumerGroup);
        long currentTimeMillis = System.currentTimeMillis();
        log.info("consumeMessageDirectly receive new message: {}", messageExt);
        try {
            ConsumeConcurrentlyStatus consumeMessage = this.messageListener.consumeMessage(arrayList, consumeConcurrentlyContext);
            if (consumeMessage != null) {
                switch (consumeMessage) {
                    case CONSUME_SUCCESS:
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_SUCCESS);
                        break;
                    case RECONSUME_LATER:
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_LATER);
                        break;
                }
            } else {
                consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_RETURN_NULL);
            }
        } catch (Throwable th) {
            consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
            consumeMessageDirectlyResult.setRemark(UtilAll.exceptionSimpleDesc(th));
            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", UtilAll.exceptionSimpleDesc(th), this.consumerGroup, arrayList, messageQueue), th);
        }
        consumeMessageDirectlyResult.setSpentTimeMills(System.currentTimeMillis() - currentTimeMillis);
        log.info("consumeMessageDirectly Result: {}", consumeMessageDirectlyResult);
        return consumeMessageDirectlyResult;
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void submitConsumeRequest(List<MessageExt> list, ProcessQueue processQueue, MessageQueue messageQueue, boolean z) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.rocketmq.client.impl.consumer.ConsumeMessageService
    public void submitPopConsumeRequest(List<MessageExt> list, PopProcessQueue popProcessQueue, MessageQueue messageQueue) {
        int consumeMessageBatchMaxSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (list.size() <= consumeMessageBatchMaxSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(list, popProcessQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
                return;
            } catch (RejectedExecutionException e) {
                submitConsumeRequestLater(consumeRequest);
                return;
            }
        }
        int i = 0;
        while (i < list.size()) {
            ArrayList arrayList = new ArrayList(consumeMessageBatchMaxSize);
            int i2 = 0;
            while (i2 < consumeMessageBatchMaxSize && i < list.size()) {
                arrayList.add(list.get(i));
                i2++;
                i++;
            }
            ConsumeRequest consumeRequest2 = new ConsumeRequest(arrayList, popProcessQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest2);
            } catch (RejectedExecutionException e2) {
                while (i < list.size()) {
                    arrayList.add(list.get(i));
                    i++;
                }
                submitConsumeRequestLater(consumeRequest2);
            }
        }
    }

    public void processConsumeResult(ConsumeConcurrentlyStatus consumeConcurrentlyStatus, ConsumeConcurrentlyContext consumeConcurrentlyContext, ConsumeRequest consumeRequest) {
        if (consumeRequest.getMsgs().isEmpty()) {
            return;
        }
        int ackIndex = consumeConcurrentlyContext.getAckIndex();
        String topic = consumeRequest.getMessageQueue().getTopic();
        switch (consumeConcurrentlyStatus) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int i = ackIndex + 1;
                int size = consumeRequest.getMsgs().size() - i;
                getConsumerStatsManager().incConsumeOKTPS(this.consumerGroup, topic, i);
                getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, topic, size);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, topic, consumeRequest.getMsgs().size());
                break;
        }
        for (int i2 = 0; i2 <= ackIndex; i2++) {
            this.defaultMQPushConsumerImpl.ackAsync(consumeRequest.getMsgs().get(i2), this.consumerGroup);
            consumeRequest.getPopProcessQueue().ack();
        }
        for (int i3 = ackIndex + 1; i3 < consumeRequest.getMsgs().size(); i3++) {
            MessageExt messageExt = consumeRequest.getMsgs().get(i3);
            consumeRequest.getPopProcessQueue().ack();
            if (messageExt.getReconsumeTimes() >= this.defaultMQPushConsumerImpl.getMaxReconsumeTimes()) {
                checkNeedAckOrDelay(messageExt);
            } else {
                changePopInvisibleTime(consumeRequest.getMsgs().get(i3), this.consumerGroup, consumeConcurrentlyContext.getDelayLevelWhenNextConsume());
            }
        }
    }

    private void checkNeedAckOrDelay(MessageExt messageExt) {
        int[] popDelayLevel = this.defaultMQPushConsumerImpl.getPopDelayLevel();
        long currentTimeMillis = System.currentTimeMillis() - messageExt.getBornTimestamp();
        if (currentTimeMillis > popDelayLevel[popDelayLevel.length - 1] * 1000 * 2) {
            log.warn("Consume too many times, ack message async. message {}", messageExt.toString());
            this.defaultMQPushConsumerImpl.ackAsync(messageExt, this.consumerGroup);
            return;
        }
        int length = popDelayLevel.length - 1;
        while (true) {
            if (length < 0) {
                break;
            }
            if (currentTimeMillis >= popDelayLevel[length] * 1000) {
                length++;
                break;
            }
            length--;
        }
        changePopInvisibleTime(messageExt, this.consumerGroup, length);
        log.warn("Consume too many times, but delay time {} not enough. changePopInvisibleTime to delayLevel {} . message key:{}", Long.valueOf(currentTimeMillis), Integer.valueOf(length), messageExt.getKeys());
    }

    private void changePopInvisibleTime(final MessageExt messageExt, String str, int i) {
        if (0 == i) {
            i = messageExt.getReconsumeTimes();
        }
        try {
            this.defaultMQPushConsumerImpl.changePopInvisibleTimeAsync(messageExt.getTopic(), str, messageExt.getProperty(MessageConst.PROPERTY_POP_CK), (i >= this.defaultMQPushConsumerImpl.getPopDelayLevel().length ? r0[r0.length - 1] : r0[i]) * 1000, new AckCallback() { // from class: org.apache.rocketmq.client.impl.consumer.ConsumeMessagePopConcurrentlyService.1
                @Override // org.apache.rocketmq.client.consumer.AckCallback
                public void onSuccess(AckResult ackResult) {
                }

                @Override // org.apache.rocketmq.client.consumer.AckCallback
                public void onException(Throwable th) {
                    ConsumeMessagePopConcurrentlyService.log.error("changePopInvisibleTimeAsync fail. msg:{} error info: {}", messageExt.toString(), th.toString());
                }
            });
        } catch (Throwable th) {
            log.error("changePopInvisibleTimeAsync fail, group:{} msg:{} errorInfo:{}", str, messageExt.toString(), th.toString());
        }
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
    }

    private void submitConsumeRequestLater(final List<MessageExt> list, final PopProcessQueue popProcessQueue, final MessageQueue messageQueue) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.rocketmq.client.impl.consumer.ConsumeMessagePopConcurrentlyService.2
            @Override // java.lang.Runnable
            public void run() {
                ConsumeMessagePopConcurrentlyService.this.submitPopConsumeRequest(list, popProcessQueue, messageQueue);
            }
        }, 5000L, TimeUnit.MILLISECONDS);
    }

    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.rocketmq.client.impl.consumer.ConsumeMessagePopConcurrentlyService.3
            @Override // java.lang.Runnable
            public void run() {
                ConsumeMessagePopConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
            }
        }, 5000L, TimeUnit.MILLISECONDS);
    }
}
