1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
//corePoolSize –核心线程数 //maximumPoolSize –线程池线程最大数量 //keepAliveTime – 空闲线程存活时间 //unit – keepAliveTime的单位 //workQueue – 队列 //threadFactory – executor创建线程的工厂 //handler – 拒绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){ ... }
|
任务提交优先级
代码展示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| ThreadPoolExecutor.class
public void execute(Runnable command) { if (command == null) throw new NullPointerException();
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; c = ctl.get(); }
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command);
else if (workerCountOf(recheck) == 0) addWorker(null, false); }
else if (!addWorker(command, false))
reject(command); }
|
结论:核心线程→队列→其他线程→拒绝策略
任务执行优先级
代码展示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| ThreadPoolExecutor.class final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try {
//while (task != null || (task = getTask()) != null) //重点:task实际上就是线程池添加进来的任务。 //先执行自己的task,然后置空,再通过getTask()获取阻塞队列的任务,达到线程复用 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(),STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(),STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 线程池添加Runable其实只是调用run方法并没有真的启动这个Runable 线程 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
|
结论:先执行核心线程和非核心线程Worker的任务,然后再执行BlockQueue的任务。
线程池如何复用线程
见上面任务执行优先级中
1 2 3 4
| while (task != null || (task = getTask()) != null) { ... } getTask方法实际上就是不断从workQueue获取task,不断循环,实现了线程复用 就不点进去细看了
|
BlockingQueue阻塞队列
平衡生产者消费者的处理速度
解耦生产者消费者
自动管理了多线程之间的自动等待和唤醒
1 2 3 4 5 6 7 8 9 10 11
| interface BlockingQueue{ put(E e); take(); offer(E e); offer(E e, long timeout, TimeUnit unit); 指定时间内插入失败会返回false pull(long timeout, TimeUnit unit); 取出队列首对象。指定时间内,队列有对象立即并取出,如果超出时间还没数据可取出 返回失败 }
|
直接提交:Sync
无界队列:Linked
有界队列:Array
拒绝策略
待补充
阿里巴巴为何不推荐使用Executors去创建线程池?
- newFixedThreadPool,singleThreadExecutor:采用了无界队列,主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM
- cachedThreadPool 的maximumPoolSize为Integer.MAX_VALUE,可能会创建非常多的线程,甚至OOM
阿里巴巴推荐什么?自定义线程,避免Integer.MAX_VALUE及无界队列
1 2 3 4 5
| ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
|