/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.spi.cluster.zookeeper;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.spi.cluster.zookeeper.impl.AsyncMapTTLMonitor;
import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMap;
import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMultiMap;
import io.vertx.spi.cluster.zookeeper.impl.ZKSyncMap;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.ErrorListenerPathable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class ZookeeperClusterManager
implements ClusterManager,
PathChildrenCacheListener {
    private static final Logger log = LoggerFactory.getLogger(ZookeeperClusterManager.class);
    private Vertx vertx;
    private NodeListener nodeListener;
    private PathChildrenCache clusterNodes;
    private volatile boolean active;
    private String nodeID;
    private CuratorFramework curator;
    private boolean customCuratorCluster;
    private RetryPolicy retryPolicy;
    private Map<String, ZKLock> locks = new ConcurrentHashMap<String, ZKLock>();
    private Map<String, AsyncMap<?, ?>> asyncMapCache = new ConcurrentHashMap();
    private Map<String, AsyncMultiMap<?, ?>> asyncMultiMapCache = new ConcurrentHashMap();
    private static final String DEFAULT_CONFIG_FILE = "default-zookeeper.json";
    private static final String CONFIG_FILE = "zookeeper.json";
    private static final String ZK_SYS_CONFIG_KEY = "vertx.zookeeper.config";
    private JsonObject conf = new JsonObject();
    private static final String ZK_PATH_LOCKS = "/locks/";
    private static final String ZK_PATH_COUNTERS = "/counters/";
    private static final String ZK_PATH_CLUSTER_NODE = "/cluster/nodes/";
    private static final String ZK_PATH_CLUSTER_NODE_WITHOUT_SLASH = "/cluster/nodes";

    public ZookeeperClusterManager() {
        String resourceLocation = System.getProperty(ZK_SYS_CONFIG_KEY, CONFIG_FILE);
        this.loadProperties(resourceLocation);
    }

    public ZookeeperClusterManager(CuratorFramework curator) {
        this(curator, UUID.randomUUID().toString());
    }

    public ZookeeperClusterManager(String resourceLocation) {
        this.loadProperties(resourceLocation);
    }

    public ZookeeperClusterManager(CuratorFramework curator, String nodeID) {
        Objects.requireNonNull(curator, "The Curator instance cannot be null.");
        Objects.requireNonNull(nodeID, "The nodeID cannot be null.");
        this.curator = curator;
        this.nodeID = nodeID;
        this.customCuratorCluster = true;
    }

    public ZookeeperClusterManager(JsonObject config) {
        this.conf = config;
    }

    ZookeeperClusterManager(RetryPolicy retryPolicy, CuratorFramework curator) {
        Objects.requireNonNull(retryPolicy, "The retry policy cannot be null.");
        Objects.requireNonNull(curator, "The Curator instance cannot be null.");
        this.retryPolicy = retryPolicy;
        this.curator = curator;
        this.nodeID = UUID.randomUUID().toString();
        this.customCuratorCluster = true;
    }

    private void loadProperties(String resourceLocation) {
        try {
            String line;
            BufferedReader reader = new BufferedReader(new InputStreamReader(new BufferedInputStream(this.getConfigStream(resourceLocation))));
            StringBuilder sb = new StringBuilder();
            while ((line = reader.readLine()) != null) {
                sb.append(line);
            }
            this.conf = new JsonObject(sb.toString());
            log.info((Object)("Loaded zookeeper.json file from resourceLocation=" + resourceLocation));
        }
        catch (FileNotFoundException e) {
            log.error((Object)"Could not find zookeeper config file", (Throwable)e);
        }
        catch (IOException e) {
            log.error((Object)"Failed to load zookeeper config", (Throwable)e);
        }
    }

    private InputStream getConfigStream(String resourceLocation) throws FileNotFoundException {
        ClassLoader ctxClsLoader = Thread.currentThread().getContextClassLoader();
        InputStream is = null;
        if (ctxClsLoader != null) {
            is = ctxClsLoader.getResourceAsStream(resourceLocation);
        }
        if (is == null && !resourceLocation.equals(CONFIG_FILE)) {
            is = new FileInputStream(resourceLocation);
        } else if (is == null && resourceLocation.equals(CONFIG_FILE) && (is = this.getClass().getClassLoader().getResourceAsStream(resourceLocation)) == null) {
            is = this.getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_FILE);
        }
        return is;
    }

    public void setConfig(JsonObject conf) {
        this.conf = conf;
    }

    public JsonObject getConfig() {
        return this.conf;
    }

    public CuratorFramework getCuratorFramework() {
        return this.curator;
    }

    public void setVertx(Vertx vertx) {
        this.vertx = vertx;
    }

    public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) {
        this.vertx.executeBlocking(event -> {
            AsyncMultiMap asyncMultiMap = this.asyncMultiMapCache.computeIfAbsent(name, key -> new ZKAsyncMultiMap(this.vertx, this.curator, name));
            event.complete((Object)asyncMultiMap);
        }, handler);
    }

    public <K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> handler) {
        AsyncMapTTLMonitor asyncMapTTLMonitor = AsyncMapTTLMonitor.getInstance(this.vertx, this);
        this.vertx.executeBlocking(event -> {
            AsyncMap zkAsyncMap = this.asyncMapCache.computeIfAbsent(name, key -> new ZKAsyncMap(this.vertx, this.curator, asyncMapTTLMonitor, name));
            event.complete((Object)zkAsyncMap);
        }, handler);
    }

    public <K, V> Map<K, V> getSyncMap(String name) {
        return new ZKSyncMap(this.curator, name);
    }

    public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
        ContextImpl context = (ContextImpl)this.vertx.getOrCreateContext();
        context.executeBlocking(() -> {
            ZKLock lock = this.locks.get(name);
            if (lock == null) {
                InterProcessSemaphoreMutex mutexLock = new InterProcessSemaphoreMutex(this.curator, ZK_PATH_LOCKS + name);
                lock = new ZKLock(mutexLock);
            }
            try {
                if (lock.getLock().acquire(timeout, TimeUnit.MILLISECONDS)) {
                    this.locks.putIfAbsent(name, lock);
                    return lock;
                }
                throw new VertxException("Timed out waiting to get lock " + name);
            }
            catch (Exception e) {
                throw new VertxException("get lock exception", (Throwable)e);
            }
        }, resultHandler);
    }

    public void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler) {
        this.vertx.executeBlocking(future -> {
            try {
                Objects.requireNonNull(name);
                future.complete((Object)new ZKCounter(name, this.retryPolicy));
            }
            catch (Exception e) {
                future.fail((Throwable)new VertxException((Throwable)e));
            }
        }, resultHandler);
    }

    public String getNodeID() {
        return this.nodeID;
    }

    public List<String> getNodes() {
        return this.clusterNodes.getCurrentData().stream().map(e -> new String(e.getData())).collect(Collectors.toList());
    }

    public void nodeListener(NodeListener listener) {
        this.nodeListener = listener;
    }

    private void addLocalNodeID() throws VertxException {
        this.clusterNodes = new PathChildrenCache(this.curator, ZK_PATH_CLUSTER_NODE_WITHOUT_SLASH, true);
        this.clusterNodes.getListenable().addListener((Object)this);
        try {
            this.clusterNodes.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            ((ACLBackgroundPathAndBytesable)this.curator.create().withMode(CreateMode.EPHEMERAL)).forPath(ZK_PATH_CLUSTER_NODE + this.nodeID, this.nodeID.getBytes());
        }
        catch (Exception e) {
            throw new VertxException((Throwable)e);
        }
    }

    public synchronized void join(Handler<AsyncResult<Void>> resultHandler) {
        this.vertx.executeBlocking(future -> {
            if (!this.active) {
                this.active = true;
                if (this.customCuratorCluster) {
                    try {
                        this.addLocalNodeID();
                        future.complete();
                    }
                    catch (VertxException e) {
                        future.fail((Throwable)e);
                    }
                    return;
                }
                if (this.curator == null) {
                    this.retryPolicy = new ExponentialBackoffRetry(this.conf.getJsonObject("retry", new JsonObject()).getInteger("initialSleepTime", Integer.valueOf(1000)).intValue(), this.conf.getJsonObject("retry", new JsonObject()).getInteger("maxTimes", Integer.valueOf(5)).intValue(), this.conf.getJsonObject("retry", new JsonObject()).getInteger("intervalTimes", Integer.valueOf(10000)).intValue());
                    String hosts = System.getProperty("vertx.zookeeper.hosts");
                    if (hosts == null) {
                        hosts = this.conf.getString("zookeeperHosts", "127.0.0.1");
                    }
                    log.info((Object)("Zookeeper hosts set to " + hosts));
                    this.curator = CuratorFrameworkFactory.builder().connectString(hosts).namespace(this.conf.getString("rootPath", "io.vertx")).sessionTimeoutMs(this.conf.getInteger("sessionTimeout", Integer.valueOf(20000)).intValue()).connectionTimeoutMs(this.conf.getInteger("connectTimeout", Integer.valueOf(3000)).intValue()).retryPolicy(this.retryPolicy).build();
                }
                this.curator.start();
                while (this.curator.getState() != CuratorFrameworkState.STARTED) {
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException e) {
                        if (this.curator.getState() == CuratorFrameworkState.STARTED) continue;
                        future.fail("zookeeper client being interrupted while starting.");
                    }
                }
                this.nodeID = UUID.randomUUID().toString();
                try {
                    this.addLocalNodeID();
                    future.complete();
                }
                catch (Exception e) {
                    future.fail((Throwable)e);
                }
            }
        }, resultHandler);
    }

    public void leave(Handler<AsyncResult<Void>> resultHandler) {
        this.vertx.executeBlocking(future -> {
            ZookeeperClusterManager zookeeperClusterManager = this;
            synchronized (zookeeperClusterManager) {
                if (this.active) {
                    this.active = false;
                    try {
                        ((ErrorListenerPathable)this.curator.delete().deletingChildrenIfNeeded().inBackground((client, event) -> {
                            if (event.getType() == CuratorEventType.DELETE) {
                                if (this.customCuratorCluster) {
                                    future.complete();
                                } else if (this.curator.getState() == CuratorFrameworkState.STARTED) {
                                    this.curator.close();
                                    future.complete();
                                }
                            }
                        })).forPath(ZK_PATH_CLUSTER_NODE + this.nodeID);
                        AsyncMapTTLMonitor.getInstance(this.vertx, this).stop();
                    }
                    catch (Exception e) {
                        log.error((Object)e);
                        future.fail((Throwable)e);
                    }
                } else {
                    future.complete();
                }
            }
        }, resultHandler);
    }

    public boolean isActive() {
        return this.active;
    }

    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        if (!this.active) {
            return;
        }
        switch (event.getType()) {
            case CHILD_ADDED: {
                try {
                    if (this.nodeListener == null) break;
                    this.nodeListener.nodeAdded(new String(event.getData().getData()));
                }
                catch (Throwable t) {
                    log.error((Object)"Failed to handle memberAdded", t);
                }
                break;
            }
            case CHILD_REMOVED: {
                try {
                    if (this.nodeListener == null) break;
                    this.nodeListener.nodeLeft(new String(event.getData().getData()));
                }
                catch (Throwable t) {
                    log.error((Object)"Failed to handle memberRemoved", t);
                }
                break;
            }
            case CHILD_UPDATED: {
                log.warn((Object)("Weird event that update cluster node. path:" + event.getData().getPath()));
                break;
            }
            case CONNECTION_SUSPENDED: {
                this.locks.values().forEach(ZKLock::release);
                break;
            }
            case CONNECTION_LOST: {
                this.locks.values().forEach(ZKLock::release);
                this.locks.clear();
            }
        }
    }

    private class ZKLock
    implements Lock {
        private final InterProcessSemaphoreMutex lock;

        private ZKLock(InterProcessSemaphoreMutex lock) {
            this.lock = lock;
        }

        InterProcessSemaphoreMutex getLock() {
            return this.lock;
        }

        public void release() {
            ZookeeperClusterManager.this.vertx.executeBlocking(future -> {
                try {
                    this.lock.release();
                }
                catch (Exception e) {
                    log.error((Object)e);
                }
                future.complete();
            }, false, null);
        }
    }

    private class ZKCounter
    implements Counter {
        private DistributedAtomicLong atomicLong;
        private String counterPath;

        public ZKCounter(String nodeName, RetryPolicy retryPolicy) throws Exception {
            this.counterPath = ZookeeperClusterManager.ZK_PATH_COUNTERS + nodeName;
            this.atomicLong = new DistributedAtomicLong(ZookeeperClusterManager.this.curator, this.counterPath, retryPolicy);
        }

        public void get(Handler<AsyncResult<Long>> resultHandler) {
            Objects.requireNonNull(resultHandler);
            ZookeeperClusterManager.this.vertx.executeBlocking(future -> {
                try {
                    future.complete(this.atomicLong.get().preValue());
                }
                catch (Exception e) {
                    future.fail((Throwable)new VertxException((Throwable)e));
                }
            }, resultHandler);
        }

        public void incrementAndGet(Handler<AsyncResult<Long>> resultHandler) {
            Objects.requireNonNull(resultHandler);
            this.increment(true, resultHandler);
        }

        public void getAndIncrement(Handler<AsyncResult<Long>> resultHandler) {
            this.increment(false, resultHandler);
        }

        private void increment(boolean post, Handler<AsyncResult<Long>> resultHandler) {
            Objects.requireNonNull(resultHandler);
            ZookeeperClusterManager.this.vertx.executeBlocking(future -> {
                try {
                    long returnValue = 0L;
                    if (this.atomicLong.get().succeeded()) {
                        returnValue = (Long)this.atomicLong.get().preValue();
                    }
                    if (this.atomicLong.increment().succeeded()) {
                        future.complete((Object)(post ? (Long)this.atomicLong.get().postValue() : Long.valueOf(returnValue)));
                    } else {
                        future.fail((Throwable)new VertxException("increment value failed."));
                    }
                }
                catch (Exception e) {
                    future.fail((Throwable)new VertxException((Throwable)e));
                }
            }, resultHandler);
        }

        public void decrementAndGet(Handler<AsyncResult<Long>> resultHandler) {
            Objects.requireNonNull(resultHandler);
            ZookeeperClusterManager.this.vertx.executeBlocking(future -> {
                try {
                    if (this.atomicLong.decrement().succeeded()) {
                        future.complete(this.atomicLong.get().postValue());
                    } else {
                        future.fail((Throwable)new VertxException("decrement value failed."));
                    }
                }
                catch (Exception e) {
                    future.fail((Throwable)new VertxException((Throwable)e));
                }
            }, resultHandler);
        }

        public void addAndGet(long value, Handler<AsyncResult<Long>> resultHandler) {
            this.add(value, true, resultHandler);
        }

        public void getAndAdd(long value, Handler<AsyncResult<Long>> resultHandler) {
            this.add(value, false, resultHandler);
        }

        private void add(long value, boolean post, Handler<AsyncResult<Long>> resultHandler) {
            Objects.requireNonNull(resultHandler);
            ZookeeperClusterManager.this.vertx.executeBlocking(future -> {
                try {
                    long returnValue = 0L;
                    if (this.atomicLong.get().succeeded()) {
                        returnValue = (Long)this.atomicLong.get().preValue();
                    }
                    if (this.atomicLong.add(Long.valueOf(value)).succeeded()) {
                        future.complete((Object)(post ? (Long)this.atomicLong.get().postValue() : Long.valueOf(returnValue)));
                    } else {
                        future.fail((Throwable)new VertxException("add value failed."));
                    }
                }
                catch (Exception e) {
                    future.fail((Throwable)new VertxException((Throwable)e));
                }
            }, resultHandler);
        }

        public void compareAndSet(long expected, long value, Handler<AsyncResult<Boolean>> resultHandler) {
            Objects.requireNonNull(resultHandler);
            ZookeeperClusterManager.this.vertx.executeBlocking(future -> {
                try {
                    if (this.atomicLong.get().succeeded() && (Long)this.atomicLong.get().preValue() == 0L) {
                        this.atomicLong.initialize(Long.valueOf(0L));
                    }
                    future.complete((Object)this.atomicLong.compareAndSet(Long.valueOf(expected), Long.valueOf(value)).succeeded());
                }
                catch (Exception e) {
                    future.fail((Throwable)new VertxException((Throwable)e));
                }
            }, resultHandler);
        }
    }
}

