/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.consumer;

import com.taobao.gecko.core.util.StringUtils;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.ZkClientChangedListener;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.FetchManager;
import com.taobao.metamorphosis.client.consumer.FetchRequest;
import com.taobao.metamorphosis.client.consumer.LoadBalanceStrategy;
import com.taobao.metamorphosis.client.consumer.SubscriberInfo;
import com.taobao.metamorphosis.client.consumer.TopicPartitionRegInfo;
import com.taobao.metamorphosis.client.consumer.storage.OffsetStorage;
import com.taobao.metamorphosis.cluster.Broker;
import com.taobao.metamorphosis.cluster.Cluster;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.network.RemotingUtils;
import com.taobao.metamorphosis.utils.MetaZookeeper;
import com.taobao.metamorphosis.utils.ThreadUtils;
import com.taobao.metamorphosis.utils.ZkUtils;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;

public class ConsumerZooKeeper
implements ZkClientChangedListener {
    protected ZkClient zkClient;
    protected final ConcurrentHashMap<FetchManager, FutureTask<ZKLoadRebalanceListener>> consumerLoadBalanceListeners = new ConcurrentHashMap();
    private final RemotingClientWrapper remotingClient;
    private final ZkUtils.ZKConfig zkConfig;
    protected final MetaZookeeper metaZookeeper;
    private final AtomicInteger counter = new AtomicInteger(0);
    static final int MAX_N_RETRIES = 7;
    static final Log log = LogFactory.getLog(ConsumerZooKeeper.class);

    public ConsumerZooKeeper(MetaZookeeper metaZookeeper, RemotingClientWrapper remotingClient, ZkClient zkClient, ZkUtils.ZKConfig zkConfig) {
        this.metaZookeeper = metaZookeeper;
        this.zkClient = zkClient;
        this.remotingClient = remotingClient;
        this.zkConfig = zkConfig;
    }

    public void commitOffsets(FetchManager fetchManager) {
        ZKLoadRebalanceListener listener = this.getBrokerConnectionListener(fetchManager);
        if (listener != null) {
            listener.commitOffsets();
        }
    }

    public ZKLoadRebalanceListener getBrokerConnectionListener(FetchManager fetchManager) {
        FutureTask<ZKLoadRebalanceListener> task = this.consumerLoadBalanceListeners.get(fetchManager);
        if (task != null) {
            try {
                return task.get();
            }
            catch (ExecutionException e) {
                throw ThreadUtils.launderThrowable((Throwable)e.getCause());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return null;
    }

    public void unRegisterConsumer(FetchManager fetchManager) {
        try {
            ZKLoadRebalanceListener listener;
            FutureTask<ZKLoadRebalanceListener> futureTask = this.consumerLoadBalanceListeners.remove(fetchManager);
            if (futureTask != null && (listener = futureTask.get()) != null) {
                listener.stop();
                listener.commitOffsets();
                this.zkClient.unsubscribeStateChanges((IZkStateListener)new ZKSessionExpireListenner(listener));
                MetaZookeeper metaZookeeper = this.metaZookeeper;
                metaZookeeper.getClass();
                MetaZookeeper.ZKGroupDirs dirs = new MetaZookeeper.ZKGroupDirs(metaZookeeper, listener.consumerConfig.getGroup());
                this.zkClient.unsubscribeChildChanges(dirs.consumerRegistryDir, (IZkChildListener)listener);
                log.info((Object)("unsubscribeChildChanges:" + dirs.consumerRegistryDir));
                for (String topic : listener.topicSubcriberRegistry.keySet()) {
                    String partitionPath = this.metaZookeeper.brokerTopicsSubPath + "/" + topic;
                    this.zkClient.unsubscribeChildChanges(partitionPath, (IZkChildListener)listener);
                    log.info((Object)("unsubscribeChildChanges:" + partitionPath));
                }
                listener.releaseAllPartitionOwnership();
                ZkUtils.deletePath((ZkClient)this.zkClient, (String)(((ZKLoadRebalanceListener)listener).dirs.consumerRegistryDir + "/" + listener.consumerIdString));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error((Object)"Interrupted when unRegisterConsumer", (Throwable)e);
        }
        catch (Exception e) {
            log.error((Object)"Error in unRegisterConsumer,maybe error when registerConsumer", (Throwable)e);
        }
    }

    public void registerConsumer(final ConsumerConfig consumerConfig, final FetchManager fetchManager, final ConcurrentHashMap<String, SubscriberInfo> topicSubcriberRegistry, final OffsetStorage offsetStorage, final LoadBalanceStrategy loadBalanceStrategy) throws Exception {
        FutureTask<ZKLoadRebalanceListener> task = new FutureTask<ZKLoadRebalanceListener>(new Callable<ZKLoadRebalanceListener>(){

            @Override
            public ZKLoadRebalanceListener call() throws Exception {
                MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
                metaZookeeper.getClass();
                MetaZookeeper.ZKGroupDirs dirs = new MetaZookeeper.ZKGroupDirs(metaZookeeper, consumerConfig.getGroup());
                String consumerUUID = ConsumerZooKeeper.this.getConsumerUUID(consumerConfig);
                String consumerUUIDString = consumerConfig.getGroup() + "_" + consumerUUID;
                ZKLoadRebalanceListener loadBalanceListener = new ZKLoadRebalanceListener(fetchManager, dirs, consumerUUIDString, consumerConfig, offsetStorage, topicSubcriberRegistry, loadBalanceStrategy);
                loadBalanceListener.start();
                return ConsumerZooKeeper.this.registerConsumerInternal(loadBalanceListener);
            }
        });
        FutureTask<ZKLoadRebalanceListener> existsTask = this.consumerLoadBalanceListeners.putIfAbsent(fetchManager, task);
        if (existsTask != null) {
            throw new MetaClientException("Consumer has been already registed");
        }
        task.run();
    }

    protected ZKLoadRebalanceListener registerConsumerInternal(ZKLoadRebalanceListener loadBalanceListener) throws UnknownHostException, InterruptedException, Exception {
        MetaZookeeper metaZookeeper = this.metaZookeeper;
        metaZookeeper.getClass();
        MetaZookeeper.ZKGroupDirs dirs = new MetaZookeeper.ZKGroupDirs(metaZookeeper, loadBalanceListener.consumerConfig.getGroup());
        String topicString = this.getTopicsString(loadBalanceListener.topicSubcriberRegistry);
        if (this.zkClient == null) {
            loadBalanceListener.fetchManager.stopFetchRunner();
            loadBalanceListener.fetchManager.resetFetchState();
            for (String topic : loadBalanceListener.topicSubcriberRegistry.keySet()) {
                SubscriberInfo subInfo = (SubscriberInfo)loadBalanceListener.topicSubcriberRegistry.get(topic);
                ConcurrentHashMap<Object, TopicPartitionRegInfo> topicPartRegInfoMap = loadBalanceListener.topicRegistry.get(topic);
                if (topicPartRegInfoMap == null) {
                    topicPartRegInfoMap = new ConcurrentHashMap();
                    loadBalanceListener.topicRegistry.put(topic, topicPartRegInfoMap);
                }
                Partition partition = new Partition(loadBalanceListener.consumerConfig.getPartition());
                long offset = loadBalanceListener.consumerConfig.getOffset();
                if (loadBalanceListener.consumerConfig.isAlwaysConsumeFromMaxOffset()) {
                    offset = Long.MAX_VALUE;
                }
                TopicPartitionRegInfo regInfo = new TopicPartitionRegInfo(topic, partition, offset);
                topicPartRegInfoMap.put(partition, regInfo);
                loadBalanceListener.fetchManager.addFetchRequest(new FetchRequest(new Broker(0, loadBalanceListener.consumerConfig.getServerUrl()), 0L, regInfo, subInfo.getMaxSize()));
            }
            loadBalanceListener.fetchManager.startFetchRunner();
        } else {
            for (int i = 0; i < 7; ++i) {
                ZkUtils.makeSurePersistentPathExists((ZkClient)this.zkClient, (String)dirs.consumerRegistryDir);
                ZkUtils.createEphemeralPathExpectConflict((ZkClient)this.zkClient, (String)(dirs.consumerRegistryDir + "/" + loadBalanceListener.consumerIdString), (String)topicString);
                this.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, (IZkChildListener)loadBalanceListener);
                for (String topic : loadBalanceListener.topicSubcriberRegistry.keySet()) {
                    String partitionPath = this.metaZookeeper.brokerTopicsSubPath + "/" + topic;
                    ZkUtils.makeSurePersistentPathExists((ZkClient)this.zkClient, (String)partitionPath);
                    this.zkClient.subscribeChildChanges(partitionPath, (IZkChildListener)loadBalanceListener);
                }
                this.zkClient.subscribeStateChanges((IZkStateListener)new ZKSessionExpireListenner(loadBalanceListener));
                if (loadBalanceListener.syncedRebalance()) break;
            }
        }
        return loadBalanceListener;
    }

    private String getTopicsString(ConcurrentHashMap<String, SubscriberInfo> topicSubcriberRegistry) {
        StringBuilder topicSb = new StringBuilder();
        boolean wasFirst = true;
        for (String topic : topicSubcriberRegistry.keySet()) {
            if (wasFirst) {
                wasFirst = false;
                topicSb.append(topic);
                continue;
            }
            topicSb.append(",").append(topic);
        }
        return topicSb.toString();
    }

    protected String getConsumerUUID(ConsumerConfig consumerConfig) throws Exception {
        String consumerUUID = null;
        consumerUUID = consumerConfig.getConsumerId() != null ? consumerConfig.getConsumerId() : RemotingUtils.getLocalHost() + "-" + this.getPid() + "-" + System.currentTimeMillis() + "-" + this.counter.incrementAndGet();
        return consumerUUID;
    }

    private String getPid() {
        String name = ManagementFactory.getRuntimeMXBean().getName();
        if (name.contains("@")) {
            return name.split("@")[0];
        }
        return name;
    }

    @Override
    public void onZkClientChanged(ZkClient newClient) {
        this.zkClient = newClient;
        for (FutureTask<ZKLoadRebalanceListener> task : this.consumerLoadBalanceListeners.values()) {
            try {
                ZKLoadRebalanceListener listener = task.get();
                listener.topicRegistry.clear();
                log.info((Object)("re-register consumer to zk,group=" + listener.consumerConfig.getGroup()));
                this.registerConsumerInternal(listener);
            }
            catch (Exception e) {
                log.error((Object)"reRegister consumer failed", (Throwable)e);
            }
        }
    }

    public class ZKLoadRebalanceListener
    implements IZkChildListener,
    Runnable {
        private final MetaZookeeper.ZKGroupDirs dirs;
        private final String group;
        protected final String consumerIdString;
        private final LoadBalanceStrategy loadBalanceStrategy;
        Map<String, List<String>> oldConsumersPerTopicMap = new HashMap<String, List<String>>();
        Map<String, List<String>> oldPartitionsPerTopicMap = new HashMap<String, List<String>>();
        private final Lock rebalanceLock = new ReentrantLock();
        final ConcurrentHashMap<String, ConcurrentHashMap<Partition, TopicPartitionRegInfo>> topicRegistry = new ConcurrentHashMap();
        private final ConcurrentHashMap<String, SubscriberInfo> topicSubcriberRegistry;
        private final ConsumerConfig consumerConfig;
        private final OffsetStorage offsetStorage;
        private final FetchManager fetchManager;
        private final Thread rebalanceThread;
        private volatile boolean stopped = false;
        Set<Broker> oldBrokerSet = new HashSet<Broker>();
        private Cluster oldCluster = new Cluster();
        private final BlockingQueue<Byte> rebalanceEvents = new ArrayBlockingQueue<Byte>(10);
        private final Byte REBALANCE_EVT = 1;

        public ZKLoadRebalanceListener(FetchManager fetchManager, MetaZookeeper.ZKGroupDirs dirs, String consumerIdString, ConsumerConfig consumerConfig, OffsetStorage offsetStorage, ConcurrentHashMap<String, SubscriberInfo> topicSubcriberRegistry, LoadBalanceStrategy loadBalanceStrategy) {
            this.fetchManager = fetchManager;
            this.dirs = dirs;
            this.consumerIdString = consumerIdString;
            this.group = consumerConfig.getGroup();
            this.consumerConfig = consumerConfig;
            this.offsetStorage = offsetStorage;
            this.topicSubcriberRegistry = topicSubcriberRegistry;
            this.loadBalanceStrategy = loadBalanceStrategy;
            this.rebalanceThread = new Thread(this);
        }

        public void start() {
            this.rebalanceThread.start();
        }

        public void stop() {
            this.stopped = true;
            this.rebalanceThread.interrupt();
            try {
                this.rebalanceThread.join(500L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        private void commitOffsets() {
            this.offsetStorage.commitOffset(this.consumerConfig.getGroup(), this.getTopicPartitionRegInfos());
        }

        private TopicPartitionRegInfo initTopicPartitionRegInfo(String topic, String group, Partition partition, long offset) {
            this.offsetStorage.initOffset(topic, group, partition, offset);
            return new TopicPartitionRegInfo(topic, partition, offset);
        }

        public Map<String, Set<Partition>> getTopicPartitions() {
            HashMap<String, Set<Partition>> rt = new HashMap<String, Set<Partition>>();
            for (Map.Entry<String, ConcurrentHashMap<Partition, TopicPartitionRegInfo>> entry : this.topicRegistry.entrySet()) {
                rt.put(entry.getKey(), entry.getValue().keySet());
            }
            return rt;
        }

        List<TopicPartitionRegInfo> getTopicPartitionRegInfos() {
            ArrayList<TopicPartitionRegInfo> rt = new ArrayList<TopicPartitionRegInfo>();
            for (ConcurrentHashMap<Partition, TopicPartitionRegInfo> subMap : this.topicRegistry.values()) {
                Collection<TopicPartitionRegInfo> values = subMap.values();
                if (values == null) continue;
                rt.addAll(values);
            }
            return rt;
        }

        private TopicPartitionRegInfo loadTopicPartitionRegInfo(String topic, Partition partition) {
            return this.offsetStorage.load(topic, this.consumerConfig.getGroup(), partition);
        }

        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            this.rebalanceEvents.put(this.REBALANCE_EVT);
        }

        @Override
        public void run() {
            while (!this.stopped) {
                try {
                    Byte evt = this.rebalanceEvents.take();
                    if (evt == null) continue;
                    this.dropDuplicatedEvents();
                    this.syncedRebalance();
                }
                catch (InterruptedException e) {
                }
                catch (Throwable e) {
                    log.error((Object)"Rebalance failed.", e);
                }
            }
        }

        private void dropDuplicatedEvents() {
            Byte evt = null;
            int count = 0;
            while ((evt = (Byte)this.rebalanceEvents.poll()) != null) {
                ++count;
            }
            if (count > 0) {
                log.info((Object)("Drop " + count + " duplicated rebalance events"));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean syncedRebalance() throws InterruptedException, Exception {
            this.rebalanceLock.lock();
            try {
                for (int i = 0; i < 7; ++i) {
                    boolean done;
                    log.info((Object)("begin rebalancing consumer " + this.consumerIdString + " try #" + i));
                    try {
                        done = this.rebalance();
                    }
                    catch (InterruptedException e) {
                        throw e;
                    }
                    catch (Throwable e) {
                        log.warn((Object)"unexpected exception occured while try rebalancing", e);
                        done = false;
                    }
                    log.warn((Object)("end rebalancing consumer " + this.consumerIdString + " try #" + i));
                    if (done) {
                        log.warn((Object)"rebalance success.");
                        boolean bl = true;
                        return bl;
                    }
                    log.warn((Object)("rebalance failed,try #" + i));
                    this.releaseAllPartitionOwnership();
                    this.resetState();
                    Thread.sleep(((ConsumerZooKeeper)ConsumerZooKeeper.this).zkConfig.zkSyncTimeMs);
                }
                log.error((Object)"rebalance failed,finally");
                boolean bl = false;
                return bl;
            }
            finally {
                this.rebalanceLock.unlock();
            }
        }

        private void resetState() {
            this.topicRegistry.clear();
            this.oldConsumersPerTopicMap.clear();
            this.oldPartitionsPerTopicMap.clear();
        }

        protected void updateFetchRunner(Cluster cluster) throws Exception {
            this.fetchManager.resetFetchState();
            HashSet<Broker> newBrokers = new HashSet<Broker>();
            for (Map.Entry<String, ConcurrentHashMap<Partition, TopicPartitionRegInfo>> entry : this.topicRegistry.entrySet()) {
                String topic = entry.getKey();
                for (Map.Entry<Partition, TopicPartitionRegInfo> partEntry : entry.getValue().entrySet()) {
                    Partition partition = partEntry.getKey();
                    TopicPartitionRegInfo info = partEntry.getValue();
                    Broker broker = cluster.getBrokerRandom(partition.getBrokerId());
                    if (broker != null) {
                        newBrokers.add(broker);
                        SubscriberInfo subscriberInfo = this.topicSubcriberRegistry.get(topic);
                        this.fetchManager.addFetchRequest(new FetchRequest(broker, 0L, info, subscriberInfo.getMaxSize()));
                        continue;
                    }
                    log.error((Object)("Could not find broker for broker id " + partition.getBrokerId() + ", it should not happen."));
                }
            }
            for (Broker newOne : newBrokers) {
                int times = 0;
                NotifyRemotingException ne = null;
                while (times++ < 3) {
                    ConsumerZooKeeper.this.remotingClient.connectWithRef(newOne.getZKString(), this);
                    try {
                        ConsumerZooKeeper.this.remotingClient.awaitReadyInterrupt(newOne.getZKString(), 4000L);
                        log.warn((Object)("Connected to " + newOne.getZKString()));
                        break;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException("Remoting client is interrupted", e);
                    }
                    catch (NotifyRemotingException e) {
                        ++times;
                        ne = e;
                    }
                }
                if (ne == null) continue;
                throw ne;
            }
            log.warn((Object)"Starting fetch runners");
            this.oldBrokerSet = newBrokers;
            this.fetchManager.startFetchRunner();
        }

        boolean rebalance() throws InterruptedException, Exception {
            Map<String, String> myConsumerPerTopicMap = this.getConsumerPerTopic(this.consumerIdString);
            Cluster cluster = ConsumerZooKeeper.this.metaZookeeper.getCluster();
            Map<String, List<String>> consumersPerTopicMap = null;
            try {
                consumersPerTopicMap = this.getConsumersPerTopic(this.group);
            }
            catch (KeeperException.NoNodeException e) {
                log.warn((Object)("maybe other consumer is rebalancing now," + e.getMessage()));
                return false;
            }
            catch (ZkNoNodeException e) {
                log.warn((Object)("maybe other consumer is rebalancing now," + e.getMessage()));
                return false;
            }
            Map<String, List<String>> partitionsPerTopicMap = this.getPartitionStringsForTopics(myConsumerPerTopicMap);
            Map<String, String> relevantTopicConsumerIdMap = this.getRelevantTopicMap(myConsumerPerTopicMap, partitionsPerTopicMap, this.oldPartitionsPerTopicMap, consumersPerTopicMap, this.oldConsumersPerTopicMap);
            if (relevantTopicConsumerIdMap.size() <= 0) {
                if (this.checkClusterChange(cluster)) {
                    log.warn((Object)"Stopping fetch runners,maybe master or slave changed");
                    this.fetchManager.stopFetchRunner();
                    this.closeOldBrokersConnections();
                    this.updateFetchRunner(cluster);
                    this.oldCluster = cluster;
                } else {
                    log.warn((Object)("Consumer " + this.consumerIdString + " with " + consumersPerTopicMap + " doesn't need to be rebalanced."));
                }
                return true;
            }
            log.warn((Object)"Stopping fetch runners");
            this.fetchManager.stopFetchRunner();
            this.closeOldBrokersConnections();
            log.warn((Object)"Comitting all offsets");
            this.commitOffsets();
            for (Map.Entry<String, String> entry : relevantTopicConsumerIdMap.entrySet()) {
                String topic = entry.getKey();
                String consumerId = entry.getValue();
                MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
                metaZookeeper.getClass();
                MetaZookeeper.ZKGroupTopicDirs topicDirs = new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, topic, this.group);
                List<String> curConsumers = consumersPerTopicMap.get(topic);
                List<String> curPartitions = partitionsPerTopicMap.get(topic);
                if (curConsumers == null) {
                    log.warn((Object)("Releasing partition ownerships for topic:" + topic));
                    this.releasePartitionOwnership(topic);
                    this.topicRegistry.remove(topic);
                    log.warn((Object)("There are no consumers subscribe topic " + topic));
                    continue;
                }
                if (curPartitions == null) {
                    log.warn((Object)("Releasing partition ownerships for topic:" + topic));
                    this.releasePartitionOwnership(topic);
                    this.topicRegistry.remove(topic);
                    log.warn((Object)("There are no partitions under topic " + topic));
                    continue;
                }
                List<String> newParts = this.loadBalanceStrategy.getPartitions(topic, consumerId, curConsumers, curPartitions);
                ConcurrentHashMap<Object, TopicPartitionRegInfo> partRegInfos = this.topicRegistry.get(topic);
                if (partRegInfos == null) {
                    partRegInfos = new ConcurrentHashMap();
                    this.topicRegistry.put(topic, new ConcurrentHashMap());
                }
                Set currentParts = partRegInfos.keySet();
                for (Partition partition : currentParts) {
                    if (newParts.contains(partition.toString())) continue;
                    log.warn((Object)("Releasing partition ownerships for partition:" + partition));
                    this.releasePartitionOwnership(topic, partition);
                    partRegInfos.remove(partition);
                }
                for (String string : newParts) {
                    if (currentParts.contains(new Partition(string))) continue;
                    log.warn((Object)(consumerId + " attempting to claim partition " + string));
                    if (this.ownPartition(topicDirs, string, topic, consumerId)) continue;
                    log.warn((Object)("Claim partition " + string + " failed,retry..."));
                    return false;
                }
            }
            this.updateFetchRunner(cluster);
            this.oldPartitionsPerTopicMap = partitionsPerTopicMap;
            this.oldConsumersPerTopicMap = consumersPerTopicMap;
            this.oldCluster = cluster;
            return true;
        }

        private void closeOldBrokersConnections() throws NotifyRemotingException {
            for (Broker old : this.oldBrokerSet) {
                ConsumerZooKeeper.this.remotingClient.closeWithRef(old.getZKString(), this, false);
                log.warn((Object)("Closed " + old.getZKString()));
            }
        }

        protected boolean checkClusterChange(Cluster cluster) {
            return !this.oldCluster.equals((Object)cluster);
        }

        protected Map<String, List<String>> getPartitionStringsForTopics(Map<String, String> myConsumerPerTopicMap) {
            return ConsumerZooKeeper.this.metaZookeeper.getPartitionStringsForSubTopics(myConsumerPerTopicMap.keySet());
        }

        protected boolean ownPartition(MetaZookeeper.ZKGroupTopicDirs topicDirs, String partition, String topic, String consumerThreadId) throws Exception {
            String partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition;
            try {
                ZkUtils.createEphemeralPathExpectConflict((ZkClient)ConsumerZooKeeper.this.zkClient, (String)partitionOwnerPath, (String)consumerThreadId);
            }
            catch (ZkNodeExistsException e) {
                log.info((Object)("waiting for the partition ownership to be deleted: " + partition));
                return false;
            }
            catch (Exception e) {
                throw e;
            }
            this.addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId);
            return true;
        }

        protected void addPartitionTopicInfo(MetaZookeeper.ZKGroupTopicDirs topicDirs, String partitionString, String topic, String consumerThreadId) {
            Partition partition = new Partition(partitionString);
            ConcurrentHashMap<Partition, TopicPartitionRegInfo> partitionTopicInfo = this.topicRegistry.get(topic);
            TopicPartitionRegInfo existsTopicPartitionRegInfo = this.loadTopicPartitionRegInfo(topic, partition);
            if (existsTopicPartitionRegInfo == null) {
                existsTopicPartitionRegInfo = this.initTopicPartitionRegInfo(topic, consumerThreadId, partition, this.consumerConfig.getOffset());
            }
            if (this.consumerConfig.isAlwaysConsumeFromMaxOffset()) {
                existsTopicPartitionRegInfo.getOffset().set(Long.MAX_VALUE);
            }
            partitionTopicInfo.put(partition, existsTopicPartitionRegInfo);
        }

        private void releaseAllPartitionOwnership() {
            for (Map.Entry<String, ConcurrentHashMap<Partition, TopicPartitionRegInfo>> entry : this.topicRegistry.entrySet()) {
                String topic = entry.getKey();
                MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
                metaZookeeper.getClass();
                MetaZookeeper.ZKGroupTopicDirs topicDirs = new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, topic, this.consumerConfig.getGroup());
                for (Partition partition : entry.getValue().keySet()) {
                    String znode = topicDirs.consumerOwnerDir + "/" + partition;
                    this.deleteOwnership(znode);
                }
            }
        }

        private void releasePartitionOwnership(String topic, Partition partition) {
            MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
            metaZookeeper.getClass();
            MetaZookeeper.ZKGroupTopicDirs topicDirs = new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, topic, this.consumerConfig.getGroup());
            String znode = topicDirs.consumerOwnerDir + "/" + partition;
            this.deleteOwnership(znode);
        }

        private void deleteOwnership(String znode) {
            try {
                ZkUtils.deletePath((ZkClient)ConsumerZooKeeper.this.zkClient, (String)znode);
            }
            catch (Throwable t) {
                log.error((Object)"exception during releasePartitionOwnership", t);
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("Consumer " + this.consumerIdString + " releasing " + znode));
            }
        }

        private void releasePartitionOwnership(String topic) {
            MetaZookeeper metaZookeeper = ConsumerZooKeeper.this.metaZookeeper;
            metaZookeeper.getClass();
            MetaZookeeper.ZKGroupTopicDirs topicDirs = new MetaZookeeper.ZKGroupTopicDirs(metaZookeeper, topic, this.consumerConfig.getGroup());
            ConcurrentHashMap<Partition, TopicPartitionRegInfo> partInfos = this.topicRegistry.get(topic);
            if (partInfos != null) {
                for (Partition partition : partInfos.keySet()) {
                    String znode = topicDirs.consumerOwnerDir + "/" + partition;
                    this.deleteOwnership(znode);
                }
            }
        }

        private Map<String, String> getRelevantTopicMap(Map<String, String> myConsumerPerTopicMap, Map<String, List<String>> newPartMap, Map<String, List<String>> oldPartMap, Map<String, List<String>> newConsumerMap, Map<String, List<String>> oldConsumerMap) {
            HashMap<String, String> relevantTopicThreadIdsMap = new HashMap<String, String>();
            for (Map.Entry<String, String> entry : myConsumerPerTopicMap.entrySet()) {
                String topic = entry.getKey();
                String consumerId = entry.getValue();
                if (this.listEquals(oldPartMap.get(topic), newPartMap.get(topic)) && this.listEquals(oldConsumerMap.get(topic), newConsumerMap.get(topic))) continue;
                relevantTopicThreadIdsMap.put(topic, consumerId);
            }
            return relevantTopicThreadIdsMap;
        }

        private boolean listEquals(List<String> list1, List<String> list2) {
            if (list1 == null && list2 != null) {
                return false;
            }
            if (list1 != null && list2 == null) {
                return false;
            }
            if (list1 == null && list2 == null) {
                return true;
            }
            return ((Object)list1).equals(list2);
        }

        protected Map<String, List<String>> getConsumersPerTopic(String group) throws Exception, KeeperException.NoNodeException {
            List consumers = ZkUtils.getChildren((ZkClient)ConsumerZooKeeper.this.zkClient, (String)this.dirs.consumerRegistryDir);
            if (consumers == null) {
                return Collections.emptyMap();
            }
            HashMap<String, List<String>> consumersPerTopicMap = new HashMap<String, List<String>>();
            for (String string : consumers) {
                List<String> topics = this.getTopics(string);
                for (String topic : topics) {
                    if (consumersPerTopicMap.get(topic) == null) {
                        ArrayList<String> list = new ArrayList<String>();
                        list.add(string);
                        consumersPerTopicMap.put(topic, list);
                        continue;
                    }
                    ((List)consumersPerTopicMap.get(topic)).add(string);
                }
            }
            for (Map.Entry entry : consumersPerTopicMap.entrySet()) {
                Collections.sort((List)entry.getValue());
            }
            return consumersPerTopicMap;
        }

        public Map<String, String> getConsumerPerTopic(String consumerId) throws Exception {
            List<String> topics = this.getTopics(consumerId);
            HashMap<String, String> rt = new HashMap<String, String>();
            for (String topic : topics) {
                rt.put(topic, consumerId);
            }
            return rt;
        }

        protected List<String> getTopics(String consumerId) throws Exception {
            String topicsString = ZkUtils.readData((ZkClient)ConsumerZooKeeper.this.zkClient, (String)(this.dirs.consumerRegistryDir + "/" + consumerId));
            if (StringUtils.isBlank((String)topicsString)) {
                return Collections.emptyList();
            }
            String[] topics = topicsString.split(",");
            ArrayList<String> rt = new ArrayList<String>(topics.length);
            for (String topic : topics) {
                rt.add(topic);
            }
            return rt;
        }
    }

    class ZKSessionExpireListenner
    implements IZkStateListener {
        private final String consumerIdString;
        private final ZKLoadRebalanceListener loadBalancerListener;

        public ZKSessionExpireListenner(ZKLoadRebalanceListener loadBalancerListener) {
            this.consumerIdString = loadBalancerListener.consumerIdString;
            this.loadBalancerListener = loadBalancerListener;
        }

        public void handleNewSession() throws Exception {
            log.info((Object)("ZK expired; release old broker parition ownership; re-register consumer " + this.consumerIdString));
            this.loadBalancerListener.resetState();
            ConsumerZooKeeper.this.registerConsumerInternal(this.loadBalancerListener);
        }

        public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof ZKSessionExpireListenner)) {
                return false;
            }
            ZKSessionExpireListenner other = (ZKSessionExpireListenner)obj;
            return this.loadBalancerListener.equals(other.loadBalancerListener);
        }

        public int hashCode() {
            return this.loadBalancerListener.hashCode();
        }
    }
}

