/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.client.impl.consumer;

import com.alibaba.rocketmq.client.VirtualEnvUtil;
import com.alibaba.rocketmq.client.consumer.PullCallback;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.PullStatus;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.alibaba.rocketmq.client.impl.FindBrokerResult;
import com.alibaba.rocketmq.client.impl.consumer.PullResultExt;
import com.alibaba.rocketmq.client.impl.factory.MQClientFactory;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class PullAPIWrapper {
    private ConcurrentHashMap<MessageQueue, AtomicLong> pullFromWhichNodeTable = new ConcurrentHashMap(32);
    private final MQClientFactory mQClientFactory;
    private final String consumerGroup;

    public PullAPIWrapper(MQClientFactory mQClientFactory, String consumerGroup) {
        this.mQClientFactory = mQClientFactory;
        this.consumerGroup = consumerGroup;
    }

    public void updatePullFromWhichNode(MessageQueue mq, long brokerId) {
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (null == suggest) {
            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
        } else {
            suggest.set(brokerId);
        }
    }

    public PullResult processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData) {
        String projectGroupPrefix = this.mQClientFactory.getMQClientAPIImpl().getProjectGroupPrefix();
        PullResultExt pullResultExt = (PullResultExt)pullResult;
        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) {
            ArrayList<MessageExt> msgList;
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            ArrayList<MessageExt> msgListFilterAgain = msgList = MessageDecoder.decodes((ByteBuffer)byteBuffer);
            if (!subscriptionData.getTagsSet().isEmpty()) {
                msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                for (MessageExt msg : msgList) {
                    if (msg.getTags() == null || !subscriptionData.getTagsSet().contains(msg.getTags())) continue;
                    msgListFilterAgain.add(msg);
                }
            }
            if (!UtilAll.isBlank((String)projectGroupPrefix)) {
                subscriptionData.setTopic(VirtualEnvUtil.clearProjectGroup(subscriptionData.getTopic(), projectGroupPrefix));
                mq.setTopic(VirtualEnvUtil.clearProjectGroup(mq.getTopic(), projectGroupPrefix));
                for (MessageExt msg : msgListFilterAgain) {
                    msg.setTopic(VirtualEnvUtil.clearProjectGroup(msg.getTopic(), projectGroupPrefix));
                    msg.putProperty("MIN_OFFSET", Long.toString(pullResult.getMinOffset()));
                    msg.putProperty("MAX_OFFSET", Long.toString(pullResult.getMaxOffset()));
                }
            } else {
                for (MessageExt msg : msgListFilterAgain) {
                    msg.putProperty("MIN_OFFSET", Long.toString(pullResult.getMinOffset()));
                    msg.putProperty("MAX_OFFSET", Long.toString(pullResult.getMaxOffset()));
                }
            }
            pullResultExt.setMsgFoundList(msgListFilterAgain);
        }
        pullResultExt.setMessageBinary(null);
        return pullResult;
    }

    public long recalculatePullFromWhichNode(MessageQueue mq) {
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }
        return 0L;
    }

    public PullResult pullKernelImpl(MessageQueue mq, String subExpression, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
        }
        if (findBrokerResult != null) {
            int sysFlagInner = sysFlag;
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag((int)sysFlagInner);
            }
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(Integer.valueOf(mq.getQueueId()));
            requestHeader.setQueueOffset(Long.valueOf(offset));
            requestHeader.setMaxMsgNums(Integer.valueOf(maxNums));
            requestHeader.setSysFlag(Integer.valueOf(sysFlagInner));
            requestHeader.setCommitOffset(Long.valueOf(commitOffset));
            requestHeader.setSuspendTimeoutMillis(Long.valueOf(brokerSuspendMaxTimeMillis));
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(Long.valueOf(subVersion));
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(findBrokerResult.getBrokerAddr(), requestHeader, timeoutMillis, communicationMode, pullCallback);
            return pullResult;
        }
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

