package com.taobao.metamorphosis.client.producer;

import com.taobao.gecko.core.command.ResponseCommand;
import com.taobao.gecko.core.util.ConcurrentHashSet;
import com.taobao.gecko.core.util.OpaqueGenerator;
import com.taobao.gecko.service.Connection;
import com.taobao.gecko.service.SingleRequestCallBackListener;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.MessageAccessor;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.RemotingClientWrapper;
import com.taobao.metamorphosis.client.transaction.TransactionContext;
import com.taobao.metamorphosis.client.transaction.TransactionSession;
import com.taobao.metamorphosis.cluster.Partition;
import com.taobao.metamorphosis.exception.InvalidMessageException;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.exception.MetaOpeartionTimeoutException;
import com.taobao.metamorphosis.exception.TransactionInProgressException;
import com.taobao.metamorphosis.network.BooleanCommand;
import com.taobao.metamorphosis.network.PutCommand;
import com.taobao.metamorphosis.transaction.TransactionId;
import com.taobao.metamorphosis.utils.CheckSum;
import com.taobao.metamorphosis.utils.LongSequenceGenerator;
import com.taobao.metamorphosis.utils.MessageFlagUtils;
import com.taobao.metamorphosis.utils.MessageUtils;
import com.taobao.metamorphosis.utils.MetaStatLog;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import javax.transaction.xa.XAException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/client/producer/SimpleMessageProducer.class */
public class SimpleMessageProducer implements MessageProducer, TransactionSession {
    private static final Log log;
    protected static final long DEFAULT_OP_TIMEOUT = 3000;
    private static final int TIMEOUT_THRESHOLD;
    private final MetaMessageSessionFactory messageSessionFactory;
    protected final RemotingClientWrapper remotingClient;
    protected final PartitionSelector partitionSelector;
    protected final ProducerZooKeeper producerZooKeeper;
    protected final String sessionId;
    private volatile boolean shutdown;
    private static final int MAX_RETRY = 1;
    static final Pattern RESULT_SPLITER;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected volatile int transactionTimeout = 0;
    private final ConcurrentHashSet<String> publishedTopics = new ConcurrentHashSet<>();
    protected long transactionRequestTimeoutInMills = 5000;
    protected final ThreadLocal<LastSentInfo> lastSentInfo = new ThreadLocal<>();
    protected final ThreadLocal<TransactionContext> transactionContext = new ThreadLocal<>();
    private final LongSequenceGenerator localTxIdGenerator = new LongSequenceGenerator();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/taobao/metamorphosis/client/producer/SimpleMessageProducer$LastSentInfo.class */
    public static class LastSentInfo {
        final String serverUrl;

        public LastSentInfo(String str) {
            this.serverUrl = str;
        }
    }

