package com.taobao.metamorphosis.client.extension.spring;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:com/taobao/metamorphosis/client/extension/spring/DefaultMessageListener.class */
public abstract class DefaultMessageListener<T> implements MessageListener, InitializingBean, DisposableBean {
    private MessageBodyConverter<?> messageBodyConverter;
    static final Log log = LogFactory.getLog(DefaultMessageListener.class);
    private int processThreads = -1;
    private ExecutorService executor;

    protected void setExecutor(ExecutorService executorService) {
        this.executor = executorService;
    }

    public int getProcessThreads() {
        return this.processThreads;
    }

    public MessageBodyConverter<?> getMessageBodyConverter() {
        return this.messageBodyConverter;
    }

    public void setMessageBodyConverter(MessageBodyConverter<?> messageBodyConverter) {
        this.messageBodyConverter = messageBodyConverter;
    }

    public void setProcessThreads(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("Invalid processThreads value:" + i);
        }
        this.processThreads = i;
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageListener
    public void recieveMessages(Message message) throws InterruptedException {
        if (this.messageBodyConverter == null) {
            onReceiveMessages(new MetaqMessage<>(message, null));
            return;
        }
        try {
            onReceiveMessages(new MetaqMessage<>(message, this.messageBodyConverter.fromByteArray(message.getData())));
        } catch (Exception e) {
            log.error("Convert message body from byte array failed,msg id is " + message.getId() + " and topic is " + message.getTopic(), e);
            message.setRollbackOnly();
        }
    }

    public abstract void onReceiveMessages(MetaqMessage<T> metaqMessage);

    public void afterPropertiesSet() throws Exception {
        if (this.processThreads > 0) {
            this.executor = Executors.newFixedThreadPool(this.processThreads);
        }
    }

    public void destroy() throws Exception {
        if (this.executor != null) {
            this.executor.shutdown();
            this.executor = null;
        }
    }

    @Override // com.taobao.metamorphosis.client.consumer.MessageListener
    public Executor getExecutor() {
        return this.executor;
    }
}
