package com.efuture.ocm.smbus.service;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.efuture.ocm.smbus.comm.ExtThreadFactory;
import com.efuture.ocm.smbus.comm.MsgCode;
import com.efuture.ocm.smbus.comm.SendMessage;
import com.efuture.ocm.smbus.comm.SmbApplicationContext;
import com.efuture.ocm.smbus.comm.StringUtils;
import com.efuture.ocm.smbus.dao.d.SmbusMapper;
import com.efuture.ocm.smbus.entity.n.SmbSendchannelWithBLOBs;
import com.efuture.ocm.smbus.entity.n.SmbSourcechannelWithBLOBs;
import com.efuture.ocm.smbus.msg.IPullMessage;
import com.efuture.ocm.smbus.msg.ISendMessage;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:WEB-INF/classes/com/efuture/ocm/smbus/service/SmbusMsgProcessor.class */
public class SmbusMsgProcessor {

    @Autowired
    private SmbusMapper smbusMapper;
    private String consumeNameSrv;
    private String consumeTopics;
    private String producerNameSrv;
    private ExecutorService sendExcutor;
    private ExecutorService finishExcutor;
    private ExecutorService retryExcutor;
    private ExecutorService loadExcutor;
    private static Map<String, IPullMessage> pullExcutor = new HashMap();
    private static Object obj = new Object();
    private static Map<String, ISendMessage> channelProcessor = new HashMap();
    private int cache_queue_size = 20480;
    private int retry_delay_step = 5;
    private int finish_batch = 32;
    private boolean isStop = false;
    private Date startTime = new Date();
    private AtomicLong sendSucess = new AtomicLong(0);
    private AtomicLong sendError = new AtomicLong(0);
    private AtomicLong sendRetry = new AtomicLong(0);
    private String consumeGroupId = "smbus_apps_consume";
    private int consumeMessageBatchMaxSize = 32;
    private int consumeThreadMax = 2;
    private int consumeThreadMin = 2;
    private List<String> topicArray = new ArrayList();
    private DefaultMQPushConsumer consumer = null;
    private String producerGroupId = "smbus_apps_consume";
    private String producerTopics = "smbus";
    private DefaultMQProducer producer = null;
    private BlockingQueue<SendMessage> sendCache = new ArrayBlockingQueue(this.cache_queue_size);
    private BlockingQueue<SendMessage> endCache = new ArrayBlockingQueue(this.cache_queue_size);
    private BlockingQueue<SendMessage> retryCache = new ArrayBlockingQueue(this.cache_queue_size);
    private Logger logger = LoggerFactory.getLogger(SmbusMsgProcessor.class);
    private List<SendMsgProcessor> _sendExcutor = new ArrayList();
    private List<FinishMsgProcessor> _finishExcutor = new ArrayList();
    private List<RetryMsgProcessor> _retryExcutor = new ArrayList();
    private List<LoadMessageProcessor> _loadExcutor = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/classes/com/efuture/ocm/smbus/service/SmbusMsgProcessor$FinishMsgProcessor.class */
    public class FinishMsgProcessor implements Runnable {
        boolean isRunning;

        private FinishMsgProcessor() {
            this.isRunning = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            SmbusMsgProcessor.this.logger.info("启动发送消息完成收集处理服务");
            while (!Thread.currentThread().isInterrupted() && this.isRunning) {
                ArrayList arrayList = new ArrayList();
                try {
                    arrayList.add((SendMessage) SmbusMsgProcessor.this.endCache.take());
                    do {
                        SendMessage sendMessage = (SendMessage) SmbusMsgProcessor.this.endCache.poll();
                        if (sendMessage == null) {
                            break;
                        } else {
                            arrayList.add(sendMessage);
                        }
                    } while (arrayList.size() <= SmbusMsgProcessor.this.finish_batch);
                    ((SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.SrvInfoName)).finishDelivery(arrayList);
                } catch (InterruptedException e) {
                    SmbusMsgProcessor.this.logger.warn("线程出现异常,中止执行,可能是结束命令.....{}", e.getMessage());
                    this.isRunning = false;
                    e.printStackTrace();
                } catch (Exception e2) {
                    SmbusMsgProcessor.this.logger.error("修改消息发送状态错误:{},{}", arrayList.toString(), StringUtils.getTrace(e2));
                    e2.printStackTrace();
                }
            }
        }

        public void stop() {
            this.isRunning = false;
        }
    }

