package com.efuture.ocm.smbus.service;

import com.alibaba.fastjson.JSONObject;
import com.efuture.ocm.smbus.comm.ExtThreadFactory;
import com.efuture.ocm.smbus.comm.MsgCode;
import com.efuture.ocm.smbus.comm.SendMessage;
import com.efuture.ocm.smbus.comm.SmbApplicationContext;
import com.efuture.ocm.smbus.comm.StringUtils;
import com.efuture.ocm.smbus.entity.n.SmbSendchannelWithBLOBs;
import com.efuture.ocm.smbus.entity.n.SmbSourcechannelWithBLOBs;
import com.efuture.ocm.smbus.msg.IPullMessage;
import com.efuture.ocm.smbus.msg.ISendMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/ocm-smbus-core-1.0.0.jar:com/efuture/ocm/smbus/service/SmbusMsgProcessor.class */
public class SmbusMsgProcessor {
    private ExecutorService sendExcutor;
    private ExecutorService finishExcutor;
    private static Map<String, IPullMessage> pullExcutor = new HashMap();
    private static Map<String, ISendMessage> channelProcessor = new HashMap();
    private BlockingQueue<SendMessage> sendCache = new ArrayBlockingQueue(20480);
    private BlockingQueue<SendMessage> endCache = new ArrayBlockingQueue(20480);
    private Logger logger = LoggerFactory.getLogger(SmbRegisterService.class);
    private List<SendMsgProcessor> _sendExcutor = new ArrayList();
    private List<FinishMsgProcessor> _finishExcutor = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/ocm-smbus-core-1.0.0.jar:com/efuture/ocm/smbus/service/SmbusMsgProcessor$FinishMsgProcessor.class */
    public class FinishMsgProcessor implements Runnable {
        boolean isRunning;

