package com.efuture.ocp.taskcore.message;

import com.efuture.ocp.common.util.DateUtils;
import com.efuture.ocp.common.util.SpringBeanFactory;
import com.efuture.ocp.common.util.UniqueID;
import com.efuture.ocp.taskcore.consumer.ConsumerNode;
import com.efuture.omd.storage.FStorageOperations;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.tags.BindTag;

/* loaded from: input_file:BOOT-INF/lib/ocp-taskcore-1.0.0.jar:com/efuture/ocp/taskcore/message/DBMessageHandle.class */
public class DBMessageHandle implements IMessageHandle {
    public static final String StorageOperation = "StorageOperation_task";
    public static final String message_tablename = "task_message";
    public static final String message_success_tablename = "task_message_success";
    public static final String message_error_tablename = "task_message_error";
    private static Logger logger = Logger.getLogger(DBMessageHandle.class);
    public static final int[] delaydate = {2, 4, 8, 16, 32, 64, 128, 256, 600, 1200};

    public FStorageOperations getStorageOperations() {
        return (FStorageOperations) SpringBeanFactory.getBean("StorageOperation_task", FStorageOperations.class);
    }

    @Override // com.efuture.ocp.taskcore.message.IMessageHandle
    @Transactional(propagation = Propagation.REQUIRES_NEW, value = "tasktransactionManager")
    public int produce(Message message) {
        message.setId(UniqueID.getUniqueID());
        message.setCreatedate(new Date());
        message.setStatus(MessageStatus.NEW.name());
        message.setKeyvalue(message.getTopic() + "-" + message.getKeyvalue());
        getStorageOperations().insert(message, "task_message");
        return 0;
    }

    @Override // com.efuture.ocp.taskcore.message.IMessageHandle
    public List<Message> pullNewMessage(ConsumerNode consumerNode) {
        Date date = new Date();
        Query query = new Query(Criteria.where("topic").is(consumerNode.getMessageConfig().getTopic()).and(BindTag.STATUS_VARIABLE_NAME).is(MessageStatus.ING.name()).and("execnode").is(consumerNode.getNodekey()));
        query.limit(consumerNode.getMessageConfig().getPagesize());
        List<Message> select = getStorageOperations().select(query, Message.class, "task_message");
        if (select.size() > 0) {
            return select;
        }
        Query query2 = new Query(Criteria.where("topic").is(consumerNode.getMessageConfig().getTopic()).and(BindTag.STATUS_VARIABLE_NAME).is(MessageStatus.NEW.name()).and("delaydate").lte(date));
        Update update = new Update();
        update.set(BindTag.STATUS_VARIABLE_NAME, MessageStatus.ING.name());
        update.set("execdate", date);
        update.set("execnode", consumerNode.getNodekey());
        query2.limit(100);
        getStorageOperations().update(query2, update, "task_message");
        return getStorageOperations().select(query, Message.class, "task_message");
    }

    @Override // com.efuture.ocp.taskcore.message.IMessageHandle
    public int lockMessage(ConsumerNode consumerNode, Message message) {
        return 1;
    }

    @Override // com.efuture.ocp.taskcore.message.IMessageHandle
    @Transactional(propagation = Propagation.REQUIRES_NEW, value = "tasktransactionManager")
    public void consumeCallbackMessage(ConsumerNode consumerNode, Message message) {
        if (message.getStatus().equalsIgnoreCase(MessageStatus.SUCCESS.name()) || message.getStatus().equalsIgnoreCase(MessageStatus.DUP.name())) {
            getStorageOperations().insert(message, "task_message_success");
            getStorageOperations().delete(new Query(Criteria.where("id").is(Long.valueOf(message.getId()))), "task_message");
            return;
        }
        if (message.getRetryCount() >= consumerNode.getMessageConfig().getMaxdelaytime()) {
            getStorageOperations().insert(message, "task_message_error");
            getStorageOperations().delete(new Query(Criteria.where("id").is(Long.valueOf(message.getId()))), "task_message");
            return;
        }
        Query query = new Query(Criteria.where("id").is(Long.valueOf(message.getId())));
        message.setRetryCount(message.getRetryCount() + 1);
        Update update = new Update();
        update.set(BindTag.STATUS_VARIABLE_NAME, MessageStatus.NEW.name());
        update.set("retryCount", Integer.valueOf(message.getRetryCount()));
        update.set("msg", message.getMsg());
        update.set("delaydate", DateUtils.addSeconds(new Date(), message.getRetryCount() >= 10 ? delaydate[9] : delaydate[message.getRetryCount() - 1]));
        getStorageOperations().update(query, update, "task_message");
    }
}
