计数器模式,    服务接口的流量控制策略

2020-01-04 22:39栏目:美高梅网上游戏
TAG:

主要是依靠 redis + lua 来实现限流器, 使用 lua 的原因是将多条命令合并在一起作为一个原子操作, 无需过多考虑并发.

  • 业务背景介绍 
    对于web应用的限流,光看标题,似乎过于抽象,难以理解,那我们还是以具体的某一个应用场景来引入这个话题吧。 
    在日常生活中,我们肯定收到过不少不少这样的短信,“双11约吗?,千款….”,“您有幸获得唱读卡,赶快戳链接…”。这种类型的短信是属于推广性质的短信。为什么我要说这个呢?听我慢慢道来。 
    一般而言,对于推广营销类短信,它们针对某一群体(譬如注册会员)进行定点推送,有时这个群体的成员量比较大,譬如京东的会员,可以达到千万级别。因此相应的,发送推广短信的量也会增大。然而,要完成这些短信发送,我们是需要调用服务商的接口来完成的。倘若一次发送的量在200万条,而我们的服务商接口每秒能处理的短信发送量有限,只能达到200条每秒。那么这个时候就会产生问题了,我们如何能控制好程序发送短信时的速度昵?于是限流这个功能就得加上了
  • 生产环境背景 
    1、服务商接口所能提供的服务上限是400条/s 
    2、业务方调用短信发送接口的速度未知,QPS可能达到800/s,1200/s,或者更高 
    3、当服务商接口访问频率超过400/s时,超过的量将拒绝服务,多出的信息将会丢失 
    4、线上为多节点布置,但调用的是同一个服务商接口
  • 需求分析 
    1、鉴于业务方对短信发送接口的调用频率未知,而服务商的接口服务有上限,为保证服务的可用性,业务层需要对接口调用方的流量进行限制—–接口限流
  • 需求设计 
    方案一、在提供给业务方的Controller层进行控制。 
    1、使用guava提供工具库里的RateLimiter类(内部采用令牌捅算法实现)进行限流

计数器模式

一.总体介绍

很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统。

    也就是面对大流量时,如何进行流量控制?

    服务接口的流量控制策略:分流、降级、限流等。本文讨论  限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的高可用。

 实际场景中常用的限流策略:

    1.Nginx前端限流
         按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

    2.业务应用系统限流
        1、客户端限流
        2、服务端限流

    3.数据库限流
        红线区,力保数据库

原理

二.常用的限流算法

常见的限流算法有:令牌桶、漏桶。 计数器也可以进行粗暴限流实现。

<!--核心代码片段-->
private RateLimiter rateLimiter = RateLimiter.create(400);//400表示每秒允许处理的量是400
 if(rateLimiter.tryAcquire()) {
   //短信发送逻辑可以在此处

 }

计数器算法是指在一段窗口时间内允许通过的固定数量的请求, 比如10次/秒, 500次/30秒.

2.1 令牌桶(单机)

大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小

图片 1

流程:
1.所有的流量在放行之前需要获取一定量的 token;
2.所有的 token 存放在一个 bucket(桶)当中,每 1/r 秒,都会往这个 bucket 当中加入一个 token;
3.bucket 有最大容量(capacity or limit),在 bucket 中的 token 数量等于最大容量,而且没有 token 消耗时,新的额外的 token 会被抛弃。

这种实现方法有几个优势:
1.避免了给每一个 Bucket 设置一个定时器这种笨办法,
2.数据结构需要的内存量很小,只需要储存 Bucket 中剩余的 Token 量以及上次补充 Token 的时间戳就可以了;
3.只有在用户访问的时候,才会计算 Token 补充量,对于系统的计算资源占用量也较小。

Guava 库当中也有一个 RateLimiter,其作用也是 用来进行限流,于是阅读了 RateLimiter 的源代码,查看一些 Google 的人是如何实现 Token Bucket 算法的。

private void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      storedPermits = min(maxPermits,
          storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
      nextFreeTicketMicros = nowMicros;
    }
}

在 resync 方法中的这句代码 storedPermits = min(maxPermits, storedPermits+ (nowMicros - nextFreeTicketMicros)/stableIntervalMicros); 就是 RateLimiter 中计算 Token 数量的方法。没有使用计时器,而是使用时间戳的方式计算。这个做法给足了 信息。我们可以在 Bucket 中存放现在的 Token 数量,然后存储上一次补充 Token 的时间戳,当用户下一次请求获取一个 Token 的时候, 根据此时的时间戳,计算从上一个时间戳开始,到现在的这个时间点所补充的所有 Token 数量,加入到 Bucket 当中。

