本文共 13087 字,大约阅读时间需要 43 分钟。
简述:在上一篇博客当中介绍了java当中提供的线程池的内容,这篇文章主要是分析下实现线程池的源码ThreadPoolExecutor。以下内容基于JDK1.8
关于ThreadPoolExecutor的基本参数我们在上篇文章当中已经介绍过了,现在先来看下ThreadPoolExecutor的继承关系体系
我们从上往下看下,每个类或者接口主要是提供了哪些方法
Executor这个接口里面只有一个execute(Runnable command)接口
接下来ExecutorService接口
这里面的接口在原来基础上,可以看到提供了更丰富的对线程的操作,以及线程执行结果的获取
AbstractExecutorService是个抽象类,主要是对ExecutorService接口方法的一些实现。
基础属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //这个状态位是来记录workerCount和runState的值,高三位是运行状态,低29位是workerCount的值 private static final int COUNT_BITS = Integer.SIZE - 3; // 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //容量 2^29 -1 // 表示线程池可以接受新的任务,并且处理阻塞队列当中的任务 private static final int RUNNING = -1 << COUNT_BITS; // 不接受新的任务,但是会去处理阻塞队列当中的任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新的任务,不处理队列当中的任务,中止正在运行中的任务 private static final int STOP = 1 << COUNT_BITS; // 所有的任务都中止了,workCount变为0,并且会去执行terminated方法 private static final int TIDYING = 2 << COUNT_BITS; // 在TIDYING状态的基础上,执行完了terminated方法 private static final int TERMINATED = 3 << COUNT_BITS; private final BlockingQueueworkQueue; // 阻塞队列 private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet workers = new HashSet (); //存放执行的任务 private final Condition termination = mainLock.newCondition(); //终止条件//线程池执行过得最大线程数 private int largestPoolSize; //已经完成的线程数 private long completedTaskCount; //创建线程的线程工厂private volatile ThreadFactory threadFactory; //拒绝策略 private volatile RejectedExecutionHandler handler; //线程存活时间 private volatile long keepAliveTime; //是否允许核心线程存活private volatile boolean allowCoreThreadTimeOut; //核心线程数 private volatile int corePoolSize; //最大线程数 private volatile int maximumPoolSize;
准备工作:
我们先来看下Worker的具体实现,Worker的内部属性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; //构造的是,基于当前work对象创建的线程 Runnable firstTask; //用户提交过来的任务 volatile long completedTasks; //完成的任务数//tryAcquire 直接修改任务状态,从0改为1,设置AQS中的线程 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } Worker(Runnable firstTask) { setState(-1); // 将AQS当中的状态位设置为-1 ,0 表示未加锁,1 表示加锁 this.firstTask = firstTask; //提交进来的任务 this.thread = getThreadFactory().newThread(this); //基于当前worker创建一个线程 } public void run() { runWorker(this); } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }}
接下来我们来看下提交线程时,都会执行哪些操作
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /** step1: 如果当前线程池当中的数量小于核心线程数,那么会尝试去创建一个新的线程,每次添加worker都会检查 runState和workerCount,防止在不应该添加任务时增加了线程,返回false step2: 如果线程成功添加到队列当中去了,我们需要再次检查我们是否需要增加一个线程(可能在我们上次检查过后线程死亡了) 或者是在线程进入这个方法时,线程shutDown了,所以我们再次检查状态 如果线程池状态stop,需要把加入到队列中的线程回滚出来,或者也有可能是没有线程再重新创建线程去执行 step3: 如果不能讲一个任务添加到队列中去,会尝试去创建一个新的线程 如果创建线程失败了,可能是线程池shutdown也可能是队列满了,需要执行拒绝策略 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //判断当前线程数是否大于核心线程数 if (addWorker(command, true)) // 小于核心线程数,添加到核心池当中,如果添加成功,则直接返回 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //如果线程池状态是running,那么将任务添加到队列中 int recheck = ctl.get(); //再次获取到线程池状态 if (! isRunning(recheck) && remove(command)) //如果线城池不是running状态,将之前添加的任务从队列中移除 reject(command); //执行决绝 else if (workerCountOf(recheck) == 0) //worker数量等于0 addWorker(null, false); //添加worker } else if (!addWorker(command, false)) //如果添加worker失败 reject(command); //执行拒绝策略 }
当我们初始化好一个线程池的时候,workerCountOf(c)返回0,假设是小于核心线程数的,我们来看下是如何处理新来的任务addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //是一个很大的负数,初始化时 -536870912 int rs = runStateOf(c); // 初始化的时候,-536870912 // 当前rs的状态小于0, if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 获取线程池中线程数量 if (wc >= CAPACITY || 如果wc大于capacity,或者wc大于核心线程数,或者大于最大线程数直接返回false wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) //将ctl的值加一,跳出当前for循环 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //创建一个工作线程 final Thread t = w.thread; //获取到创建的工作线程 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取runStateOf的值,前面ctl已经增加1 ,这里获取到rs的值和上面是一样的 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || //如果当前rs值小于0,或者 rs值和0相同并且firstTask = null (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //在workers集合当中添加当前worker int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //记录最大执行的线程数 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //如果添加worker成功,将线程启动 t.start(); //执行worker的run方法,run方法调用的runWorker方法 workerStarted = true; //将workerStarted的值修改为ture,并且返回 } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
在addWorker方法的执行流程
1、原子更新ctl,将workerCouter增加1
2、创建一个worker,在workerSet当中添加新创建的worker
3、如果超过了largestPoolSize,就更新
4、执行runWorker方法
5、如果添加worker失败,将worker从workerSet中移除,原子更新workerCount的值
前面已经分析过runWoker方法,会获取work当中的提交过来的任务,如果不为null就执行,如果为null, 尝试从缓存队列中获取任务执行
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //获取用户提交过来的任务 w.firstTask = null; //将worker当中firstTask清空 w.unlock(); // 将AQS的状态位改为0,无锁状态 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // worker加锁,状态设置成1 // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || //当前线程状态高于stop (Thread.interrupted() && //当前调用线程是否中断 runStateAtLeast(ctl.get(), STOP))) && //再次判断线程状态高于stop !wt.isInterrupted()) // 执行线程是否中断 wt.interrupt(); try { beforeExecute(wt, task); //执行回调方法,用户自定义的 Throwable thrown = null; try { task.run(); //执行用户的任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //执行回调方法,用户自定义 } } finally { task = null; w.completedTasks++; //任务完成数量加1 w.unlock(); //将AQS的状态位设置为0, } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); //最后执行线程退出的回调方法 } }
接下来看下getTask方法
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //获取当前线程池状态 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); //如果线程池处于非运行状态,或者队列已经空了,workerCount减一,返回null return null; } int wc = workerCountOf(c); //获取线程池中workerCount值 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //判断是否需要关闭掉线程 if ((wc > maximumPoolSize || (timed && timedOut)) //如果workerCount大于最大线程数 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 将workerCount减一 return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //等待指定时间 workQueue.take(); //直接从队列当中获取 if (r != null) return r; //如果不为null直接返回 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
接下来我们看下当阻塞队列当中没有任务,或者线程池shutDown之后都会调用的processWorkerExist方法
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //线程池完成的任务数加等 每个worker线程执行完成的任务数 workers.remove(w); //将执行完成的worker从workerSet当中移除 } finally { mainLock.unlock(); } tryTerminate(); //尝试去停止线程 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); //添加worker } }
这个方法主要是来记录当前线程池中执行完成的任务数,中止一些空闲线程
接下来看下shutDown方法
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //查看每个worker线程的访问权限 advanceRunState(SHUTDOWN); //将runState的状态修改为shutdown interruptIdleWorkers(); // 暂停所有空闲线程 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); //尝试中止线程池 }
接下来看下tryTerminate的方法实现
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //如果状态时shutdown,但是阻塞队列不为null直接返回 return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); //当前还有正在运行的线程,将空闲线程停掉 ,直接返回 return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //将runSate改为TIDING try { terminated(); //执行用户重写的terminated方法 } finally { ctl.set(ctlOf(TERMINATED, 0)); //执行结束后将ctl中的runState改为TERMINATED termination.signalAll(); //唤醒等待在termination上的线程 } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
总结:线程池是如何进行线程的复用,当我们在使用线程池的时候,一开始线程池当中没有线程,用户提交过来任务,会创建worker线程来执行用户提交过来的任务,当线程数量达到了核心线程数之后,会把提交过来的任务放入到阻塞队列中去,这时候,之前创建的worker线程在执行完第一次提交过来的任务之后,会去阻塞队列当中获取任务,使用worker线程去执行,而不是新的线程(这里涉及到线程中调用run方法,是在当前线程中执行,调用start方法是新创建线程执行)。
以上是关于ThreadPoolExecutor的核心代码分析,如有问题欢迎指正~