Java线程池原理及实现

线程池能充分利用资源提升应用性能,是一组被重复利用的线程。

核心参数

JDK线程池核心参数:

public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
}

核心线程数、最大线程数、线程空闲时间、keepAliveTime时间单位、缓冲队列、线程创建者、拒绝策略。

拒绝策略:

AbortPolicy

线程池默认策略。若队列已满丢弃新添加的任务并抛出RejectedExecutionException异常。

DiscardPolicy

若队列已满直接丢掉新进任务不抛出任何异常。

DiscardOldestPolicy

若队列已满,丢弃最早入队的任务尝试加入新进任务。

CallerRunsPolicy

若任务加入失败,主线程会自行执行任务,不使用线程池执行。

自定义

实现RejectedExecutionHandler接口,自行定义处理方式。

线程实现思路

固定数量的线程与队列配合。

使用Runnable包装任务,并其加入队列,调用notify方法唤醒wait状态的线程,线程从队列取出任务执行。

若没有空闲线程,任务存放在队列中。

示例代码

package com.putdns.demo;

import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPool {

  public static void main(String... strings) {
    ThreadPool threadPool = new ThreadPool(10);
    for (int i = 0; i < 10; i++) {
      // Task t = new Task();
      // 因为创建内部类对象需要已知外部类对象,
      // 在此就不用threadPool.new Task()的方式;直接用lambda的写法
      threadPool.execute(() -> {
        System.out.println("business");
      });
    }
  }

  /**
   * 线程池基本结构
   */

  // 线程数量
  private int tpc = 5;
  // (线程)执行单元
  private final Worker[] worker;
  // 任务队列
  private final LinkedBlockingQueue<Runnable> queue;

  // 构造线程池
  public ThreadPool(int thread) {
    queue = new LinkedBlockingQueue();
    worker = new Worker[thread];
    tpc = thread;

    for (int i = 0; i < tpc; i++) {
      worker[i] = new Worker();
      // 启动所有线程并处于等待状态
      worker[i].start();
    }

  }

  // 加入新任务
  public void execute(Runnable task) {
    synchronized (queue) {
      // 把任务加入队列
      queue.add(task);
      // 然后通知在队列上等待的线程
      queue.notify();
    }
  }

  // 任务执行单元
  class Worker extends Thread {

    @Override
    public void run() {
      Runnable t;
      while (true) {
        synchronized (queue) {
          while (queue.isEmpty()) {
            try {
              // 队列任务为空,则wait
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
          // 被唤醒后从队列中取出一个任务
          t = queue.poll();
        }
        try {
          // 执行任务
          t.run();
        } catch (Throwable ex) {
          ex.printStackTrace();
        }
      }
    }

  }

  // 业务逻辑处理单元
  class Task implements Runnable {
    @Override
    public void run() {
      System.out.println("business");
    }

  }

}