package com.taobao.metamorphosis.client.consumer;

import com.taobao.gecko.core.util.ConcurrentHashSet;
import com.taobao.gecko.core.util.OpaqueGenerator;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.consumer.ConsumerZooKeeper;
import com.taobao.metamorphosis.client.consumer.SimpleFetchManager;
import com.taobao.metamorphosis.client.consumer.storage.OffsetStorage;
import com.taobao.metamorphosis.client.producer.ProducerZooKeeper;
import com.taobao.metamorphosis.cluster.Broker;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.consumer.ConsumerMessageFilter;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.exception.MetaOpeartionTimeoutException;
import com.taobao.metamorphosis.network.BooleanCommand;
import com.taobao.metamorphosis.network.DataCommand;
import com.taobao.metamorphosis.network.GetCommand;
import com.taobao.metamorphosis.network.OffsetCommand;
import com.taobao.metamorphosis.utils.MetaStatLog;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/consumer/SimpleMessageConsumer.class */
public class SimpleMessageConsumer implements MessageConsumer, InnerConsumer {
    private static final int DEFAULT_OP_TIMEOUT = 10000;
    static final Log log = LogFactory.getLog(SimpleFetchManager.FetchRequestRunner.class);
    private final RemotingClientWrapper remotingClient;
    private final ConsumerConfig consumerConfig;
    private final ConsumerZooKeeper consumerZooKeeper;
    private final MetaMessageSessionFactory messageSessionFactory;
    private final OffsetStorage offsetStorage;
    private final LoadBalanceStrategy loadBalanceStrategy;
    private final ProducerZooKeeper producerZooKeeper;
    private final SubscribeInfoManager subscribeInfoManager;
    private final RecoverManager recoverStorageManager;
    private FetchManager fetchManager;
    private RejectConsumptionHandler rejectConsumptionHandler;
    private final ConcurrentHashMap<String, SubscriberInfo> topicSubcriberRegistry = new ConcurrentHashMap<>();
    private final ConcurrentHashSet<String> publishedTopics = new ConcurrentHashSet<>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    /* loaded from: input_file:com/taobao/metamorphosis/client/consumer/SimpleMessageConsumer$DropPolicy.class */
    public static class DropPolicy implements RejectConsumptionHandler {
        @Override // com.taobao.metamorphosis.client.consumer.RejectConsumptionHandler
        public void rejectConsumption(Message message, MessageConsumer messageConsumer) {
        }
    }

    /* loaded from: input_file:com/taobao/metamorphosis/client/consumer/SimpleMessageConsumer$LocalRecoverPolicy.class */
    public static class LocalRecoverPolicy implements RejectConsumptionHandler {
        private final RecoverManager recoverManager;
        static final Log log = LogFactory.getLog(LocalRecoverPolicy.class);

        public LocalRecoverPolicy(RecoverManager recoverManager) {
            this.recoverManager = recoverManager;
        }

        @Override // com.taobao.metamorphosis.client.consumer.RejectConsumptionHandler
        public void rejectConsumption(Message message, MessageConsumer messageConsumer) {
            try {
                this.recoverManager.append(messageConsumer.getConsumerConfig().getGroup(), message);
            } catch (IOException e) {
                log.error("Append message to local recover manager failed", e);
            }
        }
    }