        private FinishMsgProcessor() {
            this.isRunning = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            SmbusMsgProcessor.this.logger.info("启动发送消息完成收集处理服务");
            while (!Thread.currentThread().isInterrupted() && this.isRunning) {
                try {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add((SendMessage) SmbusMsgProcessor.this.endCache.take());
                    try {
                        do {
                            SendMessage sendMessage = (SendMessage) SmbusMsgProcessor.this.endCache.poll();
                            if (sendMessage != null) {
                                arrayList.add(sendMessage);
                            }
                            break;
                        } while (arrayList.size() <= 32);
                        break;
                        ((SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.InfoServiceName)).updateDelivery(arrayList);
                    } catch (Exception e) {
                        SmbusMsgProcessor.this.logger.error("修改消息发送状态现错误:{},{}", arrayList.toString(), StringUtils.getTrace(e));
                    }
                } catch (InterruptedException e2) {
                    SmbusMsgProcessor.this.logger.warn("线程出现异常,中止执行,可能是结束命令.....");
                    this.isRunning = false;
                    e2.printStackTrace();
                } catch (Exception e3) {
                    SmbusMsgProcessor.this.logger.error("regular数据处理线程出现异常:" + StringUtils.getTrace(e3));
                    e3.printStackTrace();
                    this.isRunning = false;
                }
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/ocm-smbus-core-1.0.0.jar:com/efuture/ocm/smbus/service/SmbusMsgProcessor$LazyHolder.class */
    private static class LazyHolder {
        private static final SmbusMsgProcessor INSTANCE = new SmbusMsgProcessor();

        private LazyHolder() {
        }

        static {
            INSTANCE.init();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/ocm-smbus-core-1.0.0.jar:com/efuture/ocm/smbus/service/SmbusMsgProcessor$SendMsgProcessor.class */
    public class SendMsgProcessor implements Runnable {
        boolean isRunning;

        private SendMsgProcessor() {
            this.isRunning = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            SmbusMsgProcessor.this.logger.info("启动发送消息服务");
            while (!Thread.currentThread().isInterrupted() && this.isRunning) {
                try {
                    SendMessage sendMessage = (SendMessage) SmbusMsgProcessor.this.sendCache.take();
                    try {
                        ISendMessage iSendMessage = (ISendMessage) SmbusMsgProcessor.channelProcessor.get(sendMessage.getSendChannel());
                        if (iSendMessage == null) {
                            synchronized (SmbusMsgProcessor.channelProcessor) {
                                iSendMessage = (ISendMessage) SmbusMsgProcessor.channelProcessor.get(sendMessage.getSendChannel());
                                if (iSendMessage == null) {
                                    iSendMessage = SmbusMsgProcessor.this.getSendChannel(sendMessage);
                                }
                            }
                        }
                        if (iSendMessage == null) {
                            SmbusMsgProcessor.this.logger.error("消息没有找到处理器:", sendMessage.toString());
                            sendMessage.setStatus(MsgCode.FAIL_NOCHANNEL);
                        } else {
                            iSendMessage.sendMessage(sendMessage);
                        }
                        SmbusMsgProcessor.this.endCache.put(sendMessage);
                    } catch (Exception e) {
                        SmbusMsgProcessor.this.logger.error("消息发送出现错误:{},{}", sendMessage.toString(), StringUtils.getTrace(e));
                    }
                } catch (InterruptedException e2) {
                    SmbusMsgProcessor.this.logger.warn("线程出现异常,中止执行,可能是结束命令.....");
                    this.isRunning = false;
                    e2.printStackTrace();
                } catch (Exception e3) {
                    SmbusMsgProcessor.this.logger.error("regular数据处理线程出现异常:" + StringUtils.getTrace(e3));
                    e3.printStackTrace();
                    this.isRunning = false;
                }
            }
        }
    }

    public static SmbusMsgProcessor getInstance() {
        return LazyHolder.INSTANCE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void init() {
        SmbusInfoService smbusInfoService = (SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.InfoServiceName);
        SmbCommService smbCommService = (SmbCommService) SmbApplicationContext.getInstance().getBean(SmbCommService.CommServiceName);
        this.sendExcutor = Executors.newFixedThreadPool(smbCommService.getSend_message_worker_num(), new ExtThreadFactory("send-processor"));
        this.finishExcutor = Executors.newFixedThreadPool(smbCommService.getFinish_message_worker_num(), new ExtThreadFactory("finish-processor"));
        for (int i = 0; i < smbCommService.getFinish_message_worker_num(); i++) {
            SendMsgProcessor sendMsgProcessor = new SendMsgProcessor();
            this._sendExcutor.add(sendMsgProcessor);
            this.sendExcutor.execute(sendMsgProcessor);
        }
        for (int i2 = 0; i2 < smbCommService.getSend_message_worker_num(); i2++) {
            FinishMsgProcessor finishMsgProcessor = new FinishMsgProcessor();
            this._finishExcutor.add(finishMsgProcessor);
            this.finishExcutor.execute(finishMsgProcessor);
        }
        for (SmbSourcechannelWithBLOBs smbSourcechannelWithBLOBs : smbusInfoService.listSrcChannel()) {
            IPullMessage srcChannel = getSrcChannel(smbSourcechannelWithBLOBs.getBh(), smbSourcechannelWithBLOBs.getLx(), smbSourcechannelWithBLOBs.getPz());
            if (srcChannel != null && !pullExcutor.containsKey(smbSourcechannelWithBLOBs.getBh())) {
                synchronized (pullExcutor) {
                    if (!pullExcutor.containsKey(smbSourcechannelWithBLOBs.getBh())) {
                        pullExcutor.put(smbSourcechannelWithBLOBs.getBh(), srcChannel);
                        srcChannel.start();
                    }
                }
            }
        }
    }

    public void shutdown() {
        this.sendExcutor.shutdown();
        this.finishExcutor.shutdown();
        Iterator<Map.Entry<String, IPullMessage>> it = pullExcutor.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().shutdown();
        }
    }

    public boolean statPull(String str, String str2) {
        SmbSourcechannelWithBLOBs srcChannel = ((SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.InfoServiceName)).getSrcChannel(str, str2);
        if (srcChannel == null) {
            return false;
        }
        IPullMessage srcChannel2 = getSrcChannel(srcChannel.getBh(), srcChannel.getLx(), srcChannel.getPz());
        boolean z = false;
        if (srcChannel2 != null && !pullExcutor.containsKey(srcChannel.getBh())) {
            synchronized (pullExcutor) {
                if (!pullExcutor.containsKey(srcChannel.getBh())) {
                    pullExcutor.put(srcChannel.getBh(), srcChannel2);
                    srcChannel2.start();
                    z = true;
                }
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ISendMessage getSendChannel(SendMessage sendMessage) {
        SmbSendchannelWithBLOBs sendChannel = ((SmbusInfoService) SmbApplicationContext.getInstance().getBean(SmbCommService.InfoServiceName)).getSendChannel(sendMessage.getEntId(), sendMessage.getSendChannel());
        if (sendChannel == null) {
            return null;
        }
        String string = JSONObject.parseObject(sendChannel.getPz()).getString("channel");
        if (string == null) {
            string = "";
        }
        String trim = string.trim();
        if (trim.equals("")) {
            if (sendChannel.getLx().equals(SmbCommService.SENDCHANNEL_WECHAT)) {
                trim = SmbCommService.DEFAULT_WECHAT_SENDCHANNEL_CLASS;
            } else if (!sendChannel.getLx().equals(SmbCommService.SENDCHANNEL_TECHSMS) && !sendChannel.getLx().equals(SmbCommService.SENDCHANNEL_AILSMS) && !sendChannel.getLx().equals(SmbCommService.SENDCHANNEL_AILPAY)) {
                this.logger.warn("没有找到发送通道实现:{}", sendChannel.toString());
            }
        }
        if ("".equals(trim)) {
            return null;
        }
        try {
            ISendMessage iSendMessage = (ISendMessage) Thread.currentThread().getContextClassLoader().loadClass(trim).newInstance();
            iSendMessage.init(sendMessage.getSendChannel(), sendChannel.getLx(), sendChannel.getPz());
            return iSendMessage;
        } catch (Exception e) {
            e.printStackTrace();
            this.logger.error("加载通道实现异常:{},{}", sendMessage.getSendChannel(), StringUtils.getTrace(e));
            return null;
        }
    }

    private IPullMessage getSrcChannel(String str, String str2, String str3) {
        String string = JSONObject.parseObject(str3).getString("channel");
        if (string == null) {
            string = "";
        }
        String trim = string.trim();
        if (trim.equals("")) {
            if (str2.equals(SmbCommService.SRCCHANNEL_ROCKETMQ)) {
                trim = SmbCommService.DEFAULT_ROCKETMQ_SRCCHANNEL_CLASS;
            } else {
                this.logger.warn("没有找获取消息通道实现:{}", str3);
            }
        }
        if ("".equals(trim)) {
            return null;
        }
        try {
            IPullMessage iPullMessage = (IPullMessage) Thread.currentThread().getContextClassLoader().loadClass(trim).newInstance();
            iPullMessage.init(str, str2, str3, this.sendCache);
            return iPullMessage;
        } catch (Exception e) {
            e.printStackTrace();
            this.logger.error("加载获取消息通道实现异常:{},{}", str, StringUtils.getTrace(e));
            return null;
        }
    }

    public boolean sendMessage(SendMessage sendMessage) {
        try {
            this.sendCache.put(sendMessage);
            return true;
        } catch (InterruptedException e) {
            this.logger.error("异常:{},{}", sendMessage, StringUtils.getTrace(e));
            e.printStackTrace();
            return false;
        }
    }

    public boolean sendMessage(String str) {
        List<SendMessage> translateMessage = ((SmbusMsgService) SmbApplicationContext.getInstance().getBean(SmbCommService.MsgServiceName)).translateMessage("test", "test", str);
        if (translateMessage.isEmpty()) {
            this.logger.warn("忽略发送:{}", str);
        }
        try {
            Iterator<SendMessage> it = translateMessage.iterator();
            while (it.hasNext()) {
                this.sendCache.put(it.next());
            }
            return true;
        } catch (InterruptedException e) {
            this.logger.error("异常:{},{}", str, StringUtils.getTrace(e));
            e.printStackTrace();
            return false;
        }
    }
}
