概述

在大型项目中,在代码中直接创建线程是不允许的,如果需要使用多线性则必须通过线程池了创建,因而了解线程池的使用规范以及底层实现是非常有必要的。

使用

预想的用法

但说到线程池,我们可能首先会类比连接池这类池化资源,会以为线程池是通过 acquire() 来获取资源,通过 release() 来释放资源,就像下边这样:

//采用一般意义上池化资源的设计方法
class ThreadPool{
  // 获取空闲线程
  Thread acquire() {
  }
  // 释放线程
  void release(Thread t){
  }
} 
//期望的使用
ThreadPool pool;
Thread T1=pool.acquire();
//传入Runnable对象
T1.execute(()->{
  //具体业务逻辑
  ......
});

也就是说,我们在使用的时候直接从 pool 中获取一个线程实例 T1,然后直接向 T1 里边传业务逻辑就行了。但实际在使用的时候发现 Thread 类中根本没有类似于 execute(Runnable task) 这样的公共方法,因而线程池是没有办法按照池化的思想来设计的。

实际的用法

业界当前在设计线程池时,普遍采用的都是生产者-消费者模式,类似下面这种形式:

//简化的线程池,仅用来说明工作原理
class MyThreadPool{
  //利用阻塞队列实现生产者-消费者模式
  BlockingQueue<Runnable> workQueue;
  //保存内部工作线程
  List<WorkerThread> threads 
    = new ArrayList<>();
  // 构造方法
  MyThreadPool(int poolSize, 
    BlockingQueue<Runnable> workQueue){
    this.workQueue = workQueue;
    // 创建工作线程
    for(int idx=0; idx<poolSize; idx++){
      WorkerThread work = new WorkerThread();
      work.start();
      threads.add(work);
    }
  }
  // 提交任务
  void execute(Runnable command){
    workQueue.put(command);
  }
  // 工作线程负责消费任务,并执行任务
  class WorkerThread extends Thread{
    public void run() {
      //循环取任务并执行
      while(true){
        Runnable task = workQueue.take();
        task.run();
      } 
    }
  }  
}

/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = 
  new LinkedBlockingQueue<>(2);
// 创建线程池  
MyThreadPool pool = new MyThreadPool(
  10, workQueue);
// 提交任务  
pool.execute(()->{
    System.out.println("hello");
});

在 MyThreadPool 的内部,维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务。也就是说在使用线程池时,我们编写的代码充当的角色类似于生产者,只要往线程池中放任务即可,线程池自己会获取任务并执行。

底层实现

当然实际线程池的功能比我们前面写的要强大的多,实现也有多种。但无论哪种实现实际上都是在 Executor框架 之下的,什么是 Executor 框架呢?

Executor 框架

实际上它是线程执行规范的抽象,它有三个模块组成:

1. 对任务的抽象(Runnable/Callable)

所要执行的任务都要实现 Runnable 或者 Callable 接口,重写其内部对应的方法,这样才能被线程池接受执行。

其中 Runnable 接口自 Java1.0 以来一直存在,但 Callable 接口仅在 Java1.5 中引入,目的是为了能够返回任务执行之后的结果或者异常的抛出。

具体到代码层面上:

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}
//---------------------------------------//
@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

我们可以看到实现 Ruable 接口的类需要重写 run() 方法,该方法不返回任何结果;而实现 Callable 接口的类则中重写的方法是 call(),该方法会返回一个泛型 V,通过这个返回值我们就可以获取到任务执行的结果。

当然两者也是进行相互转化的,通过工具Executors 类可以实现 Runnable 对象和 Callable 对象直接的转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

2. 对任务执行的抽象(Executor)

Executor 接口非常简单内部只有一个 void execute(Runnable command) 也就是说所有要接受任务(Runnable)的线程池都必须实现该接口,那 Callable 接口怎么办呢?

我们先看下边这张类关系图:

image.png

475 x 421

从类的继承关系图上可以看到,Executor 下边还有一个子接口 ExecutorService,所有的线程池实现类都实现了该接口,我们看这个接口的结构图:

image.png

559 x 284

可以看到 ExecutorService 接口在实现了 Executor接口 的前提下还定义了许多新的方法,其中 submit(Callable) 就是用来提交 Callable 类型任务的。

3. 异步计算结果的抽象(Future)

此处为什么要强调异步呢?主要原因在于当你提交一个任务到线程池中之后,任务的执行是异步的,不会同步返回给主线程。Future 内部也比较简单,定义了常用对结果操作的方法:

image.png

323 x 147

  • get() 可以阻塞主线程,并获取任务的执行结果
  • get(long,TimeUnit) 方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完
  • cancel(boolean) 可以取消任务的执行

Executor 的执行流程

通过前面 Executor框架 的说明,我们可以看一下下面这张流程示意图:

image.png

561 x 380

  • 主线程可以创建 Ruaable 类型或者 Callable 类型的任务
  • 然后通过 submit() 方法或者 execut() 方法可以将任务放到 ExecutorService 中执行
  • 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现 Future 接口的对象
  • 最后可以通过 Future.get() 方法来阻塞主线程等待任务执行完成获取任务的返回结果。(如果通过 submit(Runnable) 方法获取到的 Future 对象,其 get() 结果为 null