    public SimpleMessageConsumer(MetaMessageSessionFactory metaMessageSessionFactory, RemotingClientWrapper remotingClientWrapper, ConsumerConfig consumerConfig, ConsumerZooKeeper consumerZooKeeper, ProducerZooKeeper producerZooKeeper, SubscribeInfoManager subscribeInfoManager, RecoverManager recoverManager, OffsetStorage offsetStorage, LoadBalanceStrategy loadBalanceStrategy) {
        this.messageSessionFactory = metaMessageSessionFactory;
        this.remotingClient = remotingClientWrapper;
        this.consumerConfig = consumerConfig;
        this.producerZooKeeper = producerZooKeeper;
        this.consumerZooKeeper = consumerZooKeeper;
        this.offsetStorage = offsetStorage;
        this.subscribeInfoManager = subscribeInfoManager;
        this.recoverStorageManager = recoverManager;
        this.fetchManager = new SimpleFetchManager(consumerConfig, this);
        this.loadBalanceStrategy = loadBalanceStrategy;
        this.rejectConsumptionHandler = new LocalRecoverPolicy(this.recoverStorageManager);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.taobao.metamorphosis.client.consumer.SimpleMessageConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                SimpleMessageConsumer.this.consumerZooKeeper.commitOffsets(SimpleMessageConsumer.this.fetchManager);
            }
        }, consumerConfig.getCommitOffsetPeriodInMills(), consumerConfig.getCommitOffsetPeriodInMills(), TimeUnit.MILLISECONDS);
    }

    public MetaMessageSessionFactory getParent() {
        return this.messageSessionFactory;
    }

    public FetchManager getFetchManager() {
        return this.fetchManager;
    }

    void setFetchManager(FetchManager fetchManager) {
        this.fetchManager = fetchManager;
    }

    ConcurrentHashMap<String, SubscriberInfo> getTopicSubcriberRegistry() {
        return this.topicSubcriberRegistry;
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public OffsetStorage getOffsetStorage() {
        return this.offsetStorage;
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer, com.taobao.metamorphosis.client.Shutdownable
    public synchronized void shutdown() throws MetaClientException {
        if (this.fetchManager.isShutdown()) {
            return;
        }
        try {
            try {
                this.fetchManager.stopFetchRunner();
                this.consumerZooKeeper.unRegisterConsumer(this.fetchManager);
                Iterator it = this.publishedTopics.iterator();
                while (it.hasNext()) {
                    this.producerZooKeeper.unPublishTopic((String) it.next(), this);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.scheduledExecutorService.shutdownNow();
                this.offsetStorage.close();
                this.subscribeInfoManager.removeGroup(this.consumerConfig.getGroup());
                this.messageSessionFactory.removeChild(this);
            }
        } finally {
            this.scheduledExecutorService.shutdownNow();
            this.offsetStorage.close();
            this.subscribeInfoManager.removeGroup(this.consumerConfig.getGroup());
            this.messageSessionFactory.removeChild(this);
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public MessageConsumer subscribe(String str, int i, MessageListener messageListener) throws MetaClientException {
        return subscribe(str, i, messageListener, null);
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public MessageConsumer subscribe(String str, int i, MessageListener messageListener, ConsumerMessageFilter consumerMessageFilter) throws MetaClientException {
        checkState();
        if (StringUtils.isBlank(str)) {
            throw new IllegalArgumentException("Blank topic");
        }
        if (messageListener == null) {
            throw new IllegalArgumentException("Null messageListener");
        }
        this.subscribeInfoManager.subscribe(str, this.consumerConfig.getGroup(), i, messageListener, consumerMessageFilter);
        if (this.topicSubcriberRegistry.get(str) != null) {
            throw new MetaClientException("Topic=" + str + " has been subscribered");
        }
        if (this.topicSubcriberRegistry.putIfAbsent(str, new SubscriberInfo(messageListener, consumerMessageFilter, i)) != null) {
            throw new MetaClientException("Topic=" + str + " has been subscribered");
        }
        return this;
    }

    @Override // com.taobao.metamorphosis.client.consumer.InnerConsumer
    public void appendCouldNotProcessMessage(Message message) throws IOException {
        if (log.isInfoEnabled()) {
            log.info("Message could not process,save to local.MessageId=" + message.getId() + ",Topic=" + message.getTopic() + ",Partition=" + message.getPartition());
        }
        if (this.rejectConsumptionHandler != null) {
            this.rejectConsumptionHandler.rejectConsumption(message, this);
        }
    }

    private void checkState() {
        if (this.fetchManager.isShutdown()) {
            throw new IllegalStateException("Consumer has been shutdown");
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public void completeSubscribe() throws MetaClientException {
        checkState();
        try {
            this.consumerZooKeeper.registerConsumer(this.consumerConfig, this.fetchManager, this.topicSubcriberRegistry, this.offsetStorage, this.loadBalanceStrategy);
        } catch (Exception e) {
            throw new MetaClientException("注册订阅者失败", e);
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.InnerConsumer
    public MessageListener getMessageListener(String str) {
        SubscriberInfo subscriberInfo = this.topicSubcriberRegistry.get(str);
        if (subscriberInfo == null) {
            return null;
        }
        return subscriberInfo.getMessageListener();
    }

    @Override // com.taobao.metamorphosis.client.consumer.InnerConsumer
    public ConsumerMessageFilter getMessageFilter(String str) {
        SubscriberInfo subscriberInfo = this.topicSubcriberRegistry.get(str);
        if (subscriberInfo == null) {
            return null;
        }
        return subscriberInfo.getConsumerMessageFilter();
    }

    @Override // com.taobao.metamorphosis.client.consumer.InnerConsumer
    public long offset(FetchRequest fetchRequest) throws MetaClientException {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                try {
                    BooleanCommand invokeToGroup = this.remotingClient.invokeToGroup(fetchRequest.getBroker().getZKString(), new OffsetCommand(fetchRequest.getTopic(), this.consumerConfig.getGroup(), fetchRequest.getPartition(), fetchRequest.getOffset(), Integer.valueOf(OpaqueGenerator.getNextOpaque())), this.consumerConfig.getFetchTimeoutInMills(), TimeUnit.MILLISECONDS);
                    switch (invokeToGroup.getCode()) {
                        case 200:
                            long parseLong = Long.parseLong(invokeToGroup.getErrorMsg());
                            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis2 > 200) {
                                MetaStatLog.addStatValue2((String) null, "cli_offset_timeout", fetchRequest.getTopic(), currentTimeMillis2);
                            }
                            if (1 == 0) {
                                MetaStatLog.addStat((String) null, "cli_offset_failed", fetchRequest.getTopic());
                            }
                            return parseLong;
                        default:
                            throw new MetaClientException(invokeToGroup.getErrorMsg());
                    }
                } catch (MetaClientException e) {
                    throw e;
                }
            } catch (TimeoutException e2) {
                throw new MetaOpeartionTimeoutException("Send message timeout in " + this.consumerConfig.getFetchTimeoutInMills() + " mills");
            } catch (Exception e3) {
                throw new MetaClientException("get offset failed,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition() + ",current offset=" + fetchRequest.getOffset(), e3);
            }
        } catch (Throwable th) {
            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis3 > 200) {
                MetaStatLog.addStatValue2((String) null, "cli_offset_timeout", fetchRequest.getTopic(), currentTimeMillis3);
            }
            if (0 == 0) {
                MetaStatLog.addStat((String) null, "cli_offset_failed", fetchRequest.getTopic());
            }
            throw th;
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.InnerConsumer
    public com.taobao.metamorphosis.consumer.MessageIterator fetch(FetchRequest fetchRequest, long j, TimeUnit timeUnit) throws MetaClientException, InterruptedException {
        if (j <= 0 || timeUnit == null) {
            j = this.consumerConfig.getFetchTimeoutInMills();
            timeUnit = TimeUnit.MILLISECONDS;
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                try {
                    long offset = fetchRequest.getOffset();
                    String zKString = fetchRequest.getBroker().getZKString();
                    if (!this.remotingClient.isConnected(zKString)) {
                        ConsumerZooKeeper.ZKLoadRebalanceListener brokerConnectionListener = this.consumerZooKeeper.getBrokerConnectionListener(this.fetchManager);
                        if (brokerConnectionListener.oldBrokerSet.contains(fetchRequest.getBroker())) {
                            this.remotingClient.connectWithRef(zKString, brokerConnectionListener);
                        }
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis2 > 200) {
                            MetaStatLog.addStatValue2((String) null, "cli_get_timeout", fetchRequest.getTopic(), currentTimeMillis2);
                        }
                        if (0 == 0) {
                            MetaStatLog.addStat((String) null, "cli_get_failed", fetchRequest.getTopic());
                        }
                        return null;
                    }
                    DataCommand invokeToGroup = this.remotingClient.invokeToGroup(zKString, new GetCommand(fetchRequest.getTopic(), this.consumerConfig.getGroup(), fetchRequest.getPartition(), offset, fetchRequest.getMaxSize(), Integer.valueOf(OpaqueGenerator.getNextOpaque())), j, timeUnit);
                    if (invokeToGroup instanceof DataCommand) {
                        byte[] data = invokeToGroup.getData();
                        if (data.length < fetchRequest.getMaxSize() / 2) {
                            fetchRequest.decreaseMaxSize();
                        }
                        com.taobao.metamorphosis.consumer.MessageIterator messageIterator = new com.taobao.metamorphosis.consumer.MessageIterator(fetchRequest.getTopic(), data);
                        long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis3 > 200) {
                            MetaStatLog.addStatValue2((String) null, "cli_get_timeout", fetchRequest.getTopic(), currentTimeMillis3);
                        }
                        if (1 == 0) {
                            MetaStatLog.addStat((String) null, "cli_get_failed", fetchRequest.getTopic());
                        }
                        return messageIterator;
                    }
                    BooleanCommand booleanCommand = (BooleanCommand) invokeToGroup;
                    switch (booleanCommand.getCode()) {
                        case 301:
                            fetchRequest.resetRetries();
                            fetchRequest.setOffset(Long.parseLong(booleanCommand.getErrorMsg()), -1L, true);
                            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis4 > 200) {
                                MetaStatLog.addStatValue2((String) null, "cli_get_timeout", fetchRequest.getTopic(), currentTimeMillis4);
                            }
                            if (1 == 0) {
                                MetaStatLog.addStat((String) null, "cli_get_failed", fetchRequest.getTopic());
                            }
                            return null;
                        case 403:
                            long currentTimeMillis5 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis5 > 200) {
                                MetaStatLog.addStatValue2((String) null, "cli_get_timeout", fetchRequest.getTopic(), currentTimeMillis5);
                            }
                            if (1 == 0) {
                                MetaStatLog.addStat((String) null, "cli_get_failed", fetchRequest.getTopic());
                            }
                            return null;
                        case 404:
                            long currentTimeMillis6 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis6 > 200) {
                                MetaStatLog.addStatValue2((String) null, "cli_get_timeout", fetchRequest.getTopic(), currentTimeMillis6);
                            }
                            if (1 == 0) {
                                MetaStatLog.addStat((String) null, "cli_get_failed", fetchRequest.getTopic());
                            }
                            return null;
                        default:
                            throw new MetaClientException("Status:" + booleanCommand.getCode() + ",Error message:" + ((BooleanCommand) invokeToGroup).getErrorMsg());
                    }
                } catch (MetaClientException e) {
                    throw e;
                } catch (InterruptedException e2) {
                    throw e2;
                }
            } catch (TimeoutException e3) {
                throw new MetaOpeartionTimeoutException("Send message timeout in " + this.consumerConfig.getFetchTimeoutInMills() + " mills");
            } catch (Exception e4) {
                throw new MetaClientException("get message failed,topic=" + fetchRequest.getTopic() + ",partition=" + fetchRequest.getPartition() + ",offset=" + fetchRequest.getOffset(), e4);
            }
        } catch (Throwable th) {
            long currentTimeMillis7 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis7 > 200) {
                MetaStatLog.addStatValue2((String) null, "cli_get_timeout", fetchRequest.getTopic(), currentTimeMillis7);
            }
            if (0 == 0) {
                MetaStatLog.addStat((String) null, "cli_get_failed", fetchRequest.getTopic());
            }
            throw th;
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public void setSubscriptions(Collection<Subscription> collection) throws MetaClientException {
        if (collection == null) {
            return;
        }
        for (Subscription subscription : collection) {
            subscribe(subscription.getTopic(), subscription.getMaxSize(), subscription.getMessageListener());
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public com.taobao.metamorphosis.consumer.MessageIterator get(String str, Partition partition, long j, int i, long j2, TimeUnit timeUnit) throws MetaClientException, InterruptedException {
        if (!this.publishedTopics.contains(str)) {
            this.producerZooKeeper.publishTopic(str, this);
            this.publishedTopics.add(str);
        }
        return fetch(new FetchRequest(new Broker(partition.getBrokerId(), this.producerZooKeeper.selectBroker(str, partition)), 0L, new TopicPartitionRegInfo(str, partition, j), i), j2, timeUnit);
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public RejectConsumptionHandler getRejectConsumptionHandler() {
        return this.rejectConsumptionHandler;
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public void setRejectConsumptionHandler(RejectConsumptionHandler rejectConsumptionHandler) {
        if (rejectConsumptionHandler == null) {
            throw new NullPointerException("Null rejectConsumptionHandler");
        }
        this.rejectConsumptionHandler = rejectConsumptionHandler;
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer, com.taobao.metamorphosis.client.consumer.InnerConsumer
    public ConsumerConfig getConsumerConfig() {
        return this.consumerConfig;
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageConsumer
    public com.taobao.metamorphosis.consumer.MessageIterator get(String str, Partition partition, long j, int i) throws MetaClientException, InterruptedException {
        return get(str, partition, j, i, 10000L, TimeUnit.MILLISECONDS);
    }
}
