/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.common.register;

import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.client.CommonRpcClient;
import com.aizuda.snailjob.server.common.dto.PullRemoteNodeClientRegisterInfoDTO;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.register.AbstractRegister;
import com.aizuda.snailjob.server.common.register.RegisterContext;
import com.aizuda.snailjob.server.common.register.ServerRegister;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

@Component(value="clientRegister")
public class ClientRegister
extends AbstractRegister {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ClientRegister.class);
    public static final String BEAN_NAME = "clientRegister";
    public static final int DELAY_TIME = 30;
    protected static final LinkedBlockingDeque<ServerNode> QUEUE = new LinkedBlockingDeque(1000);
    @Autowired
    @Lazy
    private RefreshNodeSchedule refreshNodeSchedule;

    @Override
    public boolean supports(int type) {
        return this.getNodeType().equals(type);
    }

    @Override
    protected void beforeProcessor(RegisterContext context) {
    }

    @Override
    protected LocalDateTime getExpireAt() {
        return LocalDateTime.now().plusSeconds(30L);
    }

    @Override
    protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
        if ("/beat".equals(context.getUri())) {
            return QUEUE.offerFirst(serverNode);
        }
        return QUEUE.offerLast(serverNode);
    }

    @Override
    protected void afterProcessor(ServerNode serverNode) {
    }

    @Override
    protected Integer getNodeType() {
        return NodeTypeEnum.CLIENT.getType();
    }

    @Override
    public void start() {
        this.refreshNodeSchedule.startScheduler();
    }

    @Override
    public void close() {
    }

    public static List<ServerNode> getExpireNodes() {
        ServerNode serverNode = QUEUE.poll();
        if (Objects.nonNull(serverNode)) {
            ArrayList lists = Lists.newArrayList((Object[])new ServerNode[]{serverNode});
            QUEUE.drainTo(lists, 256);
            return lists;
        }
        return null;
    }

    public static List<ServerNode> refreshLocalCache() {
        List<ServerNode> expireNodes = ClientRegister.getExpireNodes();
        if (Objects.nonNull(expireNodes)) {
            for (ServerNode serverNode : expireNodes) {
                serverNode.setExpireAt(LocalDateTime.now().plusSeconds(30L));
                CacheRegisterTable.addOrUpdate(serverNode);
                CacheConsumerGroup.addOrUpdate(serverNode.getGroupName(), serverNode.getNamespaceId());
            }
        }
        return expireNodes;
    }

    @Component
    public class RefreshNodeSchedule
    extends AbstractSchedule {
        private ThreadPoolExecutor refreshNodePool;

        @Override
        protected void doExecute() {
            try {
                List<ServerNode> allClientList;
                LambdaQueryWrapper wrapper = (LambdaQueryWrapper)new LambdaQueryWrapper().eq(ServerNode::getNodeType, (Object)NodeTypeEnum.SERVER.getType());
                List serverNodes = ClientRegister.this.serverNodeMapper.selectList((Wrapper)wrapper);
                serverNodes = StreamUtils.filter((Collection)serverNodes, serverNode -> !serverNode.getHostId().equals(ServerRegister.CURRENT_CID));
                ArrayList<ServerNode> waitRefreshDBClientNodes = new ArrayList<ServerNode>();
                List<ServerNode> refreshCache = ClientRegister.refreshLocalCache();
                if (CollUtil.isNotEmpty(refreshCache)) {
                    waitRefreshDBClientNodes.addAll(refreshCache);
                }
                if (!serverNodes.isEmpty() && CollUtil.isNotEmpty(allClientList = this.pullRemoteNodeClientRegisterInfo(serverNodes))) {
                    waitRefreshDBClientNodes.addAll(allClientList);
                }
                if (CollUtil.isEmpty(waitRefreshDBClientNodes)) {
                    SnailJobLog.LOCAL.debug("clientNodes is empty", new Object[0]);
                    return;
                }
                SnailJobLog.LOCAL.debug("start refresh client nodes\uff1a{}", new Object[]{waitRefreshDBClientNodes});
                ClientRegister.this.refreshExpireAt(waitRefreshDBClientNodes);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("refresh \u5931\u8d25", new Object[]{e});
            }
        }

        private List<ServerNode> pullRemoteNodeClientRegisterInfo(List<ServerNode> serverNodes) {
            if (CollUtil.isEmpty(serverNodes)) {
                return Lists.newArrayList();
            }
            int size = serverNodes.size();
            ArrayList<Future<String>> futures = new ArrayList<Future<String>>(size);
            for (ServerNode serverNode : serverNodes) {
                Future<String> future2 = this.refreshNodePool.submit(() -> {
                    try {
                        RegisterNodeInfo nodeInfo = new RegisterNodeInfo();
                        nodeInfo.setHostId(serverNode.getHostId());
                        nodeInfo.setGroupName(serverNode.getGroupName());
                        nodeInfo.setNamespaceId(serverNode.getNamespaceId());
                        nodeInfo.setHostPort(serverNode.getHostPort());
                        nodeInfo.setHostIp(serverNode.getHostIp());
                        CommonRpcClient serverRpcClient = this.buildRpcClient(nodeInfo);
                        Result<String> regNodesAndFlush = serverRpcClient.pullRemoteNodeClientRegisterInfo(new PullRemoteNodeClientRegisterInfoDTO());
                        return (String)regNodesAndFlush.getData();
                    }
                    catch (Exception e) {
                        return "";
                    }
                });
                futures.add(future2);
            }
            return futures.stream().map(future -> {
                try {
                    String jsonString = (String)future.get(1L, TimeUnit.SECONDS);
                    if (Objects.nonNull(jsonString)) {
                        return JsonUtil.parseList((String)jsonString, ServerNode.class);
                    }
                    return new ArrayList();
                }
                catch (Exception e) {
                    return new ArrayList();
                }
            }).filter(Objects::nonNull).flatMap(Collection::stream).distinct().toList();
        }

        private CommonRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) {
            int maxRetryTimes = 3;
            return RequestBuilder.newBuilder().nodeInfo(registerNodeInfo).failRetry(true).retryTimes(maxRetryTimes).client(CommonRpcClient.class).build();
        }

        @Override
        public String lockName() {
            return "registerNode";
        }

        @Override
        public String lockAtMost() {
            return "PT10S";
        }

        @Override
        public String lockAtLeast() {
            return "PT5S";
        }

        public void startScheduler() {
            this.refreshNodePool = new ThreadPoolExecutor(4, 8, 1L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1000));
            this.refreshNodePool.allowCoreThreadTimeOut(true);
            this.taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S"));
        }
    }
}

