/*
 * Decompiled with CFR 0.152.
 */
package com.efuture.ocp.taskcore.message;

import com.efuture.ocp.common.util.DateUtils;
import com.efuture.ocp.common.util.SpringBeanFactory;
import com.efuture.ocp.common.util.UniqueID;
import com.efuture.ocp.taskcore.consumer.ConsumerNode;
import com.efuture.ocp.taskcore.message.IMessageHandle;
import com.efuture.ocp.taskcore.message.Message;
import com.efuture.ocp.taskcore.message.MessageStatus;
import com.efuture.omd.storage.FStorageOperations;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

public class DBMessageHandle
implements IMessageHandle {
    private static Logger logger = Logger.getLogger(DBMessageHandle.class);
    public static final String StorageOperation = "StorageOperation_task";
    public static final String message_tablename = "task_message";
    public static final String message_success_tablename = "task_message_success";
    public static final String message_error_tablename = "task_message_error";
    public static final int[] delaydate = new int[]{2, 4, 8, 16, 32, 64, 128, 256, 600, 1200};

    public FStorageOperations getStorageOperations() {
        return (FStorageOperations)SpringBeanFactory.getBean((String)StorageOperation, FStorageOperations.class);
    }

    @Override
    @Transactional(propagation=Propagation.REQUIRES_NEW, value="tasktransactionManager")
    public int produce(Message msg) {
        msg.setId(UniqueID.getUniqueID());
        msg.setCreatedate(new Date());
        msg.setStatus(MessageStatus.NEW.name());
        msg.setKeyvalue(msg.getTopic() + "-" + msg.getKeyvalue());
        this.getStorageOperations().insert((Object)msg, message_tablename);
        return 0;
    }

    @Override
    public List<Message> pullNewMessage(ConsumerNode node) {
        Date now = new Date();
        Criteria criteria_get = Criteria.where((String)"topic").is((Object)node.getMessageConfig().getTopic()).and("status").is((Object)MessageStatus.ING.name()).and("execnode").is((Object)node.getNodekey());
        Query query_get = new Query((CriteriaDefinition)criteria_get);
        query_get.limit(node.getMessageConfig().getPagesize());
        List rtn = this.getStorageOperations().select(query_get, Message.class, message_tablename);
        if (rtn.size() > 0) {
            return rtn;
        }
        Criteria criteria = Criteria.where((String)"topic").is((Object)node.getMessageConfig().getTopic()).and("status").is((Object)MessageStatus.NEW.name()).and("delaydate").lte((Object)now);
        Query query = new Query((CriteriaDefinition)criteria);
        Update update = new Update();
        update.set("status", (Object)MessageStatus.ING.name());
        update.set("execdate", (Object)now);
        update.set("execnode", (Object)node.getNodekey());
        query.limit(100);
        this.getStorageOperations().update(query, update, message_tablename);
        rtn = this.getStorageOperations().select(query_get, Message.class, message_tablename);
        return rtn;
    }

    @Override
    public int lockMessage(ConsumerNode node, Message msg) {
        return 1;
    }

    @Override
    @Transactional(propagation=Propagation.REQUIRES_NEW, value="tasktransactionManager")
    public void consumeCallbackMessage(ConsumerNode node, Message msg) {
        if (msg.getStatus().equalsIgnoreCase(MessageStatus.SUCCESS.name()) || msg.getStatus().equalsIgnoreCase(MessageStatus.DUP.name())) {
            this.getStorageOperations().insert((Object)msg, message_success_tablename);
            Criteria criteria = Criteria.where((String)"id").is((Object)msg.getId());
            Query query = new Query((CriteriaDefinition)criteria);
            this.getStorageOperations().delete(query, message_tablename);
        } else if (msg.getRetryCount() < node.getMessageConfig().getMaxdelaytime()) {
            Criteria criteria = Criteria.where((String)"id").is((Object)msg.getId());
            Query query = new Query((CriteriaDefinition)criteria);
            msg.setRetryCount(msg.getRetryCount() + 1);
            Update upt = new Update();
            upt.set("status", (Object)MessageStatus.NEW.name());
            upt.set("retryCount", (Object)msg.getRetryCount());
            upt.set("msg", (Object)msg.getMsg());
            int delaysec = 10;
            delaysec = msg.getRetryCount() >= 10 ? delaydate[9] : delaydate[msg.getRetryCount() - 1];
            Date delay = DateUtils.addSeconds((Date)new Date(), (int)delaysec);
            upt.set("delaydate", (Object)delay);
            this.getStorageOperations().update(query, upt, message_tablename);
        } else {
            this.getStorageOperations().insert((Object)msg, message_error_tablename);
            Criteria criteria = Criteria.where((String)"id").is((Object)msg.getId());
            Query query = new Query((CriteriaDefinition)criteria);
            this.getStorageOperations().delete(query, message_tablename);
        }
    }
}

