package com.taobao.metamorphosis.client;

import com.taobao.gecko.core.command.Constants;
import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.gecko.core.util.ExceptionMonitor;
import com.taobao.gecko.core.util.OpaqueGenerator;
import com.taobao.gecko.service.RemotingFactory;
import com.taobao.gecko.service.config.ClientConfig;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.client.consumer.ConsisHashStrategy;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.ConsumerZooKeeper;
import com.taobao.metamorphosis.client.consumer.DefaultLoadBalanceStrategy;
import com.taobao.metamorphosis.client.consumer.LoadBalanceStrategy;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.RecoverManager;
import com.taobao.metamorphosis.client.consumer.RecoverStorageManager;
import com.taobao.metamorphosis.client.consumer.SimpleMessageConsumer;
import com.taobao.metamorphosis.client.consumer.SubscribeInfoManager;
import com.taobao.metamorphosis.client.consumer.storage.OffsetStorage;
import com.taobao.metamorphosis.client.consumer.storage.ZkOffsetStorage;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.PartitionSelector;
import com.taobao.metamorphosis.client.producer.ProducerZooKeeper;
import com.taobao.metamorphosis.client.producer.RoundRobinPartitionSelector;
import com.taobao.metamorphosis.client.producer.SimpleMessageProducer;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.InvalidConsumerConfigException;
import com.taobao.metamorphosis.exception.InvalidOffsetStorageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.exception.NetworkException;
import com.taobao.metamorphosis.network.BooleanCommand;
import com.taobao.metamorphosis.network.MetamorphosisWireFormatType;
import com.taobao.metamorphosis.network.StatsCommand;
import com.taobao.metamorphosis.utils.IdGenerator;
import com.taobao.metamorphosis.utils.MetaZookeeper;
import com.taobao.metamorphosis.utils.Utils;
import com.taobao.metamorphosis.utils.ZkUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/MetaMessageSessionFactory.class */
public class MetaMessageSessionFactory implements MessageSessionFactory {
    private static final int STATS_OPTIMEOUT = 3000;
    protected RemotingClientWrapper remotingClient;
    private final MetaClientConfig metaClientConfig;
    private volatile ZkClient zkClient;
    protected final ProducerZooKeeper producerZooKeeper;
    private final ConsumerZooKeeper consumerZooKeeper;
    private volatile boolean shutdown;
    private final Thread shutdownHook;
    private ZkUtils.ZKConfig zkConfig;
    private final RecoverManager recoverManager;
    private final SubscribeInfoManager subscribeInfoManager;
    protected final IdGenerator sessionIdGenerator;
    protected MetaZookeeper metaZookeeper;
    public static final boolean TCP_NO_DELAY;
    public static final long MAX_SCHEDULE_WRITTEN_BYTES;
    static final char[] INVALID_GROUP_CHAR;
    private static final int MAX_RECONNECT_TIMES = Integer.valueOf(System.getProperty("metaq.client.network.max.reconnect.times", "350")).intValue();
    static final Log log = LogFactory.getLog(MetaMessageSessionFactory.class);
    private final CopyOnWriteArrayList<ZkClientChangedListener> zkClientChangedListeners = new CopyOnWriteArrayList<>();
    private final CopyOnWriteArrayList<Shutdownable> children = new CopyOnWriteArrayList<>();
    private volatile boolean isHutdownHookCalled = false;

    public RemotingClientWrapper getRemotingClient() {
        return this.remotingClient;
    }

    public SubscribeInfoManager getSubscribeInfoManager() {
        return this.subscribeInfoManager;
    }

    public MetaClientConfig getMetaClientConfig() {
        return this.metaClientConfig;
    }

    public ProducerZooKeeper getProducerZooKeeper() {
        return this.producerZooKeeper;
    }

    public ConsumerZooKeeper getConsumerZooKeeper() {
        return this.consumerZooKeeper;
    }

    public RecoverManager getRecoverStorageManager() {
        return this.recoverManager;
    }

    public CopyOnWriteArrayList<Shutdownable> getChildren() {
        return this.children;
    }

