package com.taobao.metamorphosis.client.extension.producer;

import com.taobao.common.store.Store;
import com.taobao.common.store.journal.JournalStore;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.GetRecoverStorageErrorException;
import com.taobao.metamorphosis.exception.UnknowCodecTypeException;
import com.taobao.metamorphosis.utils.IdWorker;
import com.taobao.metamorphosis.utils.NamedThreadFactory;
import com.taobao.metamorphosis.utils.codec.Deserializer;
import com.taobao.metamorphosis.utils.codec.Serializer;
import com.taobao.metamorphosis.utils.codec.impl.Hessian1Deserializer;
import com.taobao.metamorphosis.utils.codec.impl.Hessian1Serializer;
import com.taobao.metamorphosis.utils.codec.impl.JavaDeserializer;
import com.taobao.metamorphosis.utils.codec.impl.JavaSerializer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/extension/producer/LocalMessageStorageManager.class */
public class LocalMessageStorageManager implements MessageRecoverManager {
    protected static final String SPLIT = "@";
    protected final ConcurrentHashMap<String, FutureTask<Store>> topicStoreMap;
    protected final ConcurrentHashMap<String, FutureTask<Boolean>> topicRecoverTaskMap;
    private final Serializer serializer;
    protected final Deserializer deserializer;
    private final IdWorker idWorker;
    public String META_LOCALMESSAGE_PATH;
    private final String META_LOCALMESSAGE_CODEC_TYPE;
    protected final ThreadPoolExecutor threadPoolExecutor;
    private final ScheduledExecutorService scheduledExecutorService;
    protected MessageRecoverManager.MessageRecoverer messageRecoverer;
    static final Log log = LogFactory.getLog(LocalMessageStorageManager.class);
    public static final String DEFAULT_META_LOCALMESSAGE_PATH = System.getProperty("meta.localmessage.path", System.getProperty("user.home") + File.separator + ".meta_localmessage");

    public LocalMessageStorageManager(MetaClientConfig metaClientConfig) {
        this(metaClientConfig, DEFAULT_META_LOCALMESSAGE_PATH, null);
    }

