package com.taobao.metamorphosis.storm.spout;

import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import com.taobao.gecko.core.util.LinkedTransferQueue;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.exception.MetaClientException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/metamorphosis/storm/spout/MetaSpout.class */
public class MetaSpout extends BaseRichSpout {
    private static final long serialVersionUID = 4382748324382L;
    public static final String FETCH_MAX_SIZE = "meta.fetch.max_size";
    public static final String TOPIC = "meta.topic";
    public static final int DEFAULT_MAX_SIZE = 131072;
    private transient MessageConsumer messageConsumer;
    private transient MessageSessionFactory sessionFactory;
    private final MetaClientConfig metaClientConfig;
    private final ConsumerConfig consumerConfig;
    static final Log log = LogFactory.getLog(MetaSpout.class);
    private final Scheme scheme;
    public static final long WAIT_FOR_NEXT_MESSAGE = 1;
    private transient ConcurrentHashMap<Long, MetaMessageWrapper> id2wrapperMap;
    private transient SpoutOutputCollector collector;
    private transient LinkedTransferQueue<MetaMessageWrapper> messageQueue;

    public MetaSpout(MetaClientConfig metaClientConfig, ConsumerConfig consumerConfig, Scheme scheme) {
        this.metaClientConfig = metaClientConfig;
        this.consumerConfig = consumerConfig;
        this.scheme = scheme;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        String str = (String) map.get(TOPIC);
        if (str == null) {
            throw new IllegalArgumentException("meta.topic is null");
        }
        Integer num = (Integer) map.get(FETCH_MAX_SIZE);
        if (num == null) {
            log.warn("Using default FETCH_MAX_SIZE");
            num = Integer.valueOf(DEFAULT_MAX_SIZE);
        }
        this.id2wrapperMap = new ConcurrentHashMap<>();
        this.messageQueue = new LinkedTransferQueue<>();
        try {
            this.collector = spoutOutputCollector;
            setUpMeta(str, num);
        } catch (MetaClientException e) {
            log.error("Setup meta consumer failed", e);
        }
    }

    private void setUpMeta(String str, Integer num) throws MetaClientException {
        this.sessionFactory = new MetaMessageSessionFactory(this.metaClientConfig);
        this.messageConsumer = this.sessionFactory.createConsumer(this.consumerConfig);
        this.messageConsumer.subscribe(str, num.intValue(), new MessageListener() { // from class: com.taobao.metamorphosis.storm.spout.MetaSpout.1
            public void recieveMessages(Message message) {
                MetaMessageWrapper metaMessageWrapper = new MetaMessageWrapper(message);
                MetaSpout.this.id2wrapperMap.put(Long.valueOf(message.getId()), metaMessageWrapper);
                MetaSpout.this.messageQueue.offer(metaMessageWrapper);
                try {
                    metaMessageWrapper.latch.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (metaMessageWrapper.success) {
                    return;
                }
                message.setRollbackOnly();
            }

            public Executor getExecutor() {
                return null;
            }
        }).completeSubscribe();
    }

    public void close() {
        try {
            this.messageConsumer.shutdown();
        } catch (MetaClientException e) {
            log.error("Shutdown consumer failed", e);
        }
        try {
            this.sessionFactory.shutdown();
        } catch (MetaClientException e2) {
            log.error("Shutdown session factory failed", e2);
        }
    }

    public void nextTuple() {
        if (this.messageConsumer != null) {
            try {
                MetaMessageWrapper metaMessageWrapper = (MetaMessageWrapper) this.messageQueue.poll(1L, TimeUnit.MILLISECONDS);
                if (metaMessageWrapper == null) {
                    return;
                }
                Message message = metaMessageWrapper.message;
                this.collector.emit(this.scheme.deserialize(message.getData()), Long.valueOf(message.getId()));
            } catch (InterruptedException e) {
            }
        }
    }

    public void ack(Object obj) {
        if (!(obj instanceof Long)) {
            log.warn(String.format("don't know how to ack(%s: %s)", obj.getClass().getName(), obj));
            return;
        }
        MetaMessageWrapper remove = this.id2wrapperMap.remove(Long.valueOf(((Long) obj).longValue()));
        if (remove == null) {
            log.warn(String.format("don't know how to ack(%s: %s)", obj.getClass().getName(), obj));
        } else {
            remove.success = true;
            remove.latch.countDown();
        }
    }

    public void fail(Object obj) {
        if (!(obj instanceof Long)) {
            log.warn(String.format("don't know how to reject(%s: %s)", obj.getClass().getName(), obj));
            return;
        }
        MetaMessageWrapper remove = this.id2wrapperMap.remove(Long.valueOf(((Long) obj).longValue()));
        if (remove == null) {
            log.warn(String.format("don't know how to reject(%s: %s)", obj.getClass().getName(), obj));
        } else {
            remove.success = false;
            remove.latch.countDown();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.scheme.getOutputFields());
    }

    public boolean isDistributed() {
        return true;
    }
}
