package com.qcloud.cmq.client.consumer;

import com.qcloud.cmq.client.client.ThreadGroupFactory;
import com.qcloud.cmq.client.common.LogHelper;
import com.qcloud.cmq.client.common.RemoteHelper;
import com.qcloud.cmq.client.common.RequestIdHelper;
import com.qcloud.cmq.client.exception.MQClientException;
import com.qcloud.cmq.client.netty.CommunicationMode;
import com.qcloud.cmq.client.netty.RemoteException;
import com.qcloud.cmq.client.protocol.Cmq;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;

/* loaded from: input_file:com/qcloud/cmq/client/consumer/SubscribeService.class */
public class SubscribeService {
    private static final Logger logger = LogHelper.getLog();
    private final String queue;
    private final MessageListener listener;
    private final Cmq.CMQProto.Builder pullRequestBuilder;
    private final ConsumerImpl consumer;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: com.qcloud.cmq.client.consumer.SubscribeService.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "PullMessageScheduledThread");
        }
    });
    private AtomicInteger flightPullRequest = new AtomicInteger();
    private final BlockingQueue<Runnable> consumeRequestQueue = new LinkedBlockingQueue();
    private final ThreadPoolExecutor consumeExecutor = new ThreadPoolExecutor(1, 1, 60000, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadGroupFactory("ConsumeMessageThread_"));

    /* loaded from: input_file:com/qcloud/cmq/client/consumer/SubscribeService$ConsumeRequest.class */
    class ConsumeRequest implements Runnable {
        private final List<Message> msgList;

        ConsumeRequest(List<Message> list) {
            this.msgList = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                List<Long> consumeMessage = SubscribeService.this.listener.consumeMessage(SubscribeService.this.queue, Collections.unmodifiableList(this.msgList));
                if (consumeMessage != null && !consumeMessage.isEmpty()) {
                    SubscribeService.this.consumer.getConsumer().batchDeleteMsg(SubscribeService.this.queue, consumeMessage, new BatchDeleteCallback() { // from class: com.qcloud.cmq.client.consumer.SubscribeService.ConsumeRequest.1
                        @Override // com.qcloud.cmq.client.consumer.BatchDeleteCallback
                        public void onSuccess(BatchDeleteResult batchDeleteResult) {
                            SubscribeService.logger.debug("delete msg success, result{}", batchDeleteResult);
                        }

                        @Override // com.qcloud.cmq.client.consumer.BatchDeleteCallback
                        public void onException(Throwable th) {
                            SubscribeService.logger.debug("delete msg failed", th);
                        }
                    });
                }
                SubscribeService.this.submitPullRequest();
            } catch (Throwable th) {
                SubscribeService.logger.warn("consumeMessage exception: {} queue: {}", RemoteHelper.exceptionSimpleDesc(th), SubscribeService.this.queue);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscribeService(String str, MessageListener messageListener, Cmq.CMQProto.Builder builder, ConsumerImpl consumerImpl) {
        this.queue = str;
        this.listener = messageListener;
        this.pullRequestBuilder = builder;
        this.consumer = consumerImpl;
    }

    private void startScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.qcloud.cmq.client.consumer.SubscribeService.2
            @Override // java.lang.Runnable
            public void run() {
                SubscribeService.logger.debug("schedule flightPullRequest:{}, size:{}, active: {}", new Object[]{Integer.valueOf(SubscribeService.this.flightPullRequest.get()), Integer.valueOf(SubscribeService.this.consumeRequestQueue.size()), Integer.valueOf(SubscribeService.this.consumeExecutor.getActiveCount())});
                if (SubscribeService.this.flightPullRequest.get() >= 16 || SubscribeService.this.consumeRequestQueue.size() >= 16) {
                    return;
                }
                SubscribeService.this.flightPullRequest.incrementAndGet();
                SubscribeService.this.pullImmediately();
            }
        }, 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitPullRequest() {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: com.qcloud.cmq.client.consumer.SubscribeService.3
            @Override // java.lang.Runnable
            public void run() {
                SubscribeService.logger.debug("submit flightPullRequest:{}, size:{}", Integer.valueOf(SubscribeService.this.flightPullRequest.get()), Integer.valueOf(SubscribeService.this.consumeRequestQueue.size()));
                if (SubscribeService.this.flightPullRequest.get() >= 16 || SubscribeService.this.consumeRequestQueue.size() >= 16) {
                    return;
                }
                SubscribeService.this.flightPullRequest.incrementAndGet();
                SubscribeService.this.pullImmediately();
            }
        }, 0L, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void pullImmediately() {
        try {
            this.consumer.getMQClientInstance().getCMQClient().batchReceiveMessage(this.consumer.getQueueRoute(this.queue), this.pullRequestBuilder.setSeqno(RequestIdHelper.getNextSeqNo()).m44build(), this.consumer.getConsumer().getRequestTimeoutMS() + (this.consumer.getConsumer().getPollingWaitSeconds() * 1000), CommunicationMode.ASYNC, new BatchReceiveCallback() { // from class: com.qcloud.cmq.client.consumer.SubscribeService.4
                @Override // com.qcloud.cmq.client.consumer.BatchReceiveCallback
                public void onSuccess(BatchReceiveResult batchReceiveResult) {
                    SubscribeService.this.flightPullRequest.decrementAndGet();
                    if (batchReceiveResult.getReturnCode() == 0) {
                        SubscribeService.this.consumeExecutor.submit(new ConsumeRequest(batchReceiveResult.getMessageList()));
                    } else if (batchReceiveResult.getReturnCode() != 10200) {
                        SubscribeService.logger.info("pull message error:" + batchReceiveResult.getErrorMessage() + ", errorCode:" + batchReceiveResult.getReturnCode());
                    }
                }

                @Override // com.qcloud.cmq.client.consumer.BatchReceiveCallback
                public void onException(Throwable th) {
                    SubscribeService.this.flightPullRequest.decrementAndGet();
                    SubscribeService.logger.info("pull message error :" + th.getMessage());
                }
            });
        } catch (MQClientException e) {
            logger.error("pull message error", e);
            this.consumer.setNeedUpdateRoute();
        } catch (RemoteException e2) {
            logger.error("pull message error", e2);
            this.consumer.setNeedUpdateRoute();
        } catch (InterruptedException e3) {
            logger.error("pull message error", e3);
            this.consumer.setNeedUpdateRoute();
        }
    }

    public void start() {
        startScheduleTask();
    }

    public void shutdown() {
        this.scheduledExecutorService.shutdown();
        this.consumeExecutor.shutdown();
        logger.info("shutdown pull service for queue {} success.", this.queue);
    }
}
