/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl;

import apache.rocketmq.v1.HeartbeatRequest;
import apache.rocketmq.v1.HeartbeatResponse;
import apache.rocketmq.v1.NotifyClientTerminationRequest;
import apache.rocketmq.v1.PollCommandRequest;
import apache.rocketmq.v1.PollCommandResponse;
import apache.rocketmq.v1.PrintThreadStackTraceCommand;
import apache.rocketmq.v1.QueryRouteRequest;
import apache.rocketmq.v1.QueryRouteResponse;
import apache.rocketmq.v1.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v1.ReportThreadStackTraceRequest;
import apache.rocketmq.v1.ReportThreadStackTraceResponse;
import apache.rocketmq.v1.Resource;
import apache.rocketmq.v1.VerifyMessageConsumptionCommand;
import com.aliyun.openservices.ons.shaded.com.google.common.collect.Sets;
import com.aliyun.openservices.ons.shaded.com.google.common.math.IntMath;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.AbstractIdleService;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.SettableFuture;
import com.aliyun.openservices.ons.shaded.com.google.errorprone.annotations.concurrent.GuardedBy;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Code;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Status;
import com.aliyun.openservices.ons.shaded.commons.lang3.RandomUtils;
import com.aliyun.openservices.ons.shaded.commons.lang3.StringUtils;
import com.aliyun.openservices.ons.shaded.io.grpc.Metadata;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ClientException;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ErrorCode;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.Client;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.ClientManager;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.ClientManagerFactory;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.Signature;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPoint;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptorContext;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.misc.TopAddressing;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.misc.Validators;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Address;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.AddressScheme;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Broker;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Endpoints;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Partition;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Permission;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.TopicRouteData;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.trace.MessageTracer;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.trace.TraceEndpointsProvider;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ExecutorServices;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ThreadFactoryImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.UtilAll;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public abstract class ClientImpl
extends Client
implements MessageInterceptor,
TraceEndpointsProvider {
    private static final Logger log = LoggerFactory.getLogger(ClientImpl.class);
    private static final long POLL_COMMAND_LATER_DELAY_MILLIS = 1000L;
    private static final long POLL_COMMAND_TIMEOUT_MILLIS = 60000L;
    protected volatile ClientManager clientManager;
    protected final ClientService clientService = new ClientService();
    protected final ThreadPoolExecutor commandExecutor;
    private final MessageTracer messageTracer = new MessageTracer(this);
    private final TopAddressing topAddressing = new TopAddressing();
    private final AtomicInteger nameServerIndex = new AtomicInteger(RandomUtils.nextInt());
    @GuardedBy(value="messageInterceptorsLock")
    private final List<MessageInterceptor> messageInterceptors = new ArrayList<MessageInterceptor>();
    private final ReadWriteLock messageInterceptorsLock = new ReentrantReadWriteLock();
    @GuardedBy(value="nameServerEndpointsListLock")
    private final List<Endpoints> nameServerEndpointsList = new ArrayList<Endpoints>();
    private final ReadWriteLock nameServerEndpointsListLock = new ReentrantReadWriteLock();
    @GuardedBy(value="inflightRouteFutureLock")
    private final Map<String, Set<SettableFuture<TopicRouteData>>> inflightRouteFutureTable = new HashMap<String, Set<SettableFuture<TopicRouteData>>>();
    private final Lock inflightRouteFutureLock = new ReentrantLock();
    private volatile ScheduledFuture<?> renewNameServerListFuture;
    private volatile ScheduledFuture<?> updateRouteCacheFuture;
    private final ConcurrentMap<String, TopicRouteData> topicRouteCache = new ConcurrentHashMap<String, TopicRouteData>();

    public ClientImpl(String group) throws ClientException {
        super(group);
        this.commandExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("CommandExecutor"));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Shutdown hook is invoked, clientId={}, status={}", (Object)this.id, (Object)this.clientService.state());
            this.clientService.stopAsync().awaitTerminated();
        }));
    }

    public abstract void onTopicRouteDataUpdate0(String var1, TopicRouteData var2);

    public abstract NotifyClientTerminationRequest wrapNotifyClientTerminationRequest();

    public boolean isRunning() {
        return this.clientService.isRunning();
    }

    public void registerMessageInterceptor(MessageInterceptor messageInterceptor) {
        this.messageInterceptorsLock.writeLock().lock();
        try {
            this.messageInterceptors.add(messageInterceptor);
        }
        finally {
            this.messageInterceptorsLock.writeLock().unlock();
        }
    }

    protected void setUp() throws ClientException {
        log.info("Begin to start the rocketmq client, clientId={}", (Object)this.id);
        if (null == this.clientManager) {
            this.clientManager = ClientManagerFactory.getInstance().registerClient(this.namespace, this);
        }
        this.messageTracer.init();
        ScheduledExecutorService scheduler = this.clientManager.getScheduler();
        if (this.isNameServerNotSet()) {
            this.renewNameServerList();
            log.info("Name server list was not set, schedule a task to fetch and renew periodically, clientId={}", (Object)this.id);
            this.renewNameServerListFuture = scheduler.scheduleWithFixedDelay(() -> {
                try {
                    this.renewNameServerList();
                }
                catch (Throwable t2) {
                    log.error("Exception raised while updating nameserver from top addressing, clientId={}", (Object)this.id, (Object)t2);
                }
            }, 0L, 30L, TimeUnit.SECONDS);
        }
        this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.updateRouteCache();
            }
            catch (Throwable t2) {
                log.error("Exception raised while updating topic route cache, clientId={}", (Object)this.id, (Object)t2);
            }
        }, 10L, 30L, TimeUnit.SECONDS);
        log.info("The rocketmq client starts successfully, clientId={}", (Object)this.id);
    }

    private void notifyClientTermination() {
        log.info("Notify that client is terminated, clientId={}", (Object)this.id);
        Set<Endpoints> routeEndpointsSet = this.getRouteEndpointsSet();
        NotifyClientTerminationRequest notifyClientTerminationRequest = this.wrapNotifyClientTerminationRequest();
        try {
            Metadata metadata = this.sign();
            for (Endpoints endpoints : routeEndpointsSet) {
                this.clientManager.notifyClientTermination(endpoints, metadata, notifyClientTerminationRequest, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
            }
        }
        catch (Throwable t2) {
            log.error("Exception raised while notifying client's termination, clientId={}", (Object)this.id, (Object)t2);
        }
    }

    protected void tearDown() throws InterruptedException {
        log.info("Begin to shutdown the rocketmq client, clientId={}", (Object)this.id);
        this.notifyClientTermination();
        if (null != this.renewNameServerListFuture) {
            this.renewNameServerListFuture.cancel(false);
        }
        if (null != this.updateRouteCacheFuture) {
            this.updateRouteCacheFuture.cancel(false);
        }
        this.messageTracer.shutdown();
        ClientManagerFactory.getInstance().unregisterClient(this.namespace, this);
        this.commandExecutor.shutdown();
        if (!ExecutorServices.awaitTerminated(this.commandExecutor)) {
            log.error("[Bug] Failed to shutdown command executor, clientId={}", (Object)this.id);
        }
        log.info("Shutdown the rocketmq client successfully, clientId={}", (Object)this.id);
    }

    public void intercept(MessageHookPoint hookPoint, MessageInterceptorContext context) {
        this.intercept(hookPoint, null, context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void intercept(MessageHookPoint hookPoint, MessageExt messageExt, MessageInterceptorContext context) {
        this.messageInterceptorsLock.readLock().lock();
        try {
            for (MessageInterceptor interceptor : this.messageInterceptors) {
                try {
                    interceptor.intercept(hookPoint, messageExt, context);
                }
                catch (Throwable t2) {
                    log.warn("Exception raised while intercepting message, hookPoint={}, messageId={}, clientId={}", new Object[]{hookPoint, messageExt.getMsgId(), this.id});
                }
            }
        }
        finally {
            this.messageInterceptorsLock.readLock().unlock();
        }
    }

    public ScheduledExecutorService getScheduler() {
        return this.clientManager.getScheduler();
    }

    public Metadata sign() throws ClientException {
        try {
            return Signature.sign(this);
        }
        catch (Throwable t2) {
            log.error("Failed to calculate signature, clientId={}", (Object)this.id, (Object)t2);
            throw new ClientException(ErrorCode.SIGNATURE_FAILURE, t2);
        }
    }

    protected Set<Endpoints> getRouteEndpointsSet() {
        HashSet<Endpoints> endpointsSet = new HashSet<Endpoints>();
        for (TopicRouteData topicRouteData : this.topicRouteCache.values()) {
            endpointsSet.addAll(topicRouteData.allEndpoints());
        }
        return endpointsSet;
    }

    private boolean isNameServerNotSet() {
        this.nameServerEndpointsListLock.readLock().lock();
        try {
            boolean bl = this.nameServerEndpointsList.isEmpty();
            return bl;
        }
        finally {
            this.nameServerEndpointsListLock.readLock().unlock();
        }
    }

    private void renewNameServerList() {
        List<Endpoints> newNameServerEndpointsList;
        log.info("Start to renew name server list for a new round, clientId={}", (Object)this.id);
        try {
            newNameServerEndpointsList = this.topAddressing.fetchNameServerAddresses();
        }
        catch (Throwable t2) {
            log.error("Failed to fetch name server list from top addressing", t2);
            return;
        }
        if (newNameServerEndpointsList.isEmpty()) {
            log.warn("Got an empty name server list, clientId={}", (Object)this.id);
            return;
        }
        this.nameServerEndpointsListLock.writeLock().lock();
        try {
            if (this.nameServerEndpointsList.equals(newNameServerEndpointsList)) {
                log.debug("Name server list remains the same, name server list={}, clientId={}", (Object)this.nameServerEndpointsList, (Object)this.id);
                return;
            }
            this.nameServerEndpointsList.clear();
            this.nameServerEndpointsList.addAll(newNameServerEndpointsList);
        }
        finally {
            this.nameServerEndpointsListLock.writeLock().unlock();
        }
    }

    private synchronized Set<Endpoints> updateTopicRouteCache(String topic, TopicRouteData topicRouteData) {
        Set<Endpoints> before = this.getRouteEndpointsSet();
        TopicRouteData oldTopicRouteData = this.topicRouteCache.put(topic, topicRouteData);
        if (topicRouteData.equals(oldTopicRouteData)) {
            log.info("Topic route remains the same, namespace={}, topic={}, clientId={}", this.namespace, topic, this.id);
        } else {
            log.info("Topic route is updated, namespace={}, topic={}, clientId={}, {} => {}", this.namespace, topic, this.id, oldTopicRouteData, topicRouteData);
        }
        Set<Endpoints> after = this.getRouteEndpointsSet();
        return new HashSet<Endpoints>(Sets.difference(after, before));
    }

    private void onTopicRouteDataUpdate(String topic, TopicRouteData topicRouteData) {
        this.onTopicRouteDataUpdate0(topic, topicRouteData);
        Set<Endpoints> diff = this.updateTopicRouteCache(topic, topicRouteData);
        for (Endpoints endpoints : diff) {
            log.info("Start polling command for new endpoints={}, clientId={}", (Object)endpoints, (Object)this.id);
            this.pollCommand(endpoints);
        }
        this.messageTracer.refresh();
    }

    private void updateRouteCache() {
        log.info("Start to update route cache for a new round, clientId={}", (Object)this.id);
        for (final String topic : this.topicRouteCache.keySet()) {
            ListenableFuture<TopicRouteData> future = this.fetchTopicRoute(topic);
            Futures.addCallback(future, new FutureCallback<TopicRouteData>(){

                @Override
                public void onSuccess(TopicRouteData topicRouteData) {
                    ClientImpl.this.onTopicRouteDataUpdate(topic, topicRouteData);
                }

                @Override
                public void onFailure(Throwable t2) {
                    log.error("Failed to fetch topic route for update cache, namespace={}, topic={}, clientId={}", ClientImpl.this.namespace, topic, ClientImpl.this.id, t2);
                }
            }, MoreExecutors.directExecutor());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ListenableFuture<TopicRouteData> getRouteData(final String topic) {
        SettableFuture<TopicRouteData> future0 = SettableFuture.create();
        TopicRouteData topicRouteData = (TopicRouteData)this.topicRouteCache.get(topic);
        if (null != topicRouteData) {
            future0.set(topicRouteData);
            return future0;
        }
        this.inflightRouteFutureLock.lock();
        try {
            topicRouteData = (TopicRouteData)this.topicRouteCache.get(topic);
            if (null != topicRouteData) {
                future0.set(topicRouteData);
                SettableFuture<TopicRouteData> settableFuture = future0;
                return settableFuture;
            }
            Set<SettableFuture<TopicRouteData>> inflightFutures = this.inflightRouteFutureTable.get(topic);
            if (null != inflightFutures) {
                inflightFutures.add(future0);
                SettableFuture<TopicRouteData> settableFuture = future0;
                return settableFuture;
            }
            inflightFutures = new HashSet<SettableFuture<TopicRouteData>>();
            inflightFutures.add(future0);
            this.inflightRouteFutureTable.put(topic, inflightFutures);
        }
        finally {
            this.inflightRouteFutureLock.unlock();
        }
        ListenableFuture<TopicRouteData> future = this.fetchTopicRoute(topic);
        Futures.addCallback(future, new FutureCallback<TopicRouteData>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onSuccess(TopicRouteData newTopicRouteData) {
                ClientImpl.this.inflightRouteFutureLock.lock();
                try {
                    ClientImpl.this.onTopicRouteDataUpdate(topic, newTopicRouteData);
                    Set newFutureSet = (Set)ClientImpl.this.inflightRouteFutureTable.remove(topic);
                    if (null == newFutureSet) {
                        log.error("[Bug] in-flight route futures was empty, namespace={}, topic={}, clientId={}", ClientImpl.this.namespace, topic, ClientImpl.this.id);
                        return;
                    }
                    log.debug("Fetch topic route successfully, namespace={}, topic={}, in-flight route future size={}, clientId={}", ClientImpl.this.namespace, topic, newFutureSet.size(), ClientImpl.this.id);
                    for (SettableFuture newFuture : newFutureSet) {
                        newFuture.set(newTopicRouteData);
                    }
                }
                catch (Throwable t2) {
                    log.error("[Bug] Exception raises while update route data, clientId={}, namespace={}, topic={}", ClientImpl.this.id, ClientImpl.this.namespace, topic, t2);
                }
                finally {
                    ClientImpl.this.inflightRouteFutureLock.unlock();
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onFailure(Throwable t2) {
                ClientImpl.this.inflightRouteFutureLock.lock();
                try {
                    Set newFutureSet = (Set)ClientImpl.this.inflightRouteFutureTable.remove(topic);
                    if (null == newFutureSet) {
                        log.error("[Bug] in-flight route futures was empty, namespace={}, topic={}, clientId={}", ClientImpl.this.namespace, topic, ClientImpl.this.id);
                        return;
                    }
                    log.error("Failed to fetch topic route, namespace={}, topic={}, in-flight route future size={}, clientId={}", ClientImpl.this.namespace, topic, newFutureSet.size(), ClientImpl.this.id, t2);
                    for (SettableFuture newFuture : newFutureSet) {
                        ClientException exception = new ClientException(ErrorCode.FETCH_TOPIC_ROUTE_FAILURE, t2);
                        newFuture.setException(exception);
                    }
                }
                finally {
                    ClientImpl.this.inflightRouteFutureLock.unlock();
                }
            }
        }, MoreExecutors.directExecutor());
        return future0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setNamesrvAddr(String nameServerStr) throws ClientException {
        this.nameServerStr = nameServerStr;
        Validators.checkNamesrvAddr(nameServerStr);
        this.nameServerEndpointsListLock.writeLock().lock();
        try {
            this.nameServerEndpointsList.clear();
            boolean httpPatternMatched = false;
            String httpPrefixMatched = "";
            if (nameServerStr.startsWith("http://")) {
                httpPatternMatched = true;
                httpPrefixMatched = "http://";
            } else if (nameServerStr.startsWith("https://")) {
                httpPatternMatched = true;
                httpPrefixMatched = "https://";
            }
            if (httpPatternMatched) {
                String domainName = nameServerStr.substring(httpPrefixMatched.length());
                String[] domainNameSplit = domainName.split(":");
                String host = domainNameSplit[0].replace("_", "-").toLowerCase(UtilAll.LOCALE);
                String[] hostSplit = host.split("\\.");
                if (hostSplit.length >= 2) {
                    this.setRegionId(hostSplit[1]);
                }
                int port = domainNameSplit.length >= 2 ? Integer.parseInt(domainNameSplit[1]) : 80;
                ArrayList<Address> addresses = new ArrayList<Address>();
                addresses.add(new Address(host, port));
                this.nameServerEndpointsList.add(new Endpoints(AddressScheme.DOMAIN_NAME, addresses));
                if (StringUtils.isNotBlank(this.namespace)) {
                    return;
                }
                if (Validators.NAME_SERVER_ENDPOINT_WITH_NAMESPACE_PATTERN.matcher(nameServerStr).matches()) {
                    this.namespace = nameServerStr.substring(nameServerStr.lastIndexOf(47) + 1, nameServerStr.indexOf(46));
                }
                return;
            }
            try {
                String[] addressArray;
                for (String address : addressArray = nameServerStr.split(";")) {
                    String[] split = address.split(":");
                    String host = split[0];
                    int port = Integer.parseInt(split[1]);
                    ArrayList<Address> addresses = new ArrayList<Address>();
                    addresses.add(new Address(host, port));
                    this.nameServerEndpointsList.add(new Endpoints(AddressScheme.IPv4, addresses));
                }
            }
            catch (Throwable t2) {
                log.error("Exception raises while parse name server address, clientId={}", (Object)this.id, (Object)t2);
                throw new ClientException(ErrorCode.ILLEGAL_FORMAT, t2);
            }
        }
        finally {
            this.nameServerEndpointsListLock.writeLock().unlock();
        }
    }

    private Endpoints selectNameServerEndpoints() throws ClientException {
        this.nameServerEndpointsListLock.readLock().lock();
        try {
            if (this.nameServerEndpointsList.isEmpty()) {
                throw new ClientException(ErrorCode.NO_AVAILABLE_NAME_SERVER);
            }
            Endpoints endpoints = this.nameServerEndpointsList.get(IntMath.mod(this.nameServerIndex.get(), this.nameServerEndpointsList.size()));
            return endpoints;
        }
        finally {
            this.nameServerEndpointsListLock.readLock().unlock();
        }
    }

    public Resource getPbGroup() {
        return Resource.newBuilder().setResourceNamespace(this.namespace).setName(this.group).build();
    }

    private ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) {
        SettableFuture<TopicRouteData> future = SettableFuture.create();
        try {
            final Endpoints endpoints = this.selectNameServerEndpoints();
            Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(topic).build();
            QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource).setEndpoints(endpoints.toPbEndpoints()).build();
            Metadata metadata = this.sign();
            ListenableFuture<QueryRouteResponse> responseFuture = this.clientManager.queryRoute(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
            Futures.addCallback(responseFuture, new FutureCallback<QueryRouteResponse>(){

                @Override
                public void onSuccess(QueryRouteResponse response) {
                }

                @Override
                public void onFailure(Throwable t2) {
                    log.error("Exception raised while fetch topic route from name server endpoints={}, namespace={}, topic={}, clientId={}, choose the another one for the next round.", endpoints, ClientImpl.this.namespace, topic, ClientImpl.this.id, t2);
                    ClientImpl.this.nameServerIndex.getAndIncrement();
                }
            }, MoreExecutors.directExecutor());
            return Futures.transformAsync(responseFuture, response -> {
                Status status = response.getCommon().getStatus();
                Code code = Code.forNumber(status.getCode());
                if (Code.NOT_FOUND.equals(code)) {
                    log.error("Topic not found, namespace={}, topic={}, clientId={}, endpoints={}, status message=[{}]", this.namespace, topic, this.id, endpoints, status.getMessage());
                    future.set(TopicRouteData.EMPTY);
                    return future;
                }
                if (!Code.OK.equals(code)) {
                    throw new ClientException(ErrorCode.FETCH_TOPIC_ROUTE_FAILURE, status.toString());
                }
                TopicRouteData topicRouteData = new TopicRouteData(response.getPartitionsList());
                future.set(topicRouteData);
                return future;
            }, MoreExecutors.directExecutor());
        }
        catch (Throwable e) {
            future.setException(e);
            return future;
        }
    }

    public abstract HeartbeatRequest wrapHeartbeatRequest();

    protected ListenableFuture<HeartbeatResponse> doHeartbeat(HeartbeatRequest request, final Endpoints endpoints) {
        try {
            Metadata metadata = this.sign();
            ListenableFuture<HeartbeatResponse> future = this.clientManager.heartbeat(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
            Futures.addCallback(future, new FutureCallback<HeartbeatResponse>(){

                @Override
                public void onSuccess(HeartbeatResponse response) {
                    Status status = response.getCommon().getStatus();
                    Code code = Code.forNumber(status.getCode());
                    if (!Code.OK.equals(code)) {
                        log.warn("Failed to send heartbeat, code={}, status message=[{}], endpoints={}, clientId={}", code, status.getMessage(), endpoints, ClientImpl.this.id);
                        return;
                    }
                    log.info("Send heartbeat successfully, endpoints={}, clientId={}", (Object)endpoints, (Object)ClientImpl.this.id);
                }

                @Override
                public void onFailure(Throwable t2) {
                    log.warn("Failed to send heartbeat, endpoints={}, clientId={}", endpoints, ClientImpl.this.id, t2);
                }
            }, MoreExecutors.directExecutor());
            return future;
        }
        catch (Throwable e) {
            log.error("Exception raised while heartbeat, endpoints={}, clientId={}", endpoints, this.id, e);
            SettableFuture<HeartbeatResponse> future0 = SettableFuture.create();
            future0.setException(e);
            return future0;
        }
    }

    @Override
    public void doHeartbeat() {
        Set<Endpoints> routeEndpointsSet = this.getRouteEndpointsSet();
        HeartbeatRequest request = this.wrapHeartbeatRequest();
        for (Endpoints endpoints : routeEndpointsSet) {
            this.doHeartbeat(request, endpoints);
        }
    }

    public abstract PollCommandRequest wrapPollCommandRequest();

    public void verifyMessageConsumption(Endpoints endpoints, VerifyMessageConsumptionCommand command) {
    }

    public void recoverOrphanedTransaction(Endpoints endpoints, RecoverOrphanedTransactionCommand command) {
    }

    public void printThreadStackTrace(Endpoints endpoints, PrintThreadStackTraceCommand command) {
        final String commandId = command.getCommandId();
        Runnable task = () -> {
            ListenableFuture<Object> future;
            try {
                String threadStackTrace = UtilAll.stackTrace();
                ReportThreadStackTraceRequest request = ReportThreadStackTraceRequest.newBuilder().setThreadStackTrace(threadStackTrace).setCommandId(commandId).build();
                Metadata metadata = this.sign();
                future = this.clientManager.reportThreadStackTrace(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            catch (Throwable t2) {
                SettableFuture future0 = SettableFuture.create();
                future0.setException(t2);
                future = future0;
            }
            Futures.addCallback(future, new FutureCallback<ReportThreadStackTraceResponse>(){

                @Override
                public void onSuccess(ReportThreadStackTraceResponse response) {
                    Status status = response.getCommon().getStatus();
                    Code code = Code.forNumber(status.getCode());
                    if (!Code.OK.equals(code)) {
                        log.error("Failed to report thread stack trace, clientId={}, commandId={}, code={}, status message=[{}]", ClientImpl.this.id, commandId, code, status.getMessage());
                        return;
                    }
                    log.info("Report thread stack trace response, clientId={}, commandId={}", (Object)ClientImpl.this.id, (Object)commandId);
                }

                @Override
                public void onFailure(Throwable t2) {
                    log.error("Exception raised while reporting thread stack trace, clientId={}, commandId={}", ClientImpl.this.id, commandId, t2);
                }
            }, MoreExecutors.directExecutor());
        };
        try {
            this.commandExecutor.submit(task);
        }
        catch (Throwable t2) {
            log.error("[Bug] Exception raised while submitting task to print thread stack trace, clientId={}, commandId={}", this.id, commandId, t2);
        }
    }

    private void onPollCommandResponse(Endpoints endpoints, PollCommandResponse response) {
        switch (response.getTypeCase()) {
            case PRINT_THREAD_STACK_TRACE_COMMAND: {
                log.info("Receive command to print thread stack trace, clientId={}, commandId={}", (Object)this.id, (Object)response.getPrintThreadStackTraceCommand().getCommandId());
                this.printThreadStackTrace(endpoints, response.getPrintThreadStackTraceCommand());
                break;
            }
            case VERIFY_MESSAGE_CONSUMPTION_COMMAND: {
                log.info("Receive command to verify message consumption, clientId={}, commandId={}", (Object)this.id, (Object)response.getVerifyMessageConsumptionCommand().getCommandId());
                this.verifyMessageConsumption(endpoints, response.getVerifyMessageConsumptionCommand());
                break;
            }
            case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
                log.info("Receive command to recover orphaned transaction, clientId={}", (Object)this.id);
                this.recoverOrphanedTransaction(endpoints, response.getRecoverOrphanedTransactionCommand());
                break;
            }
            case NOOP_COMMAND: {
                log.debug("Receive noop command, clientId={}", (Object)this.id);
                break;
            }
        }
        this.pollCommand(endpoints);
    }

    private void pollCommand(final Endpoints endpoints) {
        try {
            PollCommandRequest request = this.wrapPollCommandRequest();
            Set<Endpoints> routeEndpointsSet = this.getRouteEndpointsSet();
            if (!routeEndpointsSet.contains(endpoints)) {
                log.info("Endpoints was removed, no need to poll command, endpoints={}, clientId={}", (Object)endpoints, (Object)this.id);
                return;
            }
            ListenableFuture<PollCommandResponse> future = this.pollCommand0(endpoints, request);
            Futures.addCallback(future, new FutureCallback<PollCommandResponse>(){

                @Override
                public void onSuccess(PollCommandResponse response) {
                    try {
                        ClientImpl.this.onPollCommandResponse(endpoints, response);
                    }
                    catch (Throwable t2) {
                        log.error("[Bug] Exception raised while handling polling response, would call later, endpoints={}, clientId={}", endpoints, ClientImpl.this.id, t2);
                        ClientImpl.this.pollCommandLater(endpoints);
                    }
                }

                @Override
                public void onFailure(Throwable t2) {
                    log.error("Exception raised while polling command, would call later, endpoints={}, clientId={}", endpoints, ClientImpl.this.id, t2);
                    ClientImpl.this.pollCommandLater(endpoints);
                }
            }, MoreExecutors.directExecutor());
        }
        catch (Throwable t2) {
            log.error("Exception raised while polling command, would call later, endpoints={}, clientId={}", endpoints, this.id, t2);
            this.pollCommandLater(endpoints);
        }
    }

    private ListenableFuture<PollCommandResponse> pollCommand0(Endpoints endpoints, PollCommandRequest request) {
        SettableFuture<PollCommandResponse> future = SettableFuture.create();
        try {
            Metadata metadata = this.sign();
            return this.clientManager.pollCommand(endpoints, metadata, request, 60000L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t2) {
            future.setException(t2);
            return future;
        }
    }

    private void pollCommandLater(Endpoints endpoints) {
        ScheduledExecutorService scheduler = this.clientManager.getScheduler();
        try {
            scheduler.schedule(() -> {
                try {
                    this.pollCommand(endpoints);
                }
                catch (Throwable t2) {
                    this.pollCommandLater(endpoints);
                }
            }, 1000L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable t2) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule polling command, clientId={}", (Object)this.id, (Object)t2);
            this.pollCommandLater(endpoints);
        }
    }

    @Override
    public List<Endpoints> getTraceCandidates() {
        HashSet<Endpoints> set = new HashSet<Endpoints>();
        for (TopicRouteData topicRouteData : this.topicRouteCache.values()) {
            List<Partition> partitions = topicRouteData.getPartitions();
            for (Partition partition : partitions) {
                Broker broker = partition.getBroker();
                if (0 != broker.getId() || Permission.NONE.equals((Object)partition.getPermission())) continue;
                set.add(broker.getEndpoints());
            }
        }
        return new ArrayList<Endpoints>(set);
    }

    public class ClientService
    extends AbstractIdleService {
        @Override
        protected void startUp() throws Exception {
            ClientImpl.this.setUp();
        }

        @Override
        protected void shutDown() throws Exception {
            ClientImpl.this.tearDown();
        }
    }
}

