/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer;

import apache.rocketmq.v1.AckMessageRequest;
import apache.rocketmq.v1.AckMessageResponse;
import apache.rocketmq.v1.ConsumeMessageType;
import apache.rocketmq.v1.ConsumeModel;
import apache.rocketmq.v1.ConsumePolicy;
import apache.rocketmq.v1.ConsumerData;
import apache.rocketmq.v1.DeadLetterPolicy;
import apache.rocketmq.v1.FilterExpression;
import apache.rocketmq.v1.FilterType;
import apache.rocketmq.v1.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v1.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v1.HeartbeatRequest;
import apache.rocketmq.v1.HeartbeatResponse;
import apache.rocketmq.v1.Message;
import apache.rocketmq.v1.NackMessageRequest;
import apache.rocketmq.v1.NackMessageResponse;
import apache.rocketmq.v1.PollCommandRequest;
import apache.rocketmq.v1.QueryAssignmentRequest;
import apache.rocketmq.v1.ReportMessageConsumptionResultRequest;
import apache.rocketmq.v1.ReportMessageConsumptionResultResponse;
import apache.rocketmq.v1.Resource;
import apache.rocketmq.v1.SubscriptionEntry;
import apache.rocketmq.v1.VerifyMessageConsumptionCommand;
import com.aliyun.openservices.ons.shaded.com.google.common.annotations.VisibleForTesting;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Optional;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Preconditions;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Stopwatch;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.RateLimiter;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.SettableFuture;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Code;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Status;
import com.aliyun.openservices.ons.shaded.io.grpc.Metadata;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ConsumeFromWhere;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ConsumeStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.MessageModel;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.ExpressionType;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.FilterExpression;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListener;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListenerType;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ClientException;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ErrorCode;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.Assignment;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ConsumeConcurrentlyService;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ConsumeOrderlyService;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ConsumeService;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ConsumerImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.OffsetStore;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.TopicAssignments;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPoint;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPointStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImplAccessor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptorContext;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageQueue;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Endpoints;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.TopicRouteData;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ExecutorServices;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.SimpleCallable;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ThreadFactoryImpl;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class PushConsumerImpl
extends ConsumerImpl {
    private static final Logger log = LoggerFactory.getLogger(PushConsumerImpl.class);
    private final AtomicLong receptionTimes;
    private final AtomicLong receivedMessagesQuantity;
    private final AtomicLong pullTimes;
    private final AtomicLong pulledMessagesQuantity;
    private final AtomicLong consumptionOkQuantity;
    private final AtomicLong consumptionErrorQuantity;
    private int maxTotalCachedMessagesQuantityThreshold = -1;
    private int maxCachedMessagesQuantityThresholdPerQueue = 1024;
    private int maxTotalCachedMessagesBytesThreshold = -1;
    private int maxCachedMessagesBytesThresholdPerQueue = 0x400000;
    private long fifoConsumptionSuspendTimeMillis = 1000L;
    private int consumeMessageBatchMaxSize = 1;
    private int consumptionThreadsAmount = 32;
    private int maxDeliveryAttempts = 17;
    private MessageModel messageModel = MessageModel.CLUSTERING;
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    private long consumeFromTimeMillis = System.currentTimeMillis();
    private long consumptionTimeoutMillis = 900000L;
    private long maxAwaitTimeMillisPerQueue = 0L;
    private int maxAwaitBatchSizePerQueue = 32;
    private OffsetStore offsetStore = null;
    private MessageListener messageListener = null;
    private volatile ConsumeService consumeService = null;
    private final ThreadPoolExecutor consumptionExecutor;
    private final ConcurrentMap<String, FilterExpression> filterExpressionTable = new ConcurrentHashMap<String, FilterExpression>();
    private final ConcurrentMap<String, TopicAssignments> cachedTopicAssignmentTable = new ConcurrentHashMap<String, TopicAssignments>();
    private final ConcurrentMap<String, RateLimiter> rateLimiterTable = new ConcurrentHashMap<String, RateLimiter>();
    private final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>();
    private volatile ScheduledFuture<?> scanAssignmentsFuture;

    public PushConsumerImpl(String group) throws ClientException {
        super(group);
        this.receptionTimes = new AtomicLong(0L);
        this.receivedMessagesQuantity = new AtomicLong(0L);
        this.pullTimes = new AtomicLong(0L);
        this.pulledMessagesQuantity = new AtomicLong(0L);
        this.consumptionOkQuantity = new AtomicLong(0L);
        this.consumptionErrorQuantity = new AtomicLong(0L);
        this.consumptionExecutor = new ThreadPoolExecutor(this.consumptionThreadsAmount, this.consumptionThreadsAmount, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("MessageConsumption"));
    }

    public void setOffsetStore(OffsetStore offsetStore) {
        this.offsetStore = Preconditions.checkNotNull(offsetStore, "offsetStore");
    }

    private void generateConsumeService() throws ClientException {
        switch (this.messageListener.getListenerType()) {
            case CONCURRENTLY: {
                this.consumeService = new ConsumeConcurrentlyService(this.messageListener, this, this.consumptionExecutor, this.getScheduler(), this.processQueueTable, this.consumeMessageBatchMaxSize);
                break;
            }
            case ORDERLY: {
                this.consumeService = new ConsumeOrderlyService(this.messageListener, this, this.consumptionExecutor, this.getScheduler(), this.processQueueTable);
                break;
            }
            default: {
                throw new ClientException(ErrorCode.NO_LISTENER_REGISTERED);
            }
        }
    }

    int cachedMessagesQuantityThresholdPerQueue() {
        if (this.maxTotalCachedMessagesQuantityThreshold <= 0) {
            return this.maxCachedMessagesQuantityThresholdPerQueue;
        }
        int size = this.processQueueTable.size();
        if (size <= 0) {
            return 0;
        }
        return Math.max(1, this.maxTotalCachedMessagesQuantityThreshold / size);
    }

    int cachedMessagesBytesThresholdPerQueue() {
        if (this.maxTotalCachedMessagesBytesThreshold <= 0) {
            return this.maxCachedMessagesBytesThresholdPerQueue;
        }
        int size = this.processQueueTable.size();
        if (size <= 0) {
            return 0;
        }
        return Math.max(1, this.maxTotalCachedMessagesBytesThreshold / size);
    }

    boolean isOffsetRecorded() {
        return MessageModel.BROADCASTING.equals((Object)this.messageModel) && null != this.offsetStore;
    }

    Optional<Long> readOffset(MessageQueue mq) {
        return this.offsetStore.readOffset(mq);
    }

    void updateOffset(MessageQueue mq, long offset) {
        try {
            this.offsetStore.updateOffset(mq, offset);
        }
        catch (Throwable t2) {
            log.error("Exception raises while update offset, namespace={}, mq={}, offset={}", this.namespace, mq, offset);
        }
    }

    private void promptAssignmentsScan() {
        ArrayList<ListenableFuture<TopicRouteData>> futureList = new ArrayList<ListenableFuture<TopicRouteData>>();
        for (String topic : this.filterExpressionTable.keySet()) {
            futureList.add(this.getRouteData(topic));
        }
        Futures.whenAllComplete(futureList).call(new SimpleCallable(){

            @Override
            public void run() {
                PushConsumerImpl.this.promptAssignmentsScan0();
            }
        }, MoreExecutors.directExecutor());
    }

    private void promptAssignmentsScan0() {
        ArrayList<ListenableFuture<HeartbeatResponse>> futureList = new ArrayList<ListenableFuture<HeartbeatResponse>>();
        HeartbeatRequest request = this.wrapHeartbeatRequest();
        for (Endpoints endpoints : this.getRouteEndpointsSet()) {
            futureList.add(this.doHeartbeat(request, endpoints));
        }
        Futures.whenAllComplete(futureList).call(new SimpleCallable(){

            @Override
            public void run() {
                PushConsumerImpl.this.scanAssignmentsFuture = PushConsumerImpl.this.clientManager.getScheduler().scheduleWithFixedDelay(() -> {
                    try {
                        PushConsumerImpl.this.scanAssignments();
                    }
                    catch (Throwable t2) {
                        log.error("Exception raised while scanning the load assignments, clientId={}", (Object)PushConsumerImpl.this.id, (Object)t2);
                    }
                }, 1L, 5L, TimeUnit.SECONDS);
            }
        }, MoreExecutors.directExecutor());
    }

    @Override
    public void setUp() throws ClientException {
        log.info("Begin to start the rocketmq push consumer.");
        if (null == this.messageListener) {
            throw new ClientException(ErrorCode.NO_LISTENER_REGISTERED);
        }
        super.setUp();
        this.generateConsumeService();
        this.consumeService.startAsync().awaitRunning();
        this.promptAssignmentsScan();
        log.info("The rocketmq push consumer starts successfully.");
    }

    @Override
    public void tearDown() throws InterruptedException {
        log.info("Begin to shutdown the rocketmq push consumer, clientId={}", (Object)this.id);
        if (null != this.scanAssignmentsFuture) {
            this.scanAssignmentsFuture.cancel(false);
        }
        super.tearDown();
        if (null != this.consumeService) {
            this.consumeService.stopAsync().awaitTerminated();
        }
        this.consumptionExecutor.shutdown();
        if (!ExecutorServices.awaitTerminated(this.consumptionExecutor)) {
            log.error("[Bug] Failed to shutdown the consumption executor, clientId={}", (Object)this.id);
        }
        log.info("Shutdown the rocketmq push consumer successfully, clientId={}", (Object)this.id);
    }

    public void start() {
        this.clientService.startAsync().awaitRunning();
    }

    public void shutdown() {
        this.clientService.stopAsync().awaitTerminated();
    }

    @Override
    public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
    }

    RateLimiter rateLimiter(String topic) {
        return (RateLimiter)this.rateLimiterTable.get(topic);
    }

    public void rateLimit(String topic, double permitsPerSecond) {
        RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);
        this.rateLimiterTable.put(topic, rateLimiter);
    }

    private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic, Endpoints endpoints) {
        Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(topic).build();
        return QueryAssignmentRequest.newBuilder().setTopic(topicResource).setEndpoints(endpoints.toPbEndpoints()).setGroup(this.getPbGroup()).setClientId(this.id).build();
    }

    @VisibleForTesting
    public void scanAssignments() {
        try {
            log.debug("Start to scan assignments periodically");
            for (Map.Entry entry : this.filterExpressionTable.entrySet()) {
                final String topic = (String)entry.getKey();
                final FilterExpression filterExpression = (FilterExpression)entry.getValue();
                final TopicAssignments local = (TopicAssignments)this.cachedTopicAssignmentTable.get(topic);
                ListenableFuture<TopicAssignments> future = this.queryAssignment(topic);
                Futures.addCallback(future, new FutureCallback<TopicAssignments>(){

                    @Override
                    public void onSuccess(TopicAssignments remote) {
                        if (remote.getAssignmentList().isEmpty()) {
                            if (null == local || local.getAssignmentList().isEmpty()) {
                                log.info("Acquired empty assignments from remote, would scan later, namespace={}, topic={}, clientId={}", PushConsumerImpl.this.namespace, topic, PushConsumerImpl.this.id);
                                return;
                            }
                            log.info("Attention!!! acquired empty assignments from remote, but local assignments is not empty, namespace={}, topic={}, clientId={}", PushConsumerImpl.this.namespace, topic, PushConsumerImpl.this.id);
                        }
                        if (!remote.equals(local)) {
                            log.info("Assignments of topic={}[namespace={}] has changed, {} => {}, clientId={}", topic, PushConsumerImpl.this.namespace, local, remote, PushConsumerImpl.this.id);
                            PushConsumerImpl.this.synchronizeProcessQueue(topic, remote, filterExpression);
                            PushConsumerImpl.this.cachedTopicAssignmentTable.put(topic, remote);
                            return;
                        }
                        PushConsumerImpl.this.synchronizeProcessQueue(topic, remote, filterExpression);
                    }

                    @Override
                    public void onFailure(Throwable t2) {
                        log.error("Exception raised while scanning the assignments, namespace={}, topic={}, clientId={}", PushConsumerImpl.this.namespace, topic, PushConsumerImpl.this.id, t2);
                    }
                }, MoreExecutors.directExecutor());
            }
        }
        catch (Throwable t2) {
            log.error("Exception raised while scanning the assignments for all topics, clientId={}", (Object)this.id, (Object)t2);
        }
    }

    @Override
    public void doStats() {
        long receiveTimes = this.receptionTimes.getAndSet(0L);
        long receivedMessagesQuantity = this.receivedMessagesQuantity.getAndSet(0L);
        long pullTimes = this.pullTimes.getAndSet(0L);
        long pulledMessagesQuantity = this.pulledMessagesQuantity.getAndSet(0L);
        long consumptionOkQuantity = this.consumptionOkQuantity.getAndSet(0L);
        long consumptionErrorQuantity = this.consumptionErrorQuantity.getAndSet(0L);
        log.info("clientId={}, namespace={}, group={}, receiveTimes={}, receivedMessagesQuantity={}, pullTimes={}, pulledMessagesQuantity={}, consumptionOkQuantity={}, consumptionErrorQuantity={}", this.id, this.namespace, this.group, receiveTimes, receivedMessagesQuantity, pullTimes, pulledMessagesQuantity, consumptionOkQuantity, consumptionErrorQuantity);
        for (ProcessQueue pq : this.processQueueTable.values()) {
            pq.doStats();
        }
    }

    void dropProcessQueue(MessageQueue mq) {
        ProcessQueue pq = (ProcessQueue)this.processQueueTable.remove(mq);
        if (null != pq) {
            pq.drop();
        }
    }

    private Optional<ProcessQueue> createProcessQueue(MessageQueue mq, FilterExpression filterExpression) {
        ProcessQueueImpl processQueue = new ProcessQueueImpl(this, mq, filterExpression);
        ProcessQueue previous = this.processQueueTable.putIfAbsent(mq, processQueue);
        return null == previous ? Optional.of(processQueue) : Optional.absent();
    }

    private void synchronizeProcessQueue(String topic, TopicAssignments topicAssignments, FilterExpression filterExpression) {
        ProcessQueue pq;
        HashSet<MessageQueue> latestMqs = new HashSet<MessageQueue>();
        List<Assignment> assignments = topicAssignments.getAssignmentList();
        for (Assignment assignment : assignments) {
            latestMqs.add(assignment.getMessageQueue());
        }
        HashSet<MessageQueue> activeMqs = new HashSet<MessageQueue>();
        for (Map.Entry entry : this.processQueueTable.entrySet()) {
            MessageQueue mq = (MessageQueue)entry.getKey();
            pq = (ProcessQueue)entry.getValue();
            if (!topic.equals(mq.getTopic())) continue;
            if (!latestMqs.contains(mq)) {
                log.info("Drop message queue according to the latest assignments, namespace={}, mq={}, clientId={}", this.namespace, mq, this.id);
                this.dropProcessQueue(mq);
                continue;
            }
            if (pq.expired()) {
                log.warn("Drop message queue because it is expired, namespace={}, mq={}, clientId={}", this.namespace, mq, this.id);
                this.dropProcessQueue(mq);
                continue;
            }
            activeMqs.add(mq);
        }
        for (MessageQueue mq : latestMqs) {
            if (activeMqs.contains(mq)) continue;
            Optional<ProcessQueue> optionalProcessQueue = this.createProcessQueue(mq, filterExpression);
            if (!optionalProcessQueue.isPresent()) {
                log.info("Process queue already exists, namespace={}, mq={}, clientId={}", this.namespace, mq, this.id);
                continue;
            }
            log.info("Start to fetch message from remote, namespace={}, mq={}, clientId={}", this.namespace, mq, this.id);
            pq = optionalProcessQueue.get();
            pq.fetchMessageImmediately();
        }
    }

    public void subscribe(String topic, String expression, ExpressionType expressionType) {
        FilterExpression filterExpression = new FilterExpression(expression, expressionType);
        this.filterExpressionTable.put(topic, filterExpression);
    }

    public void unsubscribe(String topic) {
        this.filterExpressionTable.remove(topic);
    }

    public void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently) {
        this.messageListener = Preconditions.checkNotNull(messageListenerConcurrently, "messageListenerConcurrently");
    }

    public void registerMessageListener(MessageListenerOrderly messageListenerOrderly) {
        this.messageListener = Preconditions.checkNotNull(messageListenerOrderly, "messageListenerOrderly");
    }

    private ListenableFuture<Endpoints> pickRouteEndpointsToQueryAssignments(String topic) {
        ListenableFuture<TopicRouteData> future = this.getRouteData(topic);
        return Futures.transformAsync(future, topicRouteData -> {
            SettableFuture<Endpoints> future0 = SettableFuture.create();
            Endpoints endpoints = topicRouteData.pickEndpointsToQueryAssignments();
            future0.set(endpoints);
            return future0;
        }, MoreExecutors.directExecutor());
    }

    private ListenableFuture<TopicAssignments> queryAssignment(String topic) {
        if (MessageModel.BROADCASTING.equals((Object)this.messageModel)) {
            ListenableFuture<TopicRouteData> future = this.getRouteData(topic);
            return Futures.transform(future, TopicAssignments::new, MoreExecutors.directExecutor());
        }
        ListenableFuture<Endpoints> future = this.pickRouteEndpointsToQueryAssignments(topic);
        ListenableFuture responseFuture = Futures.transformAsync(future, endpoints -> {
            Metadata metadata = this.sign();
            QueryAssignmentRequest request = this.wrapQueryAssignmentRequest(topic, (Endpoints)endpoints);
            return this.clientManager.queryAssignment((Endpoints)endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
        }, MoreExecutors.directExecutor());
        return Futures.transformAsync(responseFuture, response -> {
            SettableFuture<TopicAssignments> future0 = SettableFuture.create();
            Status status = response.getCommon().getStatus();
            Code code = Code.forNumber(status.getCode());
            if (!Code.OK.equals(code)) {
                String statusMessage = status.getMessage();
                log.error("Failed to query assignment, namespace={}, topic={}, clientId={}, code={}, status message=[{}]", this.namespace, topic, this.id, code, statusMessage);
                throw new ClientException(ErrorCode.NO_ASSIGNMENT, statusMessage);
            }
            TopicAssignments topicAssignments = new TopicAssignments(response.getAssignmentsList());
            future0.set(topicAssignments);
            return future0;
        }, MoreExecutors.directExecutor());
    }

    @Override
    public HeartbeatRequest wrapHeartbeatRequest() {
        ArrayList<SubscriptionEntry> subscriptionEntries = new ArrayList<SubscriptionEntry>();
        for (Map.Entry entry : this.filterExpressionTable.entrySet()) {
            String topic = (String)entry.getKey();
            FilterExpression filterExpression = (FilterExpression)entry.getValue();
            Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(topic).build();
            FilterExpression.Builder builder = apache.rocketmq.v1.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
            switch (filterExpression.getExpressionType()) {
                case TAG: {
                    builder.setType(FilterType.TAG);
                    break;
                }
                default: {
                    builder.setType(FilterType.SQL);
                }
            }
            apache.rocketmq.v1.FilterExpression expression = builder.build();
            SubscriptionEntry subscriptionEntry = SubscriptionEntry.newBuilder().setTopic(topicResource).setExpression(expression).build();
            subscriptionEntries.add(subscriptionEntry);
        }
        DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.newBuilder().setMaxDeliveryAttempts(this.maxDeliveryAttempts).build();
        ConsumerData.Builder builder = ConsumerData.newBuilder().setGroup(this.getPbGroup()).addAllSubscriptions(subscriptionEntries).setDeadLetterPolicy(deadLetterPolicy).setConsumeType(ConsumeMessageType.PASSIVE);
        switch (this.messageModel) {
            case BROADCASTING: {
                builder.setConsumeModel(ConsumeModel.BROADCASTING);
                break;
            }
            default: {
                builder.setConsumeModel(ConsumeModel.CLUSTERING);
            }
        }
        switch (this.consumeFromWhere) {
            case CONSUME_FROM_FIRST_OFFSET: {
                builder.setConsumePolicy(ConsumePolicy.PLAYBACK);
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                builder.setConsumePolicy(ConsumePolicy.TARGET_TIMESTAMP);
                break;
            }
            case CONSUME_FROM_MAX_OFFSET: {
                builder.setConsumePolicy(ConsumePolicy.DISCARD);
                break;
            }
            default: {
                builder.setConsumePolicy(ConsumePolicy.RESUME);
            }
        }
        ConsumerData consumerData = builder.build();
        return HeartbeatRequest.newBuilder().setClientId(this.id).setConsumerData(consumerData).setFifoFlag(this.messageListener.getListenerType().equals((Object)MessageListenerType.ORDERLY)).build();
    }

    @Override
    public void verifyMessageConsumption(Endpoints endpoints, VerifyMessageConsumptionCommand command) {
        ListenableFuture future;
        final String messageId = command.getMessage().getSystemAttribute().getMessageId();
        final String commandId = command.getCommandId();
        try {
            ListenableFuture<Status> statusFuture = this.verifyMessageConsumption0(command);
            future = Futures.transformAsync(statusFuture, status -> {
                ReportMessageConsumptionResultRequest request = ReportMessageConsumptionResultRequest.newBuilder().setStatus((Status)status).setCommandId(commandId).build();
                Metadata metadata = this.sign();
                return this.clientManager.reportMessageConsumption(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
            }, MoreExecutors.directExecutor());
        }
        catch (Throwable t2) {
            log.error("[Bug] Exception raised while verifying message consumption, messageId={}, clientId={}, commandId={}", messageId, this.id, commandId, t2);
            SettableFuture future0 = SettableFuture.create();
            future0.setException(t2);
            future = future0;
        }
        Futures.addCallback(future, new FutureCallback<ReportMessageConsumptionResultResponse>(){

            @Override
            public void onSuccess(ReportMessageConsumptionResultResponse response) {
                Status status = response.getCommon().getStatus();
                Code code = Code.forNumber(status.getCode());
                if (!Code.OK.equals(code)) {
                    log.error("Failed to report message consumption result, clientId={}, messageId={}, commandId={}, code={}, status message=[{}]", PushConsumerImpl.this.id, messageId, commandId, code, status.getMessage());
                    return;
                }
                log.info("Report message consumption result, clientId={}, messageId={}, commandId={}, code={}", PushConsumerImpl.this.id, messageId, commandId, code);
            }

            @Override
            public void onFailure(Throwable t2) {
                log.error("Exception raised while reporting message consumption, clientId={}, messageId={}, commandId={}", PushConsumerImpl.this.id, messageId, commandId, t2);
            }
        }, MoreExecutors.directExecutor());
    }

    public ListenableFuture<Status> verifyMessageConsumption0(VerifyMessageConsumptionCommand command) {
        Message message = command.getMessage();
        MessageImpl messageImpl = MessageImplAccessor.wrapMessageImpl(message);
        if (messageImpl.isCorrupted()) {
            log.error("Message is corrupted, ignore it for consumption verification, messageId={}, clientId={}", (Object)messageImpl.getSystemAttribute().getMessageId(), (Object)this.id);
            SettableFuture<Status> future0 = SettableFuture.create();
            future0.set(Status.newBuilder().setCode(Code.INVALID_ARGUMENT.getNumber()).build());
            return future0;
        }
        MessageExt messageExt = new MessageExt(messageImpl);
        ListenableFuture<ConsumeStatus> future = this.consumeService.consume(messageExt);
        return Futures.transform(future, consumeStatus -> {
            Code code = Code.UNKNOWN;
            switch (consumeStatus) {
                case OK: {
                    code = Code.OK;
                    break;
                }
                case ERROR: {
                    code = Code.INTERNAL;
                    break;
                }
            }
            return Status.newBuilder().setCode(code.getNumber()).build();
        }, MoreExecutors.directExecutor());
    }

    private AckMessageRequest wrapAckMessageRequest(MessageExt messageExt) {
        Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(messageExt.getTopic()).build();
        return AckMessageRequest.newBuilder().setGroup(this.getPbGroup()).setTopic(topicResource).setMessageId(messageExt.getMsgId()).setClientId(this.id).setReceiptHandle(messageExt.getReceiptHandle()).build();
    }

    public ListenableFuture<AckMessageResponse> ackMessage(MessageExt messageExt) {
        return this.ackMessage(messageExt, 1);
    }

    public ListenableFuture<AckMessageResponse> ackMessage(final MessageExt messageExt, int attempt) {
        SettableFuture<AckMessageResponse> future;
        final MessageInterceptorContext preContext = MessageInterceptorContext.builder().setTopic(messageExt.getTopic()).setAttempt(attempt).build();
        this.intercept(MessageHookPoint.PRE_ACK_MESSAGE, messageExt, preContext);
        final Stopwatch stopwatch = Stopwatch.createStarted();
        Endpoints endpoints = messageExt.getEndpoints();
        try {
            AckMessageRequest request = this.wrapAckMessageRequest(messageExt);
            Metadata metadata = this.sign();
            future = this.clientManager.ackMessage(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t2) {
            SettableFuture future0 = SettableFuture.create();
            future0.setException(t2);
            future = future0;
        }
        Futures.addCallback(future, new FutureCallback<AckMessageResponse>(){

            @Override
            public void onSuccess(AckMessageResponse response) {
                Code code = Code.forNumber(response.getCommon().getStatus().getCode());
                MessageHookPointStatus hookPointStatus = Code.OK.equals(code) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR;
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext postContext = preContext.toBuilder().setStatus(hookPointStatus).setDuration(duration).build();
                PushConsumerImpl.this.intercept(MessageHookPoint.POST_ACK_MESSAGE, messageExt, postContext);
            }

            @Override
            public void onFailure(Throwable t2) {
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext postContext = preContext.toBuilder().setStatus(MessageHookPointStatus.ERROR).setThrowable(t2).setDuration(duration).build();
                PushConsumerImpl.this.intercept(MessageHookPoint.POST_ACK_MESSAGE, messageExt, postContext);
            }
        }, MoreExecutors.directExecutor());
        return future;
    }

    private NackMessageRequest wrapNackMessageRequest(MessageExt messageExt) {
        Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(messageExt.getTopic()).build();
        return NackMessageRequest.newBuilder().setGroup(this.getPbGroup()).setTopic(topicResource).setClientId(this.id).setReceiptHandle(messageExt.getReceiptHandle()).setMessageId(messageExt.getMsgId()).setDeliveryAttempt(messageExt.getDeliveryAttempt()).setMaxDeliveryAttempts(this.maxDeliveryAttempts).build();
    }

    public ListenableFuture<NackMessageResponse> nackMessage(final MessageExt messageExt) {
        SettableFuture<NackMessageResponse> future;
        final MessageInterceptorContext preContext = MessageInterceptorContext.builder().setTopic(messageExt.getTopic()).build();
        this.intercept(MessageHookPoint.PRE_NACK_MESSAGE, messageExt, preContext);
        final Stopwatch stopwatch = Stopwatch.createStarted();
        final String messageId = messageExt.getMsgId();
        final Endpoints endpoints = messageExt.getEndpoints();
        try {
            NackMessageRequest request = this.wrapNackMessageRequest(messageExt);
            Metadata metadata = this.sign();
            future = this.clientManager.nackMessage(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t2) {
            SettableFuture future0 = SettableFuture.create();
            future0.setException(t2);
            future = future0;
        }
        Futures.addCallback(future, new FutureCallback<NackMessageResponse>(){

            @Override
            public void onSuccess(NackMessageResponse response) {
                Status status = response.getCommon().getStatus();
                Code code = Code.forNumber(status.getCode());
                MessageHookPointStatus hookPointStatus = Code.OK.equals(code) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR;
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext postContext = preContext.toBuilder().setStatus(hookPointStatus).setDuration(duration).build();
                PushConsumerImpl.this.intercept(MessageHookPoint.POST_NACK_MESSAGE, messageExt, postContext);
                if (Code.OK.equals(code)) {
                    return;
                }
                log.error("Failed to nack, messageId={}, endpoints={}, code={}, status message=[{}], clientId={}", messageId, endpoints, code, status.getMessage(), PushConsumerImpl.this.id);
            }

            @Override
            public void onFailure(Throwable t2) {
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext postContext = preContext.toBuilder().setStatus(MessageHookPointStatus.ERROR).setDuration(duration).build();
                PushConsumerImpl.this.intercept(MessageHookPoint.POST_NACK_MESSAGE, messageExt, postContext);
                log.error("Exception raised while nack, messageId={}, endpoints={}, clientId={}", messageId, endpoints, PushConsumerImpl.this.id, t2);
            }
        }, MoreExecutors.directExecutor());
        return future;
    }

    private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest(MessageExt messageExt) {
        Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(messageExt.getTopic()).build();
        return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(this.getPbGroup()).setTopic(topicResource).setClientId(this.id).setReceiptHandle(messageExt.getReceiptHandle()).setMessageId(messageExt.getMsgId()).setDeliveryAttempt(messageExt.getDeliveryAttempt()).setMaxDeliveryAttempts(this.maxDeliveryAttempts).build();
    }

    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(final MessageExt messageExt, int attempt) {
        SettableFuture<ForwardMessageToDeadLetterQueueResponse> future;
        final MessageInterceptorContext preContext = MessageInterceptorContext.builder().setTopic(messageExt.getTopic()).setAttempt(attempt).build();
        this.intercept(MessageHookPoint.PRE_FORWARD_MESSAGE_TO_DLQ, messageExt, preContext);
        final Stopwatch stopwatch = Stopwatch.createStarted();
        Endpoints endpoints = messageExt.getEndpoints();
        try {
            ForwardMessageToDeadLetterQueueRequest request = this.wrapForwardMessageToDeadLetterQueueRequest(messageExt);
            Metadata metadata = this.sign();
            future = this.clientManager.forwardMessageToDeadLetterQueue(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t2) {
            SettableFuture future0 = SettableFuture.create();
            future0.setException(t2);
            future = future0;
        }
        Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>(){

            @Override
            public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
                Code code = Code.forNumber(response.getCommon().getStatus().getCode());
                MessageHookPointStatus hookPointStatus = Code.OK.equals(code) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR;
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext postContext = preContext.toBuilder().setStatus(hookPointStatus).setDuration(duration).build();
                PushConsumerImpl.this.intercept(MessageHookPoint.POST_FORWARD_MESSAGE_TO_DLQ, messageExt, postContext);
            }

            @Override
            public void onFailure(Throwable t2) {
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext postContext = preContext.toBuilder().setStatus(MessageHookPointStatus.ERROR).setThrowable(t2).setDuration(duration).build();
                PushConsumerImpl.this.intercept(MessageHookPoint.POST_FORWARD_MESSAGE_TO_DLQ, messageExt, postContext);
            }
        }, MoreExecutors.directExecutor());
        return future;
    }

    @Override
    public PollCommandRequest wrapPollCommandRequest() {
        PollCommandRequest.Builder builder = PollCommandRequest.newBuilder().setClientId(this.id).setProducerGroup(this.getPbGroup());
        for (String topic : this.filterExpressionTable.keySet()) {
            Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(topic).build();
            builder.addTopics(topicResource);
        }
        return builder.build();
    }

    public void setConsumptionThreadsAmount(int threadsAmount) {
        Preconditions.checkArgument(threadsAmount > 0, "Must be positive");
        if (threadsAmount >= this.consumptionThreadsAmount) {
            this.consumptionExecutor.setMaximumPoolSize(threadsAmount);
            this.consumptionExecutor.setCorePoolSize(threadsAmount);
        } else {
            this.consumptionExecutor.setCorePoolSize(threadsAmount);
            this.consumptionExecutor.setMaximumPoolSize(threadsAmount);
        }
        this.consumptionThreadsAmount = threadsAmount;
    }

    public AtomicLong getReceptionTimes() {
        return this.receptionTimes;
    }

    public AtomicLong getReceivedMessagesQuantity() {
        return this.receivedMessagesQuantity;
    }

    public AtomicLong getPullTimes() {
        return this.pullTimes;
    }

    public AtomicLong getPulledMessagesQuantity() {
        return this.pulledMessagesQuantity;
    }

    public AtomicLong getConsumptionOkQuantity() {
        return this.consumptionOkQuantity;
    }

    public AtomicLong getConsumptionErrorQuantity() {
        return this.consumptionErrorQuantity;
    }

    public int getMaxTotalCachedMessagesQuantityThreshold() {
        return this.maxTotalCachedMessagesQuantityThreshold;
    }

    public int getMaxCachedMessagesQuantityThresholdPerQueue() {
        return this.maxCachedMessagesQuantityThresholdPerQueue;
    }

    public int getMaxTotalCachedMessagesBytesThreshold() {
        return this.maxTotalCachedMessagesBytesThreshold;
    }

    public int getMaxCachedMessagesBytesThresholdPerQueue() {
        return this.maxCachedMessagesBytesThresholdPerQueue;
    }

    public long getFifoConsumptionSuspendTimeMillis() {
        return this.fifoConsumptionSuspendTimeMillis;
    }

    public int getConsumeMessageBatchMaxSize() {
        return this.consumeMessageBatchMaxSize;
    }

    public int getConsumptionThreadsAmount() {
        return this.consumptionThreadsAmount;
    }

    public int getMaxDeliveryAttempts() {
        return this.maxDeliveryAttempts;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public ConsumeFromWhere getConsumeFromWhere() {
        return this.consumeFromWhere;
    }

    public long getConsumeFromTimeMillis() {
        return this.consumeFromTimeMillis;
    }

    public long getConsumptionTimeoutMillis() {
        return this.consumptionTimeoutMillis;
    }

    public long getMaxAwaitTimeMillisPerQueue() {
        return this.maxAwaitTimeMillisPerQueue;
    }

    public int getMaxAwaitBatchSizePerQueue() {
        return this.maxAwaitBatchSizePerQueue;
    }

    public OffsetStore getOffsetStore() {
        return this.offsetStore;
    }

    public MessageListener getMessageListener() {
        return this.messageListener;
    }

    public ConsumeService getConsumeService() {
        return this.consumeService;
    }

    public ThreadPoolExecutor getConsumptionExecutor() {
        return this.consumptionExecutor;
    }

    public void setMaxTotalCachedMessagesQuantityThreshold(int maxTotalCachedMessagesQuantityThreshold) {
        Preconditions.checkArgument(maxTotalCachedMessagesQuantityThreshold > 0, "Must be positive");
        this.maxTotalCachedMessagesQuantityThreshold = maxTotalCachedMessagesQuantityThreshold;
    }

    public void setMaxCachedMessagesQuantityThresholdPerQueue(int maxCachedMessagesQuantityThresholdPerQueue) {
        Preconditions.checkArgument(maxCachedMessagesQuantityThresholdPerQueue > 0, "Must be positive");
        this.maxCachedMessagesQuantityThresholdPerQueue = maxCachedMessagesQuantityThresholdPerQueue;
    }

    public void setMaxTotalCachedMessagesBytesThreshold(int maxTotalCachedMessagesBytesThreshold) {
        Preconditions.checkArgument(maxTotalCachedMessagesBytesThreshold > 0, "Must be positive");
        this.maxTotalCachedMessagesBytesThreshold = maxTotalCachedMessagesBytesThreshold;
    }

    public void setMaxCachedMessagesBytesThresholdPerQueue(int maxCachedMessagesBytesThresholdPerQueue) {
        Preconditions.checkArgument(maxCachedMessagesBytesThresholdPerQueue > 0, "Must be positive");
        this.maxCachedMessagesBytesThresholdPerQueue = maxCachedMessagesBytesThresholdPerQueue;
    }

    public void setFifoConsumptionSuspendTimeMillis(long fifoConsumptionSuspendTimeMillis) {
        Preconditions.checkArgument(fifoConsumptionSuspendTimeMillis > 0L, "Must be positive");
        this.fifoConsumptionSuspendTimeMillis = fifoConsumptionSuspendTimeMillis;
    }

    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
        Preconditions.checkArgument(consumeMessageBatchMaxSize > 0, "Must be positive");
        this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
    }

    public void setMaxDeliveryAttempts(int maxDeliveryAttempts) {
        Preconditions.checkArgument(maxDeliveryAttempts > 0, "Must be positive");
        this.maxDeliveryAttempts = maxDeliveryAttempts;
    }

    public void setMessageModel(MessageModel messageModel) {
        this.messageModel = Preconditions.checkNotNull(messageModel, "messageModel");
    }

    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
        this.consumeFromWhere = Preconditions.checkNotNull(consumeFromWhere, "consumeFromWhere");
    }

    public void setConsumeFromTimeMillis(long consumeFromTimeMillis) {
        Preconditions.checkArgument(consumeFromTimeMillis > 0L, "Must be positive");
        this.consumeFromTimeMillis = consumeFromTimeMillis;
    }

    public void setConsumptionTimeoutMillis(long consumptionTimeoutMillis) {
        Preconditions.checkArgument(consumptionTimeoutMillis > 0L, "Must be positive");
        this.consumptionTimeoutMillis = consumptionTimeoutMillis;
    }

    public void setMaxAwaitTimeMillisPerQueue(long maxAwaitTimeMillisPerQueue) {
        this.maxAwaitTimeMillisPerQueue = maxAwaitTimeMillisPerQueue;
    }

    public void setMaxAwaitBatchSizePerQueue(int maxAwaitBatchSizePerQueue) {
        this.maxAwaitBatchSizePerQueue = maxAwaitBatchSizePerQueue;
    }
}

