package com.hazelcast.client.proxy;

import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.config.ClientReliableTopicConfig;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.MemberImpl;
import com.hazelcast.client.spi.ClientProxy;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.logging.ILogger;
import com.hazelcast.monitor.LocalTopicStats;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.topic.ReliableMessageListener;
import com.hazelcast.topic.TopicOverloadException;
import com.hazelcast.topic.TopicOverloadPolicy;
import com.hazelcast.topic.impl.reliable.ReliableMessageListenerAdapter;
import com.hazelcast.topic.impl.reliable.ReliableTopicMessage;
import com.hazelcast.topic.impl.reliable.ReliableTopicService;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.UuidUtil;
import com.hazelcast.version.MemberVersion;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-all-3.8.3.jar:com/hazelcast/client/proxy/ClientReliableTopicProxy.class */
public class ClientReliableTopicProxy<E> extends ClientProxy implements ITopic<E> {
    private static final int MAX_BACKOFF = 2000;
    private static final int INITIAL_BACKOFF_MS = 100;
    private final ILogger logger;
    private final ConcurrentMap<String, ClientReliableTopicProxy<E>.MessageRunner> runnersMap;
    private final Ringbuffer ringbuffer;
    private final SerializationService serializationService;
    private final ClientReliableTopicConfig config;
    private final Executor executor;
    private final TopicOverloadPolicy overloadPolicy;

    /* loaded from: input_file:BOOT-INF/lib/hazelcast-all-3.8.3.jar:com/hazelcast/client/proxy/ClientReliableTopicProxy$MessageRunner.class */
    class MessageRunner implements ExecutionCallback<ReadResultSet<ReliableTopicMessage>> {
        final ReliableMessageListener<E> listener;
        private final String id;
        private long sequence;
        private volatile boolean cancelled;

        public MessageRunner(String str, ReliableMessageListener<E> reliableMessageListener) {
            this.id = str;
            this.listener = reliableMessageListener;
            long retrieveInitialSequence = reliableMessageListener.retrieveInitialSequence();
            this.sequence = retrieveInitialSequence == -1 ? ClientReliableTopicProxy.this.ringbuffer.tailSequence() + 1 : retrieveInitialSequence;
        }

        void next() {
            if (this.cancelled) {
                return;
            }
            ClientReliableTopicProxy.this.ringbuffer.readManyAsync(this.sequence, 1, ClientReliableTopicProxy.this.config.getReadBatchSize(), null).andThen(this, ClientReliableTopicProxy.this.executor);
        }

        @Override // com.hazelcast.core.ExecutionCallback
        public void onResponse(ReadResultSet<ReliableTopicMessage> readResultSet) {
            for (ReliableTopicMessage reliableTopicMessage : readResultSet) {
                if (this.cancelled) {
                    return;
                }
                try {
                    this.listener.storeSequence(this.sequence);
                    process(reliableTopicMessage);
                } catch (Throwable th) {
                    if (terminate(th)) {
                        cancel();
                        return;
                    }
                }
                this.sequence++;
            }
            next();
        }

        private void process(ReliableTopicMessage reliableTopicMessage) throws Throwable {
            this.listener.onMessage(toMessage(reliableTopicMessage));
        }

        private Message<E> toMessage(ReliableTopicMessage reliableTopicMessage) {
            MemberImpl memberImpl = null;
            if (reliableTopicMessage.getPublisherAddress() != null) {
                memberImpl = new MemberImpl(reliableTopicMessage.getPublisherAddress(), MemberVersion.UNKNOWN);
            }
            return new Message<>(ClientReliableTopicProxy.this.name, ClientReliableTopicProxy.this.serializationService.toObject(reliableTopicMessage.getPayload()), reliableTopicMessage.getPublishTime(), memberImpl);
        }

