/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusCreateSessionOptions;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import com.azure.messaging.servicebus.implementation.ServiceBusSession;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;

class ServiceBusReactorSession
extends ReactorSession
implements ServiceBusSession {
    static final Symbol SESSION_FILTER = Symbol.getSymbol((String)"com.microsoft:session-filter");
    static final Symbol LOCKED_UNTIL_UTC = Symbol.getSymbol((String)"com.microsoft:locked-until-utc");
    private static final Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol((String)"com.microsoft:timeout");
    private static final Symbol ENTITY_TYPE_PROPERTY = Symbol.getSymbol((String)"com.microsoft:entity-type");
    private static final Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol((String)"com.microsoft:transfer-destination-address");
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReactorSession.class);
    private final AmqpRetryPolicy retryPolicy;
    private final TokenManagerProvider tokenManagerProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
    private final AmqpConnection amqpConnection;
    private final AmqpRetryOptions retryOptions;
    private final boolean distributedTransactionsSupport;

    ServiceBusReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler, String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider, Mono<ClaimsBasedSecurityNode> cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions, ServiceBusCreateSessionOptions createOptions) {
        super(amqpConnection, session, sessionHandler, sessionName, provider, handlerProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, retryOptions);
        this.amqpConnection = amqpConnection;
        this.retryOptions = retryOptions;
        this.retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)retryOptions);
        this.tokenManagerProvider = tokenManagerProvider;
        this.cbsNodeSupplier = cbsNodeSupplier;
        this.distributedTransactionsSupport = createOptions.isDistributedTransactionsSupported();
    }

    @Override
    public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath, MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode, String clientIdentifier) {
        HashMap<Symbol, Object> filter = new HashMap<Symbol, Object>();
        return this.createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter, clientIdentifier);
    }

    @Override
    public Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath, MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode, String clientIdentifier, String sessionId) {
        HashMap<Symbol, Object> filter = new HashMap<Symbol, Object>();
        filter.put(SESSION_FILTER, sessionId);
        return this.createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter, clientIdentifier);
    }

    @Override
    public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry, String transferEntityPath, String clientIdentifier) {
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Objects.requireNonNull(timeout, "'timeout' cannot be null.");
        Objects.requireNonNull(retry, "'retry' cannot be null.");
        Duration serverTimeout = MessageUtils.adjustServerTimeout(timeout);
        HashMap<Symbol, Object> linkProperties = new HashMap<Symbol, Object>();
        linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf((long)serverTimeout.toMillis()));
        linkProperties.put(AmqpConstants.CLIENT_IDENTIFIER, clientIdentifier);
        if (!CoreUtils.isNullOrEmpty((CharSequence)transferEntityPath)) {
            linkProperties.put(LINK_TRANSFER_DESTINATION_PROPERTY, transferEntityPath);
            LOGGER.atVerbose().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).addKeyValue("transferEntityPath", transferEntityPath).log("Get or create sender link.");
            TokenManager tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, transferEntityPath);
            return tokenManager.authorize().doFinally(signalType -> tokenManager.close()).then(this.createProducer(linkName, entityPath, timeout, retry, linkProperties));
        }
        LOGGER.atVerbose().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).log("Get or create sender link.");
        return this.createProducer(linkName, entityPath, timeout, retry, linkProperties);
    }

    public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        return this.createProducer(linkName, entityPath, timeout, retry, null);
    }

    protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties) {
        if (this.distributedTransactionsSupport) {
            return this.getOrCreateTransactionCoordinator().flatMap(coordinator -> super.createProducer(linkName, entityPath, timeout, retry, linkProperties));
        }
        return super.createProducer(linkName, entityPath, timeout, retry, linkProperties);
    }

    protected ReactorReceiver createConsumer(String entityPath, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
        return new ServiceBusReactorReceiver(this.amqpConnection, entityPath, receiver, receiveLinkHandler, tokenManager, reactorProvider, this.retryOptions.getTryTimeout(), this.retryPolicy);
    }

    private Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath, MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode, Map<Symbol, Object> filter, String clientIdentifier) {
        ReceiverSettleMode receiverSettleMode;
        SenderSettleMode senderSettleMode;
        Objects.requireNonNull(linkName, "'linkName' cannot be null.");
        Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        Objects.requireNonNull(timeout, "'timeout' cannot be null.");
        Objects.requireNonNull(retry, "'retry' cannot be null.");
        Objects.requireNonNull(receiveMode, "'receiveMode' cannot be null.");
        HashMap<Symbol, Object> linkProperties = new HashMap<Symbol, Object>();
        Duration serverTimeout = MessageUtils.adjustServerTimeout(timeout);
        linkProperties.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf((long)serverTimeout.toMillis()));
        linkProperties.put(AmqpConstants.CLIENT_RECEIVER_IDENTIFIER, clientIdentifier);
        if (entityType != null) {
            linkProperties.put(ENTITY_TYPE_PROPERTY, entityType.getValue());
        }
        switch (receiveMode) {
            case PEEK_LOCK: {
                senderSettleMode = SenderSettleMode.UNSETTLED;
                receiverSettleMode = ReceiverSettleMode.SECOND;
                break;
            }
            case RECEIVE_AND_DELETE: {
                senderSettleMode = SenderSettleMode.SETTLED;
                receiverSettleMode = ReceiverSettleMode.FIRST;
                break;
            }
            default: {
                return Mono.error((Throwable)new RuntimeException("ReceiveMode is not supported: " + (Object)((Object)receiveMode)));
            }
        }
        if (this.distributedTransactionsSupport) {
            return this.getOrCreateTransactionCoordinator().flatMap(transactionCoordinator -> this.createConsumer(linkName, entityPath, timeout, retry, filter, linkProperties, null, senderSettleMode, receiverSettleMode).cast(ServiceBusReceiveLink.class));
        }
        return this.createConsumer(linkName, entityPath, timeout, retry, filter, linkProperties, null, senderSettleMode, receiverSettleMode).cast(ServiceBusReceiveLink.class);
    }
}

