本文共 4376 字,大约阅读时间需要 14 分钟。
Executor是一个用来执行提交的任务(Runnable)的对象。这个接口提供了一种将任务的提交和任务如何去执行解耦机制
先来查看Executor的接口定义:
public interface Executor { //在未来某个时间点执行给定的command任务,任务可能在一个新的线程、线程池、调用者线程中执行,这些取决与具体的Executor实现 void execute(Runnable command);}
Executor是更常用的方式,而不是使用传统类似new Thread(new(RunnableTask())).start()来创建线程,使用Executor我们可以使用以下的方式来创建线程:
Executor executor = anExecutor;executor.execute(new RunnableTask1());executor.execute(new RunnableTask2());...
当然,Executor接口并没有很严格地要求任务的执行是异步的,在一些简单的场景中,能够直接在调用的线程中运行提交的任务:
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
但是,我们通常会使用另外一个额外的线程来异步执行这些任务:
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
这里也可以通过一个任务的执行去触发下一个任务的执行:
class SerialExecutor implements Executor { final Queuetasks = new ArrayDeque (); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
ExecutorService是Executor接口的扩展,提供一系列的方法来管理任务的结束状态和产生一个Future来追踪一个或多个异步任务的执行过程
接口定义:
public interface ExecutorService extends Executor{ }
一个ExecutorService能够被关闭,之后会导致无法继续添加新的任务。这里提供了两种关闭的方法,一种是shutdown允许继续执行之前已经提交的相关任务,全部执行完之后才会关闭;另一种是shutdownNow会阻止准备被接受的任务,同时会停止当前正在执行的任务。因此不用的ExecutorService应该关闭掉以有效的回收资源。
对于ExecutorService中的submit方法是基于对Executor中的execute方法的一个扩展,能够创建一个Future用来取消任务的执行或者是等待任务的执行结束。另外对于很多个任务的执行invokeAny和invokeAll是也很常用的方法,前者用来等待至少一个任务执行完成,后者等待所有的任务执行完成。
常用的方法:
Future submit(Runnable task);
Executors提供了针对 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable这些类的工厂和辅助方法
核心的方法:
//创建一个线程池来执行队列中的任务,并用给定的线程工厂来创建必要的线程public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }
对应的底层ThreadPoolExecutor实现代码:
//以下参数部分是可以缺省public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }//corePoolSize:池中的核心线程数,即使空闲也不会释放(除非设置allowCoreThreadTimeOut)//maximumPoolSize:池中能够拥有的最大线程数//keepAliveTime:当超过核心线程数的线程空闲该时长后会被释放//unit:上述时长的单位//workQueue:队列持有将会被执行的任务,这些任务(Runnable)是通过execute方法提交到队列中//threadFactory:用来创建池中线程的工厂
//单线程执行提交的任务,因为只有一个线程,所以不会在同一时刻执行超过一个任务数public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
//线程资源被创建好之后能够缓存60秒,用来重复使用来执行后续的任务,使用于大量短时的异步任务,但是线程数是没有限定的,因此需要谨慎使用public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
//线程池用来定期执行队列中的任务public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }//底层实现与之前不同的在于队列的实现:public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }//DelayedWorkQueue为延时队列,也是实现了BlockingQueue
转载地址:http://pergi.baihongyu.com/