限流器 2023-02-17 15:45 限流器 > 参考https://www.cnblogs.com/yaochunhui/p/14135456.html 限流,指的是限制流量,就是一个接口,不能太过频繁的被外部访问。因此我们需要对请求做一个限制。 ### 1、固定窗口限流器 设定固定的时间周期,在这个周期内只能有固定的请求数访问,多出来的就被限制不能访问。 #### 代码: ##### pom ```xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project> ``` ##### 抽象限流器: ```java package com.example.demo.core; import java.util.concurrent.TimeUnit; /** * @author: HanXu * on 2022/2/23 * Class description: 抽象限流器 * 限流器的可变对象只有: * 1、限流标志limited 使用volatile修饰 * 2、计数器counter,因为要自增,所以需要使用AtomicInteger,子类实现此属性 */ public abstract class CounterLimit { /** * 允许访问的次数 */ protected int limitCount; /** * 规定时间内 */ protected long limitTime; /** * 时间单位,默认是秒 */ protected TimeUnit timeUnit; /** * 是否已限流?true 是 false 否 */ protected volatile boolean limited; /** * 尝试访问(计数器+1) * @return true 正常访问 false 已限流,不允许访问 */ public abstract boolean tryCount(); } ``` ##### 固定窗口限流器: > 一个线程维护计数器counter:以固定时间周期归零counter和limitCount limitTime的值;开放一个方法,逻辑是判断是否能访问,counter达到limitCount则不能访问 ```java package com.example.demo.core; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author: HanXu * on 2022/2/23 * Class description: 固定窗口限流器 */ @Slf4j public class FixedWindowsLimiter extends CounterLimit { /** * 计数器 */ private AtomicInteger counter = new AtomicInteger(0); public FixedWindowsLimiter(int limitCount, long limitTime) { this(limitCount, limitTime, TimeUnit.SECONDS); } public FixedWindowsLimiter(int limitCount, long limitTime, TimeUnit timeUnit) { this.limitCount = limitCount; this.limitTime = limitTime; this.timeUnit = timeUnit; //开启线程维护计数器:以limitTime时间为周期,对limitCount清零 new Thread(new CounterLimitThread()).start(); } @Override public boolean tryCount() { //为什么要放在死循环里?因为counter.compareAndSet的操作可能失败(被其他线程抢先执行了),所以要再执行一次。 while (true) { //如果已限流,则返回false不可访问 if (limited) { return false; } else { //如果没有限流,则判断当前计数器与limitCount的大小 int currentCount = counter.get(); if (currentCount == limitCount) { //计数器已经自增到limitCount了,要开始限流了 limited = true; return false; } else { //这里需要检测是否设置成功,没有成功的话,说明被其他线程访问了 if (counter.compareAndSet(currentCount, currentCount + 1)) { return true; } } } } } class CounterLimitThread implements Runnable { @Override public void run() { while (true) { try { //睡眠limitTime时间 timeUnit.sleep(limitTime); //醒来后重置限流标志 limited = false; //重置计数器(不能比较再设置) // counter.compareAndSet(limitCount, 0); counter.set(0); } catch (Exception e) { e.printStackTrace(); } } } } } ``` ##### 测试类: ```java package com.example.demo.controller; import com.example.demo.core.CounterLimit; import com.example.demo.core.FixedWindowsLimiter; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author: HanXu * on 2022/2/23 * Class description: 测试类 */ @Slf4j @RestController public class TestController { /** * 固定窗口限流器 */ private CounterLimit fixedWindowsLimiter = new FixedWindowsLimiter(10, 10); @RequestMapping("/hello1") public String hello1() { if (!fixedWindowsLimiter.tryCount()) { log.info("不能访问,限流"); return "限流"; } log.info("正常访问"); // todo something return "hello world"; } } ``` ##### 如何测试? 启动项目后,不停访问`http://localhost:8080/hello1`,观察页面,前10次都是` hello world `,第11次出现`限流`。(这些请求的时间间隔需要在10秒钟之内,且需要以启动项目时为周期) #### 原理 我们创建`FixedWindowsLimiter`时,指定了limitCount和limitTime,即指定时间内可访问的次数。`fixedWindowsLimiter`对象创建完毕后,会开启一个线程维护计数器`new Thread(new CounterLimitThread()).start();`。 ##### 维护线程任务 不停的循环,循环内部先睡眠limitTime时间(以此作为周期);醒来后,重置限流标志limited和计数器counter。 注意这里不能使用`counter.compareAndSet(limitCount, 0);`,虽然原文是这么些的,实际上这是错误的。这样会导致某些请求状态下,两个limitTime周期内只能访问一个limitCount次数。 比如:第一个10秒周期内的前3秒,我访问了8次接口;然后一直到第10秒维护线程去维护counter时,使用`counter.compareAndSet(limitCount, 0);`,发现counter是8,而limitCount是10,因此就无法设置counter为0。于是维护线程再次睡眠。睡眠过程有10秒,这10秒是第二个周期,这个过程中如果我再次访问,只能访问2次(counter是8,自增到10就限流了),所以是错误的。 正确的方式是到周期后直接设置为0,使用`counter.set(0);`。 ##### 尝试访问 每次请求访问时,先调用tryCount(),确定能否访问。tryCount()内部,判断,如果已经限流,则直接返回。 没有限流,就判断当前计数器与limitCount的大小,如果相等,则代表此周期时间内从当前请求开始不能访问(限流),就将limited置为true,并返回。 如果不相等,则CAS方式自增计数器,自增成功,则返回能访问;自增失败,则重新循环本方法。 注意:这个方法内部要是一个while(true)死循环,目的是防止高并发情况下许多请求同时访问,同时CAS操作,如果某一个请求先设置成功,那么其他请求必然设置失败,就要重新执行tryCount()内部流程。例如有12个请求同时访问,那么多次循环后只有10个请求能够访问,最后的2个请求则每次都没有自增成功计数器counter的值,所以直到第11次循环,发现当前时间周期内已经不能访问了,就被限流了,返回false。 #### 缺陷 固定窗口限流器由于固定了时间窗口,所以必然会产生限流不均匀的问题:  这会导致在短时间内有超出limitCount的请求访问次数。 ### 2、滑动窗口计数器 #### 原理:  #### 代码: ```java public abstract class CounterLimit { /** 单位时间限制数 */ protected int limitCount; /** 限制时间 */ protected long limitTime; /** 时间单位,默认为秒 */ protected TimeUnit timeUnit; /** 当前是否为受限状态 */ protected volatile boolean limited; /** * 尝试将计数器加1,返回为true表示能够正常访问接口,false表示访问受限 * @return */ protected abstract boolean tryCount(); } public class SlidingWindowCounterLimit extends CounterLimit { private static Logger logger = LoggerFactory.getLogger(SlidingWindowCounterLimit.class); /** * 格子分布 */ private AtomicInteger[] gridDistribution; /** * 当前时间在计数分布的索引 */ private volatile int currentIndex; /** * 当前时间之前的滑动窗口计数 */ private int preTotalCount; /** * 格子数 */ private int gridNumber; /** * 是否正在执行状态重置 */ private volatile boolean resetting; public SlidingWindowCounterLimit(int gridNumber, int limitCount, long limitTime) { this(gridNumber, limitCount, limitTime, TimeUnit.SECONDS); } public SlidingWindowCounterLimit(int gridNumber, int limitCount, long limitTime, TimeUnit timeUnit) { if (gridNumber <= limitTime) throw new RuntimeException("无法完成限流,gridNumber必须大于limitTime,gridNumber = " + gridNumber + ",limitTime = " + limitTime); this.gridNumber = gridNumber; this.limitCount = limitCount; this.limitTime = limitTime; this.timeUnit = timeUnit; gridDistribution = new AtomicInteger[gridNumber]; for (int i = 0; i < gridNumber; i++) { gridDistribution[i] = new AtomicInteger(0); } new Thread(new CounterResetThread()).start(); } public boolean tryCount() { while (true) { if (limited) { return false; } else { int currentGridCount = gridDistribution[currentIndex].get(); if (preTotalCount + currentGridCount == limitCount) { logger.info("限流:{}", LocalDateTime.now().toString()); limited = true; return false; } if (!resetting && gridDistribution[currentIndex].compareAndSet(currentGridCount, currentGridCount + 1)) return true; } } } /** * 单开线程维护格子,及每格的请求数 */ class CounterResetThread implements Runnable { @Override public void run() { while (true) { try { timeUnit.sleep(1); // 停止1个时间单位 int indexToReset = currentIndex - limitCount - 1; // 要重置计数的格子索引 if (indexToReset < 0) indexToReset += gridNumber; resetting = true; // 防止在更新状态时,用户访问接口将当前格子的访问量 + 1 preTotalCount = preTotalCount - gridDistribution[indexToReset].get() + gridDistribution[currentIndex++].get(); // 重置当前时间之前的滑动窗口计数 if (currentIndex == gridNumber) currentIndex = 0; if (preTotalCount + gridDistribution[currentIndex].get() < limitCount) limited = false; // 修改当前状态为不受限 resetting = false; logger.info("当前格子:{},重置格子:{},重置格子访问量:{},前窗口格子总数:{}", currentIndex, indexToReset, gridDistribution[indexToReset].get(), preTotalCount); gridDistribution[indexToReset].set(0); } catch (InterruptedException e) { e.printStackTrace(); } } } } } ``` #### 使用: ```java @Bean public SlidingWindowCounterLimit slidingWindowCounterLimit() { return new SlidingWindowCounterLimit(20, 10, 10); } @Resource private SlidingWindowCounterLimit slidingWindowCounterLimit; @RequestMapping("/limit") public String limit() { if (slidingWindowCounterLimit.tryCount()) { log.info("pass"); return "pass"; } else { log.info("error"); return "error"; } } ``` ### 3、令牌桶 #### 原理:  #### 代码: ```java package xyz.riun.img.utis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author: HanXu * on 2024/7/16 * Class description: 令牌桶限流 */ public class TokenBucketLimit { private static Logger logger = LoggerFactory.getLogger(TokenBucketLimit.class); /** * 给定时间生成令牌数 */ private int genNumber; /** * 生成令牌所花费的时间 */ private int genTime; /** * 时间单位,默认为秒 */ private TimeUnit timeUnit; /** * 最大令牌数 */ private int maxNumber; /** * 已存储的令牌数 */ private AtomicInteger storedNumber; public TokenBucketLimit(int genNumber, int genTime, int maxNumber) { this(genNumber, genTime, TimeUnit.SECONDS, maxNumber); } public TokenBucketLimit(int genNumber, int genTime, TimeUnit timeUnit, int maxNumber) { this.genNumber = genNumber; this.genTime = genTime; this.timeUnit = timeUnit; this.maxNumber = maxNumber; this.storedNumber = new AtomicInteger(0); new Thread(new TokenGenerateThread()).start(); } /** * 从桶中取出令牌,能取到则可以访问;取不到则限流 * @return */ public boolean tryAcquire() { while (true) { int currentStoredNumber = storedNumber.get(); if (currentStoredNumber == 0) { logger.info("限流:{}", LocalDateTime.now().toString()); return false; } if (storedNumber.compareAndSet(currentStoredNumber, currentStoredNumber - 1)) { return true; } } } /** * 单开线程,以一定速率向桶中添加令牌 */ class TokenGenerateThread implements Runnable { @Override public void run() { while (true) { if (storedNumber.get() == maxNumber) { logger.info("当前令牌数已满"); try { timeUnit.sleep(genTime); } catch (InterruptedException e) { e.printStackTrace(); } } else { int old = storedNumber.get(); int newValue = old + genNumber; if (newValue > maxNumber) newValue = maxNumber; storedNumber.compareAndSet(old, newValue); logger.info("生成令牌数:{},当前令牌数:{}", genNumber, newValue); try { timeUnit.sleep(genTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } ``` ### 4、漏桶 #### 原理:  #### 代码: ```java package xyz.riun.img.utis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author: HanXu * on 2024/7/16 * Class description: 漏桶限流 */ public class LeakyBucketLimit { private static Logger logger = LoggerFactory.getLogger(LeakyBucketLimit.class); /** * 桶最大容量 */ private int maxNumber; /** * 时间单位,默认为秒 */ private TimeUnit timeUnit; /** * 单位时间内泄露的数量 */ private int leakNumber; /** * 泄露的单位时间 */ private int leakTime; /** * 桶中剩余数量 */ private AtomicInteger remainingNumber; public LeakyBucketLimit(int leakNumber, int leakTime, int maxNumber) { this(leakNumber, leakTime, TimeUnit.SECONDS, maxNumber); } public LeakyBucketLimit(int leakNumber, int leakTime, TimeUnit timeUnit, int maxNumber) { this.leakNumber = leakNumber; this.leakTime = leakTime; this.timeUnit = timeUnit; this.maxNumber = maxNumber; this.remainingNumber = new AtomicInteger(0); new Thread(new LeakThread()).start(); } /** * 把请求放在桶中 * @return */ public boolean tryAcquire() { while (true) { int currentStoredNumber = remainingNumber.get(); if (currentStoredNumber == maxNumber) { logger.info("限流:{}", LocalDateTime.now().toString()); return false; } if (remainingNumber.compareAndSet(currentStoredNumber, currentStoredNumber + 1)) { return true; } } } /** * 单开线程,将桶中的请求以一定速率漏出 */ class LeakThread implements Runnable { @Override public void run() { while (true) { if (remainingNumber.get() == 0) { logger.info("当前桶已空"); try { timeUnit.sleep(leakTime); } catch (InterruptedException e) { e.printStackTrace(); } } else { int old = remainingNumber.get(); int newValue = old - leakNumber; if (newValue < 0) newValue = 0; remainingNumber.compareAndSet(old, newValue); logger.info("泄露:{},当前:{}", leakNumber, newValue); try { timeUnit.sleep(leakTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } ``` ### 5、Google guava的RateLimiter --END--
发表评论