合肥培训网站建设郑州专业网站建设公司
本文主要介绍了几种限流方法:Guava RateLimiter、简单计数、滑窗计数、信号量、令牌桶,漏桶算法和nginx限流等等
1、引入guava集成的工具
pom.xml 文件
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>23.0</version></dependency>
demo代码实现
package com.znkeji.zn_wifi_carck.guava;import com.google.common.util.concurrent.RateLimiter;public class GuavaRateLimiterTest {//比如每秒生产10个令牌,相当于每100ms生产1个令牌//RateLimiter是Guava库中的一个限流器// 基于PPS进行限流//基于PPS限流的同时提供热启动private RateLimiter rateLimiter=RateLimiter.create(10);/*** 模拟执行业务方法*/public void exeBiz(){if (rateLimiter.tryAcquire(1)){///返回boolean 尝试获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话try {Thread.sleep(50);}catch (Exception e){e.printStackTrace();}System.out.println("线程"+Thread.currentThread().getName()+":执行业务逻辑");}else {System.out.println("线程"+Thread.currentThread().getName()+":被限流了");}}public static void main(String[] args) throws InterruptedException {GuavaRateLimiterTest guavaRateLimiterTest = new GuavaRateLimiterTest();System.out.println("线程开始等待");Thread.sleep(500); //等待500ms,让limiter生产一些令牌System.out.println("线程睡了0.5秒");//模拟瞬间生产100个线程请求for (int i=0;i<100;i++){new Thread(guavaRateLimiterTest::exeBiz).start();}}}
2.令牌桶算法
package com.znkeji.zn_wifi_carck.guava;import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** 令牌桶算法:一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌,如有剩余容量则添加,没有则放弃。如果有请求进来,则需要先从桶里获取令牌,当桶里没有令牌可取时,则拒绝任务。** 令牌桶的优点是:可以改变添加令牌的速率,一旦提高速率,则可以处理突发流量。*/
public class TokenBucket {/*** 定义的桶*/public class Bucket {//容量int capacity;//速率,每秒放多少int rateCount;//目前token个数AtomicInteger curCount = new AtomicInteger(0);public Bucket(int capacity, int rateCount) {this.capacity = capacity;this.rateCount = rateCount;}public void put() {if (curCount.get() < capacity) {System.out.println("目前数量==" + curCount.get() + ", 我还可以继续放");curCount.addAndGet(rateCount);}}public boolean get() {if (curCount.get() >= 1) {curCount.decrementAndGet();return true;}return false;}}// @Testpublic void testTokenBucket() throws InterruptedException {Bucket bucket = new Bucket(5, 2);//固定线程,固定的速率往桶里放数据,比如每秒N个ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(1);scheduledCheck.scheduleAtFixedRate(() -> {bucket.put();}, 0, 1, TimeUnit.SECONDS);//先等待一会儿,让桶里放点tokenThread.sleep(6000);//模拟瞬间10个线程进来拿tokenfor (int i = 0; i < 10; i++) {new Thread(() -> {if (bucket.get()) {System.out.println(Thread.currentThread() + "获取到了资源");} else {System.out.println(Thread.currentThread() + "被拒绝");}}).start();}//等待,往桶里放tokenThread.sleep(3000);//继续瞬间10个线程进来拿tokenfor (int i = 0; i < 10; i++) {new Thread(() -> {if (bucket.get()) {System.out.println(Thread.currentThread() + "获取到了资源");} else {System.out.println(Thread.currentThread() + "被拒绝");}}).start();}}
}
3、滑窗计数器
package com.znkeji.zn_wifi_carck.guava;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;/*** 滑窗计数器 打个比方,某接口每秒允许100个请求,设置一个滑窗,窗口中有10个格子,每个格子占100ms,每100ms移动一次。滑动窗口的格子划分的越多,滑动窗口的滚动就越平滑,限流的统计就会越精确。*/
public class SliderWindowRateLimiter implements Runnable {//每秒允许的最大访问数private final long maxVisitPerSecond;//将每秒时间划分N个块private final int block;//每个块存储的数量private final AtomicLong[] countPerBlock;//滑动窗口划到了哪个块儿,可以理解为滑动窗口的起始下标位置private volatile int index;//目前总的数量private AtomicLong allCount;/*** 构造函数** @param block,每秒钟划分N个窗口* @param maxVisitPerSecond 每秒最大访问数量*/public SliderWindowRateLimiter(int block,long maxVisitPerSecond){this.block=block;this.maxVisitPerSecond=maxVisitPerSecond;countPerBlock= new AtomicLong[block];for (int i=0;i<block;i++){countPerBlock[i]=new AtomicLong();}allCount=new AtomicLong(0);}/*** 判断是否超过最大允许数量** @return*/public boolean isOverLimit() {return currentQPS() > maxVisitPerSecond;}/*** 获取目前总的访问数** @return*/public long currentQPS() {return allCount.get();}/*** 请求访问进来,判断是否可以执行业务逻辑*/public void visit(){countPerBlock[index].incrementAndGet();allCount.incrementAndGet();if (isOverLimit()){System.out.println(Thread.currentThread().getName()+"被限流了,currentQPS:"+currentQPS());}else {System.out.println(Thread.currentThread().getName()+"执行业务逻辑,currentQPS:"+currentQPS());}}/*** 定时执行器,* 每N毫秒滑块移动一次,然后再设置下新滑块的初始化数字0,然后新的请求会落到新的滑块上* 同时总数减掉新滑块上的数字,并且重置新的滑块上的数量*/@Overridepublic void run() {index=(index+1)%block;long val= countPerBlock[block].getAndSet(0);allCount.addAndGet(-val);}public static void main(String[] args) {SliderWindowRateLimiter sliderWindowRateLimiter = new SliderWindowRateLimiter(10, 100);//固定的速率移动滑块ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();scheduledExecutorService.scheduleAtFixedRate(sliderWindowRateLimiter, 100, 100, TimeUnit.MILLISECONDS);//模拟不同速度的请求new Thread(() -> {while (true) {sliderWindowRateLimiter.visit();try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}).start();//模拟不同速度的请求new Thread(() -> {while (true) {sliderWindowRateLimiter.visit();try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}
4、信号量
package com.znkeji.zn_wifi_carck.guava;import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;/*** 利用Semaphore,每隔固定速率,释放Semaphore的资源。线程获取到资源,则执行业务代码。*/
public class SemaphoreOne {private static Semaphore semaphore = new Semaphore(10);public static void bizMethod() throws InterruptedException {if (!semaphore.tryAcquire()) {System.out.println(Thread.currentThread().getName() + "被拒绝");return;}System.out.println(Thread.currentThread().getName() + "执行业务逻辑");Thread.sleep(500);//模拟处理业务逻辑需要1秒semaphore.release();}public static void main(String[] args) {Timer timer = new Timer();timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {semaphore.release(10);System.out.println("释放所有锁");}}, 1000, 1000);for (int i = 0; i < 10000; i++) {try {Thread.sleep(10);//模拟每隔10ms就有1个请求进来} catch (InterruptedException e) {e.printStackTrace();}new Thread(() -> {try {SemaphoreOne.bizMethod();} catch (InterruptedException e) {e.printStackTrace();}}).start();}}
}