package com.efuture.ocp.taskcore.service;

import com.efuture.ocp.taskcore.consumer.ConsumerExecType;
import com.efuture.ocp.taskcore.consumer.ConsumerNode;
import com.efuture.ocp.taskcore.consumer.ConsumerReg;
import com.efuture.ocp.taskcore.consumer.IConsumerHandle;
import com.efuture.ocp.taskcore.consumer.IConsumerRegService;
import com.efuture.ocp.taskcore.message.IMessageHandle;
import com.efuture.ocp.taskcore.message.Message;
import com.efuture.ocp.taskcore.message.MessageConfig;
import com.efuture.ocp.taskcore.message.MessageStatus;
import java.util.Iterator;
import java.util.List;
import org.apache.log4j.Logger;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/efuture/ocp/taskcore/service/PullMessageTaskConsumer.class */
public class PullMessageTaskConsumer implements TaskConsumerService {
    private static Logger logger = Logger.getLogger(PullMessageTaskConsumer.class);
    private ConsumerReg consumerreg;
    private ConsumerNode nodeconfig;
    private IConsumerRegService consumerRegService;
    private IConsumerHandle consumerHandler;
    private IMessageHandle messagehandle;
    private IMessageDupHandle messagedup;
    private boolean ib_init = false;
    private boolean canrun;
    private String topic;

    public PullMessageTaskConsumer(IConsumerRegService iConsumerRegService, IConsumerHandle iConsumerHandle, IMessageHandle iMessageHandle, IMessageDupHandle iMessageDupHandle, boolean z, String str) {
        this.canrun = true;
        this.consumerRegService = iConsumerRegService;
        this.consumerHandler = iConsumerHandle;
        this.messagehandle = iMessageHandle;
        this.messagedup = iMessageDupHandle;
        this.canrun = z;
        this.topic = str;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public boolean isCanrun() {
        return this.canrun;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public void setCanrun(boolean z) {
        this.canrun = z;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public IConsumerHandle getConsumerHandler() {
        return this.consumerHandler;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public IMessageHandle getMessagehandle() {
        return this.messagehandle;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public IMessageDupHandle getMessagedup() {
        return this.messagedup;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public String getTopic() {
        return this.topic;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public void setConsumerHandler(IConsumerHandle iConsumerHandle) {
        this.consumerHandler = iConsumerHandle;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public void setMessagehandle(IMessageHandle iMessageHandle) {
        this.messagehandle = iMessageHandle;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public void setMessagedup(IMessageDupHandle iMessageDupHandle) {
        this.messagedup = iMessageDupHandle;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public IConsumerRegService getConsumerRegService() {
        return this.consumerRegService;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public void setConsumerRegService(IConsumerRegService iConsumerRegService) {
        this.consumerRegService = iConsumerRegService;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public void setTopic(String str) {
        this.topic = str;
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public void init() {
        this.consumerreg = this.consumerRegService.reg(this.topic);
        if (this.consumerreg == null || StringUtils.isEmpty(this.consumerreg.getConsumerkey())) {
            logger.error(String.valueOf(this.topic) + ">>>>>>>PullMessageTaskConsumer:注册失败");
            return;
        }
        this.nodeconfig = new ConsumerNode();
        this.nodeconfig.setNodekey(this.consumerreg.getConsumerkey());
        this.nodeconfig.setNodestatus(this.consumerreg.getStatus());
        MessageConfig messageConfigbyTopic = TaskUtils.getMessageConfigbyTopic(this.topic);
        if (messageConfigbyTopic == null) {
            logger.error(String.valueOf(this.topic) + ">>>>>>>PullMessageTaskConsumer:获取消息配置失败");
        } else {
            this.nodeconfig.setMessageConfig(messageConfigbyTopic);
            this.ib_init = true;
        }
    }

    @Override // com.efuture.ocp.taskcore.service.TaskConsumerService
    public void consumer() {
        if (isCanrun()) {
            if (!this.ib_init) {
                init();
                if (!this.ib_init) {
                    logger.error(String.valueOf(this.topic) + ">>>>>>>PullMessageTaskConsumer:消费者初始化失败,不能执行!");
                    return;
                }
            }
            String exectype = this.nodeconfig.getMessageConfig().getExectype();
            do {
                List<Message> message = getMessage();
                if (message == null || message.size() <= 0) {
                    return;
                }
                Iterator<Message> it = message.iterator();
                while (it.hasNext()) {
                    Message next = it.next();
                    if (lockMessage(next) >= 1) {
                        if (this.messagedup.checkMessageDup(next) < 1) {
                            next.setStatus(MessageStatus.DUP.name());
                            next.setMsg("消息重复,不执行");
                        } else {
                            try {
                                doConsumer(next);
                                next.setStatus(MessageStatus.SUCCESS.name());
                            } catch (Exception e) {
                                logger.error("", e);
                                next.setStatus(MessageStatus.FAIL.name());
                                String message2 = e.getMessage();
                                if (StringUtils.isEmpty(message2)) {
                                    next.setMsg("Unknown Exception");
                                } else {
                                    if (message2.length() > 1900) {
                                        message2 = message2.substring(0, 1900);
                                    }
                                    next.setMsg(message2);
                                }
                                this.messagedup.delMessageDup(next);
                            } finally {
                                consumeCallbackMessage(next);
                            }
                        }
                    }
                }
                if (StringUtils.isEmpty(exectype) || !exectype.equals(ConsumerExecType.BATCH.name())) {
                    return;
                }
            } while (1 != 0);
        }
    }

    public void doConsumer(Message message) throws Exception {
        this.consumerHandler.consume(this.nodeconfig, message);
    }

    private List<Message> getMessage() {
        return this.messagehandle.pullNewMessage(this.nodeconfig);
    }

    private int lockMessage(Message message) {
        return this.messagehandle.lockMessage(this.nodeconfig, message);
    }

    public void consumeCallbackMessage(Message message) {
        this.messagehandle.consumeCallbackMessage(this.nodeconfig, message);
    }
}
