/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.extension.spring;

import com.taobao.gecko.core.util.StringUtils;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.RejectConsumptionHandler;
import com.taobao.metamorphosis.client.extension.spring.DefaultMessageListener;
import com.taobao.metamorphosis.client.extension.spring.MessageBodyConverter;
import com.taobao.metamorphosis.client.extension.spring.MetaqTopic;
import com.taobao.metamorphosis.exception.MetaClientException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

public class MessageListenerContainer
implements InitializingBean,
DisposableBean {
    private MessageBodyConverter<?> messageBodyConverter;
    private Map<MetaqTopic, ? extends DefaultMessageListener<?>> subscribers = new HashMap();
    private boolean shareConsumer = false;
    private static final Log log = LogFactory.getLog(MessageListenerContainer.class);
    private volatile MessageConsumer sharedConsumer;
    private MessageSessionFactory messageSessionFactory;
    private MetaqTopic defaultTopic;
    private DefaultMessageListener<?> defaultMessageListener;
    private RejectConsumptionHandler rejectConsumptionHandler;
    protected final CopyOnWriteArraySet<MessageConsumer> consumers = new CopyOnWriteArraySet();

    public MetaqTopic getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(MetaqTopic defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public RejectConsumptionHandler getRejectConsumptionHandler() {
        return this.rejectConsumptionHandler;
    }

    public void setRejectConsumptionHandler(RejectConsumptionHandler rejectConsumptionHandler) {
        this.rejectConsumptionHandler = rejectConsumptionHandler;
    }

    public DefaultMessageListener<?> getDefaultMessageListener() {
        return this.defaultMessageListener;
    }

    public void setDefaultMessageListener(DefaultMessageListener<?> defaultMessageListener) {
        this.defaultMessageListener = defaultMessageListener;
    }

    protected MessageConsumer getMessageConsumer(MetaqTopic topic) throws MetaClientException {
        MessageConsumer consumer = this.getMessageConsumer0(topic);
        if (this.rejectConsumptionHandler != null) {
            consumer.setRejectConsumptionHandler(this.rejectConsumptionHandler);
        }
        return consumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private MessageConsumer getMessageConsumer0(MetaqTopic topic) throws MetaClientException {
        if (this.shareConsumer) {
            if (this.sharedConsumer == null) {
                if (this.defaultTopic == null) {
                    throw new IllegalArgumentException("Please provide default topic when sharing consumer.");
                }
                MessageListenerContainer messageListenerContainer = this;
                synchronized (messageListenerContainer) {
                    if (this.sharedConsumer == null) {
                        this.sharedConsumer = this.messageSessionFactory.createConsumer(this.defaultTopic.getConsumerConfig());
                        if (!StringUtils.isBlank((String)this.defaultTopic.getTopic())) {
                            this.sharedConsumer.subscribe(this.defaultTopic.getTopic(), this.defaultTopic.getMaxBufferSize(), this.defaultMessageListener);
                        }
                        this.consumers.add(this.sharedConsumer);
                    }
                }
            }
            return this.sharedConsumer;
        }
        if (this.defaultMessageListener != null || this.defaultTopic != null) {
            throw new IllegalStateException("You can't provide default topic or message listener when not sharing consumer.");
        }
        MessageConsumer consumer = this.messageSessionFactory.createConsumer(topic.getConsumerConfig());
        this.consumers.add(consumer);
        return consumer;
    }

    public void destroy() throws Exception {
        if (this.sharedConsumer != null) {
            this.shutdownConsumer(this.sharedConsumer);
            this.sharedConsumer = null;
        }
        for (MessageConsumer consumer : this.consumers) {
            this.shutdownConsumer(consumer);
        }
        this.consumers.clear();
    }

    private void shutdownConsumer(MessageConsumer consumer) {
        try {
            consumer.shutdown();
        }
        catch (MetaClientException e) {
            log.error((Object)"Shutdown consumer failed", (Throwable)e);
        }
    }

    public boolean isShareConsumer() {
        return this.shareConsumer;
    }

    public void setShareConsumer(boolean shareConsumer) {
        this.shareConsumer = shareConsumer;
    }

    public MessageSessionFactory getMessageSessionFactory() {
        return this.messageSessionFactory;
    }

    public void setMessageSessionFactory(MessageSessionFactory messageSessionFactory) {
        this.messageSessionFactory = messageSessionFactory;
    }

    public void afterPropertiesSet() throws Exception {
        log.info((Object)"Start to initialize message listener container.");
        if (this.subscribers != null) {
            HashSet<MessageConsumer> consumers = new HashSet<MessageConsumer>();
            for (Map.Entry<MetaqTopic, DefaultMessageListener<?>> entry : this.subscribers.entrySet()) {
                MetaqTopic topic = entry.getKey();
                DefaultMessageListener<?> listener = entry.getValue();
                if (topic == null) {
                    throw new IllegalArgumentException("Topic is null");
                }
                if (StringUtils.isBlank((String)topic.getTopic())) {
                    throw new IllegalArgumentException("Blank topic");
                }
                MessageConsumer consumer = this.getMessageConsumer(topic);
                if (consumer == null) {
                    throw new IllegalStateException("Get or create consumer failed");
                }
                log.info((Object)("Subscribe topic=" + topic.getTopic() + " with group=" + topic.getGroup()));
                if (listener.getMessageBodyConverter() == null) {
                    listener.setMessageBodyConverter(this.messageBodyConverter);
                }
                consumer.subscribe(topic.getTopic(), topic.getMaxBufferSize(), listener);
                consumers.add(consumer);
            }
            for (MessageConsumer consumer : consumers) {
                consumer.completeSubscribe();
            }
        }
        log.info((Object)"Initialize message listener container successfully.");
    }

    public MessageBodyConverter<?> getMessageBodyConverter() {
        return this.messageBodyConverter;
    }

    public Map<MetaqTopic, ? extends DefaultMessageListener<?>> getSubscribers() {
        return this.subscribers;
    }

    public void setSubscribers(Map subscribers) {
        this.subscribers = subscribers;
    }

    public void setMessageBodyConverter(MessageBodyConverter<?> messageBodyConverter) {
        this.messageBodyConverter = messageBodyConverter;
    }
}

