package com.efuture.job.model;

import cn.hutool.core.exceptions.ExceptionUtil;
import com.efuture.common.utils.MessageFormatUtils;
import com.efuture.common.utils.UniqueUtils;
import com.efuture.job.PropertiesJob;
import com.efuture.job.component.handle.complete.SqlOnComplete;
import com.efuture.job.spi.Input;
import com.efuture.job.spi.JobHandle;
import com.efuture.job.spi.Output;
import com.efuture.job.spi.PipelineTransform;
import com.efuture.job.utils.FutureJobLog;
import com.efuture.job.utils.TransformBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/efuture/job/model/JobContext.class */
public class JobContext {
    String jobId;
    JobConfigBean jobConfig;
    ShardingInfo shardingInfo;
    JobTrans startTransInfo;
    BatchContext curBatch;
    JobHandle handle;
    SqlOnComplete postJobHandle;
    AtomicBoolean hasException;
    String lastErrorMsg;
    int status;
    int readNum;
    boolean retry;
    AtomicInteger procNum;
    AtomicInteger sucNum;
    AtomicInteger errNum;
    private ArrayBlockingQueue<String> logMsg;
    private ArrayBlockingQueue<String> queue;
    public Function<JobContext, Input> getInputFunc;
    public Function<JobContext, Output> getOutputFunc;
    JobContext superContext;
    PipelineTransform transform;
    static final Logger logger = LoggerFactory.getLogger(JobContext.class);

    public JobContext(JobConfigBean jobConfigBean) {
        this.hasException = new AtomicBoolean(false);
        this.lastErrorMsg = "";
        this.status = 0;
        this.retry = false;
        this.procNum = new AtomicInteger();
        this.sucNum = new AtomicInteger();
        this.errNum = new AtomicInteger();
        this.logMsg = new ArrayBlockingQueue<>(5000);
        this.queue = null;
        this.getInputFunc = null;
        this.getOutputFunc = null;
        this.transform = null;
        this.jobConfig = jobConfigBean;
        this.jobId = String.valueOf(UniqueUtils.genPhKey());
        init();
    }

    public PipelineTransform getTransform() {
        if (this.transform == null) {
            this.transform = TransformBuilder.build(this);
        }
        return this.transform;
    }

    public boolean isRetry() {
        return this.retry;
    }

    public void setRetry(boolean z) {
        this.retry = z;
    }

    public JobHandle getHandle() {
        return this.handle;
    }

    public void setHandle(JobHandle jobHandle) {
        this.handle = jobHandle;
    }

    public JobContext(SqlOnComplete sqlOnComplete) {
        this.hasException = new AtomicBoolean(false);
        this.lastErrorMsg = "";
        this.status = 0;
        this.retry = false;
        this.procNum = new AtomicInteger();
        this.sucNum = new AtomicInteger();
        this.errNum = new AtomicInteger();
        this.logMsg = new ArrayBlockingQueue<>(5000);
        this.queue = null;
        this.getInputFunc = null;
        this.getOutputFunc = null;
        this.transform = null;
        this.postJobHandle = sqlOnComplete;
    }

    public ShardingInfo getShardingInfo() {
        return this.shardingInfo;
    }

    public void setShardingInfo(ShardingInfo shardingInfo) {
        this.shardingInfo = shardingInfo;
    }

    private void init() {
        this.queue = new ArrayBlockingQueue<>(this.jobConfig.getSaveCapacity());
    }

    public JobConfigBean getJobConfig() {
        return this.jobConfig;
    }

    public String getLogId() {
        return this.jobConfig.getJobName() + "-" + getJobId();
    }

    public JobTrans getStartTransInfo() {
        return this.startTransInfo;
    }

    public void setStartTransInfo(JobTrans jobTrans) {
        this.startTransInfo = jobTrans;
    }

    public JobContext getSuperContext() {
        return this.superContext;
    }

    public void setSuperContext(JobContext jobContext) {
        this.superContext = jobContext;
    }

    public void offerToQueue(String str) throws InterruptedException {
        if (!this.queue.offer(str, 60L, TimeUnit.SECONDS)) {
            throw new RuntimeException("等待保存超时:" + str);
        }
    }

    public void pollFromQueue() {
        this.queue.poll();
    }

    public void waitQueue() throws InterruptedException {
        boolean z = false;
        int i = 60;
        while (!hasException()) {
            int size = this.queue.size();
            if (size == 0) {
                z = true;
            } else {
                FutureJobLog.log("还有[{}]批数据未保存,等待[{}]秒", Integer.valueOf(size), 1);
                Thread.sleep(1 * 1000);
                i--;
            }
            if (i <= 0) {
                z = true;
            }
            if (z) {
                return;
            }
        }
        FutureJobLog.log("发生错误,直接退出", new Object[0]);
    }

