package com.efuture.taskflow.taskmanager;

import com.efuture.ocp.common.distributedLock.DLock;
import com.efuture.ocp.common.exception.ServiceException;
import com.efuture.ocp.common.rest.ServiceLogs;
import com.efuture.ocp.common.util.CacheUtils;
import com.efuture.taskflow.TaskComponentFactory;
import com.efuture.taskflow.TaskConstant;
import com.efuture.taskflow.entity.Task;
import com.efuture.taskflow.entity.TaskErrorInfo;
import com.efuture.taskflow.exception.TaskExceptionCode;
import com.efuture.taskflow.param.TaskParam;
import com.efuture.taskflow.repository.TaskRepository;
import com.efuture.taskflow.taskdata.TaskDataQuery;
import com.efuture.taskflow.work.TaskWorkerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;

@Component("defaultTaskStatusManager")
/* loaded from: input_file:com/efuture/taskflow/taskmanager/TaskStatusManagerImpl.class */
public class TaskStatusManagerImpl implements TaskStatusManager {
    public static String logtype = "TaskStatusManager";

    @Autowired
    TaskRepository taskRepo;
    LinkedBlockingQueue<TaskStatusChangeEvent> taskStatusChangedEvents = new LinkedBlockingQueue<>(50000);
    boolean ibExit = false;
    Thread taskStatusChangedHandleThread = null;

    public TaskStatusManagerImpl() {
        start();
    }

    public TaskRepository getTaskRepo() {
        return this.taskRepo;
    }

    @Override // com.efuture.taskflow.taskmanager.TaskStatusManager
    public void taskStatusChanged(Task task, int i) {
        int task_status = task.getTask_status();
        if (i == 99 && task.getTot_subtask_num() == 0) {
            i = 200;
        }
        task.setTask_status(i);
        setValToCached(task.getStatusKey(), task.getStatusValue());
        publishEvent(new TaskStatusChangeEvent(task, task_status));
    }

    private TaskStatusManager getTaskStatusManager() {
        return TaskComponentFactory.getTaskStatusManager();
    }

    private TaskWorkerFactory getTaskWorkerFactory() {
        return TaskComponentFactory.getTaskWorkerFactory();
    }

    private TaskDataQuery getTaskDataQuery() {
        return TaskComponentFactory.getTaskDataQuery();
    }

