博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java concurrent之ThreadPoolExecutor线程池
阅读量:6606 次
发布时间:2019-06-24

本文共 5750 字,大约阅读时间需要 19 分钟。

  hot3.png

线程池并不神秘!

ThreadPoolExecutor其实是实现了ExecutorService接口的具体实现。而ExecutorService继承了Executor接口,在Executor接口中定义了一系列execute(Runnable command)接口。ExecutorService二级接口中有添加了更多的功能接口如shutdown(); submit(Callable<T> task)

ThreadPoolExecutor类中有一系列重要的属性变量:(可暂时略过,不过看完后面的逻辑,相信你会回来找的)

//控制线程池的生命周期的参数    volatile int runState;    static final int RUNNING    = 0;    static final int SHUTDOWN   = 1;    static final int STOP       = 2;    static final int TERMINATED = 3;    //用来存放任务的一个队列,队列里的任务会转接给woker变量里的线程处理    private final BlockingQueue
workQueue; //针对操作ThreadPoolExecutor对象成员变量的锁。 private final ReentrantLock mainLock = new ReentrantLock(); //用来支持awaitTermination的Wait condition private final Condition termination = mainLock.newCondition(); //包含线程池中所有的worker threads. 当持有mainLock锁的时候才有访问权限 private final HashSet
workers = new HashSet
(); /** * 线程空闲等待超时时间,以毫秒记。 * 当线程池线程数量大于corePoolSize或者allowCoreThreadTimeOut为true时使用。 */ private volatile long keepAliveTime; /** * false: 即使线程处于空闲状态,仍然存活(不会使用keepAliveTime去判断是否结束线程) * true: 如果线程处于空闲状态,则按照keepAliveTime判断结束线程 */ private volatile boolean allowCoreThreadTimeOut; //线程池常驻线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见. private volatile int corePoolSize; // 线程池最大线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见. private volatile int maximumPoolSize; // 线程池当前线程数量,只有获得mainLock锁时才能修改,但是对所有多线程读操作实时可见. private volatile int poolSize; //线程池数量饱和或线程池关闭时会调用的处理类 private volatile RejectedExecutionHandler handler; //线程池添加新线程的工厂类。线程池中所有的线程都由工厂类生成。 private volatile ThreadFactory threadFactory; //线程池达到的最大大小. private int largestPoolSize; //线程池完成任务计数器,worker threads中止时会更新. private long completedTaskCount; //线程池拒绝执行task的默认处理类 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

线程池执行任务核心入口方法execute()具体代码如下:

public void execute(Runnable command) {	if (command == null)		throw new NullPointerException();	if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {		if (runState == RUNNING && workQueue.offer(command)) {			if (runState != RUNNING || poolSize == 0)				ensureQueuedTaskHandled(command);		}		else if (!addIfUnderMaximumPoolSize(command))			reject(command); // is shutdown or saturated	}}

过程如下:

  1. 执行的任务commandnull时,抛出异常
  2. poolSize < corePoolSize时,表示线程池的数量小于线程池预期的常驻线程数量,调用addIfUnderCorePoolSize方法:1)生成新的thread对象;2)执行线程任务
  3. private boolean addIfUnderCorePoolSize(Runnable firstTask) {        Thread t = null;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (poolSize < corePoolSize && runState == RUNNING)                t = addThread(firstTask);        } finally {            mainLock.unlock();        }        if (t == null)            return false;        t.start();        return true;    }

    addThread()并不是threadFactory.newThread()那么简单。woker对象初始化的时候意味着线程池里有新线程的生成,这个时候传入的Runnable实现类被标记为首个任务。1)生成worker对象;2)生成新的线程;3)新线程指派给worker对象,并将woker对象放入阻塞队列中;4)增加线程池当前线程数量,修改线程池到达最大数量

    private Thread addThread(Runnable firstTask) {        Worker w = new Worker(firstTask);        Thread t = threadFactory.newThread(w);//注意此时新线程的run实现由Woker提供        if (t != null) {            w.thread = t;            workers.add(w);            int nt = ++poolSize;            if (nt > largestPoolSize)                largestPoolSize = nt;        }        return t;    }

    线程池内线程具体执行任务的代码:1)取出开始的首任务执行 2)遍历从workQueue中取任务执行

    /**         * Main run loop         */        public void run() {            try {                Runnable task = firstTask;                firstTask = null;                while (task != null || (task = getTask()) != null) {                    runTask(task);//里边还会递归调用run方法                    task = null;                }            } finally {                workerDone(this);            }        }    }

    线程池如何保证线程一直运行呢?getTask()时阻塞队列的take()实现会让线程在无任务时进入阻塞状态(RetrantLockCondition实现)

    Runnable getTask() {        for (;;) {            try {                int state = runState;                if (state > SHUTDOWN)                    return null;                Runnable r;                if (state == SHUTDOWN)  // Help drain queue                    r = workQueue.poll();//取出并删除队列顶部一个元素,失败时并不会阻塞而是返回false                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);                else                    r = workQueue.take();//取出并删除队列顶部一个元素,如果workQueue无任务则当前线程进入阻塞状态                if (r != null)                    return r;                if (workerCanExit()) {                    if (runState >= SHUTDOWN) // Wake up others                        interruptIdleWorkers();                    return null;                }                // Else retry            } catch (InterruptedException ie) {                // On interruption, re-check runState            }        }    }
  4. poolSize >= corePoolSize时,此时不会创建新的线程,而是把这些任务放到待工作队列workQueue
    workQueue.offer(command)
  5. poolSize >= corePoolSize且工作队列workQueue满员时(对于无界队列貌似不会发生):1)生成新的thread对象;2)执行线程任务
    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {        Thread t = null;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (poolSize < maximumPoolSize && runState == RUNNING)//需要线程池线程数量小于最大数                t = addThread(firstTask);        } finally {            mainLock.unlock();        }        if (t == null)            return false;        t.start();        return true;    }
  6. 如果仍然往一个工作负荷满载的线程池指派任务的话,那么线程池会使用拒绝执行策略来处理。具体的策略类有很多可以选择,甚至可以自己实现。
    void reject(Runnable command) {        handler.rejectedExecution(command, this);    }

     

转载于:https://my.oschina.net/athhu/blog/714591

你可能感兴趣的文章
netty与marshalling简单使用
查看>>
优化带来降权得问题九个问题
查看>>
深入了解NTFS for Mac的界面中的五个勾选项 如启用聚关灯搜索
查看>>
ffmpeg遇到inttypes.h找不到
查看>>
mpsl *** 配置
查看>>
Spring data redis pubsub 简单接入
查看>>
IT技术人,不可有傲气,但须有傲骨
查看>>
如何选择适合自己的存储
查看>>
从Exchange 2007升级到Exchange 2010
查看>>
AWS SDK Python
查看>>
局域网通信工具 飞秋
查看>>
等差数列
查看>>
System Center Operation Manager 2012(八)DELL_MD_3200 Monintoring
查看>>
操作系统(四)---MS-DOS微软磁盘操作系统
查看>>
ajax提交表单
查看>>
FTP服务
查看>>
我的友情链接
查看>>
xcode8运行后后台打印网络相关的日志
查看>>
python语言中函数的传参与基本练习
查看>>
Java集合框架面试题
查看>>