9、线程池 2022-02-15 19:33 > 以下基于jdk1.8 ### 1、线程工厂 线程工厂就是用来创建线程的,内部把一些东西给包掉(名字啊,管理器啊什么的),让你只需要使用newThread()就能得到一个线程。 `Executors`工具类内部静态类:默认线程工厂。所有线程池如果不指定工厂都是使用的默认线程工厂 ```java /** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } /** * Returns a default thread factory used to create new threads. */ public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } ``` 当需要创建线程时,调用`newThread()`,每次调用threadNumber变量都会自增。也就造成了打印日志时线程名字不同。pool-1-thread-1、pool-1-thread-2 使用: ```java public static void main(String[] args) { ThreadFactory threadFactory = Executors.defaultThreadFactory(); Thread thread = threadFactory.newThread(() -> { log.info("我是runnable的方法"); }); thread.start(); Thread thread2 = threadFactory.newThread(() -> { log.info("我是第二个线程runnable的方法"); }); thread2.start(); } ``` 输出: ```verilog 14:58:21.547 [pool-1-thread-1] INFO com.ppdai.ddp.third.maas5g.web.thread.ThreadPoolDemo1 - 我是runnable的方法 14:58:21.547 [pool-1-thread-2] INFO com.ppdai.ddp.third.maas5g.web.thread.ThreadPoolDemo1 - 我是第二个线程runnable的方法 ``` ### 2、拒绝策略 `ThreadPoolExecutor`内置4种拒绝策略,静态内部类。都实现于`RejectedExecutionHandler`接口: ```java public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); } ``` #### ①、丢弃策略 `DiscardPolicy` Discard:丢弃。直接就是一个空的方法实现,到时候主线程执行rejectedExecution(r, e)使用拒绝策略时,把线程任务传进来,好家伙这方法内部啥都不执行。 ```java /** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } ``` #### ②、中止策略 `AbortPolicy` (默认策略) Abort:中止。直接就抛出一个异常:`RejectedExecutionException`(实现于`RuntimeException`)。到时候具体怎么拒绝就看外部怎么接了,我就抛给你一个异常,看你怎么处理吧。 ```java /** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } ``` #### ③、调用者执行策略 ` CallerRunsPolicy ` Caller Runs :谁调用,谁执行。当外部主线程调用rejectedExecution(r, e)执行拒绝策略时,诶,我方法内部就直接执行r.run(),这可是直接调用方法啊。我们都知道,线程执行要调用start()才能是正常的开出一个线程去执行,那你一直接调用run(),那就是普通的方法执行啊,那执行者是谁呢?就是调用者!所以就通过直接执行r.run()达到了谁调用谁执行的目的。这就是它的拒绝策略,诶,我执行不了,给你,你执行。 ```java /** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } ``` #### ④、丢弃最老策略 ` DiscardOldestPolicy ` Discard Oldest:丢弃,最老的。这个拒绝策略处理方法内部就是直接弹出一个最老的线程任务,弹出了就没了,然后直接把当前线程任务加入线程池中。执行的时候还是要走流程,就是看有没有空闲线程啊之类那些步骤的。并不是一加入立即就执行了。只是说丢一个老的,把新的加入进去。 ```java /** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } ``` #### 一图胜千言: ![](http://minio.riun.xyz/riun1/2021-12-16_1IxMhks8PEvobuzMx7.jpg) 其实这个执行策略里右下角那个DiscardOldestPolicy,并不一定会执行。只是说他给了线程任务机会,让他进去了,有可能执行的。左边两个是直接丢弃了。 ### 3、线程池 #### 基本介绍 **核心参数:** - corePoolSize(int) 线程池的核心线程数 - maximumPoolSize(int) 能容纳的最大线程数 - keepAliveTime(long) 空闲线程存活时间 - unit(TimeUnit) 存活的时间单位 - workQueue(BlockingQueue<Runnable>) 存放已提交但未执行的线程任务的队列 - threadFactory(ThreadFactory) 创建线程的工厂 - handler(RejectedExecutionHandler) 无法处理更多线程任务时拒绝策略 **执行流程:** > 线程池创建之后,池中的线程数为零。当加入线程任务时 1、若当前运行线程数<核心线程数,则立即创建线程去执行线程任务; 2、若当前运行线程数≥核心线程数,则将新来的线程任务放入队列中;(一旦有空闲的线程可用,就会从队列中按照FIFO取出线程任务进行执行) 3、若当前运行线程数≥核心线程数,且线程任务等待队列也满了,则判断当前运行线程数与可容纳的最大线程数: - 若当前运行线程数<可容纳最大线程数,则立即继续创建线程去执行当前线程任务。 - 若当前运行线程数≥可容纳最大线程数,则执行拒绝策略,拒绝线程任务。 > 规则: > > 1、 当一个线程完成任务时,它会从队列中取下一个任务来执行 > > 2、当一个线程无事可做超过keepAliveTime时间时,会进行判断: > > - 若当前运行线程数>核心线程数,那么这个线程会被停掉 > > 当线程池的所有任务都被完成时,它会将池中的线程数量缩小并维持到核心线程数大小。 ![]( http://minio.riun.xyz/riun1/2021-08-30_11VyhYjo7pRTIbCRXz.jpg ) #### JDK提供的几种线程池 ##### ①、newSingleThreadExecutor() **单一**线程池:一池一线程:核心线程数和最大线程数都是1,阻塞队列是无界的。single:单一的 ```java ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); /* return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); new LinkedBlockingQueue<Runnable>(): public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } */ ``` ##### ②、newFixedThreadPool() **固定**线程池:一池多线程:核心线程数和最大线程数是固定的,可传入,阻塞队列是无界的。fixed:固定的。 ```java ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4); /* return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>(): public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } */ ``` ##### ③、newCachedThreadPool() **可扩容**线程池:一池多线程:核心线程数是0,最大线程数无限大,阻塞队列容量只有1。这两个特性意味着只要有新任务加入,就不停创建线程执行,达到最大值时,向阻塞队列放一个,就不能放了,必须等待空出一个线程把阻塞队列中的线程执行了,才能继续向阻塞队列中放 ```java ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); /* return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); */ ``` ##### ④、newScheduledThreadPool() **定时**任务线程池:核心线程数固定,可传入;最大线程数无限大。等待时间,延迟队列 ```java ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(12); /* return new ScheduledThreadPoolExecutor(corePoolSize); public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } */ ``` ##### ⑤、newWorkStealingPool() 抢占线程池。给定一个或者由系统自己得出自身机器允许的并行线程数量。 //待补充 #### 为什么不允许使用默认方式创建线程池? 阿里巴巴手册这样规定: ![](http://minio.riun.xyz/riun1/2021-12-16_1IBgZX0FGq7hiXFuE4.jpg) SingleThreadPool和FixedThreadPool(单一和定长):允许的请求队列长度为Integer.MAX_VALUE,可能堆积大量请求,导致OOM。 CachedThreadPool和ScheduledThreadPool(可缓存和定时):允许创建线程数量为Integer.MAX_VALUE,可能创建大量线程,导致OOM。 #### Work ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } ``` #### 工作原理 exectu ### 4、扩展线程池 #### ForkJoin 将大任务拆分成小任务,分别处理,最后汇总返回。 参考: https://www.cnblogs.com/gjmhome/p/14410760.html https://blog.csdn.net/hotdust/article/details/71480762 ![](http://minio.riun.xyz/riun1/2022-01-10_1Sm9AZcawvAtvgFd6T.jpg) 说明: `ForkJoinPool.commonPool()` ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。 文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”(Runtime.getRuntime().availableProcessors() - 1)。 ForkJoinTask自己启动时,使用的就是这个静态实例。 ForkJoinPool提交任务并执行: execute(异步,不返回结果) submit(异步,返回结果,返回`ForkJoinTask<T>`。因为`ForkJoinTask`是`Future`,所以可使用其get方法获取返回结果,但是有可能获取的时候还没执行完) invoke(同步,返回结果,返回`T`) 示例: ```java package com.example.demo.core.ForkJoin; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; /** * @author: HanXu * on 2022/1/7 * Class description: Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。 */ public class ForkJoinDemo1 { private static Random random = new Random(0); private static long random() { return random.nextInt(10000); } public static void main(String[] args) { // 创建2000个随机数组成的数组: long[] array = new long[2000]; long expectedSum = 0; for (int i = 0; i < array.length; i++) { array[i] = random(); expectedSum += array[i]; } System.out.println("预期的总和为: " + expectedSum + "\n"); // fork/join处理: ForkJoinTask<Long> task = new SumTask(array, 0, array.length); Long result = ForkJoinPool.commonPool().invoke(task); System.out.println("Fork/join方式计算 总和: " + result); } } class SumTask extends RecursiveTask<Long> { static final int THRESHOLD = 500; long[] array; int start; int end; SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } //计算 @Override protected Long compute() { if (end - start <= THRESHOLD) { // 如果任务足够小,直接计算: long sum = 0; for (int i = start; i < end; i++) { sum += this.array[i]; } return sum; } else { // 任务太大,一分为二: int middle = (end + start) / 2; System.out.println(String.format("split [%d~%d) ==> [%d~%d), [%d~%d)", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); //invokeAll会并行云心个两个子任务 invokeAll(subtask1, subtask2); //获取子任务的结果 Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); //返回总和 return result; } } } ``` ```java package com.example.demo.core.ForkJoin; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; /** * @author: HanXu * on 2022/1/10 * Class description: ForkJoin处理没有返回值的任务 */ public class ForkJoinDemo2 { private static final CountDownLatch cdl = new CountDownLatch(10); public static void main(String[] args) throws InterruptedException { //任务 CountTask countTask = new CountTask(1, 11, cdl); //线程池 ForkJoinPool forkJoinPool = new ForkJoinPool(); //执行 forkJoinPool.execute(countTask); cdl.await(); } } class CountTask extends RecursiveAction { //分割阈值 private static final int THREAD_HOLD = 2; private int start; private int end; private CountDownLatch cdl; public CountTask(int start, int end, CountDownLatch cdl) { this.start = start; this.end = end; this.cdl = cdl; } @Override protected void compute() { boolean canCompute = (end - start) <= THREAD_HOLD; if (canCompute) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + "正在处理:" + i + "号任务"); cdl.countDown(); } } else { int middle = (start + end) / 2; CountTask leftCountTask = new CountTask(start, middle, cdl); CountTask rightCountTask = new CountTask(middle, end, cdl); //异步执行两个任务 leftCountTask.fork(); rightCountTask.fork(); } } } ``` --END--
发表评论