/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.producer;

import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.ZkClientChangedListener;
import com.taobao.metamorphosis.client.producer.PartitionSelector;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.MetaZookeeper;
import com.taobao.metamorphosis.utils.ThreadUtils;
import com.taobao.metamorphosis.utils.ZkUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ProducerZooKeeper
implements ZkClientChangedListener {
    private final RemotingClientWrapper remotingClient;
    private final ConcurrentHashMap<String, FutureTask<BrokerConnectionListener>> topicConnectionListeners = new ConcurrentHashMap();
    private final MetaClientConfig metaClientConfig;
    private ZkClient zkClient;
    private final MetaZookeeper metaZookeeper;
    private String defaultTopic;
    static final Log log = LogFactory.getLog(ProducerZooKeeper.class);
    private final ConcurrentHashMap<String, CopyOnWriteArraySet<BrokerChangeListener>> brokerChangeListeners = new ConcurrentHashMap();

    public void onBrokerChange(String topic, BrokerChangeListener listener) {
        CopyOnWriteArraySet<BrokerChangeListener> set = this.getListenerList(topic);
        set.add(listener);
    }

    public void deregisterBrokerChangeListener(String topic, BrokerChangeListener listener) {
        CopyOnWriteArraySet<BrokerChangeListener> set = this.getListenerList(topic);
        set.remove(listener);
    }

    public void notifyBrokersChange(String topic) {
        for (BrokerChangeListener listener : this.getListenerList(topic)) {
            try {
                listener.brokersChanged(topic);
            }
            catch (Exception e) {
                log.error((Object)"Notify brokers changed failed", (Throwable)e);
            }
        }
    }

    private CopyOnWriteArraySet<BrokerChangeListener> getListenerList(String topic) {
        CopyOnWriteArraySet<BrokerChangeListener> oldSet;
        CopyOnWriteArraySet<BrokerChangeListener> set = this.brokerChangeListeners.get(topic);
        if (set == null && (oldSet = this.brokerChangeListeners.putIfAbsent(topic, set = new CopyOnWriteArraySet())) != null) {
            set = oldSet;
        }
        return set;
    }

    public ProducerZooKeeper(MetaZookeeper metaZookeeper, RemotingClientWrapper remotingClient, ZkClient zkClient, MetaClientConfig metaClientConfig) {
        this.metaZookeeper = metaZookeeper;
        this.remotingClient = remotingClient;
        this.zkClient = zkClient;
        this.metaClientConfig = metaClientConfig;
    }

    public void publishTopic(final String topic, final Object ref) {
        if (this.topicConnectionListeners.get(topic) != null) {
            this.addRef(topic, ref);
            return;
        }
        FutureTask<BrokerConnectionListener> task = new FutureTask<BrokerConnectionListener>(new Callable<BrokerConnectionListener>(){

            @Override
            public BrokerConnectionListener call() throws Exception {
                BrokerConnectionListener listener = new BrokerConnectionListener(topic);
                if (ProducerZooKeeper.this.zkClient != null) {
                    ProducerZooKeeper.this.publishTopicInternal(topic, listener);
                }
                listener.references.add(ref);
                return listener;
            }
        });
        FutureTask<BrokerConnectionListener> existsTask = this.topicConnectionListeners.putIfAbsent(topic, task);
        if (existsTask == null) {
            task.run();
        } else {
            this.addRef(topic, ref);
        }
    }

    private void addRef(String topic, Object ref) {
        BrokerConnectionListener listener = this.getBrokerConnectionListener(topic);
        if (!listener.references.contains(ref)) {
            listener.references.add(ref);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unPublishTopic(String topic, Object ref) {
        BrokerConnectionListener listener = this.getBrokerConnectionListener(topic);
        if (listener != null) {
            Set<Object> set = listener.references;
            synchronized (set) {
                if (this.getBrokerConnectionListener(topic) == null) {
                    return;
                }
                listener.references.remove(ref);
                if (listener.references.isEmpty()) {
                    this.topicConnectionListeners.remove(topic);
                    listener.dispose();
                }
            }
        }
    }

    private void publishTopicInternal(String topic, BrokerConnectionListener listener) throws Exception, NotifyRemotingException, InterruptedException {
        String partitionPath = this.metaZookeeper.brokerTopicsPubPath + "/" + topic;
        ZkUtils.makeSurePersistentPathExists((ZkClient)this.zkClient, (String)partitionPath);
        this.zkClient.subscribeChildChanges(partitionPath, (IZkChildListener)listener);
        listener.syncedUpdateBrokersInfo();
    }

    BrokerConnectionListener getBrokerConnectionListener(String topic) {
        FutureTask<BrokerConnectionListener> task = this.topicConnectionListeners.get(topic);
        if (task != null) {
            try {
                return task.get();
            }
            catch (ExecutionException e) {
                throw ThreadUtils.launderThrowable((Throwable)e.getCause());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return null;
    }

    Set<String> getServerUrlSetByTopic(String topic) {
        BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(topic);
        if (brokerConnectionListener != null) {
            BrokersInfo info = brokerConnectionListener.brokersInfo;
            Map<Integer, String> brokerStringMap = info.oldBrokerStringMap;
            Map<String, List<Partition>> topicPartitionMap = info.oldTopicPartitionMap;
            List<Partition> plist = topicPartitionMap.get(topic);
            if (plist != null) {
                HashSet<String> result = new HashSet<String>();
                for (Partition partition : plist) {
                    int brokerId = partition.getBrokerId();
                    String url = brokerStringMap.get(brokerId);
                    if (url == null) continue;
                    result.add(url);
                }
                return result;
            }
        }
        return Collections.emptySet();
    }

    public synchronized void setDefaultTopic(String topic, Object ref) {
        if (this.defaultTopic != null && !this.defaultTopic.equals(topic)) {
            throw new IllegalStateException("Default topic has been setup already:" + this.defaultTopic);
        }
        this.defaultTopic = topic;
        this.publishTopic(topic, ref);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Partition selectPartition(String topic, Message msg, PartitionSelector selector, String serverUrl) throws MetaClientException {
        boolean oldReadOnly = msg.isReadOnly();
        try {
            msg.setReadOnly(true);
            BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(topic);
            if (brokerConnectionListener != null) {
                BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
                List<Partition> partitions = brokersInfo.oldTopicPartitionMap.get(topic);
                Map<Integer, String> brokerStringMap = brokersInfo.oldBrokerStringMap;
                ArrayList<Partition> partitionsForSelect = new ArrayList<Partition>();
                for (Partition partition : partitions) {
                    if (!serverUrl.equals(brokerStringMap.get(partition.getBrokerId()))) continue;
                    partitionsForSelect.add(partition);
                }
                Partition partition = selector.getPartition(topic, partitionsForSelect, msg);
                return partition;
            }
            Partition partition = this.selectDefaultPartition(topic, msg, selector, serverUrl);
            return partition;
        }
        finally {
            msg.setReadOnly(oldReadOnly);
        }
    }

    public String selectBroker(String topic, Partition partition) {
        if (this.metaClientConfig.getServerUrl() != null) {
            return this.metaClientConfig.getServerUrl();
        }
        if (partition != null) {
            BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(topic);
            if (brokerConnectionListener != null) {
                BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
                return brokersInfo.oldBrokerStringMap.get(partition.getBrokerId());
            }
            return this.selectDefaultBroker(topic, partition);
        }
        return null;
    }

    private String selectDefaultBroker(String topic, Partition partition) {
        if (this.defaultTopic == null) {
            return null;
        }
        BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(this.defaultTopic);
        if (brokerConnectionListener != null) {
            BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
            return brokersInfo.oldBrokerStringMap.get(partition.getBrokerId());
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Partition selectPartition(String topic, Message message, PartitionSelector partitionSelector) throws MetaClientException {
        boolean oldReadOnly = message.isReadOnly();
        try {
            message.setReadOnly(true);
            if (this.metaClientConfig.getServerUrl() != null) {
                Partition partition = Partition.RandomPartiton;
                return partition;
            }
            BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(topic);
            if (brokerConnectionListener != null) {
                BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
                Partition partition = partitionSelector.getPartition(topic, brokersInfo.oldTopicPartitionMap.get(topic), message);
                return partition;
            }
            Partition partition = this.selectDefaultPartition(topic, message, partitionSelector, null);
            return partition;
        }
        finally {
            message.setReadOnly(oldReadOnly);
        }
    }

    private Partition selectDefaultPartition(String topic, Message message, PartitionSelector partitionSelector, String serverUrl) throws MetaClientException {
        if (this.defaultTopic == null) {
            return null;
        }
        BrokerConnectionListener brokerConnectionListener = this.getBrokerConnectionListener(this.defaultTopic);
        if (brokerConnectionListener != null) {
            BrokersInfo brokersInfo = brokerConnectionListener.brokersInfo;
            if (serverUrl == null) {
                return partitionSelector.getPartition(this.defaultTopic, brokersInfo.oldTopicPartitionMap.get(this.defaultTopic), message);
            }
            List<Partition> partitions = brokersInfo.oldTopicPartitionMap.get(this.defaultTopic);
            Map<Integer, String> brokerStringMap = brokersInfo.oldBrokerStringMap;
            ArrayList<Partition> partitionsForSelect = new ArrayList<Partition>();
            for (Partition partition : partitions) {
                if (!serverUrl.equals(brokerStringMap.get(partition.getBrokerId()))) continue;
                partitionsForSelect.add(partition);
            }
            return partitionSelector.getPartition(this.defaultTopic, partitionsForSelect, message);
        }
        return null;
    }

    @Override
    public void onZkClientChanged(ZkClient newClient) {
        this.zkClient = newClient;
        try {
            for (String topic : this.topicConnectionListeners.keySet()) {
                log.info((Object)("re-publish topic to zk,topic=" + topic));
                this.publishTopicInternal(topic, this.getBrokerConnectionListener(topic));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            log.error((Object)"\u91cd\u65b0\u8bbe\u7f6ezKClient\u5931\u8d25", (Throwable)e);
        }
    }

    final class BrokerConnectionListener
    implements IZkChildListener {
        final Lock lock = new ReentrantLock();
        volatile BrokersInfo brokersInfo = new BrokersInfo(new TreeMap<Integer, String>(), new HashMap<String, List<Partition>>());
        final String topic;
        final Set<Object> references = Collections.synchronizedSet(new HashSet());

        public BrokerConnectionListener(String topic) {
            this.topic = topic;
        }

        void dispose() {
            String partitionPath = ((ProducerZooKeeper)ProducerZooKeeper.this).metaZookeeper.brokerTopicsPubPath + "/" + this.topic;
            ProducerZooKeeper.this.zkClient.unsubscribeChildChanges(partitionPath, (IZkChildListener)this);
        }

        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            this.syncedUpdateBrokersInfo();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void syncedUpdateBrokersInfo() throws NotifyRemotingException, InterruptedException {
            this.lock.lock();
            try {
                Map newBrokerStringMap = ProducerZooKeeper.this.metaZookeeper.getMasterBrokersByTopic(this.topic);
                ArrayList<String> topics = new ArrayList<String>(1);
                topics.add(this.topic);
                Map newTopicPartitionMap = ProducerZooKeeper.this.metaZookeeper.getPartitionsForTopicsFromMaster(topics);
                log.warn((Object)("Begin receiving broker changes for topic " + this.topic + ",broker ids:" + newTopicPartitionMap));
                boolean changed = !((Object)this.brokersInfo.oldBrokerStringMap).equals(newBrokerStringMap);
                for (Map.Entry<Integer, String> entry : this.brokersInfo.oldBrokerStringMap.entrySet()) {
                    String oldBrokerString = entry.getValue();
                    ProducerZooKeeper.this.remotingClient.closeWithRef(oldBrokerString, this, false);
                    log.warn((Object)("Closed " + oldBrokerString));
                }
                for (Map.Entry<Integer, String> entry : newBrokerStringMap.entrySet()) {
                    String newBrokerString = entry.getValue();
                    ProducerZooKeeper.this.remotingClient.connectWithRef(newBrokerString, this);
                    try {
                        ProducerZooKeeper.this.remotingClient.awaitReadyInterrupt(newBrokerString, 10000L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException("Connecting to broker is interrupted", e);
                    }
                    log.warn((Object)("Connected to " + newBrokerString));
                }
                this.brokersInfo = new BrokersInfo(newBrokerStringMap, newTopicPartitionMap);
                if (changed) {
                    ProducerZooKeeper.this.notifyBrokersChange(this.topic);
                }
                log.warn((Object)("End receiving broker changes for topic " + this.topic));
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    public static interface BrokerChangeListener {
        public void brokersChanged(String var1);
    }

    public static class BrokersInfo {
        final Map<Integer, String> oldBrokerStringMap;
        final Map<String, List<Partition>> oldTopicPartitionMap;

        public BrokersInfo(Map<Integer, String> oldBrokerStringMap, Map<String, List<Partition>> oldTopicPartitionMap) {
            this.oldBrokerStringMap = oldBrokerStringMap;
            this.oldTopicPartitionMap = oldTopicPartitionMap;
        }
    }
}