        @Override // com.hazelcast.core.ExecutionCallback
        public void onFailure(Throwable th) {
            if (this.cancelled) {
                return;
            }
            RuntimeException peel = ExceptionUtil.peel(th);
            if (peel instanceof StaleSequenceException) {
                long headSequence = ClientReliableTopicProxy.this.ringbuffer.headSequence();
                if (this.listener.isLossTolerant()) {
                    if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                        ClientReliableTopicProxy.this.logger.finest("MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + " ran into a stale sequence. Jumping from oldSequence: " + this.sequence + " to sequence: " + headSequence);
                    }
                    this.sequence = headSequence;
                    next();
                    return;
                }
                ClientReliableTopicProxy.this.logger.warning("Terminating MessageListener:" + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". Reason: The listener was too slow or the retention period of the message has been violated. head: " + headSequence + " sequence:" + this.sequence);
            } else if (peel instanceof HazelcastInstanceNotActiveException) {
                if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                    ClientReliableTopicProxy.this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ".  Reason: HazelcastInstance is shutting down");
                }
            } else if (peel instanceof DistributedObjectDestroyedException) {
                if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                    ClientReliableTopicProxy.this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". Reason: Topic is destroyed");
                }
            } else if (!(peel instanceof HazelcastClientNotActiveException) && !(peel instanceof RejectedExecutionException)) {
                ClientReliableTopicProxy.this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". Reason: Unhandled exception, message: " + peel.getMessage(), peel);
            } else if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                ClientReliableTopicProxy.this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". Reason: HazelcastClient is shutting down");
            }
            cancel();
        }

        void cancel() {
            this.cancelled = true;
            ClientReliableTopicProxy.this.runnersMap.remove(this.id);
        }

        private boolean terminate(Throwable th) {
            if (this.cancelled) {
                return true;
            }
            try {
                boolean isTerminal = this.listener.isTerminal(th);
                if (isTerminal) {
                    ClientReliableTopicProxy.this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". Reason: Unhandled exception, message: " + th.getMessage(), th);
                } else if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                    ClientReliableTopicProxy.this.logger.finest("MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + " ran into an exception: message:" + th.getMessage(), th);
                }
                return isTerminal;
            } catch (Throwable th2) {
                ClientReliableTopicProxy.this.logger.warning("Terminating messageListener:" + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". Reason: Unhandled exception while calling ReliableMessageListener.isTerminal() method", th2);
                return true;
            }
        }
    }

    public ClientReliableTopicProxy(String str, HazelcastClientInstanceImpl hazelcastClientInstanceImpl) {
        super(ReliableTopicService.SERVICE_NAME, str);
        this.runnersMap = new ConcurrentHashMap();
        this.ringbuffer = hazelcastClientInstanceImpl.getRingbuffer(RingbufferService.TOPIC_RB_PREFIX + str);
        this.serializationService = hazelcastClientInstanceImpl.getSerializationService();
        this.config = hazelcastClientInstanceImpl.getClientConfig().getReliableTopicConfig(str);
        this.executor = getExecutor(this.config, hazelcastClientInstanceImpl);
        this.overloadPolicy = this.config.getTopicOverloadPolicy();
        this.logger = hazelcastClientInstanceImpl.getLoggingService().getLogger(getClass());
    }

    private Executor getExecutor(ClientReliableTopicConfig clientReliableTopicConfig, HazelcastClientInstanceImpl hazelcastClientInstanceImpl) {
        Executor executor = clientReliableTopicConfig.getExecutor();
        if (executor == null) {
            executor = hazelcastClientInstanceImpl.getClientExecutionService().getUserExecutor();
        }
        return executor;
    }

    @Override // com.hazelcast.core.ITopic
    public void publish(E e) {
        try {
            ReliableTopicMessage reliableTopicMessage = new ReliableTopicMessage(this.serializationService.toData(e), null);
            switch (this.overloadPolicy) {
                case ERROR:
                    addOrFail(reliableTopicMessage);
                    break;
                case DISCARD_OLDEST:
                    addOrOverwrite(reliableTopicMessage);
                    break;
                case DISCARD_NEWEST:
                    this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.FAIL).get();
                    break;
                case BLOCK:
                    addWithBackoff(reliableTopicMessage);
                    break;
                default:
                    throw new IllegalArgumentException("Unknown overloadPolicy:" + this.overloadPolicy);
            }
        } catch (Exception e2) {
            throw ((RuntimeException) ExceptionUtil.peel(e2, null, "Failed to publish message: " + e + " to topic:" + getName()));
        }
    }

    private Long addOrOverwrite(ReliableTopicMessage reliableTopicMessage) throws Exception {
        return this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.OVERWRITE).get();
    }

    private void addOrFail(ReliableTopicMessage reliableTopicMessage) throws Exception {
        if (this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.FAIL).get().longValue() == -1) {
            throw new TopicOverloadException("Failed to publish message: " + reliableTopicMessage + " on topic:" + this.name);
        }
    }

    private void addWithBackoff(ReliableTopicMessage reliableTopicMessage) throws Exception {
        long j = 100;
        while (this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.FAIL).get().longValue() == -1) {
            TimeUnit.MILLISECONDS.sleep(j);
            j *= 2;
            if (j > 2000) {
                j = 2000;
            }
        }
    }

    @Override // com.hazelcast.core.ITopic
    public String addMessageListener(MessageListener<E> messageListener) {
        Preconditions.checkNotNull(messageListener, "listener can't be null");
        String newUnsecureUuidString = UuidUtil.newUnsecureUuidString();
        ClientReliableTopicProxy<E>.MessageRunner messageRunner = new MessageRunner(newUnsecureUuidString, toReliableMessageListener(messageListener));
        this.runnersMap.put(newUnsecureUuidString, messageRunner);
        messageRunner.next();
        return newUnsecureUuidString;
    }

    private ReliableMessageListener<E> toReliableMessageListener(MessageListener<E> messageListener) {
        return messageListener instanceof ReliableMessageListener ? (ReliableMessageListener) messageListener : new ReliableMessageListenerAdapter(messageListener);
    }

    @Override // com.hazelcast.core.ITopic
    public boolean removeMessageListener(String str) {
        Preconditions.checkNotNull(str, "registrationId can't be null");
        ClientReliableTopicProxy<E>.MessageRunner messageRunner = this.runnersMap.get(str);
        if (messageRunner == null) {
            return false;
        }
        messageRunner.cancel();
        return true;
    }

    @Override // com.hazelcast.core.ITopic
    public LocalTopicStats getLocalTopicStats() {
        throw new UnsupportedOperationException("Locality is ambiguous for client!!!");
    }

    public Ringbuffer getRingbuffer() {
        return this.ringbuffer;
    }

    public String toString() {
        return "ITopic{name='" + this.name + "'}";
    }
}