    public int getSucNum() {
        return this.sucNum.get();
    }

    public int getErrNum() {
        return this.errNum.get();
    }

    public boolean hasException() {
        return this.hasException.get();
    }

    public void addSucNum(int i) {
        this.sucNum.addAndGet(i);
    }

    public void addErrNum(int i) {
        this.errNum.addAndGet(i);
    }

    public String getJobId() {
        return this.jobId;
    }

    public BatchContext getCurBatch() {
        if (this.curBatch == null) {
            initBatchContext();
        }
        return this.curBatch;
    }

    public void setCurBatch(BatchContext batchContext) {
        this.curBatch = batchContext;
    }

    public String getLastErrorMsg() {
        return this.lastErrorMsg;
    }

    public void setLastErrorMsg(String str) {
        this.lastErrorMsg = str;
    }

    public void log(Throwable th, String str, Object... objArr) {
        if (null != this.superContext) {
            this.superContext.log(th, str, objArr);
            return;
        }
        String format = MessageFormatUtils.format(str, objArr);
        if (th != null) {
            format = format + "-->" + ExceptionUtil.stacktraceToString(th);
        }
        this.logMsg.add(format);
        if (this.jobConfig.isDebug()) {
            logger.info(format);
        }
    }

    public List<String> getLogMsg() {
        if (null != this.superContext) {
            return this.superContext.getLogMsg();
        }
        ArrayList arrayList = new ArrayList();
        this.logMsg.drainTo(arrayList);
        if (arrayList.size() > 0) {
            return arrayList;
        }
        return null;
    }

    public void checkIsComplete() {
        if (this.status >= 20 && this.readNum == 0) {
            setStatus(99);
        } else {
            if (this.queue.size() != 0 || this.readNum <= 0 || this.status < 20) {
                return;
            }
            setStatus(99);
        }
    }

    public boolean isComplete() {
        if (this.status < 99) {
            checkIsComplete();
        }
        return this.status >= 99;
    }

    public boolean isSuccess() {
        return this.status == 100;
    }

    public String getInfo() {
        return MessageFormatUtils.format("已读取[{0}]条数据 ,转换条数[{1}], 写入成功[{2}],写入失败[{3}]", new Object[]{Integer.valueOf(this.readNum), Integer.valueOf(this.procNum.get()), Integer.valueOf(this.sucNum.get()), Integer.valueOf(this.errNum.get())});
    }

    public void info(String str, Object... objArr) {
        log(null, str, objArr);
    }

    public void error(Throwable th, String str, Object... objArr) {
        log(th, str, objArr);
    }

    public void debug(String str, Object... objArr) {
        if (this.jobConfig.isDebug()) {
            log(null, str, objArr);
        }
    }

    public String getJobHandleName() {
        return this.jobConfig.getJobType();
    }

    public void setStatus(int i) {
        if (this.status >= i) {
            info("状态变更-->输入的状态[{0}]比当前状态[{1}]小,不执行", Integer.valueOf(i), Integer.valueOf(this.status));
        } else {
            this.status = i;
            debug("状态变更-->输入的状态[{0}] 当前状态[{1}]", Integer.valueOf(i), Integer.valueOf(this.status));
        }
    }

    public int getStatus() {
        return this.status;
    }

    public void start() {
        setStatus(1);
        this.handle.onJobStart(this);
        setStatus(9);
    }

    public void complete() {
        this.handle.onJobComplete(this);
        setStatus(100);
    }

    public void input() throws InterruptedException {
        setStatus(10);
        this.handle.doInput(this);
    }

    public void handleRow(Map<String, Object> map) throws InterruptedException {
        this.handle.inputRowHandle(this, map);
        this.readNum++;
    }

    public void inputSucceed() {
        setStatus(20);
    }

    public void inputFailed(Throwable th) {
        setStatus(25);
        error(th, "读数据时发生错误", new Object[0]);
    }

    public void inputComplete() {
        setStatus(30);
    }

    public void setExceptionStatus(Throwable th) {
        setExceptionStatus(th.getMessage());
    }

    public void setExceptionStatus(String str) {
        this.hasException.set(true);
        setStatus(PropertiesJob.JOB_STATUS.exception);
        this.lastErrorMsg = str;
    }

    public boolean isInputCompleted() {
        return this.status >= 20;
    }

    private synchronized void initBatchContext() {
        if (this.curBatch == null) {
            this.curBatch = new BatchContext(this, "output");
        }
    }

    public void switchToNextBatch() {
        if (this.curBatch == null) {
            debug("curBatch is null 不能做批次切换", new Object[0]);
        } else {
            this.curBatch = new BatchContext(this.curBatch);
        }
    }
}
