/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.client.pulsarmq;

import com.alibaba.fastjson2.JSON;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.common.utils.MQUtil;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarMQCanalConnector
implements CanalMQConnector {
    private static final Logger logger = LoggerFactory.getLogger(PulsarMQCanalConnector.class);
    private volatile Messages<byte[]> lastGetBatchMessage;
    private PulsarClient pulsarClient;
    private Consumer<byte[]> consumer;
    private boolean isFlatMessage = false;
    private String topic;
    private String serviceUrl;
    private String roleToken;
    private String subscriptName;
    private int batchSize = 30;
    private int getBatchTimeoutSeconds = 30;
    private int batchProcessTimeoutSeconds = 60;
    private int redeliveryDelaySeconds = 60;
    private int ackTimeoutSeconds = 30;
    private boolean isRetry = true;
    private boolean isRetryDLQUpperCase = false;
    private int maxRedeliveryCount = 128;
    private boolean connected = false;

    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic, String subscriptName) {
        this.isFlatMessage = isFlatMessage;
        this.serviceUrl = serviceUrl;
        this.roleToken = roleToken;
        this.topic = topic;
        this.subscriptName = subscriptName;
        if (StringUtils.isEmpty((String)this.subscriptName)) {
            throw new RuntimeException("Pulsar Consumer subscriptName required");
        }
    }

    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic, String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds, int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase, int maxRedeliveryCount) {
        this.isFlatMessage = isFlatMessage;
        this.serviceUrl = serviceUrl;
        this.roleToken = roleToken;
        this.topic = topic;
        this.subscriptName = subscriptName;
        if (StringUtils.isEmpty((String)this.subscriptName)) {
            throw new RuntimeException("Pulsar Consumer subscriptName required");
        }
        this.batchSize = batchSize;
        this.getBatchTimeoutSeconds = getBatchTimeoutSeconds;
        this.batchProcessTimeoutSeconds = batchProcessTimeoutSeconds;
        this.redeliveryDelaySeconds = redeliveryDelaySeconds;
        this.ackTimeoutSeconds = ackTimeoutSeconds;
        this.isRetry = isRetry;
        this.isRetryDLQUpperCase = isRetryDLQUpperCase;
        this.maxRedeliveryCount = maxRedeliveryCount;
    }

    @Override
    public void connect() throws CanalClientException {
        try {
            this.pulsarClient = PulsarClient.builder().serviceUrl(this.serviceUrl).authentication(AuthenticationFactory.token((String)this.roleToken)).build();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void disconnect() throws CanalClientException {
        try {
            if (null != this.consumer && this.consumer.isConnected()) {
                this.consumer.close();
            }
        }
        catch (PulsarClientException e) {
            logger.error("close pulsar consumer error", (Throwable)e);
        }
        try {
            if (null != this.pulsarClient) {
                this.pulsarClient.close();
            }
        }
        catch (PulsarClientException e) {
            logger.error("close pulsar client error", (Throwable)e);
        }
        this.connected = false;
    }

    @Override
    public boolean checkValid() throws CanalClientException {
        return this.connected;
    }

    @Override
    public synchronized void subscribe(String filter) throws CanalClientException {
        if (this.connected) {
            return;
        }
        ConsumerBuilder builder = this.pulsarClient.newConsumer();
        if (MQUtil.isPatternTopic((String)this.topic)) {
            builder.topicsPattern(this.topic);
        } else {
            builder.topic(new String[]{this.topic});
        }
        builder.subscriptionType(SubscriptionType.Failover);
        builder.negativeAckRedeliveryDelay((long)this.redeliveryDelaySeconds, TimeUnit.SECONDS).subscriptionName(this.subscriptName);
        if (this.isRetry) {
            DeadLetterPolicy.DeadLetterPolicyBuilder dlqBuilder = DeadLetterPolicy.builder().maxRedeliverCount(this.maxRedeliveryCount);
            if (!MQUtil.isPatternTag((String)this.topic)) {
                String retryTopic = this.topic + (this.isRetryDLQUpperCase ? "-RETRY" : "-retry");
                dlqBuilder.retryLetterTopic(retryTopic);
                String dlqTopic = this.topic + (this.isRetryDLQUpperCase ? "-DLQ" : "-dlq");
                dlqBuilder.deadLetterTopic(dlqTopic);
            }
            builder.enableRetry(true).deadLetterPolicy(dlqBuilder.build());
        }
        builder.ackTimeout((long)this.ackTimeoutSeconds, TimeUnit.SECONDS);
        builder.batchReceivePolicy(new BatchReceivePolicy.Builder().maxNumMessages(this.batchSize).timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS).build());
        try {
            this.consumer = builder.subscribe();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
        this.connected = true;
    }

    @Override
    public void subscribe() throws CanalClientException {
        this.subscribe(null);
    }

    @Override
    public void unsubscribe() throws CanalClientException {
        try {
            if (null != this.consumer) {
                this.consumer.unsubscribe();
            }
        }
        catch (PulsarClientException e) {
            throw new CanalClientException(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
        List<Message> messages = this.getListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    @Override
    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        return this.getListWithoutAck();
    }

    @Override
    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
        List<FlatMessage> messages = this.getFlatListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    @Override
    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        return this.getListWithoutAck();
    }

    private <T> List<T> getListWithoutAck() {
        if (null != this.lastGetBatchMessage) {
            throw new CanalClientException("mq get/ack not support concurrent & async ack");
        }
        ArrayList<Object> messageList = Lists.newArrayList();
        try {
            this.lastGetBatchMessage = this.consumer.batchReceive();
            if (null == this.lastGetBatchMessage || this.lastGetBatchMessage.size() < 1) {
                this.lastGetBatchMessage = null;
                return messageList;
            }
        }
        catch (PulsarClientException e) {
            logger.error("Receiver Pulsar MQ message error", (Throwable)e);
            throw new CanalClientException((Throwable)e);
        }
        for (org.apache.pulsar.client.api.Message msgExt : this.lastGetBatchMessage) {
            byte[] data = msgExt.getData();
            if (data == null) {
                logger.warn("Received message data is null");
                continue;
            }
            try {
                if (this.isFlatMessage) {
                    FlatMessage flatMessage = (FlatMessage)JSON.parseObject((byte[])data, FlatMessage.class);
                    messageList.add(flatMessage);
                    continue;
                }
                Message message = CanalMessageDeserializer.deserializer(data);
                messageList.add(message);
            }
            catch (Exception ex) {
                logger.error("Add message error", (Throwable)ex);
                throw new CanalClientException((Throwable)ex);
            }
        }
        return messageList;
    }

    @Override
    public void ack() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.consumer.acknowledge(this.lastGetBatchMessage);
            }
        }
        catch (Throwable e) {
            if (this.lastGetBatchMessage != null) {
                this.consumer.negativeAcknowledge(this.lastGetBatchMessage);
            }
        }
        finally {
            this.lastGetBatchMessage = null;
        }
    }

    @Override
    public void rollback() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.consumer.negativeAcknowledge(this.lastGetBatchMessage);
            }
        }
        finally {
            this.lastGetBatchMessage = null;
        }
    }

    @Override
    public Message get(int batchSize) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public Message getWithoutAck(int batchSize) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public void ack(long batchId) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public void rollback(long batchId) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }
}

