/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.client.rocketmq;

import com.aliyun.openservices.ons.api.ExpressionType;
import com.aliyun.openservices.ons.api.MessageSelector;
import com.aliyun.openservices.ons.api.TopicPartition;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.client.ClientAbstract;
import com.aliyun.openservices.ons.client.utils.UtilAll;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Optional;
import com.aliyun.openservices.ons.shaded.commons.lang3.StringUtils;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.MessageModel;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ClientException;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.OffsetStore;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageQueue;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.Properties;

public class PushConsumer
extends ClientAbstract {
    private static final Logger log = LoggerFactory.getLogger(PushConsumer.class);
    private static final int MAX_CACHED_MESSAGES_QUANTITY = 50000;
    private static final int MIN_CACHED_MESSAGES_QUANTITY = 100;
    private static final int DEFAULT_CACHED_MESSAGES_QUANTITY = 5000;
    private static final int MIN_CACHED_MESSAGE_MEMORY_IN_MIB = 16;
    private static final int MAX_CACHED_MESSAGE_MEMORY_IN_MIB = 2048;
    private static final int DEFAULT_CACHED_MESSAGE_MEMORY_IN_MIB = 512;
    private static final int DEFAULT_CONSUMPTION_THREADS_AMOUNT = 20;
    private static final int CONSUMPTION_THREADS_MAX_AMOUNT = 1000;
    private static final long DEFAULT_CONSUMPTION_TIMEOUT_MILLIS = 900000L;
    protected final DefaultMQPushConsumer defaultMQPushConsumer;

    public PushConsumer(Properties properties) {
        super(properties);
        String groupId = properties.getProperty("GROUP_ID");
        if (StringUtils.isBlank(groupId)) {
            throw new ONSClientException("Group id is blank, please set it.");
        }
        try {
            this.defaultMQPushConsumer = new DefaultMQPushConsumer(groupId);
        }
        catch (ClientException e) {
            throw new ONSClientException(e);
        }
        this.defaultMQPushConsumer.setCredentialsProvider(this.provider);
        String maxReconsumeTimes = properties.getProperty("maxReconsumeTimes");
        if (StringUtils.isNoneBlank(maxReconsumeTimes)) {
            int maxDeliveryAttempts;
            try {
                maxDeliveryAttempts = 1 + Integer.parseInt(maxReconsumeTimes);
            }
            catch (NumberFormatException e) {
                throw new ONSClientException("Illegal format of maxReconsumeTimes");
            }
            this.defaultMQPushConsumer.setMaxDeliveryAttempts(maxDeliveryAttempts);
        }
        try {
            this.defaultMQPushConsumer.setNamesrvAddr(this.nameServerAddr);
        }
        catch (Throwable t2) {
            throw new ONSClientException(t2);
        }
        if (null != this.namespace) {
            this.defaultMQPushConsumer.setNamespace(this.namespace);
        }
        this.defaultMQPushConsumer.setMessageTracingEnabled(this.messageTracingEnabled);
        String messageModel = properties.getProperty("MessageModel", "CLUSTERING");
        this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));
        String maxCachedMessagesQuantityProp = properties.getProperty("maxCachedMessageAmount");
        int maxCachedMessagesQuantity = 5000;
        if (StringUtils.isNoneBlank(maxCachedMessagesQuantityProp)) {
            maxCachedMessagesQuantity = Integer.parseInt(maxCachedMessagesQuantityProp);
            maxCachedMessagesQuantity = Math.max(100, maxCachedMessagesQuantity);
            maxCachedMessagesQuantity = Math.min(50000, maxCachedMessagesQuantity);
        }
        this.defaultMQPushConsumer.setMaxTotalCachedMessagesQuantityThreshold(maxCachedMessagesQuantity);
        String maxCachedMessageSizeInMibProp = properties.getProperty("maxCachedMessageSizeInMiB");
        int maxCachedMessageSizeInMib = 512;
        if (StringUtils.isNoneBlank(maxCachedMessageSizeInMibProp)) {
            maxCachedMessageSizeInMib = Integer.parseInt(maxCachedMessageSizeInMibProp);
            maxCachedMessageSizeInMib = Math.max(16, maxCachedMessageSizeInMib);
            maxCachedMessageSizeInMib = Math.min(2048, maxCachedMessageSizeInMib);
        }
        this.defaultMQPushConsumer.setMaxTotalCachedMessagesBytesThreshold(0x100000 * maxCachedMessageSizeInMib);
        long consumptionTimeoutMillis = 900000L;
        String consumptionTimeoutMinutesProp = properties.getProperty("consumeTimeout");
        if (StringUtils.isNoneBlank(consumptionTimeoutMinutesProp)) {
            consumptionTimeoutMillis = Long.parseLong(consumptionTimeoutMinutesProp) * 60L * 1000L;
        }
        this.defaultMQPushConsumer.setConsumptionTimeoutMillis(consumptionTimeoutMillis);
        int consumptionThreadsAmount = 20;
        String consumptionThreadsAmountProp = properties.getProperty("ConsumeThreadNums");
        if (StringUtils.isNoneBlank(consumptionThreadsAmountProp)) {
            consumptionThreadsAmount = Integer.parseInt(consumptionThreadsAmountProp);
        }
        if (consumptionThreadsAmount < 1 || consumptionThreadsAmount > 1000) {
            throw new ONSClientException("Consumption thread amount is out of range [1, 1000]");
        }
        this.defaultMQPushConsumer.setConsumptionThreadsAmount(consumptionThreadsAmount);
    }

    protected void subscribe(String topic, MessageSelector selector) {
        com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.ExpressionType rocketmqType;
        ExpressionType type = selector.getType();
        switch (type) {
            case TAG: {
                rocketmqType = com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.ExpressionType.TAG;
                break;
            }
            default: {
                rocketmqType = com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.ExpressionType.SQL92;
            }
        }
        this.defaultMQPushConsumer.subscribe(topic, selector.getSubExpression(), rocketmqType);
    }

    public void unsubscribe(String topic) {
        this.defaultMQPushConsumer.unsubscribe(topic);
    }

    public void setOffsetStore(final com.aliyun.openservices.ons.api.OffsetStore offsetStore) {
        if (null == offsetStore) {
            throw new ONSClientException("OffsetStore is null, please set it.");
        }
        this.defaultMQPushConsumer.setOffsetStore(new OffsetStore(){

            @Override
            public void start() {
                offsetStore.start();
            }

            @Override
            public void shutdown() {
                offsetStore.shutdown();
            }

            @Override
            public void updateOffset(MessageQueue mq, long offset) {
                TopicPartition partition = UtilAll.convertToPartition(mq);
                offsetStore.updateOffset(partition, offset);
            }

            @Override
            public Optional<Long> readOffset(MessageQueue mq) {
                TopicPartition partition = UtilAll.convertToPartition(mq);
                return offsetStore.readOffset(partition);
            }
        });
    }

    @Override
    public void start() {
        try {
            if (this.started.compareAndSet(false, true)) {
                log.info("Begin to start the ONS consumer.");
                this.defaultMQPushConsumer.start();
                log.info("Start the ONS consumer successfully.");
                return;
            }
            log.warn("ONS consumer has been started before.");
        }
        catch (Exception e) {
            log.error("Failed to start the ONS consumer.");
            throw new ONSClientException(e.getMessage());
        }
    }

    @Override
    public void shutdown() {
        if (this.started.compareAndSet(true, false)) {
            log.info("Begin to shutdown the ONS consumer.");
            try {
                this.defaultMQPushConsumer.shutdown();
            }
            catch (Throwable t2) {
                throw new ONSClientException(t2);
            }
            log.info("Shutdown the ONS consumer successfully.");
            return;
        }
        log.warn("Failed to shutdown the ONS consumer.");
    }
}