通过使用RateLimiter简单模拟一个实现:

package com.niepeng.goldcode.common.ratelimit;

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.RateLimiter;
import com.niepeng.goldcode.util.DateUtil;

/**
 * 介绍文档:google的ratelimiter文档翻译
 * http://ifeve.com/guava-ratelimiter/
 * 
 * @author niepeng
 *
 */
public class ApiCallDemo {

    private int permitsPerSecond = 10; // 每秒10个许可
    private int threadNum = 3;

    public static void main(String[] args) {
        new ApiCallDemo().call();
    }

    private void call() {
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);
        for (int i = 0; i < threadNum; i++) {
            executor.execute(new ApiCallTask(rateLimiter));
        }
        executor.shutdown();
    }
}

class ApiCallTask implements Runnable {

    private RateLimiter rateLimiter;
    private boolean runing = true;

    public ApiCallTask(RateLimiter rateLimiter) {
        this.rateLimiter = rateLimiter;
    }

    @Override
    public void run() {
        while (runing) {
            rateLimiter.acquire(); // or rateLimiter.tryAcquire()
            getData();
        }
    }

    // 模拟调用合作伙伴API接口
    private void getData() {
        System.out.println(DateUtil.format(new Date()) + ", " +Thread.currentThread().getName() + " runing!");
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2、使用Java自带delayqueue的延迟队列实现(编码过程相对麻烦,此处省略代码)

如果设置的时间粒度越细, 那么限流会更平滑.

2.2 漏桶(单机)

漏桶算法强制一个常量的输出速率而不管输入数据流的突发性

图片 2

流程:

到达的数据包(网络层的PDU)被放置在底部具有漏孔的桶中(数据包缓存);
漏桶最多可以排队b个字节,漏桶的这个尺寸受限于有效的系统内存。如果数据包到达的时候漏桶已经满了,那么数据包应被丢弃;
数据包从漏桶中漏出,以常量速率(r字节/秒)注入网络,因此平滑了突发流量。

3、使用Redis实现,存储两个key,一个用于计时,一个用于计数。请求每调用一次,计数器增加1,若在计时器时间内计数器未超过阈值,则可以处理任务

实现

2.3 计数器(单机或统一缓存系统如:redis)

限流某个接口的总并发/请求数

如果接口可能会有突发访问情况,但又担心访问量太大造成崩溃,如抢购业务;这个时候就需要限制这个接口的总并发/请求数总请求数了;因为粒度比较细,可以为每个接口都设置相应的阀值。可以使用Java中的AtomicLong进行限流。

try {
    if(atomic.incrementAndGet() > 限流数) {
        //拒绝请求
    }
    //处理请求
} finally {
    atomic.decrementAndGet();
}

当然直接使用redis:

try {
    if(shardedJedis.incr(key) > 限流数) {
        //拒绝请求
    }
    //处理请求
} finally {
 shardedJedis.decr(key);
}
 if(!cacheDao.hasKey(API_WEB_TIME_KEY)) {            cacheDao.putToValue(API_WEB_TIME_KEY,0,(long)1, TimeUnit.SECONDS);
}       if(cacheDao.hasKey(API_WEB_TIME_KEY)&&cacheDao.incrBy(API_WEB_COUNTER_KEY,(long)1) > (long)400) {
    LOGGER.info("调用频率过快");
}
//短信发送逻辑

所使用的 Lua 脚本

2.4 对比

令牌桶和漏桶对比:   

1.令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;   
2.漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;  
3.令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;  
4.漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;  
5.令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率;    6.两个算法实现可以一样,但是方向是相反的,对于相同的参数得到的限流效果是一样的。

方案二、在短信发送至服务商时做限流处理 
方案三、同时使用方案一和方案二

-- 计数器限流-- 此处支持的最小单位时间是秒, 若将 expire 改成 pexpire 则可支持毫秒粒度.-- KEYS[1] string 限流的key-- ARGV[1] int 限流数-- ARGV[2] int 单位时间(秒)local cnt = tonumber(redis.call("incr", KEYS[1]))if (cnt == 1) then -- cnt 值为1说明之前不存在该值, 因此需要设置其过期时间 redis.call("expire", KEYS[1], tonumber(ARGV[2]))elseif (cnt  tonumber(ARGV[1])) then return -1end return cnt

三.分布式限流

  • 可行性分析 
    最快捷且有效的方式是使用RateLimiter实现,但是这很容易踩到一个坑,单节点模式下,使用RateLimiter进行限流一点问题都没有。但是…线上是分布式系统,布署了多个节点,而且多个节点最终调用的是同一个短信服务商接口。虽然我们对单个节点能做到将QPS限制在400/s,但是多节点条件下,如果每个节点均是400/s,那么到服务商那边的总请求就是节点数x400/s,于是限流效果失效。使用该方案对单节点的阈值控制是难以适应分布式环境的,至少目前我还没想到更为合适的方式。 
    对于第二种,使用delayqueue方式。其实主要存在两个问题,1:短信系统本身就用了一层消息队列,有用kafka,或者rabitmq,如果再加一层延迟队列,从设计上来说是不太合适的。2:实现delayqueue的过程相对较麻烦,耗时可能比较长,而且达不到精准限流的效果 
    对于第三种,使用redis进行限流,其很好地解决了分布式环境下多实例所导致的并发问题。因为使用redis设置的计时器和计数器均是全局唯一的,不管多少个节点,它们使用的都是同样的计时器和计数器,因此可以做到非常精准的流控。同时,这种方案编码并不复杂,可能需要的代码不超过10行。

  • 实施方案 
    根据可行性分析可知,整个系统采取redis限流处理是成本最低且最高效的。 
    具体实现

    1、在Controller层设置两个全局key,一个用于计数,另一个用于计时

返回 -1 表示超过限流, 否则返回当前单位时间已通过的请求数

3.1方案一:redis存储可用数量和上一次放入token的时间

public boolean access(String userId) {
        String key = genKey(userId);
        Map<String, String> counter = jedis.hgetAll(key);
        if (counter.size() == 0) {
            TokenBucket tokenBucket = new TokenBucket(System.currentTimeMillis(), limit - 1);
            jedis.hmset(key, tokenBucket.toHash());
            return true;
        } 
        
        TokenBucket tokenBucket = TokenBucket.fromHash(counter);
        long lastRefillTime = tokenBucket.getLastRefillTime();
        /*
         * 桶中需要补充数量
         *  1.过了整个周期了,需要补到最大值
         *  2.如果到了至少补充一个的周期了,那么需要补充部分,否则不补充
         */
        long currentTokensRemaining;
        long refillTime = System.currentTimeMillis();
        long intervalSinceLast = refillTime - lastRefillTime;
        if(intervalSinceLast > intervalInMills) {
            currentTokensRemaining = limit;
        } else {
            long grantedTokens = (long) (intervalSinceLast / intervalPerPermit);
            if(grantedTokens < 1) {
                refillTime = lastRefillTime;
            }
            currentTokensRemaining = Math.min(grantedTokens + tokenBucket.getTokensRemaining(), limit);
        }
        
        tokenBucket.setLastRefillTime(refillTime);
        if (currentTokensRemaining == 0) {
            tokenBucket.setTokensRemaining(currentTokensRemaining);
            jedis.hmset(key, tokenBucket.toHash());
            return false;
        } else {
            tokenBucket.setTokensRemaining(currentTokensRemaining - 1);
            jedis.hmset(key, tokenBucket.toHash());
            return true;
        }
    }

上面的方法是最初的实现方法,对于每一个 Token Bucket,在 Redis 上面,使用一个 Hash 进行表示,一个 Token Bucket 有 lastRefillTime 表示最后一次补充 Token 的时间,tokensRemaining 则表示 Bucket 中的剩余 Token 数量,access() 方法大致的步骤为:

1.当一个请求 Token进入 access() 方法后,先计算计算该请求的 Token Bucket 的 key;
2.如果这个 Token Bucket 在 Redis 中不存在,那么就新建一个 Token Bucket,然后设置该 Bucket 的 Token 数量为最大值减一(去掉了这次请求获取的 Token)。 在初始化 Token Bucket 的时候将 Token 数量设置为最大值这一点在后面还有讨论;
3.如果这个 Token Bucket 在 Redis 中存在,而且其上一次加入 Token 的时间到现在时间的时间间隔大于 Token Bucket 的 interval,那么也将 Bucket 的 Token 值重置为最大值减一;
4.如果 Token Bucket 上次加入 Token 的时间到现在时间的时间间隔没有大于 interval,那么就计算这次需要补充的 Token 数量,将补充过后的 Token 数量更新到 Token Bucket 中。

完整代码详见:

这个方法在单线程的条件下面,可以比较好地满足需求,但是在多线程的条件下面会有问题,考虑用方案二(方案一做铺垫)。

key 可以但不限于以下的情况

3.2方案二,redis+lua

分布式限流最关键的是要将限流服务做成原子化,而解决方案可以使使用redis+lua或者技术进行实现,通过这两种技术可以实现的高并发和高性能。

根据方案一改版的redis+lua化:

其中核心部分access方法通过lua脚本实现,通过来实现原子化操作:

--[[
  A lua rate limiter script run in redis
  use token bucket algorithm.
  Algorithm explaination
  1. key, use this key to find the token bucket in redis
  2. there're several args should be passed in:
       intervalPerPermit, time interval in millis between two token permits;
       refillTime, timestamp when running this lua script;
       limit, the capacity limit of the token bucket;
       interval, the time interval in millis of the token bucket;
]] --
local key, intervalPerPermit, refillTime, burstTokens = KEYS[1], tonumber(ARGV[1]), tonumber(ARGV[2]), tonumber(ARGV[3])
local limit, interval = tonumber(ARGV[4]), tonumber(ARGV[5])
local bucket = redis.call('hgetall', key)

local currentTokens

if table.maxn(bucket) == 0 then
    -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access
    currentTokens = burstTokens
    redis.call('hset', key, 'lastRefillTime', refillTime)
elseif table.maxn(bucket) == 4 then
    -- if bucket exists, first we try to refill the token bucket

    local lastRefillTime, tokensRemaining = tonumber(bucket[2]), tonumber(bucket[4])

    if refillTime > lastRefillTime then
        -- if refillTime larger than lastRefillTime, we should refill the token buckets

        -- calculate the interval between refillTime and lastRefillTime
        -- if the result is bigger than the interval of the token bucket,
        -- refill the tokens to capacity limit;
        -- else calculate how much tokens should be refilled
        local intervalSinceLast = refillTime - lastRefillTime
        if intervalSinceLast > interval then
            currentTokens = burstTokens
            redis.call('hset', key, 'lastRefillTime', refillTime)
        else
            local grantedTokens = math.floor(intervalSinceLast / intervalPerPermit)
            if grantedTokens > 0 then
                -- ajust lastRefillTime, we want shift left the refill time.
                local padMillis = math.fmod(intervalSinceLast, intervalPerPermit)
                redis.call('hset', key, 'lastRefillTime', refillTime - padMillis)
            end
            currentTokens = math.min(grantedTokens + tokensRemaining, limit)
        end
    else
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        currentTokens = tokensRemaining
    end
end

assert(currentTokens >= 0)

if currentTokens == 0 then
    -- we didn't consume any keys
    redis.call('hset', key, 'tokensRemaining', currentTokens)
    return 0
else
    redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
    return 1
end

全部代码查看:

使用redis+lua实现时间窗内某个接口的请求数限流,实现了该功能后可以改造为限流总并发/请求数和限制总资源数。Lua本身就是一种编程语言,也可以使用它实现复杂的令牌桶或漏桶算法。

local key = KEYS[1] --限流KEY(一秒一个)
local limit = tonumber(ARGV[1])        --限流大小
local current = tonumber(redis.call("INCRBY", key, "1")) --请求数+1
if current > limit then --如果超出限流大小
   return 0
elseif current == 1 then  --只有第一次访问需要设置2秒的过期时间
   redis.call("expire", key,"2")
end
return 1

如上操作因是在一个lua脚本中,又因Redis是单线程模型,因此是线程安全的。如上方式有一个缺点就是当达到限流大小后还是会递增的,可以改造成如下方式实现:

local key = KEYS[1] --限流KEY(一秒一个)
local limit = tonumber(ARGV[1])        --限流大小
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then --如果超出限流大小
   return 0
else  --请求数+1,并设置2秒过期
   redis.call("INCRBY", key,"1")
   redis.call("expire", key,"2")
   return 1
end

Java中判断是否需要限流的代码:

public static boolean acquire() throws Exception {
    String luaScript = Files.toString(new File("limit.lua"), Charset.defaultCharset());
    Jedis jedis = new Jedis("127.0.0.1", 6379);
    String key = "ip:" + System.currentTimeMillis()/ 1000; //此处将当前时间戳取秒数
    Stringlimit = "3"; //限流大小
    return (Long)jedis.eval(luaScript,Lists.newArrayList(key), Lists.newArrayList(limit)) == 1;
}

因为Redis的限制(Lua中有写操作不能使用带随机性质的读操作,如TIME)不能在Redis Lua中使用TIME获取时间戳,因此只好从应用获取然后传入,在某些极端情况下(机器时钟不准的情况下),限流会存在一些小问题。

另外按照方案一的实现,本人对lua脚本不熟悉,参考toys的实现:

参考文章:

private static final String API_WEB_TIME_KEY = "time_key";

    private static final String API_WEB_COUNTER_KEY = "counter_key";

ip + 接口 user_id + 接口

3.3方案三,nginx+lua

local locks = require "resty.lock"

local function acquire()
    local lock =locks:new("locks")
    local elapsed, err =lock:lock("limit_key") --互斥锁
    local limit_counter =ngx.shared.limit_counter --计数器

    local key = "ip:" ..os.time()
    local limit = 5 --限流大小
    local current =limit_counter:get(key)

    if current ~= nil and current + 1> limit then --如果超出限流大小
       lock:unlock()
       return 0
    end
    if current == nil then
       limit_counter:set(key, 1, 1) --第一次需要设置过期时间,设置key的值为1,过期时间为1秒
    else
        limit_counter:incr(key, 1) --第二次开始加1即可
    end
    lock:unlock()
    return 1
end
ngx.print(acquire())

实现中我们需要使用lua-resty-lock互斥锁模块来解决原子性问题(在实际工程中使用时请考虑获取锁的超时问题),并使用ngx.shared.DICT共享字典来实现计数器。如果需要限流则返回0,否则返回1。使用时需要先定义两个共享字典(分别用来存放锁和计数器数据):

http {

    ……

    lua_shared_dict locks 10m;

    lua_shared_dict limit_counter 10m;

}

有人会纠结如果应用并发量非常大那么redis或者nginx是不是能抗得住;不过这个问题要从多方面考虑:你的流量是不是真的有这么大,是不是可以通过一致性哈希将分布式限流进行分片,是不是可以当并发量太大降级为应用级限流;对策非常多,可以根据实际情况调节;像在京东使用Redis+Lua来限流抢购流量,一般流量是没有问题的。

参考:

2、对时间key的存在与否进行判断,并对计数器是否超过阈值进行判断

优点

四.应用级限流(tomcat)

对于一个应用系统来说一定会有极限并发/请求数,即总有一个TPS/QPS阀值,如果超了阀值则系统就会不响应用户请求或响应的非常慢,因此我们最好进行过载保护,防止大量请求涌入击垮系统。

如果你使用过Tomcat,其Connector 其中一种配置有如下几个参数:
acceptCount:如果Tomcat的线程都忙于响应,新来的连接会进入队列排队,如果超出排队大小,则拒绝连接;
maxConnections: 瞬时最大连接数,超出的会排队等待;
maxThreads:Tomcat能启动用来处理请求的最大线程数,如果请求处理量一直远远大于最大线程数则可能会僵死。
详细的配置请参考官方文档。另外如Mysql(如max_connections)、Redis(如tcp-backlog)都会有类似的限制连接数的配置。

参考:

if(!cacheDao.hasKey(API_WEB_TIME_KEY)) {

            cacheDao.putToValue(API_WEB_TIME_KEY,0,(long)1, TimeUnit.SECONDS);
            cacheDao.putToValue(API_WEB_COUNTER_KEY,0,(long)2, TimeUnit.SECONDS);//时间到就重新初始化为

        }

        if(cacheDao.hasKey(API_WEB_TIME_KEY)&&cacheDao.incrBy(API_WEB_COUNTER_KEY,(long)1) > (long)400) {


            LOGGER.info("调用频率过快");

        }
         //短信发送逻辑

实现简单

实施结果 
可以达到非常精准的流控,截图会在后续的过程中贴出来。欢迎有疑问的小伙伴们在评论区提出问题,我看到后尽量抽时间回答的

缺点

粒度不够细的情况下, 会出现在同一个窗口时间内出现双倍请求数

 

注意

一、场景描述                                                                                                

尽量保持时间粒度精细

     很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统。

场景分析

    也就是面对大流量时,如何进行流量控制?

eg. 1000/3s 的限流

    服务接口的流量控制策略:分流、降级、限流等。本文讨论下限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的高可用。

极端情况1:

     实际场景中常用的限流策略:

第1秒请求数 10

  • Nginx前端限流

第2秒请求数 10

         按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

第3秒请求数 980

  • 业务应用系统限流

第4秒请求数 900

        1、客户端限流

第5秒请求数 100

        2、服务端限流

第6秒请求数 0

  • 数据库限流

此时注意第3~5秒内的总请求数高达 1980

        红线区,力保数据库

极端情况2:

二、常用的限流算法                                                                                       

版权声明:本文由美高梅开户送58元官网发布于美高梅网上游戏,转载请注明出处:计数器模式,    服务接口的流量控制策略