package com.taobao.metamorphosis.client.extension.spring;

import com.taobao.gecko.core.util.StringUtils;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.RejectConsumptionHandler;
import com.taobao.metamorphosis.exception.MetaClientException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:com/taobao/metamorphosis/client/extension/spring/MessageListenerContainer.class */
public class MessageListenerContainer implements InitializingBean, DisposableBean {
    private MessageBodyConverter<?> messageBodyConverter;
    private static final Log log = LogFactory.getLog(MessageListenerContainer.class);
    private volatile MessageConsumer sharedConsumer;
    private MessageSessionFactory messageSessionFactory;
    private MetaqTopic defaultTopic;
    private DefaultMessageListener<?> defaultMessageListener;
    private RejectConsumptionHandler rejectConsumptionHandler;
    private Map<MetaqTopic, ? extends DefaultMessageListener<?>> subscribers = new HashMap();
    private boolean shareConsumer = false;
    protected final CopyOnWriteArraySet<MessageConsumer> consumers = new CopyOnWriteArraySet<>();

    public MetaqTopic getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(MetaqTopic metaqTopic) {
        this.defaultTopic = metaqTopic;
    }

    public RejectConsumptionHandler getRejectConsumptionHandler() {
        return this.rejectConsumptionHandler;
    }

    public void setRejectConsumptionHandler(RejectConsumptionHandler rejectConsumptionHandler) {
        this.rejectConsumptionHandler = rejectConsumptionHandler;
    }

    public DefaultMessageListener<?> getDefaultMessageListener() {
        return this.defaultMessageListener;
    }

    public void setDefaultMessageListener(DefaultMessageListener<?> defaultMessageListener) {
        this.defaultMessageListener = defaultMessageListener;
    }

    protected MessageConsumer getMessageConsumer(MetaqTopic metaqTopic) throws MetaClientException {
        MessageConsumer messageConsumer0 = getMessageConsumer0(metaqTopic);
        if (this.rejectConsumptionHandler != null) {
            messageConsumer0.setRejectConsumptionHandler(this.rejectConsumptionHandler);
        }
        return messageConsumer0;
    }

    private MessageConsumer getMessageConsumer0(MetaqTopic metaqTopic) throws MetaClientException {
        if (!this.shareConsumer) {
            if (this.defaultMessageListener != null || this.defaultTopic != null) {
                throw new IllegalStateException("You can't provide default topic or message listener when not sharing consumer.");
            }
            MessageConsumer createConsumer = this.messageSessionFactory.createConsumer(metaqTopic.getConsumerConfig());
            this.consumers.add(createConsumer);
            return createConsumer;
        }
        if (this.sharedConsumer == null) {
            if (this.defaultTopic == null) {
                throw new IllegalArgumentException("Please provide default topic when sharing consumer.");
            }
            synchronized (this) {
                if (this.sharedConsumer == null) {
                    this.sharedConsumer = this.messageSessionFactory.createConsumer(this.defaultTopic.getConsumerConfig());
                    if (!StringUtils.isBlank(this.defaultTopic.getTopic())) {
                        this.sharedConsumer.subscribe(this.defaultTopic.getTopic(), this.defaultTopic.getMaxBufferSize(), this.defaultMessageListener);
                    }
                    this.consumers.add(this.sharedConsumer);
                }
            }
        }
        return this.sharedConsumer;
    }

    public void destroy() throws Exception {
        if (this.sharedConsumer != null) {
            shutdownConsumer(this.sharedConsumer);
            this.sharedConsumer = null;
        }
        Iterator<MessageConsumer> it = this.consumers.iterator();
        while (it.hasNext()) {
            shutdownConsumer(it.next());
        }
        this.consumers.clear();
    }

    private void shutdownConsumer(MessageConsumer messageConsumer) {
        try {
            messageConsumer.shutdown();
        } catch (MetaClientException e) {
            log.error("Shutdown consumer failed", e);
        }
    }

    public boolean isShareConsumer() {
        return this.shareConsumer;
    }

    public void setShareConsumer(boolean z) {
        this.shareConsumer = z;
    }

    public MessageSessionFactory getMessageSessionFactory() {
        return this.messageSessionFactory;
    }

    public void setMessageSessionFactory(MessageSessionFactory messageSessionFactory) {
        this.messageSessionFactory = messageSessionFactory;
    }

    public void afterPropertiesSet() throws Exception {
        log.info("Start to initialize message listener container.");
        if (this.subscribers != null) {
            HashSet hashSet = new HashSet();
            for (Map.Entry<MetaqTopic, ? extends DefaultMessageListener<?>> entry : this.subscribers.entrySet()) {
                MetaqTopic key = entry.getKey();
                DefaultMessageListener<?> value = entry.getValue();
                if (key == null) {
                    throw new IllegalArgumentException("Topic is null");
                }
                if (StringUtils.isBlank(key.getTopic())) {
                    throw new IllegalArgumentException("Blank topic");
                }
                MessageConsumer messageConsumer = getMessageConsumer(key);
                if (messageConsumer == null) {
                    throw new IllegalStateException("Get or create consumer failed");
                }
                log.info("Subscribe topic=" + key.getTopic() + " with group=" + key.getGroup());
                if (value.getMessageBodyConverter() == null) {
                    value.setMessageBodyConverter(this.messageBodyConverter);
                }
                messageConsumer.subscribe(key.getTopic(), key.getMaxBufferSize(), value);
                hashSet.add(messageConsumer);
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                ((MessageConsumer) it.next()).completeSubscribe();
            }
        }
        log.info("Initialize message listener container successfully.");
    }

    public MessageBodyConverter<?> getMessageBodyConverter() {
        return this.messageBodyConverter;
    }

    public Map<MetaqTopic, ? extends DefaultMessageListener<?>> getSubscribers() {
        return this.subscribers;
    }

    public void setSubscribers(Map map) {
        this.subscribers = map;
    }

    public void setMessageBodyConverter(MessageBodyConverter<?> messageBodyConverter) {
        this.messageBodyConverter = messageBodyConverter;
    }
}