    public void publishEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
        if (!taskStatusChangeEvent.isSaveImmediately()) {
            this.taskStatusChangedEvents.add(taskStatusChangeEvent);
            return;
        }
        try {
            getTaskStatusManager().updateTask(taskStatusChangeEvent.getTask(), taskStatusChangeEvent.getOldStatus());
        } catch (Exception e) {
            ServiceLogs.errLog(logtype, e, "更新task[{0}]状态时错误,放入缓冲队列", new Object[]{taskStatusChangeEvent.getTask().getBillno()});
            this.taskStatusChangedEvents.add(taskStatusChangeEvent);
        }
    }

    private void start() {
        this.taskStatusChangedHandleThread = new Thread(new Runnable() { // from class: com.efuture.taskflow.taskmanager.TaskStatusManagerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    TaskStatusManagerImpl.this.doHandleTaskStatusChanged();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "taskStatusChangedHandle");
        this.taskStatusChangedHandleThread.start();
    }

    @Override // com.efuture.taskflow.taskmanager.TaskStatusManager
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @DLock(key = "#args[0].getLockKey()")
    public void refreshTaskStatus(Task task) {
        Task syncTaskStatus = syncTaskStatus(task);
        if (syncTaskStatus.isNotStart()) {
            handleNotStartTaskStatus(syncTaskStatus);
            return;
        }
        if (syncTaskStatus.isExecing()) {
            handleExecingTaskStatus(syncTaskStatus);
        } else if (syncTaskStatus.isWaitSubComplete()) {
            handleWaitSubCompleteTaskStatus(syncTaskStatus);
        } else if (syncTaskStatus.isStoped()) {
            handleStopTaskStatus(syncTaskStatus);
        }
    }

    @Override // com.efuture.taskflow.taskmanager.TaskStatusManager
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void refreshTaskError(Task task, String str) {
        int intVal = TaskParam.TASK_PUBLIC.MAX_CHECK_COUNT.getIntVal(task.getEnt_id());
        task.setExec_count(task.getExec_count() + 1);
        task.setLast_exec_error(str);
        task.setLast_exec_date(System.currentTimeMillis() / 1000);
        if (task.getExec_count() > intVal) {
            onOverMaxCheckTimes(task);
        } else {
            getTaskRepo().updateTask(task, "last_exec_date,exec_count,last_exec_error");
        }
    }

    private void handleStopTaskStatus(Task task) {
        onAllComplete(task);
    }

    private void handleWaitSubCompleteTaskStatus(Task task) {
        if (task.isWaitSubComplete()) {
            checkSubTaskStatus(task);
        }
    }

    private void doMoveToErrorList(Task task) {
        getTaskRepo().moveTaskToErrorList(task);
    }

    private void doUpdateSubtaskStatus(Task task) {
        getTaskRepo().updateTask(task, "success_subtask_num,error_subtask_num,exec_count");
    }

    private void checkSubTaskStatus(Task task) {
        int exec_count = task.getExec_count();
        int intVal = TaskParam.TASK_PUBLIC.MAX_CHECK_COUNT.getIntVal(task.getEnt_id());
        task.setSubTaskStatusInfo(getTaskRepo().querySubTaskStatus(task.getEnt_id(), task.getBillno()));
        if (task.isAllSubCompleted()) {
            task.setTask_status(TaskConstant.TASK_STATUS.ALL_COMPLETE);
            onAllComplete(task);
        } else if (exec_count >= intVal) {
            onOverMaxCheckTimes(task);
        } else {
            task.setExec_count(exec_count + 1);
            doUpdateSubtaskStatus(task);
        }
    }

    private void handleExecingTaskStatus(Task task) {
        if (task.hasSub()) {
            checkSubTaskStatus(task);
        } else {
            checkThisTaskStatus(task);
        }
    }

    private void onOvertime(Task task) {
        task.setTask_status(TaskConstant.TASK_STATUS.OVER_TIME);
        if (StringUtils.isEmpty(task.getLast_exec_errorCode())) {
            task.setLast_exec_errorCode(String.valueOf(TaskConstant.TASK_STATUS.OVER_TIME));
            task.setLast_exec_error("任务超时");
            task.setLast_exec_errorGroup("unknown");
        }
        doMoveToErrorList(task);
    }

    private void onOverMaxCheckTimes(Task task) {
        task.setTask_status(TaskConstant.TASK_STATUS.MAX_EXEC_NUM);
        if (StringUtils.isEmpty(task.getLast_exec_errorCode())) {
            task.setLast_exec_errorCode(String.valueOf(TaskConstant.TASK_STATUS.MAX_EXEC_NUM));
            task.setLast_exec_error("超过重试次数");
            task.setLast_exec_errorGroup("unknown");
        }
        doMoveToErrorList(task);
    }

    private void checkThisTaskStatus(Task task) {
        if (getTaskRepo().checkTaskIsExecComplete(task)) {
            task.setTask_status(TaskConstant.TASK_STATUS.ALL_COMPLETE);
            onAllComplete(task);
            return;
        }
        TaskErrorInfo queryTaskErrorInfo = getTaskRepo().queryTaskErrorInfo(task.getPh_key());
        if (new Date().getTime() - (((TaskParam.TASK_PUBLIC.TASK_TIMEOUT_HOURS.getIntVal(task.getEnt_id()) * 60) * 60) * 1000) >= queryTaskErrorInfo.getLastExecDate()) {
            onOvertime(task);
            return;
        }
        if (queryTaskErrorInfo.getErrorCount() == 0) {
            task.setExec_count(task.getExec_count() + 1);
        } else {
            task.setExec_count(queryTaskErrorInfo.getErrorCount());
        }
        task.setLast_exec_date(queryTaskErrorInfo.getLastExecDate());
        task.setLast_exec_error(queryTaskErrorInfo.getLastExecError());
        task.setLast_exec_errorGroup(queryTaskErrorInfo.getLast_exec_errorGroup());
        task.setLast_exec_errorCode(queryTaskErrorInfo.getLast_exec_errorCode());
        if (task.getExec_count() + 1 >= TaskParam.TASK_PUBLIC.MAX_CHECK_COUNT.getIntVal(task.getEnt_id())) {
            onOverMaxCheckTimes(task);
        } else {
            getTaskRepo().updateTask(task, "exec_count,last_exec_date,last_exec_error,last_exec_errorCode,last_exec_errorGroup");
        }
    }

    private void handleNotStartTaskStatus(Task task) {
        task.getExec_count();
        TaskParam.TASK_PUBLIC.MAX_CHECK_COUNT.getIntVal(task.getEnt_id());
        try {
            if (((new Date().getTime() - task.getPh_timestamp().getTime()) / 1000) / 60 >= TaskParam.TASK_PUBLIC.START_TIMEOUT_MINS.getIntVal(task.getEnt_id())) {
                try {
                    Object taskData = getTaskDataQuery().getTaskData(task);
                    if (taskData == null) {
                        TaskExceptionCode.GET_TASKDATA_ERROR.throwThisException(task.getBillno(), "未知原因");
                    } else {
                        task.setData(taskData);
                        if (task.getData() != null) {
                            getTaskWorkerFactory().getWorker(task.getTask_group(), task.getTask_type()).receiveTask(task);
                        }
                    }
                } catch (ServiceException e) {
                    throw e;
                } catch (Exception e2) {
                    TaskExceptionCode.GET_TASKDATA_ERROR.throwThisException(task.getBillno(), e2.getMessage());
                }
            }
        } catch (ServiceException e3) {
            throw e3;
        } catch (Exception e4) {
            ServiceLogs.errLog(logtype, e4, "重启启动[{0}]任务时错误,错误信息[{1}]", new Object[]{task.getBillno(), e4.getMessage()});
            TaskExceptionCode.GET_TASKDATA_ERROR.throwThisException(task.getBillno(), e4.getMessage());
        }
    }

    @Override // com.efuture.taskflow.taskmanager.TaskStatusManager
    public Task syncTaskStatus(Task task) {
        if (task.getRun_mode() == 2) {
            return task;
        }
        String valFromCached = getValFromCached(task.getStatusKey());
        String valFromCached2 = getValFromCached(task.getParentStatusKey());
        if (valFromCached == null) {
            Task findTaskById = this.taskRepo.findTaskById(task.getEnt_id(), task.getPh_key());
            if (findTaskById != null) {
                task.setStatusValue(findTaskById.getStatusValue());
                setValToCached(task.getStatusKey(), task.getStatusValue());
            }
        } else {
            if (valFromCached2 != null && task.convertToTaskStatus(valFromCached2).getTask_status() == 302) {
                taskStatusChanged(task, TaskConstant.TASK_STATUS.STOP);
                return task;
            }
            int task_status = task.getTask_status();
            if (task.setStatusValue(valFromCached)) {
                int task_status2 = task.getTask_status();
                task.setTask_status(task_status);
                taskStatusChanged(task, task_status2);
            }
        }
        return task;
    }

    protected void doHandleTaskStatusChanged() throws InterruptedException {
        do {
            try {
                changeStatus(getTaskStatusChangedEvents());
            } catch (Exception e) {
                e.printStackTrace();
            }
            Thread.sleep(10000L);
        } while (!this.ibExit);
    }

    private void changeStatus(List<TaskStatusChangeEvent> list) {
        long currentTimeMillis = System.currentTimeMillis();
        ServiceLogs.truedebuglog(logtype, "获取到[{0}]条状态变更", 0L, new Object[]{Integer.valueOf(list.size())});
        HashMap hashMap = new HashMap();
        for (TaskStatusChangeEvent taskStatusChangeEvent : list) {
            String billno = taskStatusChangeEvent.getTask().getBillno();
            TaskStatusChangeEvent taskStatusChangeEvent2 = (TaskStatusChangeEvent) hashMap.get(billno);
            if (taskStatusChangeEvent2 == null || taskStatusChangeEvent2.getTimestamp() < taskStatusChangeEvent.getTimestamp()) {
                hashMap.put(billno, taskStatusChangeEvent);
            }
        }
        ServiceLogs.truedebuglog(logtype, "合并后还有[{0}]条状态变更", currentTimeMillis, new Object[]{Integer.valueOf(hashMap.size())});
        int i = 0;
        for (TaskStatusChangeEvent taskStatusChangeEvent3 : hashMap.values()) {
            try {
                getTaskStatusManager().updateTask(taskStatusChangeEvent3.getTask(), taskStatusChangeEvent3.getOldStatus());
            } catch (Exception e) {
                i++;
            }
        }
        ServiceLogs.truedebuglog(logtype, "处理[{0}]条状态变更,失败[{1}]条", currentTimeMillis, new Object[]{Integer.valueOf(hashMap.size()), Integer.valueOf(i)});
    }

    @Transactional
    public void onAllComplete(Task task) {
        getTaskRepo().moveTaskToSuccess(task);
        TaskComponentFactory.getTaskWorkerFactory().getWorker(task.getTask_group(), task.getTask_type()).onComplete(task);
    }

    @Transactional
    public void onWaitSubComplete(Task task) {
        getTaskRepo().moveTaskToWaitSub(task);
    }

    @Transactional
    public void updateToDb(Task task) {
        String str = "task_status";
        if (task.getExecReturn() != null && task.getExecReturn().getNeedUpdateCols() != null) {
            str = str + "," + task.getExecReturn().getNeedUpdateCols();
        }
        getTaskRepo().updateTask(task, str);
    }

    private List<TaskStatusChangeEvent> getTaskStatusChangedEvents() {
        ArrayList arrayList = new ArrayList(10000);
        this.taskStatusChangedEvents.drainTo(arrayList, 10000);
        return arrayList;
    }

    @Override // com.efuture.taskflow.taskmanager.TaskStatusManager
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void updateTask(Task task, int i) {
        try {
            if (task.getTask_status() == 99) {
                onWaitSubComplete(task);
            } else if (task.getTask_status() == 200) {
                onAllComplete(task);
            } else {
                updateToDb(task);
            }
        } catch (Exception e) {
            ServiceLogs.errLog(logtype, e, "更新任务[{0}]状态失败", new Object[]{task.getBillno()});
            throw e;
        }
    }

    private String getValFromCached(String str) {
        Object data = CacheUtils.getCacheUtils().getData(str);
        if (data != null) {
            return data.toString();
        }
        return null;
    }

    private void setValToCached(String str, String str2, int i) {
        CacheUtils.getCacheUtils().putData(str, str2, i * 60);
    }

    private void setValToCached(String str, String str2) {
        setValToCached(str, str2, 1440);
    }

    @Override // com.efuture.taskflow.taskmanager.TaskStatusManager
    public void faildown(Task task, int i) {
        task.setTask_status(i);
        getTaskRepo().moveTaskToErrorList(task);
    }

    @Override // com.efuture.taskflow.taskmanager.TaskStatusManager
    public void clearCachedStatus(Task task) {
        CacheUtils.getCacheUtils().deleteData(task.getStatusKey());
    }

    @Override // com.efuture.taskflow.taskmanager.TaskStatusManager
    public void stop(Task task) {
        getTaskRepo().updateSubTaskStatus(task.getBillno(), TaskConstant.TASK_STATUS.STOP);
        for (Task task2 : getTaskRepo().querySubTaskNotLast(task.getBillno())) {
            task2.setTask_status(TaskConstant.TASK_STATUS.STOP);
            setValToCached(task2.getStatusKey(), task2.getStatusValue());
        }
    }
}
