/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer;

import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ConsumeStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListener;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ConsumeService;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageQueue;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class ConsumeConcurrentlyService
extends ConsumeService {
    private static final Logger log = LoggerFactory.getLogger(ConsumeConcurrentlyService.class);
    private final int batchMaxSize;

    public ConsumeConcurrentlyService(MessageListener messageListener, MessageInterceptor interceptor, ThreadPoolExecutor consumptionExecutor, ScheduledExecutorService scheduler, ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable, int batchMaxSize) {
        super(messageListener, interceptor, consumptionExecutor, scheduler, processQueueTable);
        this.batchMaxSize = batchMaxSize;
    }

    @Override
    public boolean dispatch0() {
        ArrayList processQueues = new ArrayList(this.processQueueTable.values());
        Collections.shuffle(processQueues);
        int accumulativeSize = 0;
        final HashMap<MessageQueue, List<MessageExt>> messageExtListTable = new HashMap<MessageQueue, List<MessageExt>>();
        for (Object pq : processQueues) {
            List<MessageExt> messageExtList = pq.tryTakeMessages(this.batchMaxSize - accumulativeSize);
            if (messageExtList.isEmpty()) continue;
            MessageQueue mq = pq.getMessageQueue();
            messageExtListTable.put(mq, messageExtList);
            if ((accumulativeSize += messageExtList.size()) < this.batchMaxSize) continue;
            break;
        }
        ArrayList<MessageExt> messageExtList = new ArrayList<MessageExt>();
        for (List list : messageExtListTable.values()) {
            messageExtList.addAll(list);
        }
        if (messageExtList.isEmpty()) {
            return false;
        }
        ListenableFuture<ConsumeStatus> future = this.consume(messageExtList);
        Futures.addCallback(future, new FutureCallback<ConsumeStatus>(){

            @Override
            public void onSuccess(ConsumeStatus status) {
                for (Map.Entry entry : messageExtListTable.entrySet()) {
                    MessageQueue mq = (MessageQueue)entry.getKey();
                    List messageExtList = (List)entry.getValue();
                    ProcessQueue pq = (ProcessQueue)ConsumeConcurrentlyService.this.processQueueTable.get(mq);
                    if (null == pq) continue;
                    pq.eraseMessages(messageExtList, status);
                }
            }

            @Override
            public void onFailure(Throwable t2) {
                log.error("[Bug] Exception raised in consumption callback.", t2);
            }
        }, MoreExecutors.directExecutor());
        return true;
    }
}

