线程池并不神秘!
ThreadPoolExecutor其实是实现了ExecutorService接口的具体实现。而ExecutorService继承了Executor接口,在Executor接口中定义了一系列execute(Runnable command)接口。ExecutorService二级接口中有添加了更多的功能接口如shutdown(); submit(Callable<T> task);
ThreadPoolExecutor类中有一系列重要的属性变量:(可暂时略过,不过看完后面的逻辑,相信你会回来找的)
//控制线程池的生命周期的参数 volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3; //用来存放任务的一个队列,队列里的任务会转接给woker变量里的线程处理 private final BlockingQueueworkQueue; //针对操作ThreadPoolExecutor对象成员变量的锁。 private final ReentrantLock mainLock = new ReentrantLock(); //用来支持awaitTermination的Wait condition private final Condition termination = mainLock.newCondition(); //包含线程池中所有的worker threads. 当持有mainLock锁的时候才有访问权限 private final HashSet workers = new HashSet (); /** * 线程空闲等待超时时间,以毫秒记。 * 当线程池线程数量大于corePoolSize或者allowCoreThreadTimeOut为true时使用。 */ private volatile long keepAliveTime; /** * false: 即使线程处于空闲状态,仍然存活(不会使用keepAliveTime去判断是否结束线程) * true: 如果线程处于空闲状态,则按照keepAliveTime判断结束线程 */ private volatile boolean allowCoreThreadTimeOut; //线程池常驻线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见. private volatile int corePoolSize; // 线程池最大线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见. private volatile int maximumPoolSize; // 线程池当前线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见. private volatile int poolSize; //线程池数量饱和或线程池关闭时会调用的处理类 private volatile RejectedExecutionHandler handler; //线程池添加新线程的工厂类。线程池中所有的线程都由工厂类生成。 private volatile ThreadFactory threadFactory; //线程池达到的最大大小. private int largestPoolSize; //线程池完成任务计数器,worker threads中止时会更新. private long completedTaskCount; //线程池拒绝执行task的默认处理类 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
线程池执行任务核心入口方法execute()具体代码如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated }}
过程如下:
- 执行的任务command为null时,抛出异常
- poolSize < corePoolSize时,表示线程池的数量小于线程池预期的常驻线程数量,调用addIfUnderCorePoolSize方法:1)生成新的thread对象;2)执行线程任务
-
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
addThread()并不是threadFactory.newThread()那么简单。woker对象初始化的时候意味着线程池里有新线程的生成,这个时候传入的Runnable实现类被标记为首个任务。1)生成worker对象;2)生成新的线程;3)新线程指派给worker对象,并将woker对象放入阻塞队列中;4)增加线程池当前线程数量,修改线程池到达最大数量
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w);//注意此时新线程的run实现由Woker提供 if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
线程池内线程具体执行任务的代码:1)取出开始的首任务执行 2)遍历从workQueue中取任务执行
/** * Main run loop */ public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task);//里边还会递归调用run方法 task = null; } } finally { workerDone(this); } } }
线程池如何保证线程一直运行呢?getTask()时阻塞队列的take()实现会让线程在无任务时进入阻塞状态(RetrantLock和Condition实现)
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll();//取出并删除队列顶部一个元素,失败时并不会阻塞而是返回false else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take();//取出并删除队列顶部一个元素,如果workQueue无任务则当前线程进入阻塞状态 if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
- poolSize >= corePoolSize时,此时不会创建新的线程,而是把这些任务放到待工作队列workQueue
workQueue.offer(command)
- poolSize >= corePoolSize且工作队列workQueue满员时(对于无界队列貌似不会发生):1)生成新的thread对象;2)执行线程任务
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING)//需要线程池线程数量小于最大数 t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
- 如果仍然往一个工作负荷满载的线程池指派任务的话,那么线程池会使用拒绝执行策略来处理。具体的策略类有很多可以选择,甚至可以自己实现。
void reject(Runnable command) { handler.rejectedExecution(command, this); }