/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client;

import com.taobao.gecko.core.command.Constants;
import com.taobao.gecko.core.command.RequestCommand;
import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.gecko.core.util.ExceptionMonitor;
import com.taobao.gecko.core.util.OpaqueGenerator;
import com.taobao.gecko.service.RemotingFactory;
import com.taobao.gecko.service.config.ClientConfig;
import com.taobao.gecko.service.config.WireFormatType;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaTopicBrowser;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.Shutdownable;
import com.taobao.metamorphosis.client.StatsResult;
import com.taobao.metamorphosis.client.TopicBrowser;
import com.taobao.metamorphosis.client.ZkClientChangedListener;
import com.taobao.metamorphosis.client.consumer.ConsisHashStrategy;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.ConsumerZooKeeper;
import com.taobao.metamorphosis.client.consumer.DefaultLoadBalanceStrategy;
import com.taobao.metamorphosis.client.consumer.LoadBalanceStrategy;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.RecoverManager;
import com.taobao.metamorphosis.client.consumer.RecoverStorageManager;
import com.taobao.metamorphosis.client.consumer.SimpleMessageConsumer;
import com.taobao.metamorphosis.client.consumer.SubscribeInfoManager;
import com.taobao.metamorphosis.client.consumer.storage.OffsetStorage;
import com.taobao.metamorphosis.client.consumer.storage.ZkOffsetStorage;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.PartitionSelector;
import com.taobao.metamorphosis.client.producer.ProducerZooKeeper;
import com.taobao.metamorphosis.client.producer.RoundRobinPartitionSelector;
import com.taobao.metamorphosis.client.producer.SimpleMessageProducer;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.InvalidConsumerConfigException;
import com.taobao.metamorphosis.exception.InvalidOffsetStorageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.exception.NetworkException;
import com.taobao.metamorphosis.network.BooleanCommand;
import com.taobao.metamorphosis.network.MetamorphosisWireFormatType;
import com.taobao.metamorphosis.network.StatsCommand;
import com.taobao.metamorphosis.utils.IdGenerator;
import com.taobao.metamorphosis.utils.MetaZookeeper;
import com.taobao.metamorphosis.utils.Utils;
import com.taobao.metamorphosis.utils.ZkUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class MetaMessageSessionFactory
implements MessageSessionFactory {
    private static final int MAX_RECONNECT_TIMES = Integer.valueOf(System.getProperty("metaq.client.network.max.reconnect.times", "350"));
    private static final int STATS_OPTIMEOUT = 3000;
    protected RemotingClientWrapper remotingClient;
    private final MetaClientConfig metaClientConfig;
    private volatile ZkClient zkClient;
    static final Log log = LogFactory.getLog(MetaMessageSessionFactory.class);
    private final CopyOnWriteArrayList<ZkClientChangedListener> zkClientChangedListeners = new CopyOnWriteArrayList();
    protected final ProducerZooKeeper producerZooKeeper;
    private final ConsumerZooKeeper consumerZooKeeper;
    private final CopyOnWriteArrayList<Shutdownable> children = new CopyOnWriteArrayList();
    private volatile boolean shutdown;
    private volatile boolean isHutdownHookCalled = false;
    private final Thread shutdownHook;
    private ZkUtils.ZKConfig zkConfig;
    private final RecoverManager recoverManager;
    private final SubscribeInfoManager subscribeInfoManager;
    protected final IdGenerator sessionIdGenerator;
    protected MetaZookeeper metaZookeeper;
    public static final boolean TCP_NO_DELAY;
    public static final long MAX_SCHEDULE_WRITTEN_BYTES;
    static final char[] INVALID_GROUP_CHAR;

    public RemotingClientWrapper getRemotingClient() {
        return this.remotingClient;
    }

    public SubscribeInfoManager getSubscribeInfoManager() {
        return this.subscribeInfoManager;
    }

    public MetaClientConfig getMetaClientConfig() {
        return this.metaClientConfig;
    }

    public ProducerZooKeeper getProducerZooKeeper() {
        return this.producerZooKeeper;
    }

    public ConsumerZooKeeper getConsumerZooKeeper() {
        return this.consumerZooKeeper;
    }

    public RecoverManager getRecoverStorageManager() {
        return this.recoverManager;
    }

    public CopyOnWriteArrayList<Shutdownable> getChildren() {
        return this.children;
    }

    public MetaMessageSessionFactory(MetaClientConfig metaClientConfig) throws MetaClientException {
        try {
            this.checkConfig(metaClientConfig);
            this.metaClientConfig = metaClientConfig;
            ClientConfig clientConfig = new ClientConfig();
            clientConfig.setTcpNoDelay(TCP_NO_DELAY);
            clientConfig.setMaxReconnectTimes(MAX_RECONNECT_TIMES);
            clientConfig.setWireFormatType((WireFormatType)new MetamorphosisWireFormatType());
            clientConfig.setMaxScheduleWrittenBytes(MAX_SCHEDULE_WRITTEN_BYTES);
            try {
                this.remotingClient = new RemotingClientWrapper(RemotingFactory.connect((ClientConfig)clientConfig));
            }
            catch (NotifyRemotingException e) {
                throw new NetworkException("Create remoting client failed", (Throwable)e);
            }
            if (this.metaClientConfig.getServerUrl() != null) {
                this.connectServer(this.metaClientConfig);
            } else {
                this.initZooKeeper();
            }
            this.producerZooKeeper = new ProducerZooKeeper(this.metaZookeeper, this.remotingClient, this.zkClient, metaClientConfig);
            this.sessionIdGenerator = new IdGenerator();
            this.consumerZooKeeper = this.initConsumerZooKeeper(this.remotingClient, this.zkClient, this.zkConfig);
            this.zkClientChangedListeners.add(this.producerZooKeeper);
            this.zkClientChangedListeners.add(this.consumerZooKeeper);
            this.subscribeInfoManager = new SubscribeInfoManager();
            this.recoverManager = new RecoverStorageManager(this.metaClientConfig, this.subscribeInfoManager);
            this.shutdownHook = new Thread(){

                @Override
                public void run() {
                    try {
                        MetaMessageSessionFactory.this.isHutdownHookCalled = true;
                        MetaMessageSessionFactory.this.shutdown();
                    }
                    catch (MetaClientException e) {
                        log.error((Object)"\u5173\u95edsession factory\u5931\u8d25", (Throwable)e);
                    }
                }
            };
            Runtime.getRuntime().addShutdownHook(this.shutdownHook);
        }
        catch (MetaClientException e) {
            this.shutdown();
            throw e;
        }
        catch (Exception e) {
            this.shutdown();
            throw new MetaClientException("Construct message session factory failed.", (Throwable)e);
        }
    }

    protected ConsumerZooKeeper initConsumerZooKeeper(RemotingClientWrapper remotingClientWrapper, ZkClient zkClient2, ZkUtils.ZKConfig config) {
        return new ConsumerZooKeeper(this.metaZookeeper, this.remotingClient, this.zkClient, this.zkConfig);
    }

    private void checkConfig(MetaClientConfig metaClientConfig) throws MetaClientException {
        if (metaClientConfig == null) {
            throw new MetaClientException("null configuration");
        }
    }

    private void connectServer(MetaClientConfig metaClientConfig) throws NetworkException {
        try {
            this.remotingClient.connect(metaClientConfig.getServerUrl());
            this.remotingClient.awaitReadyInterrupt(metaClientConfig.getServerUrl());
        }
        catch (NotifyRemotingException e) {
            throw new NetworkException("Connect to " + metaClientConfig.getServerUrl() + " failed", (Throwable)e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void initZooKeeper() throws MetaClientException {
        this.zkConfig = null;
        this.zkConfig = this.metaClientConfig.getZkConfig() != null ? this.metaClientConfig.getZkConfig() : this.loadZkConfigFromLocalFile();
        if (this.zkConfig == null) {
            throw new MetaClientException("No zk config offered");
        }
        this.zkClient = new ZkClient(this.zkConfig.zkConnect, this.zkConfig.zkSessionTimeoutMs, this.zkConfig.zkConnectionTimeoutMs, (ZkSerializer)new ZkUtils.StringSerializer());
        this.metaZookeeper = new MetaZookeeper(this.zkClient, this.zkConfig.zkRoot);
    }

    @Override
    public synchronized void shutdown() throws MetaClientException {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        if (this.recoverManager != null) {
            this.recoverManager.shutdown();
        }
        for (Shutdownable child : this.children) {
            if (child == null) continue;
            child.shutdown();
        }
        try {
            if (this.remotingClient != null) {
                this.remotingClient.stop();
            }
        }
        catch (NotifyRemotingException e) {
            throw new NetworkException("Stop remoting client failed", (Throwable)e);
        }
        if (this.zkClient != null) {
            this.zkClient.close();
        }
        if (!this.isHutdownHookCalled && this.shutdownHook != null) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
        }
    }

    private ZkUtils.ZKConfig loadZkConfigFromLocalFile() {
        try {
            Properties properties = Utils.getResourceAsProperties((String)"zk.properties", (String)"GBK");
            ZkUtils.ZKConfig zkConfig = new ZkUtils.ZKConfig();
            if (StringUtils.isNotBlank((String)properties.getProperty("zk.zkConnect"))) {
                zkConfig.zkConnect = properties.getProperty("zk.zkConnect");
            }
            if (StringUtils.isNotBlank((String)properties.getProperty("zk.zkSessionTimeoutMs"))) {
                zkConfig.zkSessionTimeoutMs = Integer.parseInt(properties.getProperty("zk.zkSessionTimeoutMs"));
            }
            if (StringUtils.isNotBlank((String)properties.getProperty("zk.zkConnectionTimeoutMs"))) {
                zkConfig.zkConnectionTimeoutMs = Integer.parseInt(properties.getProperty("zk.zkConnectionTimeoutMs"));
            }
            if (StringUtils.isNotBlank((String)properties.getProperty("zk.zkSyncTimeMs"))) {
                zkConfig.zkSyncTimeMs = Integer.parseInt(properties.getProperty("zk.zkSyncTimeMs"));
            }
            return zkConfig;
        }
        catch (IOException e) {
            log.error((Object)"zk\u914d\u7f6e\u5931\u8d25", (Throwable)e);
            return null;
        }
    }

    @Override
    public MessageProducer createProducer(PartitionSelector partitionSelector) {
        return this.createProducer(partitionSelector, false);
    }

    @Override
    public MessageProducer createProducer() {
        return this.createProducer(new RoundRobinPartitionSelector(), false);
    }

    @Override
    @Deprecated
    public MessageProducer createProducer(boolean ordered) {
        return this.createProducer(new RoundRobinPartitionSelector(), ordered);
    }

    @Override
    @Deprecated
    public MessageProducer createProducer(PartitionSelector partitionSelector, boolean ordered) {
        if (partitionSelector == null) {
            throw new IllegalArgumentException("Null partitionSelector");
        }
        return this.addChild(new SimpleMessageProducer(this, this.remotingClient, partitionSelector, this.producerZooKeeper, this.sessionIdGenerator.generateId()));
    }

    protected <T extends Shutdownable> T addChild(T child) {
        this.children.add(child);
        return child;
    }

    public <T extends Shutdownable> void removeChild(T child) {
        this.children.remove(child);
    }

    private synchronized MessageConsumer createConsumer0(ConsumerConfig consumerConfig, OffsetStorage offsetStorage, RecoverManager recoverManager0) {
        if (consumerConfig.getServerUrl() == null) {
            consumerConfig.setServerUrl(this.metaClientConfig.getServerUrl());
        }
        if (offsetStorage == null) {
            throw new InvalidOffsetStorageException("Null offset storage");
        }
        if (!recoverManager0.isStarted()) {
            recoverManager0.start(this.metaClientConfig);
        }
        this.checkConsumerConfig(consumerConfig);
        return this.addChild(new SimpleMessageConsumer(this, this.remotingClient, consumerConfig, this.consumerZooKeeper, this.producerZooKeeper, this.subscribeInfoManager, recoverManager0, offsetStorage, this.createLoadBalanceStrategy(consumerConfig)));
    }

    protected LoadBalanceStrategy createLoadBalanceStrategy(ConsumerConfig consumerConfig) {
        switch (consumerConfig.getLoadBalanceStrategyType()) {
            case DEFAULT: {
                return new DefaultLoadBalanceStrategy();
            }
            case CONSIST: {
                return new ConsisHashStrategy();
            }
        }
        throw new IllegalArgumentException("Unknow load balance strategy type:" + (Object)((Object)consumerConfig.getLoadBalanceStrategyType()));
    }

    protected MessageConsumer createConsumer(ConsumerConfig consumerConfig, OffsetStorage offsetStorage, RecoverManager recoverManager0) {
        OffsetStorage offsetStorageCopy = offsetStorage;
        if (offsetStorageCopy == null) {
            offsetStorageCopy = new ZkOffsetStorage(this.metaZookeeper, this.zkClient);
            this.zkClientChangedListeners.add((ZkOffsetStorage)offsetStorageCopy);
        }
        return this.createConsumer0(consumerConfig, offsetStorageCopy, recoverManager0 != null ? recoverManager0 : this.recoverManager);
    }

    @Override
    public MessageConsumer createConsumer(ConsumerConfig consumerConfig, OffsetStorage offsetStorage) {
        return this.createConsumer(consumerConfig, offsetStorage, this.recoverManager);
    }

    @Override
    public Map<InetSocketAddress, StatsResult> getStats(String item) throws InterruptedException {
        return this.getStats0(null, item);
    }

    private Map<InetSocketAddress, StatsResult> getStats0(InetSocketAddress target, String item) throws InterruptedException {
        Set<String> groups = this.remotingClient.getGroupSet();
        if (groups == null || groups.size() <= 1) {
            return Collections.emptyMap();
        }
        HashMap<InetSocketAddress, StatsResult> rt = new HashMap<InetSocketAddress, StatsResult>();
        try {
            for (String group : groups) {
                String body;
                BooleanCommand resp;
                if (group.equals(Constants.DEFAULT_GROUP)) continue;
                URI uri = new URI(group);
                InetSocketAddress sockAddr = new InetSocketAddress(uri.getHost(), uri.getPort());
                if (target != null && !target.equals(sockAddr) || (resp = (BooleanCommand)this.remotingClient.invokeToGroup(group, (RequestCommand)new StatsCommand(Integer.valueOf(OpaqueGenerator.getNextOpaque()), item), 3000L, TimeUnit.MILLISECONDS)).getResponseStatus() != ResponseStatus.NO_ERROR || (body = resp.getErrorMsg()) == null) continue;
                this.parseStatsValues(sockAddr, rt, group, body);
            }
            return rt;
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IllegalStateException("Get statistics from brokers failed", e);
        }
    }

    private void parseStatsValues(InetSocketAddress sockAddr, Map<InetSocketAddress, StatsResult> rt, String group, String body) throws URISyntaxException {
        String[] lines = body.split("\r\n");
        HashMap<String, String> values = new HashMap<String, String>();
        for (String line : lines) {
            int index = line.indexOf(" ");
            if (index > 0) {
                String key = line.substring(0, index);
                String value = line.substring(index + 1);
                values.put(key, value);
                continue;
            }
            values.put(line, "NO VALUE");
        }
        rt.put(sockAddr, new StatsResult(values));
    }

    @Override
    public Map<InetSocketAddress, StatsResult> getStats() throws InterruptedException {
        return this.getStats((String)null);
    }

    @Override
    public StatsResult getStats(InetSocketAddress target, String item) throws InterruptedException {
        return this.getStats0(target, item).get(target);
    }

    @Override
    public StatsResult getStats(InetSocketAddress target) throws InterruptedException {
        return this.getStats(target, null);
    }

    @Override
    public List<Partition> getPartitionsForTopic(String topic) {
        if (this.metaZookeeper != null) {
            ArrayList<String> topics = new ArrayList<String>(1);
            topics.add(topic);
            List rt = (List)this.metaZookeeper.getPartitionsForTopicsFromMaster(topics).get(topic);
            if (rt == null) {
                return Collections.emptyList();
            }
            return rt;
        }
        throw new IllegalStateException("Could not talk with zookeeper to get partitions list");
    }

    @Override
    public MessageConsumer createConsumer(ConsumerConfig consumerConfig) {
        return this.createConsumer(consumerConfig, null, null);
    }

    protected void checkConsumerConfig(ConsumerConfig consumerConfig) {
        if (StringUtils.isBlank((String)consumerConfig.getGroup())) {
            throw new InvalidConsumerConfigException("Blank group");
        }
        char[] chary = new char[consumerConfig.getGroup().length()];
        consumerConfig.getGroup().getChars(0, chary.length, chary, 0);
        for (char ch : chary) {
            for (char invalid : INVALID_GROUP_CHAR) {
                if (ch != invalid) continue;
                throw new InvalidConsumerConfigException("Group name has invalid character " + ch);
            }
        }
        if (consumerConfig.getFetchRunnerCount() <= 0) {
            throw new InvalidConsumerConfigException("Invalid fetchRunnerCount:" + consumerConfig.getFetchRunnerCount());
        }
        if (consumerConfig.getFetchTimeoutInMills() <= 0L) {
            throw new InvalidConsumerConfigException("Invalid fetchTimeoutInMills:" + consumerConfig.getFetchTimeoutInMills());
        }
    }

    @Override
    public TopicBrowser createTopicBrowser(String topic) {
        return this.createTopicBrowser(topic, 0x100000, 5L, TimeUnit.SECONDS);
    }

    @Override
    public TopicBrowser createTopicBrowser(String topic, int maxSize, long timeout, TimeUnit timeUnit) {
        MessageConsumer consumer = this.createConsumer(new ConsumerConfig("Just_for_Browser"));
        return new MetaTopicBrowser(topic, maxSize, TimeUnit.MILLISECONDS.convert(timeout, timeUnit), consumer, this.getPartitionsForTopic(topic));
    }

    static {
        ExceptionMonitor.setInstance((ExceptionMonitor)new ExceptionMonitor(){

            public void exceptionCaught(Throwable cause) {
                boolean isResetConnEx;
                boolean bl = isResetConnEx = cause instanceof IOException && cause.getMessage().indexOf("Connection reset by peer") >= 0;
                if (log.isErrorEnabled() && !isResetConnEx) {
                    log.error((Object)"Networking unexpected exception", cause);
                }
            }
        });
        TCP_NO_DELAY = Boolean.valueOf(System.getProperty("metaq.network.tcp_nodelay", "true"));
        MAX_SCHEDULE_WRITTEN_BYTES = Long.valueOf(System.getProperty("metaq.network.max_schedule_written_bytes", String.valueOf(Runtime.getRuntime().maxMemory() / 3L)));
        INVALID_GROUP_CHAR = new char[]{'~', '!', '#', '$', '%', '^', '&', '*', '(', ')', '+', '=', '`', '\'', '\"', ',', ';', '/', '?', '[', ']', '<', '>', '.', ':', ' '};
    }
}

