package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationChangeEvent;
import io.seata.config.ConfigurationChangeListener;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.ResourceManager;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.MessageType;
import io.seata.core.protocol.RegisterRMRequest;
import io.seata.core.protocol.RegisterRMResponse;
import io.seata.core.rpc.netty.NettyPoolKey;
import io.seata.core.rpc.processor.client.ClientHeartbeatProcessor;
import io.seata.core.rpc.processor.client.ClientOnResponseProcessor;
import io.seata.core.rpc.processor.client.RmBranchCommitProcessor;
import io.seata.core.rpc.processor.client.RmBranchRollbackProcessor;
import io.seata.core.rpc.processor.client.RmUndoLogProcessor;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/seata/core/rpc/netty/RmNettyRemotingClient.class */
public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(RmNettyRemotingClient.class);
    private ResourceManager resourceManager;
    private static volatile RmNettyRemotingClient instance;
    private final AtomicBoolean initialized;
    private static final long KEEP_ALIVE_TIME = 2147483647L;
    private static final int MAX_QUEUE_SIZE = 20000;
    private String applicationId;
    private String transactionServiceGroup;

    @Override // io.seata.core.rpc.netty.AbstractNettyRemotingClient, io.seata.core.rpc.netty.AbstractNettyRemoting
    public void init() {
        registerProcessor();
        if (this.initialized.compareAndSet(false, true)) {
            super.init();
            if (this.resourceManager == null || this.resourceManager.getManagedResources().isEmpty() || !StringUtils.isNotBlank(this.transactionServiceGroup)) {
                return;
            }
            getClientChannelManager().reconnect(this.transactionServiceGroup);
        }
    }

    private RmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor threadPoolExecutor) {
        super(nettyClientConfig, eventExecutorGroup, threadPoolExecutor, NettyPoolKey.TransactionRole.RMROLE);
        this.initialized = new AtomicBoolean(false);
        this.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean("transport.enableRmClientBatchSendRequest", ConfigurationFactory.getInstance().getBoolean("transport.enableClientBatchSendRequest", true));
        ConfigurationCache.addConfigListener("transport.enableRmClientBatchSendRequest", new ConfigurationChangeListener[]{new ConfigurationChangeListener() { // from class: io.seata.core.rpc.netty.RmNettyRemotingClient.1
            public void onChangeEvent(ConfigurationChangeEvent configurationChangeEvent) {
                String dataId = configurationChangeEvent.getDataId();
                String newValue = configurationChangeEvent.getNewValue();
                if ("transport.enableRmClientBatchSendRequest".equals(dataId) && StringUtils.isNotBlank(newValue)) {
                    RmNettyRemotingClient.this.enableClientBatchSendRequest = Boolean.parseBoolean(newValue);
                }
            }
        }});
    }

    public static RmNettyRemotingClient getInstance(String str, String str2) {
        RmNettyRemotingClient rmNettyRemotingClient = getInstance();
        rmNettyRemotingClient.setApplicationId(str);
        rmNettyRemotingClient.setTransactionServiceGroup(str2);
        return rmNettyRemotingClient;
    }

    public static RmNettyRemotingClient getInstance() {
        if (instance == null) {
            synchronized (RmNettyRemotingClient.class) {
                if (instance == null) {
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    instance = new RmNettyRemotingClient(nettyClientConfig, null, new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(), KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue(MAX_QUEUE_SIZE), new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(), nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy()));
                }
            }
        }
        return instance;
    }

    public void setApplicationId(String str) {
        this.applicationId = str;
    }

    public void setTransactionServiceGroup(String str) {
        this.transactionServiceGroup = str;
    }

    public void setResourceManager(ResourceManager resourceManager) {
        this.resourceManager = resourceManager;
    }

    @Override // io.seata.core.rpc.RemotingClient
    public void onRegisterMsgSuccess(String str, Channel channel, Object obj, AbstractMessage abstractMessage) {
        RegisterRMRequest registerRMRequest = (RegisterRMRequest) abstractMessage;
        RegisterRMResponse registerRMResponse = (RegisterRMResponse) obj;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register RM success. client version:{}, server version:{},channel:{}", new Object[]{registerRMRequest.getVersion(), registerRMResponse.getVersion(), channel});
        }
        getClientChannelManager().registerChannel(str, channel);
        String mergedResourceKeys = getMergedResourceKeys();
        if (registerRMRequest.getResourceIds() == null || registerRMRequest.getResourceIds().equals(mergedResourceKeys)) {
            return;
        }
        sendRegisterMessage(str, channel, mergedResourceKeys);
    }

    @Override // io.seata.core.rpc.RemotingClient
    public void onRegisterMsgFail(String str, Channel channel, Object obj, AbstractMessage abstractMessage) {
        RegisterRMResponse registerRMResponse = (RegisterRMResponse) obj;
        throw new FrameworkException(String.format("register RM failed. client version: %s,server version: %s, errorMsg: %s, channel: %s", ((RegisterRMRequest) abstractMessage).getVersion(), registerRMResponse.getVersion(), registerRMResponse.getMsg(), channel));
    }

    public void registerResource(String str, String str2) {
        if (StringUtils.isBlank(this.transactionServiceGroup)) {
            return;
        }
        if (getClientChannelManager().getChannels().isEmpty()) {
            getClientChannelManager().reconnect(this.transactionServiceGroup);
            return;
        }
        synchronized (getClientChannelManager().getChannels()) {
            for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {
                String key = entry.getKey();
                Channel value = entry.getValue();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("will register resourceId:{}", str2);
                }
                sendRegisterMessage(key, value, str2);
            }
        }
    }

    public void sendRegisterMessage(String str, Channel channel, String str2) {
        RegisterRMRequest registerRMRequest = new RegisterRMRequest(this.applicationId, this.transactionServiceGroup);
        registerRMRequest.setResourceIds(str2);
        try {
            super.sendAsyncRequest(channel, registerRMRequest);
        } catch (FrameworkException e) {
            if (e.getErrcode() != FrameworkErrorCode.ChannelIsNotWritable || str == null) {
                LOGGER.error("register resource failed, channel:{},resourceId:{}", new Object[]{channel, str2, e});
                return;
            }
            getClientChannelManager().releaseChannel(channel, str);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("remove not writable channel:{}", channel);
            }
        }
    }

    public String getMergedResourceKeys() {
        Set<String> keySet = this.resourceManager.getManagedResources().keySet();
        if (keySet.isEmpty()) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        boolean z = true;
        for (String str : keySet) {
            if (z) {
                z = false;
            } else {
                sb.append(",");
            }
            sb.append(str);
        }
        return sb.toString();
    }

    @Override // io.seata.core.rpc.netty.AbstractNettyRemotingClient, io.seata.core.rpc.netty.AbstractNettyRemoting, io.seata.core.rpc.Disposable
    public void destroy() {
        super.destroy();
        this.initialized.getAndSet(false);
        instance = null;
    }

    @Override // io.seata.core.rpc.netty.AbstractNettyRemotingClient
    protected Function<String, NettyPoolKey> getPoolKeyFunction() {
        return str -> {
            String mergedResourceKeys = getMergedResourceKeys();
            if (mergedResourceKeys != null && LOGGER.isInfoEnabled()) {
                LOGGER.info("RM will register :{}", mergedResourceKeys);
            }
            RegisterRMRequest registerRMRequest = new RegisterRMRequest(this.applicationId, this.transactionServiceGroup);
            registerRMRequest.setResourceIds(mergedResourceKeys);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, str, registerRMRequest);
        };
    }

    @Override // io.seata.core.rpc.netty.AbstractNettyRemotingClient
    protected String getTransactionServiceGroup() {
        return this.transactionServiceGroup;
    }

    @Override // io.seata.core.rpc.netty.AbstractNettyRemotingClient
    public boolean isEnableClientBatchSendRequest() {
        return this.enableClientBatchSendRequest;
    }

    @Override // io.seata.core.rpc.netty.AbstractNettyRemotingClient
    public long getRpcRequestTimeout() {
        return NettyClientConfig.getRpcRmRequestTimeout();
    }

    private void registerProcessor() {
        super.registerProcessor(3, new RmBranchCommitProcessor(getTransactionMessageHandler(), this), this.messageExecutor);
        super.registerProcessor(5, new RmBranchRollbackProcessor(getTransactionMessageHandler(), this), this.messageExecutor);
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, new RmUndoLogProcessor(getTransactionMessageHandler()), this.messageExecutor);
        ClientOnResponseProcessor clientOnResponseProcessor = new ClientOnResponseProcessor(this.mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(60, clientOnResponseProcessor, null);
        super.registerProcessor(12, clientOnResponseProcessor, null);
        super.registerProcessor(14, clientOnResponseProcessor, null);
        super.registerProcessor(22, clientOnResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, clientOnResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, clientOnResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, new ClientHeartbeatProcessor(), null);
    }
}
