Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[help wanted]关于集群并发流控server端等待队列设计 #1666

Closed
yunfeiyanggzq opened this issue Aug 13, 2020 · 13 comments
Closed

[help wanted]关于集群并发流控server端等待队列设计 #1666

yunfeiyanggzq opened this issue Aug 13, 2020 · 13 comments
Labels
area/cluster-flow Issues or PRs related to cluster flow control kind/discussion For further discussion kind/feature Category issues or prs related to feature request.

Comments

@yunfeiyanggzq
Copy link
Contributor

yunfeiyanggzq commented Aug 13, 2020

Issue Description

Type: bug report or feature request

Describe what happened (or what feature you want)

希望大家提出宝贵意见完善方案
我们打算使用token分发的形式进行集群并发流控,详情可见 #1629 如果一个请求被block,我们希望将这个请求加入到队列等待一段时间再进行请求token,且希望结果是一定通过的(等待超时除外)。
目前的设计是在processor中判断如果请求被block了,就将请求打包加入一个阻塞队列中,然后另外设置一个线程去不断的取出请求尝试获取token,直到pas(或者超时退出),然后写入到netty。在客户端则通过netty的异步调用future获得结果
processor代码:

    @Override
    public ClusterResponse processRequest(ChannelHandlerContext ctx, ClusterRequest<ConcurrentFlowAcquireRequestData> request) {

        TokenService tokenService = TokenServiceProvider.getService();
        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        boolean prioritized = true;
        String clientAddress = getRemoteAddress(ctx);
        TokenResult result = tokenService.requestConcurrentToken(clientAddress, flowId, count);
        // 满足要求的话接入队列,判断条件还没有完善下来,可以忽略
        if (result.getStatus() == TokenResultStatus.BLOCKED && prioritized) {
           // 相关信息打包加入等待队列
            Queue.addRequestToWaitQueue(new RequestObject(ctx, clientAddress, request));
            return null;
        }
        return toResponse(result, request);
    }

队列代码:

public class Queue {
    private static BlockingQueue<RequestObject> blockingQueue = new ArrayBlockingQueue<>(100);

    private static volatile boolean FLAG = true;

    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static ExecutorService consumerPool = Executors.newFixedThreadPool(1);

    static {
        tryToConsumeRequestInQueue();
    }

