/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.producer;

import apache.rocketmq.v1.Encoding;
import apache.rocketmq.v1.EndTransactionRequest;
import apache.rocketmq.v1.EndTransactionResponse;
import apache.rocketmq.v1.HealthCheckRequest;
import apache.rocketmq.v1.HealthCheckResponse;
import apache.rocketmq.v1.HeartbeatRequest;
import apache.rocketmq.v1.Message;
import apache.rocketmq.v1.NotifyClientTerminationRequest;
import apache.rocketmq.v1.PollCommandRequest;
import apache.rocketmq.v1.ProducerData;
import apache.rocketmq.v1.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v1.Resource;
import apache.rocketmq.v1.SendMessageRequest;
import apache.rocketmq.v1.SendMessageResponse;
import apache.rocketmq.v1.SystemAttribute;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Preconditions;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Stopwatch;
import com.aliyun.openservices.ons.shaded.com.google.common.collect.Sets;
import com.aliyun.openservices.ons.shaded.com.google.common.math.IntMath;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListeningExecutorService;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.SettableFuture;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.ByteString;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.util.Timestamps;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Code;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Status;
import com.aliyun.openservices.ons.shaded.commons.lang3.StringUtils;
import com.aliyun.openservices.ons.shaded.io.grpc.Metadata;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ClientException;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ErrorCode;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.exception.ServerException;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.ClientImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.producer.SendingTopicRouteData;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPoint;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPointStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImplAccessor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptorContext;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageQueue;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.protocol.MessageType;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.misc.Validators;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.producer.MessageGroupQueueSelector;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.producer.MessageQueueSelector;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.producer.SendCallback;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.producer.SendResult;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.producer.Transaction;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.producer.TransactionChecker;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.producer.TransactionImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.producer.TransactionResolution;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Endpoints;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Partition;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.TopicRouteData;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ExecutorServices;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ThreadFactoryImpl;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.UtilAll;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ProducerImpl
extends ClientImpl {
    public static final int MESSAGE_COMPRESSION_THRESHOLD = 4096;
    public static final int MESSAGE_COMPRESSION_LEVEL = 5;
    private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
    private int maxAttempts = 3;
    private long sendMessageTimeoutMillis = 5000L;
    private long transactionResolveDelayMillis = 5000L;
    private TransactionChecker transactionChecker;
    private final ExecutorService defaultSendCallbackExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("SendCallbackWorker"));
    private ExecutorService customSendCallbackExecutor = null;
    private final ConcurrentMap<String, SendingTopicRouteData> sendingRouteDataCache = new ConcurrentHashMap<String, SendingTopicRouteData>();
    private final Set<Endpoints> isolatedEndpointsSet = Collections.newSetFromMap(new ConcurrentHashMap());

    public ProducerImpl(String group) throws ClientException {
        super(group);
    }

    private void preconditionCheck(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message) throws ClientException {
        if (!this.clientService.isRunning()) {
            throw new ClientException(ErrorCode.NOT_STARTED, "Please invoke #start() first!");
        }
        Validators.checkMessage(message);
    }

    @Override
    protected void setUp() throws ClientException {
        log.info("Begin to start the rocketmq producer, clientId={}", (Object)this.id);
        super.setUp();
        log.info("The rocketmq producer starts successfully, clientId={}", (Object)this.id);
    }

    @Override
    protected void tearDown() throws InterruptedException {
        log.info("Begin to shutdown the rocketmq producer, clientId={}", (Object)this.id);
        super.tearDown();
        this.defaultSendCallbackExecutor.shutdown();
        if (!ExecutorServices.awaitTerminated(this.defaultSendCallbackExecutor)) {
            log.error("[Bug] Failed to shutdown default send callback executor, clientId={}", (Object)this.id);
        }
        log.info("Shutdown the rocketmq producer successfully, clientId={}", (Object)this.id);
    }

    public void start() {
        this.clientService.startAsync().awaitRunning();
    }

    public void shutdown() {
        this.clientService.stopAsync().awaitTerminated();
    }

    public void isolateEndpoints(Endpoints endpoints) {
        this.isolatedEndpointsSet.add(endpoints);
    }

    @Override
    public void doHealthCheck() {
        Set<Endpoints> routeEndpointsSet = this.getRouteEndpointsSet();
        HashSet<Endpoints> expired = new HashSet<Endpoints>(Sets.difference(routeEndpointsSet, this.isolatedEndpointsSet));
        this.isolatedEndpointsSet.removeAll(expired);
        HealthCheckRequest request = HealthCheckRequest.newBuilder().build();
        for (final Endpoints endpoints : this.isolatedEndpointsSet) {
            Metadata metadata;
            try {
                metadata = this.sign();
            }
            catch (Throwable t2) {
                continue;
            }
            ListenableFuture<HealthCheckResponse> future = this.clientManager.healthCheck(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
            Futures.addCallback(future, new FutureCallback<HealthCheckResponse>(){

                @Override
                public void onSuccess(HealthCheckResponse response) {
                    Status status = response.getCommon().getStatus();
                    Code code = Code.forNumber(status.getCode());
                    if (Code.OK.equals(code)) {
                        ProducerImpl.this.isolatedEndpointsSet.remove(endpoints);
                        log.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", (Object)ProducerImpl.this.id, (Object)endpoints);
                        return;
                    }
                    log.warn("Failed to rejoin the endpoints which is isolated before, clientId={}, code={}, status message=[{}], endpoints={}", ProducerImpl.this.id, code, status.getMessage(), endpoints);
                }

                @Override
                public void onFailure(Throwable t2) {
                    log.error("Failed to do health check, clientId={}, endpoints={}", ProducerImpl.this.id, endpoints, t2);
                }
            }, MoreExecutors.directExecutor());
        }
    }

    public void setCallbackExecutor(ExecutorService executor) {
        this.customSendCallbackExecutor = Preconditions.checkNotNull(executor, "executor");
    }

    public ExecutorService getSendCallbackExecutor() {
        if (null != this.customSendCallbackExecutor) {
            return this.customSendCallbackExecutor;
        }
        return this.defaultSendCallbackExecutor;
    }

    private SendMessageRequest wrapSendMessageRequest(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, Partition partition) {
        Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(message.getTopic()).build();
        SystemAttribute.Builder systemAttributeBuilder = SystemAttribute.newBuilder().setTag(message.getTag()).addAllKeys(message.getKeysList()).setMessageId(message.getMessageExt().getMsgId()).setBornTimestamp(Timestamps.fromMillis(message.getBornTimeMillis())).setBornHost(message.getBornHost()).setPartitionId(partition.getId()).setProducerGroup(this.getPbGroup());
        com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.protocol.Encoding encoding = com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.protocol.Encoding.IDENTITY;
        byte[] body = message.getBody();
        if (body.length > 4096) {
            try {
                body = UtilAll.compressBytesGzip(body, 5);
                encoding = com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.protocol.Encoding.GZIP;
            }
            catch (IOException e) {
                log.warn("Failed to compress message, clientId={}, messageId={}", this.id, message.getMsgId(), e);
            }
        }
        switch (encoding) {
            case GZIP: {
                systemAttributeBuilder.setBodyEncoding(Encoding.GZIP);
                break;
            }
            default: {
                systemAttributeBuilder.setBodyEncoding(Encoding.IDENTITY);
            }
        }
        MessageImpl messageImpl = MessageImplAccessor.getMessageImpl(message);
        String traceContext = messageImpl.getSystemAttribute().getTraceContext();
        if (null != traceContext) {
            systemAttributeBuilder.setTraceContext(traceContext);
        }
        switch (messageImpl.getSystemAttribute().getMessageType()) {
            case FIFO: {
                systemAttributeBuilder.setMessageType(apache.rocketmq.v1.MessageType.FIFO);
                String messageGroup = message.getMessageGroup();
                if (null == messageGroup) break;
                systemAttributeBuilder.setMessageGroup(messageGroup);
                break;
            }
            case DELAY: {
                systemAttributeBuilder.setMessageType(apache.rocketmq.v1.MessageType.DELAY);
                int delayTimeLevel = message.getDelayTimeLevel();
                long deliveryTimestamp = message.getDelayTimeMillis();
                if (delayTimeLevel > 0) {
                    systemAttributeBuilder.setDelayLevel(delayTimeLevel);
                    break;
                }
                if (deliveryTimestamp <= 0L) break;
                systemAttributeBuilder.setDeliveryTimestamp(Timestamps.fromMillis(deliveryTimestamp));
                break;
            }
            case TRANSACTION: {
                systemAttributeBuilder.setMessageType(apache.rocketmq.v1.MessageType.TRANSACTION);
                break;
            }
            default: {
                systemAttributeBuilder.setMessageType(apache.rocketmq.v1.MessageType.NORMAL);
            }
        }
        SystemAttribute systemAttribute = systemAttributeBuilder.build();
        Message msg = Message.newBuilder().setTopic(topicResource).setSystemAttribute(systemAttribute).putAllUserAttribute(message.getUserProperties()).setBody(ByteString.copyFrom(body)).build();
        return SendMessageRequest.newBuilder().setMessage(msg).build();
    }

    public SendResult send(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message) throws ClientException, InterruptedException, ServerException, TimeoutException {
        return this.send(message, this.sendMessageTimeoutMillis);
    }

    public SendResult send(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, long timeoutMillis) throws ClientException, InterruptedException, TimeoutException, ServerException {
        this.preconditionCheck(message);
        ListenableFuture<SendResult> future = this.send0(message, this.maxAttempts);
        Futures.withTimeout(future, timeoutMillis, TimeUnit.MILLISECONDS, this.getScheduler());
        try {
            return (SendResult)future.get(timeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw this.onExecutionException(e);
        }
    }

    public void send(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, SendCallback sendCallback) throws ClientException, InterruptedException {
        this.send(message, sendCallback, this.sendMessageTimeoutMillis);
    }

    public void send(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, final SendCallback sendCallback, long timeoutMillis) throws ClientException, InterruptedException {
        this.preconditionCheck(message);
        ListenableFuture<SendResult> future = this.send0(message, this.maxAttempts);
        Futures.withTimeout(future, timeoutMillis, TimeUnit.MILLISECONDS, this.getScheduler());
        final ExecutorService sendCallbackExecutor = this.getSendCallbackExecutor();
        final String messageId = message.getMsgId();
        Futures.addCallback(future, new FutureCallback<SendResult>(){

            @Override
            public void onSuccess(SendResult sendResult) {
                try {
                    sendCallbackExecutor.submit(() -> {
                        try {
                            sendCallback.onSuccess(sendResult);
                        }
                        catch (Throwable t2) {
                            log.error("Exception raised in SendCallback#onSuccess, namespace={}, messageId={}, clientId={}", ProducerImpl.this.namespace, messageId, ProducerImpl.this.id, t2);
                        }
                    });
                }
                catch (Throwable t2) {
                    log.error("Exception occurs while submitting task to send callback executor, namespace={}, messageId={}, clientId={}", ProducerImpl.this.namespace, messageId, ProducerImpl.this.id, t2);
                }
            }

            @Override
            public void onFailure(Throwable t2) {
                try {
                    sendCallbackExecutor.submit(() -> {
                        try {
                            sendCallback.onException(t2);
                        }
                        catch (Throwable t1) {
                            log.error("Exception occurs in SendCallback#onException, namespace={}, messageId={}, clientId={}", ProducerImpl.this.namespace, messageId, ProducerImpl.this.id, t1);
                        }
                    });
                }
                catch (Throwable t0) {
                    log.error("Exception occurs while submitting task to send callback executor, namespace={}, messageId={}, clientId={}", ProducerImpl.this.namespace, messageId, ProducerImpl.this.id, t0);
                }
            }
        }, MoreExecutors.directExecutor());
    }

    public void sendOneway(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message) throws ClientException {
        this.preconditionCheck(message);
        this.send0(message, 1);
    }

    public SendResult send(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, String messageGroup) throws ServerException, ClientException, InterruptedException, TimeoutException {
        if (StringUtils.isBlank(messageGroup)) {
            throw new ClientException(ErrorCode.ILLEGAL_FORMAT, "message group is blank");
        }
        MessageImpl messageImpl = MessageImplAccessor.getMessageImpl(message);
        messageImpl.getSystemAttribute().setMessageGroup(messageGroup);
        MessageGroupQueueSelector selector = new MessageGroupQueueSelector(messageGroup);
        return this.send(message, selector, null);
    }

    public SendResult send(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, MessageQueueSelector selector, Object arg) throws ClientException, InterruptedException, ServerException, TimeoutException {
        return this.send(message, selector, arg, this.sendMessageTimeoutMillis);
    }

    public SendResult send(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, MessageQueueSelector selector, Object arg, long timeoutMillis) throws ClientException, ServerException, InterruptedException, TimeoutException {
        this.preconditionCheck(message);
        ListenableFuture<SendResult> future = this.send0(message, selector, arg, this.maxAttempts);
        Futures.withTimeout(future, timeoutMillis, TimeUnit.MILLISECONDS, this.getScheduler());
        try {
            return (SendResult)future.get(timeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw this.onExecutionException(e);
        }
    }

    public Transaction prepare(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message) throws ServerException, InterruptedException, ClientException, TimeoutException {
        MessageImpl messageImpl = MessageImplAccessor.getMessageImpl(message);
        com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.protocol.SystemAttribute systemAttribute = messageImpl.getSystemAttribute();
        systemAttribute.setMessageType(MessageType.TRANSACTION);
        systemAttribute.setOrphanedTransactionRecoveryPeriodMillis(this.transactionResolveDelayMillis);
        SendResult sendResult = this.send(message);
        return new TransactionImpl(sendResult, message, this);
    }

    public void commit(Endpoints endpoints, MessageExt messageExt, String transactionId) throws ClientException, ServerException, InterruptedException, TimeoutException {
        this.endTransaction(endpoints, messageExt, transactionId, TransactionResolution.COMMIT);
    }

    public void rollback(Endpoints endpoints, MessageExt messageExt, String transactionId) throws ClientException, ServerException, InterruptedException, TimeoutException {
        this.endTransaction(endpoints, messageExt, transactionId, TransactionResolution.ROLLBACK);
    }

    private void endTransaction(Endpoints endpoints, final MessageExt messageExt, String transactionId, TransactionResolution resolution) throws ClientException, ServerException, InterruptedException, TimeoutException {
        Metadata metadata;
        try {
            metadata = this.sign();
        }
        catch (Throwable t2) {
            throw new ClientException(ErrorCode.SIGNATURE_FAILURE, t2);
        }
        String messageId = messageExt.getMsgId();
        EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder().setMessageId(messageId).setTransactionId(transactionId).setGroup(this.getPbGroup());
        switch (resolution) {
            case COMMIT: {
                builder.setResolution(EndTransactionRequest.TransactionResolution.COMMIT);
                break;
            }
            default: {
                builder.setResolution(EndTransactionRequest.TransactionResolution.ROLLBACK);
            }
        }
        EndTransactionRequest request = builder.build();
        MessageHookPoint preHookPoint = TransactionResolution.COMMIT.equals((Object)resolution) ? MessageHookPoint.PRE_COMMIT_MESSAGE : MessageHookPoint.PRE_ROLLBACK_MESSAGE;
        final MessageHookPoint postHookPoint = TransactionResolution.COMMIT.equals((Object)resolution) ? MessageHookPoint.POST_COMMIT_MESSAGE : MessageHookPoint.POST_ROLLBACK_MESSAGE;
        String topic = messageExt.getTopic();
        final MessageInterceptorContext preContext = MessageInterceptorContext.builder().setTopic(topic).build();
        this.intercept(preHookPoint, messageExt, preContext);
        final Stopwatch stopwatch = Stopwatch.createStarted();
        ListenableFuture<EndTransactionResponse> future = this.clientManager.endTransaction(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
        Futures.addCallback(future, new FutureCallback<EndTransactionResponse>(){

            @Override
            public void onSuccess(EndTransactionResponse response) {
                Code code = Code.forNumber(response.getCommon().getStatus().getCode());
                MessageHookPointStatus status = Code.OK.equals(code) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR;
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext postContext = preContext.toBuilder().setDuration(duration).setStatus(status).build();
                ProducerImpl.this.intercept(postHookPoint, messageExt, postContext);
            }

            @Override
            public void onFailure(Throwable t2) {
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext postContext = preContext.toBuilder().setDuration(duration).setStatus(MessageHookPointStatus.ERROR).setThrowable(t2).build();
                ProducerImpl.this.intercept(postHookPoint, messageExt, postContext);
            }
        }, MoreExecutors.directExecutor());
        try {
            EndTransactionResponse response = (EndTransactionResponse)future.get(this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
            Status status = response.getCommon().getStatus();
            Code code = Code.forNumber(status.getCode());
            if (!Code.OK.equals(code)) {
                log.error("Failed to end transaction, clientId={}, namespace={}, topic={}, messageId={}, transactionId={}, resolution={}, code={}, status message=[{}]", new Object[]{this.id, this.namespace, topic, messageId, transactionId, resolution, code, status.getMessage()});
                throw new ServerException(ErrorCode.OTHER, status.getMessage());
            }
        }
        catch (ExecutionException e) {
            throw this.onExecutionException(e);
        }
    }

    @Override
    public void recoverOrphanedTransaction(final Endpoints endpoints, final RecoverOrphanedTransactionCommand command) {
        Future future;
        MessageExt messageExt;
        Message message = command.getOrphanedTransactionalMessage();
        final String messageId = message.getSystemAttribute().getMessageId();
        if (null == this.transactionChecker) {
            log.error("No transaction checker registered, ignore it, messageId={}, clientId={}", (Object)messageId, (Object)this.id);
            return;
        }
        try {
            MessageImpl messageImpl = MessageImplAccessor.wrapMessageImpl(message);
            messageExt = new MessageExt(messageImpl);
        }
        catch (Throwable t2) {
            log.error("[Bug] Failed to decode message while recovering orphaned transaction, messageId={}, clientId={}", messageId, this.id, t2);
            return;
        }
        try {
            ListeningExecutorService commandService = MoreExecutors.listeningDecorator(this.commandExecutor);
            Callable<TransactionResolution> task = () -> this.transactionChecker.check(messageExt);
            future = commandService.submit(task);
        }
        catch (Throwable t3) {
            SettableFuture future0 = SettableFuture.create();
            future0.setException(t3);
            future = future0;
        }
        Futures.addCallback(future, new FutureCallback<TransactionResolution>(){

            @Override
            public void onSuccess(TransactionResolution resolution) {
                try {
                    if (null == resolution || TransactionResolution.UNKNOWN.equals((Object)resolution)) {
                        return;
                    }
                    ProducerImpl.this.endTransaction(endpoints, messageExt, command.getTransactionId(), resolution);
                }
                catch (Throwable t2) {
                    log.error("Exception raised while check and end transaction, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, command.getTransactionId(), endpoints, ProducerImpl.this.id, t2);
                }
            }

            @Override
            public void onFailure(Throwable t2) {
                log.error("Exception raised while recover orphaned transaction, messageId={}, endpoints={}, clientId={}", messageId, endpoints, ProducerImpl.this.id, t2);
            }
        }, MoreExecutors.directExecutor());
    }

    @Override
    public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
        this.sendingRouteDataCache.put(topic, new SendingTopicRouteData(topicRouteData));
    }

    @Override
    public NotifyClientTerminationRequest wrapNotifyClientTerminationRequest() {
        return NotifyClientTerminationRequest.newBuilder().setClientId(this.id).setProducerGroup(this.getPbGroup()).build();
    }

    private ListenableFuture<SendingTopicRouteData> getSendingTopicRouteData(String topic) {
        SettableFuture<SendingTopicRouteData> future0 = SettableFuture.create();
        SendingTopicRouteData cachedSendingRouteData = (SendingTopicRouteData)this.sendingRouteDataCache.get(topic);
        if (null != cachedSendingRouteData) {
            future0.set(cachedSendingRouteData);
            return future0;
        }
        ListenableFuture<TopicRouteData> future = this.getRouteData(topic);
        return Futures.transform(future, topicRouteData -> {
            SendingTopicRouteData sendingRouteData = new SendingTopicRouteData((TopicRouteData)topicRouteData);
            this.sendingRouteDataCache.put(topic, sendingRouteData);
            return sendingRouteData;
        }, MoreExecutors.directExecutor());
    }

    private ListenableFuture<SendResult> send0(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, int maxAttempts) {
        ListenableFuture<SendingTopicRouteData> future = this.getSendingTopicRouteData(message.getTopic());
        return Futures.transformAsync(future, sendingRouteData -> {
            List<Partition> candidates = this.takePartitions((SendingTopicRouteData)sendingRouteData, maxAttempts);
            return this.send0(message, candidates, maxAttempts);
        }, MoreExecutors.directExecutor());
    }

    private ListenableFuture<SendResult> send0(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, MessageQueueSelector selector, Object arg, int maxAttempts) {
        MessageImpl messageImpl = MessageImplAccessor.getMessageImpl(message);
        com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.protocol.SystemAttribute systemAttribute = messageImpl.getSystemAttribute();
        systemAttribute.setMessageType(MessageType.FIFO);
        ListenableFuture<Partition> future0 = this.selectPartition(message, selector, arg);
        return Futures.transformAsync(future0, partition -> {
            ArrayList<Partition> candidates = new ArrayList<Partition>();
            candidates.add((Partition)partition);
            return this.send0(message, candidates, maxAttempts);
        }, MoreExecutors.directExecutor());
    }

    private ListenableFuture<SendResult> send0(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, List<Partition> candidates, int maxAttempts) {
        SettableFuture<SendResult> future = SettableFuture.create();
        int delayTimeLevel = message.getDelayTimeLevel();
        long deliveryTimestamp = message.getDelayTimeMillis();
        if (delayTimeLevel > 0 || deliveryTimestamp > 0L) {
            MessageImplAccessor.getMessageImpl(message).getSystemAttribute().setMessageType(MessageType.DELAY);
        }
        this.send0(future, candidates, message, 1, maxAttempts);
        return future;
    }

    private void send0(final SettableFuture<SendResult> future, final List<Partition> candidates, final com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, final int attempt, final int maxAttempts) {
        Metadata metadata;
        final String topic = message.getTopic();
        final String msgId = message.getMsgId();
        if (future.isCancelled()) {
            log.error("No need for sending because of timeout, namespace={}, topic={}, messageId={}, maxAttempts={}, attempt={}, clientId={}", this.namespace, topic, msgId, maxAttempts, attempt, this.id);
            return;
        }
        try {
            metadata = this.sign();
        }
        catch (Throwable t2) {
            future.setException(t2);
            return;
        }
        Partition partition = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
        final Endpoints endpoints = partition.getBroker().getEndpoints();
        final MessageInterceptorContext preContext = MessageInterceptorContext.builder().setAttempt(attempt).setTopic(topic).build();
        this.intercept(MessageHookPoint.PRE_SEND_MESSAGE, message.getMessageExt(), preContext);
        final Stopwatch stopwatch = Stopwatch.createStarted();
        SendMessageRequest request = this.wrapSendMessageRequest(message, partition);
        ListenableFuture<SendMessageResponse> responseFuture = this.clientManager.sendMessage(endpoints, metadata, request, this.ioTimeoutMillis, TimeUnit.MILLISECONDS);
        ListenableFuture attemptFuture = Futures.transformAsync(responseFuture, response -> {
            SettableFuture<SendResult> future0 = SettableFuture.create();
            SendResult sendResult = ProducerImpl.processSendResponse(endpoints, response);
            future0.set(sendResult);
            return future0;
        }, MoreExecutors.directExecutor());
        Futures.addCallback(attemptFuture, new FutureCallback<SendResult>(){

            @Override
            public void onSuccess(SendResult sendResult) {
                future.set(sendResult);
                if (1 < attempt) {
                    log.info("Resend message successfully, namespace={}, topic={}, messageId={}, maxAttempts={}, attempt={}, endpoints={}, clientId={}", ProducerImpl.this.namespace, topic, msgId, maxAttempts, attempt, endpoints, ProducerImpl.this.id);
                }
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext context = preContext.toBuilder().setDuration(duration).setStatus(MessageHookPointStatus.OK).build();
                ProducerImpl.this.intercept(MessageHookPoint.POST_SEND_MESSAGE, message.getMessageExt(), context);
            }

            @Override
            public void onFailure(Throwable t2) {
                long duration = stopwatch.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                MessageInterceptorContext context = preContext.toBuilder().setDuration(duration).setStatus(MessageHookPointStatus.ERROR).setThrowable(t2).build();
                ProducerImpl.this.intercept(MessageHookPoint.POST_SEND_MESSAGE, message.getMessageExt(), context);
                ProducerImpl.this.isolateEndpoints(endpoints);
                if (attempt >= maxAttempts) {
                    future.setException(t2);
                    log.error("Failed to send message finally, run out of attempt times, maxAttempts={}, attempt={}, namespace={}, topic={}, messageId={}, endpoints={}, clientId={}", maxAttempts, attempt, ProducerImpl.this.namespace, topic, msgId, endpoints, ProducerImpl.this.id, t2);
                    return;
                }
                log.warn("Failed to send message, would attempt to resend right now, maxAttempts={}, attempt={}, namespace={}, topic={}, messageId={}, endpoints={}, clientId={}", maxAttempts, attempt, ProducerImpl.this.namespace, topic, msgId, endpoints, ProducerImpl.this.id, t2);
                ProducerImpl.this.send0(future, candidates, message, 1 + attempt, maxAttempts);
            }
        }, MoreExecutors.directExecutor());
    }

    private List<Partition> takePartitions(SendingTopicRouteData sendingRouteData, int maxAttempts) throws ClientException {
        return sendingRouteData.takePartitions(this.isolatedEndpointsSet, maxAttempts);
    }

    private ListenableFuture<Partition> selectPartition(com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.Message message, MessageQueueSelector selector, Object arg) {
        String topic = message.getTopic();
        ListenableFuture<SendingTopicRouteData> future = this.getSendingTopicRouteData(topic);
        return Futures.transformAsync(future, sendingRouteData -> {
            if (sendingRouteData.isEmpty()) {
                log.warn("No available sending route for selector, namespace={}, topic={}, clientId={}", this.namespace, topic, this.id);
                throw new ClientException(ErrorCode.NO_PERMISSION);
            }
            MessageQueue mq = selector.select(sendingRouteData.getMessageQueues(), message, arg);
            SettableFuture<Partition> future0 = SettableFuture.create();
            future0.set(mq.getPartition());
            return future0;
        }, MoreExecutors.directExecutor());
    }

    @Override
    public HeartbeatRequest wrapHeartbeatRequest() {
        ProducerData producerData = ProducerData.newBuilder().setGroup(this.getPbGroup()).build();
        return HeartbeatRequest.newBuilder().setClientId(this.id).setProducerData(producerData).build();
    }

    @Override
    public void doStats() {
    }

    @Override
    public PollCommandRequest wrapPollCommandRequest() {
        PollCommandRequest.Builder builder = PollCommandRequest.newBuilder().setClientId(this.id).setProducerGroup(this.getPbGroup());
        for (String topic : this.sendingRouteDataCache.keySet()) {
            Resource topicResource = Resource.newBuilder().setResourceNamespace(this.namespace).setName(topic).build();
            builder.addTopics(topicResource);
        }
        return builder.build();
    }

    public static SendResult processSendResponse(Endpoints endpoints, SendMessageResponse response) throws ServerException {
        Status status = response.getCommon().getStatus();
        Code code = Code.forNumber(status.getCode());
        if (Code.OK.equals(code)) {
            return new SendResult(endpoints, response.getMessageId(), response.getTransactionId());
        }
        log.debug("Response indicates failure of sending message, code={}, status message=[{}]", (Object)code, (Object)status.getMessage());
        throw new ServerException(ErrorCode.OTHER, status.getMessage());
    }

    public ClientException onExecutionException(ExecutionException e) throws ServerException {
        Throwable cause = e.getCause();
        if (cause instanceof ClientException) {
            return (ClientException)cause;
        }
        if (cause instanceof ServerException) {
            throw (ServerException)cause;
        }
        if (null != cause) {
            return new ClientException(ErrorCode.OTHER, cause);
        }
        return new ClientException(ErrorCode.OTHER, (Throwable)e);
    }

    public void setTransactionChecker(TransactionChecker checker) {
        this.transactionChecker = Preconditions.checkNotNull(checker, "checker");
    }

    public int getMaxAttempts() {
        return this.maxAttempts;
    }

    public long getSendMessageTimeoutMillis() {
        return this.sendMessageTimeoutMillis;
    }

    public long getTransactionRecoverDelayMillis() {
        return this.transactionResolveDelayMillis;
    }

    public TransactionChecker getTransactionChecker() {
        return this.transactionChecker;
    }

    public void setMaxAttempts(int maxAttempts) {
        Preconditions.checkArgument(maxAttempts > 0, "Must be positive");
        this.maxAttempts = maxAttempts;
    }

    public void setSendMessageTimeoutMillis(long sendMessageTimeoutMillis) {
        Preconditions.checkArgument(sendMessageTimeoutMillis > 0L, "Must be positive");
        this.sendMessageTimeoutMillis = sendMessageTimeoutMillis;
    }

    public void setTransactionRecoverDelayMillis(long transactionResolveDelayMillis) {
        Preconditions.checkArgument(transactionResolveDelayMillis > 0L, "Must be positive");
        this.transactionResolveDelayMillis = transactionResolveDelayMillis;
    }
}

