package com.efuture.ocm.smbus.msg;

import com.alibaba.fastjson.JSONObject;
import com.efuture.ocm.smbus.comm.SendMessage;
import com.efuture.ocm.smbus.comm.SmbApplicationContext;
import com.efuture.ocm.smbus.comm.StringUtils;
import com.efuture.ocm.smbus.service.SmbCommService;
import com.efuture.ocm.smbus.service.SmbusMsgService;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/ocm-smbus-core-1.0.0.jar:com/efuture/ocm/smbus/msg/PullRocketMqMessage.class */
public class PullRocketMqMessage implements IPullMessage {
    private BlockingQueue<SendMessage> cache;
    private String config;
    private String fromChannel;
    private String ctype;
    private String groupId;
    private String nameSrv;
    private String topics;
    private Logger logger = LoggerFactory.getLogger(PullRocketMqMessage.class);
    private int maxThread = 2;
    private int minThread = 2;
    private int consumeMessageBatchMaxSize = 32;
    private List<String> topicArray = new ArrayList();
    private DefaultMQPushConsumer consumer = null;

    @Override // com.efuture.ocm.smbus.msg.IPullMessage
    public boolean init(String str, String str2, String str3, BlockingQueue<SendMessage> blockingQueue) {
        JSONObject parseObject = JSONObject.parseObject(str3);
        this.cache = blockingQueue;
        this.config = str3;
        this.fromChannel = str;
        this.ctype = str2;
        this.nameSrv = StringUtils.trim(parseObject.get("nameServer"));
        this.groupId = StringUtils.trim(parseObject.get("groupId"));
        this.topics = StringUtils.trim(parseObject.get("topic"));
        for (String str4 : this.topics.split(",|\\|")) {
            this.topicArray.add(str4.trim());
        }
        if (this.groupId.equals("")) {
            this.groupId = "smbus_group";
        }
        String trim = StringUtils.trim(parseObject.get("consumeThreadMin"));
        if (!"".equals(trim)) {
            this.minThread = Integer.valueOf(trim).intValue();
        }
        String trim2 = StringUtils.trim(parseObject.get("consumeThreadMax"));
        if (!"".equals(trim2)) {
            this.maxThread = Integer.valueOf(trim2).intValue();
        }
        String trim3 = StringUtils.trim(parseObject.get("consumeMessageBatchMaxSize"));
        if (!"".equals(trim3)) {
            this.consumeMessageBatchMaxSize = Integer.valueOf(trim3).intValue();
        }
        this.consumer = new DefaultMQPushConsumer(this.groupId);
        StringUtils.copyMapToBean(this.consumer, parseObject);
        this.consumer.setNamesrvAddr(this.nameSrv);
        this.consumer.setConsumeThreadMax(this.maxThread);
        this.consumer.setConsumeThreadMin(this.minThread);
        this.consumer.setConsumeMessageBatchMaxSize(this.consumeMessageBatchMaxSize);
        try {
            Iterator<String> it = this.topicArray.iterator();
            while (it.hasNext()) {
                this.consumer.subscribe(it.next(), "*");
            }
            this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            return true;
        } catch (Exception e) {
            this.logger.error("初始化拉取数据通道失败:{},{},{}", new Object[]{str, str3, StringUtils.getTrace(e)});
            return false;
        }
    }

    @Override // com.efuture.ocm.smbus.msg.IPullMessage
    public boolean start() {
        this.logger.info("启动拉取消息服务:{},{}", this.fromChannel, this.config);
        try {
            this.consumer.registerMessageListener(new MessageListenerConcurrently() { // from class: com.efuture.ocm.smbus.msg.PullRocketMqMessage.1
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    String str = null;
                    try {
                        SmbusMsgService smbusMsgService = (SmbusMsgService) SmbApplicationContext.getInstance().getBean(SmbCommService.MsgServiceName);
                        Iterator<MessageExt> it = list.iterator();
                        while (it.hasNext()) {
                            str = new String(it.next().getBody(), "UTF-8");
                            List<SendMessage> translateMessage = smbusMsgService.translateMessage(PullRocketMqMessage.this.fromChannel, PullRocketMqMessage.this.ctype, str);
                            if (translateMessage.isEmpty()) {
                                PullRocketMqMessage.this.logger.warn("忽略发送:{}", str);
                            }
                            Iterator<SendMessage> it2 = translateMessage.iterator();
                            while (it2.hasNext()) {
                                PullRocketMqMessage.this.cache.put(it2.next());
                            }
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        PullRocketMqMessage.this.logger.info("消费数据失败:{},{}", str, StringUtils.getTrace(e));
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
            this.consumer.start();
            return true;
        } catch (MQClientException e) {
            e.printStackTrace();
            return true;
        }
    }

    @Override // com.efuture.ocm.smbus.msg.IPullMessage
    public void shutdown() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
    }
}