    public LocalMessageStorageManager(MetaClientConfig metaClientConfig, String str, MessageRecoverManager.MessageRecoverer messageRecoverer) {
        this.topicStoreMap = new ConcurrentHashMap<>();
        this.topicRecoverTaskMap = new ConcurrentHashMap<>();
        this.idWorker = new IdWorker(0L);
        this.META_LOCALMESSAGE_CODEC_TYPE = System.getProperty("meta.localmessage.codec", "java");
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.META_LOCALMESSAGE_PATH = StringUtils.isNotBlank(str) ? str : DEFAULT_META_LOCALMESSAGE_PATH;
        this.messageRecoverer = messageRecoverer;
        if (this.META_LOCALMESSAGE_CODEC_TYPE.equals("java")) {
            this.serializer = new JavaSerializer();
            this.deserializer = new JavaDeserializer();
        } else {
            if (!this.META_LOCALMESSAGE_CODEC_TYPE.equals("hessian1")) {
                throw new UnknowCodecTypeException(this.META_LOCALMESSAGE_CODEC_TYPE);
            }
            this.serializer = new Hessian1Serializer();
            this.deserializer = new Hessian1Deserializer();
        }
        this.threadPoolExecutor = new ThreadPoolExecutor(metaClientConfig.getRecoverThreadCount(), metaClientConfig.getRecoverThreadCount(), 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new NamedThreadFactory("SendRecover-thread"), new ThreadPoolExecutor.CallerRunsPolicy());
        makeDataDir();
        loadStores();
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.taobao.metamorphosis.client.extension.producer.LocalMessageStorageManager.1
            @Override // java.lang.Runnable
            public void run() {
                LocalMessageStorageManager.log.info("开始尝试发送本地缓存的消息...");
                LocalMessageStorageManager.this.recover();
            }
        }, 0L, metaClientConfig.getRecoverMessageIntervalInMills(), TimeUnit.MILLISECONDS);
    }

    private void loadStores() {
        for (File file : new File(this.META_LOCALMESSAGE_PATH).listFiles()) {
            if (file.isDirectory()) {
                String name = file.getName();
                String[] split = name.split(SPLIT);
                if (split.length == 2) {
                    log.info("加载local message storage " + name + " ...");
                    getOrCreateStore(split[0], new Partition(split[1]));
                }
            }
        }
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager
    public void recover() {
        Set<String> keySet = this.topicStoreMap.keySet();
        if (keySet == null || keySet.size() == 0) {
            log.info("SendRecover没有需要恢复的消息");
            return;
        }
        if (this.messageRecoverer == null) {
            log.warn("messageRecoverer还未设置");
            return;
        }
        for (String str : keySet) {
            String[] split = str.split(SPLIT);
            String str2 = split[0];
            Partition partition = new Partition(split[1]);
            int messageCount = getMessageCount(str2, partition);
            log.info(str + "需要恢复的条数:" + messageCount);
            if (messageCount > 0 && !recover(str2, partition, this.messageRecoverer)) {
                log.info("SendRecover发送恢复任务正在运行,不需要重新启动,name=" + str);
            }
        }
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager
    public boolean recover(final String str, final Partition partition, final MessageRecoverManager.MessageRecoverer messageRecoverer) {
        final String generateKey = generateKey(str, partition);
        FutureTask<Boolean> futureTask = new FutureTask<>(new Callable<Boolean>() { // from class: com.taobao.metamorphosis.client.extension.producer.LocalMessageStorageManager.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                AtomicLong atomicLong = new AtomicLong(0L);
                try {
                    try {
                        innerRecover(LocalMessageStorageManager.this.getOrCreateStore(str, partition), messageRecoverer, atomicLong, generateKey);
                        LocalMessageStorageManager.log.info("SendRecover执行完毕移除发送恢复任务,name=" + generateKey + ",恢复消息" + atomicLong.get() + "条");
                        LocalMessageStorageManager.this.topicRecoverTaskMap.remove(generateKey);
                    } catch (Throwable th) {
                        LocalMessageStorageManager.log.error("SendRecover发送消息恢复失败,name=" + generateKey, th);
                        LocalMessageStorageManager.log.info("SendRecover执行完毕移除发送恢复任务,name=" + generateKey + ",恢复消息" + atomicLong.get() + "条");
                        LocalMessageStorageManager.this.topicRecoverTaskMap.remove(generateKey);
                    }
                    return true;
                } catch (Throwable th2) {
                    LocalMessageStorageManager.log.info("SendRecover执行完毕移除发送恢复任务,name=" + generateKey + ",恢复消息" + atomicLong.get() + "条");
                    LocalMessageStorageManager.this.topicRecoverTaskMap.remove(generateKey);
                    throw th2;
                }
            }

            private void innerRecover(Store store, MessageRecoverManager.MessageRecoverer messageRecoverer2, AtomicLong atomicLong, String str2) throws IOException, Exception {
                Iterator it = store.iterator();
                while (it.hasNext()) {
                    byte[] bArr = (byte[]) it.next();
                    messageRecoverer2.handle((Message) LocalMessageStorageManager.this.deserializer.decodeObject(store.get(bArr)));
                    try {
                        store.remove(bArr);
                        atomicLong.incrementAndGet();
                        if (atomicLong.get() % 20000 == 0) {
                            LocalMessageStorageManager.log.info("SendRecover " + str2 + "已恢复消息条数:" + atomicLong.get());
                        }
                    } catch (IOException e) {
                        LocalMessageStorageManager.log.error("SendRecover remove message failed", e);
                    }
                }
            }
        });
        if (this.topicRecoverTaskMap.putIfAbsent(generateKey, futureTask) == null) {
            this.threadPoolExecutor.submit(futureTask);
            return true;
        }
        if (!log.isDebugEnabled()) {
            return false;
        }
        log.debug("SendRecover发送恢复任务正在运行,不需要重新启动,name=" + generateKey);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Store getOrCreateStore(String str, Partition partition) {
        getOrCreateStore0(str, Partition.RandomPartiton);
        return getOrCreateStore0(str, partition);
    }

    private Store getOrCreateStore0(String str, Partition partition) {
        final String generateKey = generateKey(str, partition);
        FutureTask<Store> futureTask = this.topicStoreMap.get(generateKey);
        if (futureTask != null) {
            return getStore(generateKey, futureTask);
        }
        FutureTask<Store> futureTask2 = new FutureTask<>(new Callable<Store>() { // from class: com.taobao.metamorphosis.client.extension.producer.LocalMessageStorageManager.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Store call() throws Exception {
                File file = new File(LocalMessageStorageManager.this.META_LOCALMESSAGE_PATH + File.separator + generateKey);
                if (!file.exists()) {
                    file.mkdir();
                }
                return newStore(generateKey);
            }

            private Store newStore(String str2) throws IOException {
                return LocalMessageStorageManager.this.newStore(str2);
            }
        });
        FutureTask<Store> putIfAbsent = this.topicStoreMap.putIfAbsent(generateKey, futureTask2);
        if (putIfAbsent == null) {
            futureTask2.run();
            putIfAbsent = futureTask2;
        }
        return getStore(generateKey, putIfAbsent);
    }

    private Store getStore(String str, FutureTask<Store> futureTask) {
        try {
            return futureTask.get();
        } catch (Throwable th) {
            log.error("获取topic=" + str + "对应的store失败", th);
            throw new GetRecoverStorageErrorException("获取topic=" + str + "对应的store失败", th);
        }
    }

    private void makeDataDir() {
        File file = new File(this.META_LOCALMESSAGE_PATH);
        if (file.exists()) {
            return;
        }
        file.mkdir();
    }

    @Override // com.taobao.metamorphosis.client.Shutdownable
    public void shutdown() {
        for (Map.Entry<String, FutureTask<Store>> entry : this.topicStoreMap.entrySet()) {
            try {
                getStore(entry.getKey(), entry.getValue()).close();
            } catch (IOException e) {
            }
        }
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager
    public void append(Message message, Partition partition) throws IOException {
        Store orCreateStore = getOrCreateStore(message.getTopic(), partition);
        ByteBuffer allocate = ByteBuffer.allocate(16);
        allocate.putLong(this.idWorker.nextId());
        orCreateStore.add(allocate.array(), this.serializer.encodeObject(message));
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager
    public int getMessageCount(String str, Partition partition) {
        String generateKey = generateKey(str, partition);
        FutureTask<Store> futureTask = this.topicStoreMap.get(generateKey);
        if (futureTask != null) {
            return getStore(generateKey, futureTask).size();
        }
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String generateKey(String str, Partition partition) {
        return str + SPLIT + partition;
    }

    @Override // com.taobao.metamorphosis.client.extension.producer.MessageRecoverManager
    public synchronized void setMessageRecoverer(MessageRecoverManager.MessageRecoverer messageRecoverer) {
        if (this.messageRecoverer == null) {
            this.messageRecoverer = messageRecoverer;
        }
    }

    protected Store newStore(String str) throws IOException {
        return new JournalStore(this.META_LOCALMESSAGE_PATH + File.separator + str, str);
    }
}