    /* loaded from: input_file:WEB-INF/classes/com/efuture/ocm/smbus/service/SmbusMsgProcessor$LazyHolder.class */
    private static class LazyHolder {
        private static final SmbusMsgProcessor INSTANCE = new SmbusMsgProcessor();

        private LazyHolder() {
        }

        static {
            INSTANCE.init();
        }
    }

    /* loaded from: input_file:WEB-INF/classes/com/efuture/ocm/smbus/service/SmbusMsgProcessor$LoadMessageProcessor.class */
    public class LoadMessageProcessor implements Runnable {
        boolean isRunning = true;

        public LoadMessageProcessor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted() && this.isRunning) {
                try {
                    synchronized (SmbusMsgProcessor.obj) {
                        SmbusMsgProcessor.obj.wait(10000L);
                    }
                    JSONObject jSONObject = new JSONObject();
                    JSONObject jSONObject2 = new JSONObject();
                    JSONArray jSONArray = new JSONArray();
                    JSONObject jSONObject3 = new JSONObject();
                    jSONObject3.put("appid", "wx51a209fa5ee756fb");
                    jSONObject3.put("openid", "o6Wla6DmvfKfg8wQmyjwvmEcm3wI");
                    jSONArray.add(jSONObject3);
                    JSONObject jSONObject4 = new JSONObject();
                    jSONObject4.put("appid", "wxb9e5fa1635a9f189");
                    jSONObject4.put("openid", "okxxA1M-mAKL-KegB8bYJv1i6rCw");
                    jSONArray.add(jSONObject4);
                    jSONObject2.put("cmgrade", "01");
                    jSONObject2.put("cmname", "cz");
                    jSONObject2.put("cmmobile1", "13720279362");
                    jSONObject.put("data", jSONObject2);
                    jSONObject.put("wechatIdentify", jSONArray);
                    jSONObject.put("log_seq", "19070810050803000006");
                    jSONObject.put("from", "http");
                    jSONObject.put("scene", "S08");
                    jSONObject.put("cid", "0000016132");
                    jSONObject.put("mktid", "611");
                    jSONObject.put("phone", "15527609896");
                    jSONObject.toJSONString();
                } catch (InterruptedException e) {
                    SmbusMsgProcessor.this.logger.warn("线程出现异常,中止执行,可能是结束命令.....");
                    e.printStackTrace();
                    return;
                } catch (Exception e2) {
                    SmbusMsgProcessor.this.logger.error("数据处理线程出现异常:" + StringUtils.getTrace(e2));
                    e2.printStackTrace();
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/classes/com/efuture/ocm/smbus/service/SmbusMsgProcessor$RetryMsgProcessor.class */
    public class RetryMsgProcessor implements Runnable {
        boolean isRunning;
        Object waitObj;

        private RetryMsgProcessor() {
            this.isRunning = true;
            this.waitObj = new Object();
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis;
            SendMessage sendMessage;
            long sendtime;
            int retry;
            long currentTimeMillis2;
            SmbusMsgProcessor.this.logger.info("启动重试消息服务");
            while (!Thread.currentThread().isInterrupted() && this.isRunning) {
                try {
                    currentTimeMillis = System.currentTimeMillis();
                    sendMessage = (SendMessage) SmbusMsgProcessor.this.retryCache.take();
                    sendtime = sendMessage.getSendtime();
                    retry = sendMessage.getRetry();
                    currentTimeMillis2 = System.currentTimeMillis();
                    if (retry == 0) {
                        retry = 1;
                    }
                } catch (InterruptedException e) {
                    SmbusMsgProcessor.this.logger.warn("重试消息线程出现异常,中止执行,可能是结束命令.....");
                    this.isRunning = false;
                    e.printStackTrace();
                } catch (Exception e2) {
                    SmbusMsgProcessor.this.logger.error("重试消息发送出现错误:{},{}", StringUtils.getTrace(e2));
                    e2.printStackTrace();
                }
                if ((currentTimeMillis2 - sendtime) / 1000 > Math.pow(SmbusMsgProcessor.this.retry_delay_step, retry) || SmbusMsgProcessor.this.retryCache.size() > SmbusMsgProcessor.this.cache_queue_size / 2) {
                    SmbusMsgProcessor.this.logger.warn("重试消息:{}", sendMessage);
                    ISendMessage sendProcessor = SmbusMsgProcessor.this.getSendProcessor(sendMessage);
                    if (sendProcessor == null) {
                        SmbusMsgProcessor.this.logger.error("消息没有找到处理器:", sendMessage);
                        sendMessage.setStatus(MsgCode.FAIL_NOCHANNEL);
                        SmbusMsgProcessor.this.endCache.put(sendMessage);
                    } else if (!SmbusMsgProcessor.this.isStop()) {
                        sendMessage.setRetry(sendMessage.getRetry() + 1);
                        boolean sendMessage2 = sendProcessor.sendMessage(sendMessage);
                        if (sendMessage2 || sendMessage.getStatus() != 100) {
                            SmbusMsgProcessor.this.endCache.put(sendMessage);
                        } else {
                            try {
                                ((SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.SrvInfoName)).retryDelivery(sendMessage);
                            } catch (Exception e3) {
                                SmbusMsgProcessor.this.logger.error("修改消息重试状态错误:{},{}", sendMessage, StringUtils.getTrace(e3));
                            }
                            SmbusMsgProcessor.this.retryCache.put(sendMessage);
                        }
                        SmbusMsgProcessor.this.retryStatus(sendMessage2);
                    }
                } else {
                    SmbusMsgProcessor.this.retryCache.put(sendMessage);
                }
                if ((currentTimeMillis2 - currentTimeMillis) / 1000 < 2) {
                    synchronized (this.waitObj) {
                        this.waitObj.wait(2000L);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/classes/com/efuture/ocm/smbus/service/SmbusMsgProcessor$SendMsgProcessor.class */
    public class SendMsgProcessor implements Runnable {
        boolean isRunning;

        private SendMsgProcessor() {
            this.isRunning = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            SendMessage sendMessage;
            ISendMessage sendProcessor;
            SmbusMsgProcessor.this.logger.info("启动发送消息服务");
            while (!Thread.currentThread().isInterrupted() && this.isRunning) {
                try {
                    sendMessage = (SendMessage) SmbusMsgProcessor.this.sendCache.take();
                    try {
                        sendProcessor = SmbusMsgProcessor.this.getSendProcessor(sendMessage);
                    } catch (Exception e) {
                        SmbusMsgProcessor.this.logger.error("消息发送出现错误:{},{}", sendMessage.toString(), StringUtils.getTrace(e));
                    }
                } catch (InterruptedException e2) {
                    SmbusMsgProcessor.this.logger.warn("线程出现异常,中止执行,可能是结束命令.....");
                    this.isRunning = false;
                    e2.printStackTrace();
                } catch (Exception e3) {
                    SmbusMsgProcessor.this.logger.error("regular数据处理线程出现异常:" + StringUtils.getTrace(e3));
                    e3.printStackTrace();
                    this.isRunning = false;
                }
                if (sendProcessor == null) {
                    SmbusMsgProcessor.this.logger.error("消息没有找到处理器:", sendMessage);
                    sendMessage.setStatus(MsgCode.FAIL_NOCHANNEL);
                    SmbusMsgProcessor.this.endCache.put(sendMessage);
                } else if (!SmbusMsgProcessor.this.isStop()) {
                    sendMessage.setRetry(sendMessage.getRetry() + 1);
                    boolean sendMessage2 = sendProcessor.sendMessage(sendMessage);
                    if (!sendMessage2 && sendMessage.getStatus() == 100) {
                        SmbusMsgProcessor.this.retryCache.put(sendMessage);
                    } else if (sendMessage2) {
                        SmbusMsgProcessor.this.endCache.put(sendMessage);
                    } else {
                        JSONObject parseObject = JSONObject.parseObject(sendMessage.getContent());
                        if (parseObject.containsKey("reserve")) {
                            JSONArray jSONArray = parseObject.getJSONArray("reserve");
                            if (!jSONArray.isEmpty()) {
                                JSONObject jSONObject = jSONArray.getJSONObject(0);
                                jSONArray.remove(0);
                                sendMessage.setRetry(0);
                                JSONObject jSONObject2 = new JSONObject();
                                jSONObject2.put("send", jSONObject.getString("content"));
                                sendMessage.setContent(jSONObject2.toString());
                                sendMessage.setCtype(jSONObject.getString("ctype"));
                                sendMessage.setSendChannel(jSONObject.getString("sendChannel"));
                                SmbusMsgProcessor.this.sendCache.add(sendMessage);
                            }
                        }
                    }
                    SmbusMsgProcessor.this.sendStatus(sendMessage2);
                }
            }
        }
    }

    public static SmbusMsgProcessor getInstance() {
        return LazyHolder.INSTANCE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Boolean init() {
        SmbusInfoService smbusInfoService = (SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.SrvInfoName);
        SmbCommService smbCommService = (SmbCommService) SmbApplicationContext.getInstance().getBean(SmbCommService.SrvCommName);
        this.sendExcutor = Executors.newFixedThreadPool(smbCommService.getSend_message_worker_num(), new ExtThreadFactory("send-processor"));
        this.finishExcutor = Executors.newFixedThreadPool(smbCommService.getFinish_message_worker_num(), new ExtThreadFactory("finish-processor"));
        this.retryExcutor = Executors.newFixedThreadPool(smbCommService.getRetry_message_worker_num(), new ExtThreadFactory("retry-processor"));
        this.loadExcutor = Executors.newFixedThreadPool(smbCommService.getRetry_message_worker_num(), new ExtThreadFactory("load-processor"));
        this.retry_delay_step = smbCommService.getRetry_delay_step();
        for (int i = 0; i < smbCommService.getSend_message_worker_num(); i++) {
            SendMsgProcessor sendMsgProcessor = new SendMsgProcessor();
            this._sendExcutor.add(sendMsgProcessor);
            this.sendExcutor.execute(sendMsgProcessor);
        }
        for (int i2 = 0; i2 < smbCommService.getFinish_message_worker_num(); i2++) {
            FinishMsgProcessor finishMsgProcessor = new FinishMsgProcessor();
            this._finishExcutor.add(finishMsgProcessor);
            this.finishExcutor.execute(finishMsgProcessor);
        }
        for (int i3 = 0; i3 < smbCommService.getRetry_message_worker_num(); i3++) {
            RetryMsgProcessor retryMsgProcessor = new RetryMsgProcessor();
            this._retryExcutor.add(retryMsgProcessor);
            this.retryExcutor.execute(retryMsgProcessor);
        }
        for (int i4 = 0; i4 < 1; i4++) {
            LoadMessageProcessor loadMessageProcessor = new LoadMessageProcessor();
            this._loadExcutor.add(loadMessageProcessor);
            this.loadExcutor.execute(loadMessageProcessor);
        }
        for (SmbSourcechannelWithBLOBs smbSourcechannelWithBLOBs : smbusInfoService.listSrcChannel()) {
            IPullMessage srcChannel = getSrcChannel(smbSourcechannelWithBLOBs.getBh(), smbSourcechannelWithBLOBs.getLx(), smbSourcechannelWithBLOBs.getPz());
            if (srcChannel != null && !pullExcutor.containsKey(smbSourcechannelWithBLOBs.getBh())) {
                synchronized (pullExcutor) {
                    if (!pullExcutor.containsKey(smbSourcechannelWithBLOBs.getBh())) {
                        pullExcutor.put(smbSourcechannelWithBLOBs.getBh(), srcChannel);
                        srcChannel.start();
                    }
                }
            }
        }
        this.consumeTopics = StringUtils.trim(this.consumeTopics);
        for (String str : this.consumeTopics.split(",|\\|")) {
            this.topicArray.add(str.trim());
        }
        this.consumer = new DefaultMQPushConsumer(this.consumeGroupId);
        this.consumer.setNamesrvAddr(this.consumeNameSrv);
        this.consumer.setConsumeThreadMax(this.consumeThreadMax);
        this.consumer.setConsumeThreadMin(this.consumeThreadMin);
        this.consumer.setConsumeMessageBatchMaxSize(this.consumeMessageBatchMaxSize);
        try {
            Iterator<String> it = this.topicArray.iterator();
            while (it.hasNext()) {
                this.consumer.subscribe(it.next(), "*");
            }
            this.producer = new DefaultMQProducer(this.producerGroupId);
            this.producer.setNamesrvAddr(this.producerNameSrv);
            start();
            return true;
        } catch (Exception e) {
            this.logger.error("初始化获取数据任务失败:{}", StringUtils.getTrace(e));
            return false;
        }
    }

    public void shutdown() {
        tryStop(true);
        this.sendExcutor.shutdown();
        this.finishExcutor.shutdown();
        this.retryExcutor.shutdown();
        this.loadExcutor.shutdown();
        Iterator<Map.Entry<String, IPullMessage>> it = pullExcutor.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().shutdown();
        }
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.producer != null) {
            this.producer.shutdown();
        }
    }

    public void tryStop(boolean z) {
        this.isStop = z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isStop() throws InterruptedException {
        if (this.isStop) {
            Object obj2 = new Object();
            synchronized (obj2) {
                obj2.wait(60000L);
            }
        }
        return this.isStop;
    }

    public JSONObject getStatus() {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("isStop", Boolean.valueOf(this.isStop));
        jSONObject.put("sendCache", Integer.valueOf(this.sendCache.size()));
        jSONObject.put("retryCache", Integer.valueOf(this.retryCache.size()));
        jSONObject.put("endCache", Integer.valueOf(this.endCache.size()));
        jSONObject.put("sendSucess", Long.valueOf(this.sendSucess.get()));
        jSONObject.put("sendError", Long.valueOf(this.sendError.get()));
        jSONObject.put("sendRetry", Long.valueOf(this.sendRetry.get()));
        return jSONObject;
    }

    public boolean statPull(String str, String str2) {
        SmbSourcechannelWithBLOBs srcChannel = ((SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.SrvInfoName)).getSrcChannel(str, str2);
        if (srcChannel == null) {
            return false;
        }
        IPullMessage srcChannel2 = getSrcChannel(srcChannel.getBh(), srcChannel.getLx(), srcChannel.getPz());
        boolean z = false;
        if (srcChannel2 != null && !pullExcutor.containsKey(srcChannel.getBh())) {
            synchronized (pullExcutor) {
                if (!pullExcutor.containsKey(srcChannel.getBh())) {
                    pullExcutor.put(srcChannel.getBh(), srcChannel2);
                    srcChannel2.start();
                    z = true;
                }
            }
        }
        return z;
    }

    private ISendMessage getSendChannel(SendMessage sendMessage) {
        SmbSendchannelWithBLOBs sendChannel = ((SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.SrvInfoName)).getSendChannel(sendMessage.getEntId(), sendMessage.getSendChannel());
        if (sendChannel == null) {
            return null;
        }
        String string = JSONObject.parseObject(sendChannel.getPz()).getString("channel");
        if (string == null) {
            string = "";
        }
        String trim = string.trim();
        if (trim.equals("")) {
            if (sendChannel.getLx().equals(SmbCommService.SENDCHANNEL_WECHAT)) {
                trim = SmbCommService.DEFAULT_WECHAT_SENDCHANNEL_CLASS;
            } else if (sendChannel.getLx().equals(SmbCommService.SENDCHANNEL_TECHSMS)) {
                trim = SmbCommService.DEFAULT_TECHSMS_SENDCHANNEL_CLASS;
            } else if (!sendChannel.getLx().equals(SmbCommService.SENDCHANNEL_AILSMS) && !sendChannel.getLx().equals(SmbCommService.SENDCHANNEL_AILPAY)) {
                this.logger.warn("没有找到发送通道实现:{}", sendChannel.toString());
            }
        }
        if ("".equals(trim)) {
            return null;
        }
        try {
            ISendMessage iSendMessage = (ISendMessage) Thread.currentThread().getContextClassLoader().loadClass(trim).newInstance();
            iSendMessage.init(sendMessage.getSendChannel(), sendChannel.getLx(), sendChannel.getPz());
            return iSendMessage;
        } catch (Exception e) {
            e.printStackTrace();
            this.logger.error("加载通道实现异常:{},{}", sendMessage.getSendChannel(), StringUtils.getTrace(e));
            return null;
        }
    }

    public ISendMessage getSendProcessor(SendMessage sendMessage) {
        String str = sendMessage.getEntId() + sendMessage.getSendChannel();
        ISendMessage iSendMessage = channelProcessor.get(str);
        if (iSendMessage == null) {
            synchronized (channelProcessor) {
                iSendMessage = channelProcessor.get(str);
                if (iSendMessage == null) {
                    iSendMessage = getSendChannel(sendMessage);
                    if (iSendMessage != null) {
                        channelProcessor.put(str, iSendMessage);
                    }
                }
            }
        }
        return iSendMessage;
    }

    private IPullMessage getSrcChannel(String str, String str2, String str3) {
        String string = JSONObject.parseObject(str3).getString("channel");
        if (string == null) {
            string = "";
        }
        String trim = string.trim();
        if ("".equals(trim)) {
            return null;
        }
        try {
            IPullMessage iPullMessage = (IPullMessage) Thread.currentThread().getContextClassLoader().loadClass(trim).newInstance();
            iPullMessage.init(str, str2, str3, this.sendCache);
            return iPullMessage;
        } catch (Exception e) {
            e.printStackTrace();
            this.logger.error("加载获取消息通道实现异常:{},{}", str, StringUtils.getTrace(e));
            return null;
        }
    }

    public boolean sendMessage(SendMessage sendMessage) {
        try {
            if (isStop()) {
                return false;
            }
            this.sendCache.put(sendMessage);
            return true;
        } catch (InterruptedException e) {
            this.logger.error("异常:{},{}", sendMessage, StringUtils.getTrace(e));
            e.printStackTrace();
            return false;
        }
    }

    public boolean sendMessage(String str) throws Exception {
        List<SendMessage> translateMessage = ((SmbusMsgService) SmbApplicationContext.getInstance().getBean(SmbCommService.SrvMsgName)).translateMessage("test", "test", str, "");
        if (translateMessage.isEmpty()) {
            this.logger.warn("忽略发送:{}", str);
        }
        try {
            Iterator<SendMessage> it = translateMessage.iterator();
            while (it.hasNext()) {
                this.sendCache.put(it.next());
            }
            return true;
        } catch (InterruptedException e) {
            this.logger.error("异常:{},{}", str, StringUtils.getTrace(e));
            e.printStackTrace();
            return false;
        }
    }

    public void retryStatus(boolean z) {
        this.sendRetry.incrementAndGet();
        if (z) {
            this.sendSucess.incrementAndGet();
        } else {
            this.sendError.incrementAndGet();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendStatus(boolean z) {
        if (z) {
            this.sendSucess.incrementAndGet();
        } else {
            this.sendError.incrementAndGet();
        }
    }

    public void refresh() {
        synchronized (obj) {
            obj.notifyAll();
        }
    }

    public boolean start() {
        this.logger.info("启动smbus代理服务");
        try {
            this.producer.start();
            this.consumer.registerMessageListener(new MessageListenerConcurrently() { // from class: com.efuture.ocm.smbus.service.SmbusMsgProcessor.1
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    String str = null;
                    try {
                        SmbusMsgService smbusMsgService = (SmbusMsgService) SmbApplicationContext.getInstance().getBean(SmbCommService.SrvMsgName);
                        for (MessageExt messageExt : list) {
                            str = new String(messageExt.getBody(), "UTF-8");
                            JSONObject parseObject = JSONObject.parseObject(str);
                            String topic = messageExt.getTopic();
                            String string = parseObject.getString("data");
                            if (string == null) {
                                SmbusMsgProcessor.this.logger.warn("忽略消息:{}", str);
                            } else if (topic.equalsIgnoreCase("SENDMSG")) {
                                smbusMsgService.doSendCommon(smbusMsgService, SmbusMsgProcessor.this.sendCache, string);
                            }
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        SmbusMsgProcessor.this.logger.info("消费数据失败:{},{}", str, StringUtils.getTrace(e));
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
            return true;
        } catch (MQClientException e) {
            e.printStackTrace();
            return true;
        }
    }

    public static String longTodate(String str) {
        return (str == null || "".equals(str)) ? "" : str.indexOf("-") != -1 ? str : new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(Long.valueOf(str).longValue()));
    }

    public String getConsumeGroupId() {
        return this.consumeGroupId;
    }

    public void setConsumeGroupId(String str) {
        this.consumeGroupId = str;
    }

    public String getConsumeNameSrv() {
        return this.consumeNameSrv;
    }

    public void setConsumeNameSrv(String str) {
        this.consumeNameSrv = str;
    }

    public String getConsumeTopics() {
        return this.consumeTopics;
    }

    public void setConsumeTopics(String str) {
        this.consumeTopics = str;
    }

    public int getConsumeMessageBatchMaxSize() {
        return this.consumeMessageBatchMaxSize;
    }

    public void setConsumeMessageBatchMaxSize(int i) {
        this.consumeMessageBatchMaxSize = i;
    }

    public int getConsumeThreadMax() {
        return this.consumeThreadMax;
    }

    public void setConsumeThreadMax(int i) {
        this.consumeThreadMax = i;
    }

    public int getConsumeThreadMin() {
        return this.consumeThreadMin;
    }

    public void setConsumeThreadMin(int i) {
        this.consumeThreadMin = i;
    }

    public String getProducerGroupId() {
        return this.producerGroupId;
    }

    public void setProducerGroupId(String str) {
        this.producerGroupId = str;
    }

    public String getProducerNameSrv() {
        return this.producerNameSrv;
    }

    public void setProducerNameSrv(String str) {
        this.producerNameSrv = str;
    }

    public String getProducerTopics() {
        return this.producerTopics;
    }

    public void setProducerTopics(String str) {
        this.producerTopics = str;
    }
}