    public MetaMessageSessionFactory(MetaClientConfig metaClientConfig) throws MetaClientException {
        try {
            checkConfig(metaClientConfig);
            this.metaClientConfig = metaClientConfig;
            ClientConfig clientConfig = new ClientConfig();
            clientConfig.setTcpNoDelay(TCP_NO_DELAY);
            clientConfig.setMaxReconnectTimes(MAX_RECONNECT_TIMES);
            clientConfig.setWireFormatType(new MetamorphosisWireFormatType());
            clientConfig.setMaxScheduleWrittenBytes(MAX_SCHEDULE_WRITTEN_BYTES);
            try {
                this.remotingClient = new RemotingClientWrapper(RemotingFactory.connect(clientConfig));
                if (this.metaClientConfig.getServerUrl() != null) {
                    connectServer(this.metaClientConfig);
                } else {
                    initZooKeeper();
                }
                this.producerZooKeeper = new ProducerZooKeeper(this.metaZookeeper, this.remotingClient, this.zkClient, metaClientConfig);
                this.sessionIdGenerator = new IdGenerator();
                this.consumerZooKeeper = initConsumerZooKeeper(this.remotingClient, this.zkClient, this.zkConfig);
                this.zkClientChangedListeners.add(this.producerZooKeeper);
                this.zkClientChangedListeners.add(this.consumerZooKeeper);
                this.subscribeInfoManager = new SubscribeInfoManager();
                this.recoverManager = new RecoverStorageManager(this.metaClientConfig, this.subscribeInfoManager);
                this.shutdownHook = new Thread() { // from class: com.taobao.metamorphosis.client.MetaMessageSessionFactory.2
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            MetaMessageSessionFactory.this.isHutdownHookCalled = true;
                            MetaMessageSessionFactory.this.shutdown();
                        } catch (MetaClientException e) {
                            MetaMessageSessionFactory.log.error("关闭session factory失败", e);
                        }
                    }
                };
                Runtime.getRuntime().addShutdownHook(this.shutdownHook);
            } catch (NotifyRemotingException e) {
                throw new NetworkException("Create remoting client failed", e);
            }
        } catch (Exception e2) {
            shutdown();
            throw new MetaClientException("Construct message session factory failed.", e2);
        } catch (MetaClientException e3) {
            shutdown();
            throw e3;
        }
    }

    protected ConsumerZooKeeper initConsumerZooKeeper(RemotingClientWrapper remotingClientWrapper, ZkClient zkClient, ZkUtils.ZKConfig zKConfig) {
        return new ConsumerZooKeeper(this.metaZookeeper, this.remotingClient, this.zkClient, this.zkConfig);
    }

    private void checkConfig(MetaClientConfig metaClientConfig) throws MetaClientException {
        if (metaClientConfig == null) {
            throw new MetaClientException("null configuration");
        }
    }

    private void connectServer(MetaClientConfig metaClientConfig) throws NetworkException {
        try {
            this.remotingClient.connect(metaClientConfig.getServerUrl());
            this.remotingClient.awaitReadyInterrupt(metaClientConfig.getServerUrl());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (NotifyRemotingException e2) {
            throw new NetworkException("Connect to " + metaClientConfig.getServerUrl() + " failed", e2);
        }
    }

    private void initZooKeeper() throws MetaClientException {
        this.zkConfig = null;
        if (this.metaClientConfig.getZkConfig() != null) {
            this.zkConfig = this.metaClientConfig.getZkConfig();
        } else {
            this.zkConfig = loadZkConfigFromLocalFile();
        }
        if (this.zkConfig == null) {
            throw new MetaClientException("No zk config offered");
        }
        this.zkClient = new ZkClient(this.zkConfig.zkConnect, this.zkConfig.zkSessionTimeoutMs, this.zkConfig.zkConnectionTimeoutMs, new ZkUtils.StringSerializer());
        this.metaZookeeper = new MetaZookeeper(this.zkClient, this.zkConfig.zkRoot);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory, com.taobao.metamorphosis.client.Shutdownable
    public synchronized void shutdown() throws MetaClientException {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        if (this.recoverManager != null) {
            this.recoverManager.shutdown();
        }
        Iterator<Shutdownable> it = this.children.iterator();
        while (it.hasNext()) {
            Shutdownable next = it.next();
            if (next != null) {
                next.shutdown();
            }
        }
        try {
            if (this.remotingClient != null) {
                this.remotingClient.stop();
            }
            if (this.zkClient != null) {
                this.zkClient.close();
            }
            if (this.isHutdownHookCalled || this.shutdownHook == null) {
                return;
            }
            Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
        } catch (NotifyRemotingException e) {
            throw new NetworkException("Stop remoting client failed", e);
        }
    }

    private ZkUtils.ZKConfig loadZkConfigFromLocalFile() {
        try {
            Properties resourceAsProperties = Utils.getResourceAsProperties("zk.properties", "GBK");
            ZkUtils.ZKConfig zKConfig = new ZkUtils.ZKConfig();
            if (StringUtils.isNotBlank(resourceAsProperties.getProperty("zk.zkConnect"))) {
                zKConfig.zkConnect = resourceAsProperties.getProperty("zk.zkConnect");
            }
            if (StringUtils.isNotBlank(resourceAsProperties.getProperty("zk.zkSessionTimeoutMs"))) {
                zKConfig.zkSessionTimeoutMs = Integer.parseInt(resourceAsProperties.getProperty("zk.zkSessionTimeoutMs"));
            }
            if (StringUtils.isNotBlank(resourceAsProperties.getProperty("zk.zkConnectionTimeoutMs"))) {
                zKConfig.zkConnectionTimeoutMs = Integer.parseInt(resourceAsProperties.getProperty("zk.zkConnectionTimeoutMs"));
            }
            if (StringUtils.isNotBlank(resourceAsProperties.getProperty("zk.zkSyncTimeMs"))) {
                zKConfig.zkSyncTimeMs = Integer.parseInt(resourceAsProperties.getProperty("zk.zkSyncTimeMs"));
            }
            return zKConfig;
        } catch (IOException e) {
            log.error("zk配置失败", e);
            return null;
        }
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public MessageProducer createProducer(PartitionSelector partitionSelector) {
        return createProducer(partitionSelector, false);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public MessageProducer createProducer() {
        return createProducer(new RoundRobinPartitionSelector(), false);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    @Deprecated
    public MessageProducer createProducer(boolean z) {
        return createProducer(new RoundRobinPartitionSelector(), z);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    @Deprecated
    public MessageProducer createProducer(PartitionSelector partitionSelector, boolean z) {
        if (partitionSelector == null) {
            throw new IllegalArgumentException("Null partitionSelector");
        }
        return (MessageProducer) addChild(new SimpleMessageProducer(this, this.remotingClient, partitionSelector, this.producerZooKeeper, this.sessionIdGenerator.generateId()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends Shutdownable> T addChild(T t) {
        this.children.add(t);
        return t;
    }

    public <T extends Shutdownable> void removeChild(T t) {
        this.children.remove(t);
    }

    private synchronized MessageConsumer createConsumer0(ConsumerConfig consumerConfig, OffsetStorage offsetStorage, RecoverManager recoverManager) {
        if (consumerConfig.getServerUrl() == null) {
            consumerConfig.setServerUrl(this.metaClientConfig.getServerUrl());
        }
        if (offsetStorage == null) {
            throw new InvalidOffsetStorageException("Null offset storage");
        }
        if (!recoverManager.isStarted()) {
            recoverManager.start(this.metaClientConfig);
        }
        checkConsumerConfig(consumerConfig);
        return (MessageConsumer) addChild(new SimpleMessageConsumer(this, this.remotingClient, consumerConfig, this.consumerZooKeeper, this.producerZooKeeper, this.subscribeInfoManager, recoverManager, offsetStorage, createLoadBalanceStrategy(consumerConfig)));
    }

    protected LoadBalanceStrategy createLoadBalanceStrategy(ConsumerConfig consumerConfig) {
        switch (consumerConfig.getLoadBalanceStrategyType()) {
            case DEFAULT:
                return new DefaultLoadBalanceStrategy();
            case CONSIST:
                return new ConsisHashStrategy();
            default:
                throw new IllegalArgumentException("Unknow load balance strategy type:" + consumerConfig.getLoadBalanceStrategyType());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageConsumer createConsumer(ConsumerConfig consumerConfig, OffsetStorage offsetStorage, RecoverManager recoverManager) {
        OffsetStorage offsetStorage2 = offsetStorage;
        if (offsetStorage2 == null) {
            offsetStorage2 = new ZkOffsetStorage(this.metaZookeeper, this.zkClient);
            this.zkClientChangedListeners.add((ZkOffsetStorage) offsetStorage2);
        }
        return createConsumer0(consumerConfig, offsetStorage2, recoverManager != null ? recoverManager : this.recoverManager);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public MessageConsumer createConsumer(ConsumerConfig consumerConfig, OffsetStorage offsetStorage) {
        return createConsumer(consumerConfig, offsetStorage, this.recoverManager);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public Map<InetSocketAddress, StatsResult> getStats(String str) throws InterruptedException {
        return getStats0(null, str);
    }

    private Map<InetSocketAddress, StatsResult> getStats0(InetSocketAddress inetSocketAddress, String str) throws InterruptedException {
        String errorMsg;
        Set<String> groupSet = this.remotingClient.getGroupSet();
        if (groupSet == null || groupSet.size() <= 1) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        try {
            for (String str2 : groupSet) {
                if (!str2.equals(Constants.DEFAULT_GROUP)) {
                    URI uri = new URI(str2);
                    InetSocketAddress inetSocketAddress2 = new InetSocketAddress(uri.getHost(), uri.getPort());
                    if (inetSocketAddress == null || inetSocketAddress.equals(inetSocketAddress2)) {
                        BooleanCommand invokeToGroup = this.remotingClient.invokeToGroup(str2, new StatsCommand(Integer.valueOf(OpaqueGenerator.getNextOpaque()), str), 3000L, TimeUnit.MILLISECONDS);
                        if (invokeToGroup.getResponseStatus() == ResponseStatus.NO_ERROR && (errorMsg = invokeToGroup.getErrorMsg()) != null) {
                            parseStatsValues(inetSocketAddress2, hashMap, str2, errorMsg);
                        }
                    }
                }
            }
            return hashMap;
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            throw new IllegalStateException("Get statistics from brokers failed", e2);
        }
    }

    private void parseStatsValues(InetSocketAddress inetSocketAddress, Map<InetSocketAddress, StatsResult> map, String str, String str2) throws URISyntaxException {
        String[] split = str2.split("\r\n");
        HashMap hashMap = new HashMap();
        for (String str3 : split) {
            int indexOf = str3.indexOf(" ");
            if (indexOf > 0) {
                hashMap.put(str3.substring(0, indexOf), str3.substring(indexOf + 1));
            } else {
                hashMap.put(str3, "NO VALUE");
            }
        }
        map.put(inetSocketAddress, new StatsResult(hashMap));
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public Map<InetSocketAddress, StatsResult> getStats() throws InterruptedException {
        return getStats((String) null);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public StatsResult getStats(InetSocketAddress inetSocketAddress, String str) throws InterruptedException {
        return getStats0(inetSocketAddress, str).get(inetSocketAddress);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public StatsResult getStats(InetSocketAddress inetSocketAddress) throws InterruptedException {
        return getStats(inetSocketAddress, null);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public List<Partition> getPartitionsForTopic(String str) {
        if (this.metaZookeeper == null) {
            throw new IllegalStateException("Could not talk with zookeeper to get partitions list");
        }
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(str);
        List<Partition> list = (List) this.metaZookeeper.getPartitionsForTopicsFromMaster(arrayList).get(str);
        return list == null ? Collections.emptyList() : list;
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public MessageConsumer createConsumer(ConsumerConfig consumerConfig) {
        return createConsumer(consumerConfig, null, null);
    }

    protected void checkConsumerConfig(ConsumerConfig consumerConfig) {
        if (StringUtils.isBlank(consumerConfig.getGroup())) {
            throw new InvalidConsumerConfigException("Blank group");
        }
        char[] cArr = new char[consumerConfig.getGroup().length()];
        consumerConfig.getGroup().getChars(0, cArr.length, cArr, 0);
        for (char c : cArr) {
            for (char c2 : INVALID_GROUP_CHAR) {
                if (c == c2) {
                    throw new InvalidConsumerConfigException("Group name has invalid character " + c);
                }
            }
        }
        if (consumerConfig.getFetchRunnerCount() <= 0) {
            throw new InvalidConsumerConfigException("Invalid fetchRunnerCount:" + consumerConfig.getFetchRunnerCount());
        }
        if (consumerConfig.getFetchTimeoutInMills() <= 0) {
            throw new InvalidConsumerConfigException("Invalid fetchTimeoutInMills:" + consumerConfig.getFetchTimeoutInMills());
        }
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public TopicBrowser createTopicBrowser(String str) {
        return createTopicBrowser(str, 1048576, 5L, TimeUnit.SECONDS);
    }

    @Override // com.taobao.metamorphosis.client.MessageSessionFactory
    public TopicBrowser createTopicBrowser(String str, int i, long j, TimeUnit timeUnit) {
        return new MetaTopicBrowser(str, i, TimeUnit.MILLISECONDS.convert(j, timeUnit), createConsumer(new ConsumerConfig("Just_for_Browser")), getPartitionsForTopic(str));
    }

    static {
        ExceptionMonitor.setInstance(new ExceptionMonitor() { // from class: com.taobao.metamorphosis.client.MetaMessageSessionFactory.1
            public void exceptionCaught(Throwable th) {
                boolean z = (th instanceof IOException) && th.getMessage().indexOf("Connection reset by peer") >= 0;
                if (!MetaMessageSessionFactory.log.isErrorEnabled() || z) {
                    return;
                }
                MetaMessageSessionFactory.log.error("Networking unexpected exception", th);
            }
        });
        TCP_NO_DELAY = Boolean.valueOf(System.getProperty("metaq.network.tcp_nodelay", "true")).booleanValue();
        MAX_SCHEDULE_WRITTEN_BYTES = Long.valueOf(System.getProperty("metaq.network.max_schedule_written_bytes", String.valueOf(Runtime.getRuntime().maxMemory() / 3))).longValue();
        INVALID_GROUP_CHAR = new char[]{'~', '!', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '`', '\'', '\"', ',', ';', '/', '?', '[', ']', '<', '>', '.', ':', ' '};
    }
}
