/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.metamorphosis.client.extension.spring;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.client.extension.spring.MessageBodyConverter;
import com.taobao.metamorphosis.client.extension.spring.MetaqMessage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

public abstract class DefaultMessageListener<T>
implements MessageListener,
InitializingBean,
DisposableBean {
    private MessageBodyConverter<?> messageBodyConverter;
    static final Log log = LogFactory.getLog(DefaultMessageListener.class);
    private int processThreads = -1;
    private ExecutorService executor;

    protected void setExecutor(ExecutorService executor) {
        this.executor = executor;
    }

    public int getProcessThreads() {
        return this.processThreads;
    }

    public MessageBodyConverter<?> getMessageBodyConverter() {
        return this.messageBodyConverter;
    }

    public void setMessageBodyConverter(MessageBodyConverter<?> messageBodyConverter) {
        this.messageBodyConverter = messageBodyConverter;
    }

    public void setProcessThreads(int processThreads) {
        if (processThreads < 0) {
            throw new IllegalArgumentException("Invalid processThreads value:" + processThreads);
        }
        this.processThreads = processThreads;
    }

    @Override
    public void recieveMessages(Message message) throws InterruptedException {
        if (this.messageBodyConverter != null) {
            try {
                Object body = this.messageBodyConverter.fromByteArray(message.getData());
                this.onReceiveMessages(new MetaqMessage(message, body));
            }
            catch (Exception e) {
                log.error((Object)("Convert message body from byte array failed,msg id is " + message.getId() + " and topic is " + message.getTopic()), (Throwable)e);
                message.setRollbackOnly();
            }
        } else {
            this.onReceiveMessages(new MetaqMessage<Object>(message, null));
        }
    }

    public abstract void onReceiveMessages(MetaqMessage<T> var1);

    public void afterPropertiesSet() throws Exception {
        if (this.processThreads > 0) {
            this.executor = Executors.newFixedThreadPool(this.processThreads);
        }
    }

    public void destroy() throws Exception {
        if (this.executor != null) {
            this.executor.shutdown();
            this.executor = null;
        }
    }

    @Override
    public Executor getExecutor() {
        return this.executor;
    }
}

