package com.geekhalo.lego.core.msg.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.geekhalo.lego.annotation.msg.consumer.HandleTag;
import com.geekhalo.lego.annotation.msg.consumer.TagBasedDispatcherMessageConsumer;
import com.geekhalo.lego.core.support.consumer.support.AbstractConsumerContainer;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.env.Environment;

/* loaded from: input_file:BOOT-INF/lib/lego-core-0.1.39.jar:com/geekhalo/lego/core/msg/consumer/TagBasedDispatcherConsumerContainer.class */
public class TagBasedDispatcherConsumerContainer extends AbstractConsumerContainer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TagBasedDispatcherConsumerContainer.class);
    private final TagBasedDispatcherMessageConsumer tagBasedDispatcherMessageConsumer;
    private final Map<String, MethodInvoker> tagMethods;

    /* loaded from: input_file:BOOT-INF/lib/lego-core-0.1.39.jar:com/geekhalo/lego/core/msg/consumer/TagBasedDispatcherConsumerContainer$DefaultMessageListenerOrderly.class */
    private class DefaultMessageListenerOrderly implements MessageListenerOrderly {
        private DefaultMessageListenerOrderly() {
        }

        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
            for (MessageExt messageExt : list) {
                TagBasedDispatcherConsumerContainer.log.debug("received msg: {}", messageExt);
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    for (String str : messageExt.getTags().split("\\|\\|")) {
                        MethodInvoker methodInvoker = (MethodInvoker) TagBasedDispatcherConsumerContainer.this.tagMethods.get(str);
                        if (methodInvoker == null) {
                            TagBasedDispatcherConsumerContainer.log.warn("Failed to find Invoker for Tag {}", str);
                        } else {
                            methodInvoker.invoke(messageExt.getBody());
                        }
                    }
                    TagBasedDispatcherConsumerContainer.log.info("consume {} cost: {} ms", messageExt.getMsgId(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                } catch (Exception e) {
                    TagBasedDispatcherConsumerContainer.log.warn("consume message failed. messageId:{}, topic:{}, reconsume Times:{}", messageExt.getMsgId(), messageExt.getTopic(), Integer.valueOf(messageExt.getReconsumeTimes()), e);
                    if (TagBasedDispatcherConsumerContainer.this.skipWhenException()) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                    consumeOrderlyContext.setSuspendCurrentQueueTimeMillis(TagBasedDispatcherConsumerContainer.this.getDelayLevelWhenNextConsume());
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/lego-core-0.1.39.jar:com/geekhalo/lego/core/msg/consumer/TagBasedDispatcherConsumerContainer$MethodInvoker.class */
    public class MethodInvoker {
        private final Method method;
        private final Type paraType;

        MethodInvoker(Method method) {
            this.method = method;
            this.paraType = method.getGenericParameterTypes()[0];
        }

        public void invoke(byte[] bArr) throws Exception {
            this.method.invoke(TagBasedDispatcherConsumerContainer.this.getBean(), deserialize(bArr, this.paraType));
        }

        private Object deserialize(byte[] bArr, Type type) {
            return JSON.parseObject(bArr, type, new Feature[0]);
        }
    }

    public TagBasedDispatcherConsumerContainer(Environment environment, Object obj, TagBasedDispatcherMessageConsumer tagBasedDispatcherMessageConsumer) {
        super(environment, obj);
        this.tagMethods = Maps.newHashMap();
        Preconditions.checkArgument(tagBasedDispatcherMessageConsumer != null);
        this.tagBasedDispatcherMessageConsumer = tagBasedDispatcherMessageConsumer;
    }

    @Override // com.geekhalo.lego.core.support.consumer.support.AbstractConsumerContainer
    protected DefaultMQPushConsumer createConsumer() throws Exception {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        String resolve = resolve(this.tagBasedDispatcherMessageConsumer.nameServer());
        defaultMQPushConsumer.setNamesrvAddr(resolve);
        String resolve2 = resolve(this.tagBasedDispatcherMessageConsumer.consumer());
        defaultMQPushConsumer.setConsumerGroup(resolve2);
        String resolve3 = resolve(this.tagBasedDispatcherMessageConsumer.topic());
        String findTag = findTag();
        defaultMQPushConsumer.subscribe(resolve3, findTag);
        defaultMQPushConsumer.setMessageListener(new DefaultMessageListenerOrderly());
        log.info("success to subscribe  {}, topic {}, tag {}, group {}", resolve, resolve3, findTag, resolve2);
        return defaultMQPushConsumer;
    }

    @Override // com.geekhalo.lego.core.support.consumer.support.AbstractConsumerContainer, org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() throws Exception {
        initMethods();
        super.afterPropertiesSet();
    }

    private void initMethods() {
        MethodUtils.getMethodsListWithAnnotation(this.bean.getClass(), HandleTag.class).forEach(method -> {
            if (method.getParameterCount() != 1) {
                log.warn("Method {} must have only one param", method);
                return;
            }
            String value = ((HandleTag) AnnotatedElementUtils.findMergedAnnotation(method, HandleTag.class)).value();
            if (this.tagMethods.containsKey(value)) {
                throw new RuntimeException(String.format("Tag %s On %s is duplicate", value, this.bean.getClass().getName()));
            }
            this.tagMethods.put(value, new MethodInvoker(method));
        });
    }

    private String findTag() {
        return this.tagMethods.isEmpty() ? "*" : (String) this.tagMethods.keySet().stream().collect(Collectors.joining("||"));
    }
}