    public static void addRequestToWaitQueue(final RequestObject requestObject) {
        try {
            // 加入请求到阻塞队列
            blockingQueue.offer(requestObject, 2L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void tryToConsumeRequestInQueue() {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                while (FLAG) {
                    TokenResult tokenResult = null;
                    RequestObject res = null;
                    try {
                        // 如果队列有请求,取出处理
                        res = blockingQueue.poll(2L, TimeUnit.SECONDS);
                        if (res == null) {
                            continue;
                        }
                        if (System.currentTimeMillis() - res.getCreatTime() > 1000) {
                            sendResponse(res.getCtx(), new TokenResult(TokenResultStatus.BLOCKED), res.getRequest());
                            continue;
                        }
                        // 获取相关信息
                        long flowId = res.getRequest().getData().getFlowId();
                        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(flowId);
                        int acquire = res.getRequest().getData().getCount();
                        AtomicInteger nowCalls = CurrentConcurrencyManager.get(flowId);
                        if (nowCalls == null) {
                            sendResponse(res.getCtx(), new TokenResult(TokenResultStatus.FAIL), res.getRequest());
                            continue;
                        }
                        // 加锁
                        synchronized (nowCalls) {
                            // 等待直到能够满足并发需求
                            // TODO:可以设置循超时时间,超过时间直接退出
                            while (nowCalls.get() + acquire > ConcurrentClusterFlowChecker.calcGlobalThreshold(rule)) {
                                Thread.sleep(2);
                            }
                            nowCalls.getAndAdd(acquire);
                        }
                        TokenCacheNode node = TokenCacheNode.generateTokenCacheNode(rule, acquire, res.getAddress());
                        TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node);
                        tokenResult = new TokenResult(TokenResultStatus.OK);
                        tokenResult.setTokenId(node.getTokenId());
                        // 通过netty发送响应值
                        sendResponse(res.getCtx(), tokenResult, res.getRequest());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumerPool.execute(task);
    }

    private static ClusterResponse<ConcurrentFlowAcquireResponseData> toResponse(TokenResult result, ClusterRequest request) {
        return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
                new ConcurrentFlowAcquireResponseData().setTokenId(result.getTokenId())
        );
    }

    private static void sendResponse(ChannelHandlerContext ctx, TokenResult result, ClusterRequest<ConcurrentFlowAcquireRequestData> request) {
        ClusterResponse<ConcurrentFlowAcquireResponseData> response = toResponse(result, request);
        ctx.writeAndFlush(response);
    }
}

存在问题

  • 可能server通过这样的方式发放了token,然而client却因为超时而接收不到token
  • 队列的大小如何确定
  • 如果队列满了采用什么机制,拒绝新的或者删除旧的?
  • 还不太会设计client端代码(异步获得结果),我太菜了,学习中,同步的方式比较容易超时
@yunfeiyanggzq
Copy link
Contributor Author

yunfeiyanggzq commented Aug 13, 2020

@sczyh30 please cc

@Lin-Liang
Copy link

hey @yunfeiyanggzq 这个队列是一个先进先出队列? 从吞吐量考虑非公平队列的性能要好一点。

if (System.currentTimeMillis() - res.getCreatTime() > 1000) {
                            sendResponse(res.getCtx(), new TokenResult(TokenResultStatus.BLOCKED), res.getRequest());
                            continue;
                        }

这里应该是把报获取token超时?

  synchronized (nowCalls) {
                            // 等待直到能够满足并发需求
                            // TODO:可以设置循超时时间,超过时间直接退出
                            while (nowCalls.get() + acquire > ConcurrentClusterFlowChecker.calcGlobalThreshold(rule)) {
                                Thread.sleep(2);
                            }
                            nowCalls.getAndAdd(acquire);
                        }

这里如果是加锁 + 超时时间,性能影响可能会很大。
并且nowCalls已经是一个原子类,多线程环境下用cas操作可以保证线程安全。另外用重试次数可能比超时时间更好。

- 可能server通过这样的方式发放了token,然而client却因为超时而接收不到token
这种情况,超时时间应该是server端统一指定会比较好。

@yunfeiyanggzq
Copy link
Contributor Author

nowCalls.get() + acquire > ConcurrentClusterFlowChecker.calcGlobalThreshold(rule)这个判断,如果高并发的情况下不加锁,如果100个线程通过了这个判断,都还没有运行到 nowCalls.getAndAdd(acquire);,会导致nowCalls.get()依然是0,导致并发控制失败

hey @yunfeiyanggzq 这个队列是一个先进先出队列? 从吞吐量考虑非公平队列的性能要好一点。

if (System.currentTimeMillis() - res.getCreatTime() > 1000) {
                            sendResponse(res.getCtx(), new TokenResult(TokenResultStatus.BLOCKED), res.getRequest());
                            continue;
                        }

这里应该是把报获取token超时?

  synchronized (nowCalls) {
                            // 等待直到能够满足并发需求
                            // TODO:可以设置循超时时间,超过时间直接退出
                            while (nowCalls.get() + acquire > ConcurrentClusterFlowChecker.calcGlobalThreshold(rule)) {
                                Thread.sleep(2);
                            }
                            nowCalls.getAndAdd(acquire);
                        }

这里如果是加锁 + 超时时间,性能影响可能会很大。
并且nowCalls已经是一个原子类,多线程环境下用cas操作可以保证线程安全。另外用重试次数可能比超时时间更好。

- 可能server通过这样的方式发放了token,然而client却因为超时而接收不到token
这种情况,超时时间应该是server端统一指定会比较好。

nowCalls.get() + acquire > ConcurrentClusterFlowChecker.calcGlobalThreshold(rule)这个判断,如果高并发的情况下不加锁,如果100个线程通过了这个判断,都还没有运行到 nowCalls.getAndAdd(acquire);,会导致nowCalls.get()依然是0,导致并发控制失败

@yunfeiyanggzq
Copy link
Contributor Author

yunfeiyanggzq commented Aug 13, 2020

@Lin-Liang 非常感谢您的建议,ArrayBlockingQueue是一个先进先出的队列,就是公平队列,我会再调研一下非公平的,看性能上的区别。这里应该是把报获取token超时?这里我觉得应该报阻塞,因为进入到队列等待的已经被阻塞过了,所以如果进入到队列又获取失败了应该报block。这里如果是加锁 + 超时时间,性能影响可能会很大。这也是我比较担心的,尝试次数比较可能是个好的建议,我会实验一下。

@sczyh30 sczyh30 added area/cluster-flow Issues or PRs related to cluster flow control kind/discussion For further discussion kind/feature Category issues or prs related to feature request. labels Aug 13, 2020
@Lin-Liang
Copy link

nowCalls.get() + acquire > ConcurrentClusterFlowChecker.calcGlobalThreshold(rule)这个判断,如果高并发的情况下不加锁,如果100个线程通过了这个判断,都还没有运行到 nowCalls.getAndAdd(acquire);,会导致nowCalls.get()依然是0,导致并发控制失败

hey @yunfeiyanggzq 这个队列是一个先进先出队列? 从吞吐量考虑非公平队列的性能要好一点。

if (System.currentTimeMillis() - res.getCreatTime() > 1000) {
                            sendResponse(res.getCtx(), new TokenResult(TokenResultStatus.BLOCKED), res.getRequest());
                            continue;
                        }

这里应该是把报获取token超时?

  synchronized (nowCalls) {
                            // 等待直到能够满足并发需求
                            // TODO:可以设置循超时时间,超过时间直接退出
                            while (nowCalls.get() + acquire > ConcurrentClusterFlowChecker.calcGlobalThreshold(rule)) {
                                Thread.sleep(2);
                            }
                            nowCalls.getAndAdd(acquire);
                        }

这里如果是加锁 + 超时时间,性能影响可能会很大。
并且nowCalls已经是一个原子类,多线程环境下用cas操作可以保证线程安全。另外用重试次数可能比超时时间更好。
- 可能server通过这样的方式发放了token,然而client却因为超时而接收不到token
这种情况,超时时间应该是server端统一指定会比较好。

nowCalls.get() + acquire > ConcurrentClusterFlowChecker.calcGlobalThreshold(rule)这个判断,如果高并发的情况下不加锁,如果100个线程通过了这个判断,都还没有运行到 nowCalls.getAndAdd(acquire);,会导致nowCalls.get()依然是0,导致并发控制失败

AtomicInteger nowCalls = new AtomicInteger(0); nowCalls.compareAndSet(0,1);

@yunfeiyanggzq
Copy link
Contributor Author

yunfeiyanggzq commented Aug 13, 2020

@Lin-Liang 我的理解是对原子类进行操作是可以保证原子性的,但是( 判断能够通过+对原子类操作)这个操作是不能单纯使用原子类保证这个过程的原子性

@yunfeiyanggzq
Copy link
Contributor Author

yunfeiyanggzq commented Aug 13, 2020

确实在锁里循环等待会导致极大的影响性能,甚至导致死锁,可能次数会好一些

@Lin-Liang
Copy link

@Lin-Liang 我的理解是对原子类进行操作是可以保证原子性的,但是( 判断能够通过+对原子类操作)这个操作是不能单纯使用原子类保证这个过程的原子性

判断通过之后,原子性就已经取决于原子类操作了

@jasonjoo2010
Copy link
Collaborator

无边界的设计(无超时、无数量上限等)适用性非常有限,且有状态,这种方案我个人不是很认同。

实现上,建议看看是否有可能改造sentinel-cluster使之高抽象地方便地支持多种实现,这样未来可以做更多的独立实现,包括自动成组和其它HA方案。

@yunfeiyanggzq
Copy link
Contributor Author

无边界的设计(无超时、无数量上限等)适用性非常有限,且有状态,这种方案我个人不是很认同。

实现上,建议看看是否有可能改造sentinel-cluster使之高抽象地方便地支持多种实现,这样未来可以做更多的独立实现,包括自动成组和其它HA方案。

是的,需要数量上限和超时的队列,目前集群并发控制使用的框架是宿何前辈之前的
图片
不知道您有什么比较好的建议

@Lin-Liang
Copy link

这是为了解决什么问题?没有等待队列会怎样?

@yunfeiyanggzq
Copy link
Contributor Author

这是为了解决什么问题?没有等待队列会怎样?

没有这个队列请求被判断为block后直接返回给client,我们希望用这种方式让请求稍微等待一段时间去获取(能够应对突发的请求,类似于削峰),用这种方式能够在保证并发控制的情况下让更多的请求通过,当然这是可配置的,用户可以选择是否采用这种策略

@yunfeiyanggzq
Copy link
Contributor Author

yunfeiyanggzq commented Aug 19, 2020

目前的进展 https://github.com/yunfeiyanggzq/Sentinel/tree/pr1/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/queue , 基本功能已经完成,主要思路是采用生产者消费者模式,输出结果的方式有两种,一种是通过netty发放给client,一种是通过future的方式发放给server(server嵌入模式)。整个实现可见 #1629 欢迎大家帮忙参考。感谢大家的帮助!!!!

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
area/cluster-flow Issues or PRs related to cluster flow control kind/discussion For further discussion kind/feature Category issues or prs related to feature request.
Projects
None yet
Development

No branches or pull requests

4 participants