package com.efuture.ocp.taskcore.rocketmq;

import com.alibaba.fastjson.JSON;
import com.efuture.ocp.common.rest.ServiceLogs;
import com.efuture.ocp.common.util.SpringBeanFactory;
import com.efuture.ocp.common.util.UniqueID;
import com.efuture.ocp.taskcore.consumer.ConsumerNode;
import com.efuture.ocp.taskcore.message.IMessageHandle;
import com.efuture.ocp.taskcore.message.Message;
import com.efuture.ocp.taskcore.message.MessageStatus;
import com.efuture.omd.storage.FStorageOperations;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import org.apache.log4j.Logger;
import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:com/efuture/ocp/taskcore/rocketmq/RocketMqMessageHandle.class */
public class RocketMqMessageHandle implements IMessageHandle {
    private static Logger logger = Logger.getLogger(RocketMqMessageHandle.class);
    public static final String StorageOperation = "StorageOperation_task";
    public static final String message_tablename = "task_message";
    public static final String message_success_tablename = "task_message_success";
    public static final String message_error_tablename = "task_message_error";

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public FStorageOperations getStorageOperations() {
        return (FStorageOperations) SpringBeanFactory.getBean(StorageOperation, FStorageOperations.class);
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW, value = "tasktransactionManager")
    public void produce(Message message) {
        message.setId(UniqueID.getUniqueID());
        message.setCreatedate(new Date());
        message.setStatus(MessageStatus.NEW.name());
        message.setKeyvalue(String.valueOf(message.getTopic()) + "-" + message.getKeyvalue());
        long currentTimeMillis = System.currentTimeMillis();
        try {
            ServiceLogs.logSendMqStart(message.getTopic(), message.getKeyvalue(), "");
            ServiceLogs.logSendMqSuccess(message.getTopic(), message.getKeyvalue(), this.rocketMQTemplate.syncSend(String.valueOf(message.getTopic()) + ":" + message.getEvent(), MessageBuilder.withPayload(message).setHeader("KEYS", message.getKeyvalue()).build()).toString(), "", currentTimeMillis);
        } catch (Exception e) {
            ServiceLogs.logSendMqError(message.getTopic(), message.getKeyvalue(), "1.1", "发送失败，后续保存到数据库-->" + e.getMessage(), "", currentTimeMillis);
            message.setMsg(e.getMessage());
            insdb(message, currentTimeMillis);
        }
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW, value = "tasktransactionManager")
    public void insdb(Message message, long j) {
        try {
            getStorageOperations().insert(message, message_tablename);
        } catch (Exception e) {
            message.setMsg(e.getMessage());
            ServiceLogs.logSendMqError(message.getTopic(), message.getKeyvalue(), "1.2", "保存数据库失败,请手工发送", JSON.toJSONString(message), j);
        }
    }

    public List<Message> pullNewMessage(ConsumerNode consumerNode) {
        logger.warn("rocketmq 不支持 pullNewMessage");
        return null;
    }

    public int lockMessage(ConsumerNode consumerNode, Message message) {
        logger.warn("rocketmq 不支持 lockMessage");
        return 0;
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW, value = "tasktransactionManager")
    public void consumeCallbackMessage(ConsumerNode consumerNode, Message message) {
        if (message == null) {
            return;
        }
        try {
            message.setData("");
            if (message.getStatus().equalsIgnoreCase(MessageStatus.FAIL.name())) {
                getStorageOperations().insert(message, message_error_tablename);
            } else {
                getStorageOperations().insert(message, message_success_tablename);
            }
        } catch (Exception e) {
            message.setMsg(e.getMessage());
            ServiceLogs.logConsumeMqError(message.getTopic(), message.getKeyvalue(), "2.1", "消息回调,保存到数据库失败:" + e.getMessage(), message.getExecdate().getTime());
        }
    }
}