    public SimpleMessageProducer(MetaMessageSessionFactory metaMessageSessionFactory, RemotingClientWrapper remotingClientWrapper, PartitionSelector partitionSelector, ProducerZooKeeper producerZooKeeper, String str) {
        this.sessionId = str;
        this.messageSessionFactory = metaMessageSessionFactory;
        this.remotingClient = remotingClientWrapper;
        this.partitionSelector = partitionSelector;
        this.producerZooKeeper = producerZooKeeper;
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void setTransactionRequestTimeout(long j, TimeUnit timeUnit) {
        if (timeUnit == null) {
            throw new IllegalArgumentException("Invalid time unit");
        }
        this.transactionRequestTimeoutInMills = TimeUnit.MILLISECONDS.convert(j, timeUnit);
    }

    long getTransactionRequestTimeoutInMills() {
        return this.transactionRequestTimeoutInMills;
    }

    public MetaMessageSessionFactory getParent() {
        return this.messageSessionFactory;
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public PartitionSelector getPartitionSelector() {
        return this.partitionSelector;
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    @Deprecated
    public boolean isOrdered() {
        return false;
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void publish(String str) {
        checkState();
        checkTopic(str);
        if (this.publishedTopics.contains(str)) {
            return;
        }
        this.producerZooKeeper.publishTopic(str, this);
        this.publishedTopics.add(str);
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void setDefaultTopic(String str) {
        if (this.publishedTopics.contains(str)) {
            return;
        }
        this.producerZooKeeper.setDefaultTopic(str, this);
        this.publishedTopics.add(str);
    }

    private void checkTopic(String str) {
        if (StringUtils.isBlank(str)) {
            throw new IllegalArgumentException("Blank topic:" + str);
        }
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public SendResult sendMessage(Message message, long j, TimeUnit timeUnit) throws MetaClientException, InterruptedException {
        checkState();
        checkMessage(message);
        return sendMessageToServer(message, j, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SendResult sendMessageToServer(Message message, long j, TimeUnit timeUnit) throws MetaClientException, InterruptedException, MetaOpeartionTimeoutException {
        SendResult sendResult = null;
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        long convert = TimeUnit.MILLISECONDS.convert(j, timeUnit);
        byte[] encodePayload = MessageUtils.encodePayload(message);
        for (int i2 = 0; i2 < MAX_RETRY; i2 += MAX_RETRY) {
            try {
                sendResult = send0(message, encodePayload, j, timeUnit);
                if (sendResult.isSuccess()) {
                    break;
                }
                if (System.currentTimeMillis() - currentTimeMillis >= convert) {
                    throw new MetaOpeartionTimeoutException("Send message timeout in " + convert + " mills");
                }
                i += MAX_RETRY;
            } catch (Throwable th) {
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                MetaStatLog.addStatValue2((String) null, "cli_put_time", message.getTopic(), currentTimeMillis2);
                if (currentTimeMillis2 > TIMEOUT_THRESHOLD) {
                    MetaStatLog.addStatValue2((String) null, "cli_put_timeout", message.getTopic(), currentTimeMillis2);
                }
                if (i > 0) {
                    MetaStatLog.addStatValue2((String) null, "cli_put_retry", message.getTopic(), i);
                }
                throw th;
            }
        }
        long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
        MetaStatLog.addStatValue2((String) null, "cli_put_time", message.getTopic(), currentTimeMillis3);
        if (currentTimeMillis3 > TIMEOUT_THRESHOLD) {
            MetaStatLog.addStatValue2((String) null, "cli_put_timeout", message.getTopic(), currentTimeMillis3);
        }
        if (i > 0) {
            MetaStatLog.addStatValue2((String) null, "cli_put_retry", message.getTopic(), i);
        }
        return sendResult;
    }

    private Partition selectPartition(Message message) throws MetaClientException {
        return this.producerZooKeeper.selectPartition(message.getTopic(), message, this.partitionSelector);
    }

    private TransactionContext getTx() throws MetaClientException {
        TransactionContext transactionContext = this.transactionContext.get();
        if (transactionContext == null) {
            throw new MetaClientException("There is no transaction begun");
        }
        return transactionContext;
    }

    @Override // com.taobao.metamorphosis.client.transaction.TransactionSession
    public void removeContext(TransactionContext transactionContext) {
        if (!$assertionsDisabled && this.transactionContext.get() != transactionContext) {
            throw new AssertionError();
        }
        this.transactionContext.remove();
        resetLastSentInfo();
    }

    @Override // com.taobao.metamorphosis.client.transaction.TransactionSession
    public String getSessionId() {
        return this.sessionId;
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void setTransactionTimeout(int i) throws MetaClientException {
        if (i < 0) {
            throw new IllegalArgumentException("Illegal transaction timeout value");
        }
        this.transactionTimeout = i;
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public int getTransactionTimeout() throws MetaClientException {
        return this.transactionTimeout;
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void beginTransaction() throws MetaClientException {
        if (this.transactionContext.get() != null) {
            throw new TransactionInProgressException("A transaction has begun");
        }
        this.transactionContext.set(new TransactionContext(this.remotingClient, null, this, this.localTxIdGenerator, this.transactionTimeout, this.transactionRequestTimeoutInMills));
    }

    protected void beforeSendMessageFirstTime(String str) throws MetaClientException, XAException {
        TransactionContext tx = getTx();
        if (tx.getTransactionId() == null) {
            tx.setServerUrl(str);
            tx.begin();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void logLastSentInfo(String str) {
        if (isInTransaction() && this.lastSentInfo.get() == null) {
            this.lastSentInfo.set(new LastSentInfo(str));
        }
    }

    protected TransactionId getTransactionId() throws MetaClientException {
        if (isInTransaction()) {
            return getTx().getTransactionId();
        }
        return null;
    }

    protected boolean isInTransaction() {
        return this.transactionContext.get() != null;
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void commit() throws MetaClientException {
        try {
            getTx().commit();
            resetLastSentInfo();
            this.transactionContext.remove();
        } catch (Throwable th) {
            resetLastSentInfo();
            this.transactionContext.remove();
            throw th;
        }
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void rollback() throws MetaClientException {
        try {
            getTx().rollback();
            resetLastSentInfo();
            this.transactionContext.remove();
        } catch (Throwable th) {
            resetLastSentInfo();
            this.transactionContext.remove();
            throw th;
        }
    }

    protected void resetLastSentInfo() {
        this.lastSentInfo.remove();
    }

    private SendResult send0(Message message, byte[] bArr, long j, TimeUnit timeUnit) throws InterruptedException, MetaClientException {
        LastSentInfo lastSentInfo;
        try {
            String topic = message.getTopic();
            Partition partition = null;
            String str = null;
            if (isInTransaction() && (lastSentInfo = this.lastSentInfo.get()) != null) {
                str = lastSentInfo.serverUrl;
                partition = this.producerZooKeeper.selectPartition(topic, message, this.partitionSelector, str);
                if (partition == null) {
                    throw new MetaClientException("There is no partitions in `" + str + "` to send message with topic `" + topic + "` in a transaction");
                }
            }
            if (partition == null) {
                partition = selectPartition(message);
            }
            if (partition == null) {
                throw new MetaClientException("There is no aviable partition for topic " + topic + ",maybe you don't publish it at first?");
            }
            if (str == null) {
                str = this.producerZooKeeper.selectBroker(topic, partition);
            }
            if (str == null) {
                throw new MetaClientException("There is no aviable server right now for topic " + topic + " and partition " + partition + ",maybe you don't publish it at first?");
            }
            if (isInTransaction() && this.lastSentInfo.get() == null) {
                beforeSendMessageFirstTime(str);
            }
            return genSendResult(message, partition, str, invokeToGroup(str, partition, new PutCommand(topic, partition.getPartition(), bArr, MessageFlagUtils.getFlag(message), CheckSum.crc32(bArr), getTransactionId(), Integer.valueOf(OpaqueGenerator.getNextOpaque())), message, j, timeUnit));
        } catch (Exception e) {
            throw new MetaClientException("send message failed", e);
        } catch (MetaClientException e2) {
            throw e2;
        } catch (InterruptedException e3) {
            throw e3;
        } catch (TimeoutException e4) {
            throw new MetaOpeartionTimeoutException("Send message timeout in " + TimeUnit.MILLISECONDS.convert(j, timeUnit) + " mills");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SendResult genSendResult(Message message, Partition partition, String str, BooleanCommand booleanCommand) {
        String errorMsg = booleanCommand.getErrorMsg();
        switch (booleanCommand.getCode()) {
            case 200:
                String[] split = RESULT_SPLITER.split(errorMsg);
                MessageAccessor.setId(message, Long.parseLong(split[0]));
                Partition partition2 = new Partition(partition.getBrokerId(), Integer.parseInt(split[MAX_RETRY]));
                MessageAccessor.setPartition(message, partition2);
                logLastSentInfo(str);
                return new SendResult(true, partition2, Long.parseLong(split[2]), null);
            case 403:
                if (log.isDebugEnabled()) {
                    log.debug(errorMsg);
                }
                return new SendResult(false, null, -1L, String.valueOf(403));
            default:
                return new SendResult(false, null, -1L, errorMsg);
        }
    }

    protected BooleanCommand invokeToGroup(String str, Partition partition, PutCommand putCommand, Message message, long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException, NotifyRemotingException {
        return this.remotingClient.invokeToGroup(str, putCommand, j, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkState() {
        if (this.shutdown) {
            throw new IllegalStateException("Producer has been shutdown");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkMessage(Message message) throws MetaClientException {
        if (message == null) {
            throw new InvalidMessageException("Null message");
        }
        if (StringUtils.isBlank(message.getTopic())) {
            throw new InvalidMessageException("Blank topic");
        }
        if (message.getData() == null) {
            throw new InvalidMessageException("Null data");
        }
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void sendMessage(final Message message, final SendMessageCallback sendMessageCallback, long j, TimeUnit timeUnit) {
        try {
            String topic = message.getTopic();
            final Partition selectPartition = selectPartition(message);
            if (selectPartition == null) {
                throw new MetaClientException("There is no aviable partition for topic " + topic + ",maybe you don't publish it at first?");
            }
            final String selectBroker = this.producerZooKeeper.selectBroker(topic, selectPartition);
            if (selectBroker == null) {
                throw new MetaClientException("There is no aviable server right now for topic " + topic + " and partition " + selectPartition + ",maybe you don't publish it at first?");
            }
            int flag = MessageFlagUtils.getFlag(message);
            byte[] encodePayload = MessageUtils.encodePayload(message);
            this.remotingClient.sendToGroup(selectBroker, new PutCommand(topic, selectPartition.getPartition(), encodePayload, flag, CheckSum.crc32(encodePayload), getTransactionId(), Integer.valueOf(OpaqueGenerator.getNextOpaque())), new SingleRequestCallBackListener() { // from class: com.taobao.metamorphosis.client.producer.SimpleMessageProducer.1
                public void onResponse(ResponseCommand responseCommand, Connection connection) {
                    sendMessageCallback.onMessageSent(SimpleMessageProducer.this.genSendResult(message, selectPartition, selectBroker, (BooleanCommand) responseCommand));
                }

                public void onException(Exception exc) {
                    sendMessageCallback.onException(exc);
                }

                public ThreadPoolExecutor getExecutor() {
                    return null;
                }
            }, j, timeUnit);
        } catch (Throwable th) {
            sendMessageCallback.onException(th);
        }
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public void sendMessage(Message message, SendMessageCallback sendMessageCallback) {
        sendMessage(message, sendMessageCallback, DEFAULT_OP_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer
    public SendResult sendMessage(Message message) throws MetaClientException, InterruptedException {
        return sendMessage(message, DEFAULT_OP_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    @Override // com.taobao.metamorphosis.client.producer.MessageProducer, com.taobao.metamorphosis.client.Shutdownable
    public synchronized void shutdown() throws MetaClientException {
        if (this.shutdown) {
            return;
        }
        Iterator it = this.publishedTopics.iterator();
        while (it.hasNext()) {
            this.producerZooKeeper.unPublishTopic((String) it.next(), this);
        }
        this.shutdown = true;
        this.publishedTopics.clear();
        this.messageSessionFactory.removeChild(this);
    }

    static {
        $assertionsDisabled = !SimpleMessageProducer.class.desiredAssertionStatus();
        log = LogFactory.getLog(SimpleMessageProducer.class);
        TIMEOUT_THRESHOLD = Integer.parseInt(System.getProperty("meta.send.timeout.threshold", "200"));
        RESULT_SPLITER = Pattern.compile(" ");
    }
}
