目录

JUC - 共享模型之线程池

# 线程池

# 池化技术

程序的运行,其本质上,是对系统资源(CPU、内存、磁盘、网络等等)的使用。如何高效的使用这些资源是我们编程优化演进的一个方向。今天说的线程池就是一种对 CPU 利用的优化手段。

通过学习线程池原理,明白所有池化技术的基本设计思路。遇到其他相似问题可以解决。

前面提到一个名词:池化技术,那么到底什么是池化技术呢?

池化技术简单点来说,就是提前保存大量的资源,以备不时之需。在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升性能等。

在编程领域,比较典型的池化技术有:线程池、连接池、内存池、对象池等

我们通过创建一个线程对象,并且实现 Runnable 接口就可以实现一个简单的线程。可以利用上多核 CPU。当一个任务结束,当前线程就接收。

但很多时候,我们不止会执行一个任务。如果每次都是如此的创建线程 -> 执行任务 -> 销毁线程,会造成很大的性能开销。

那能否一个线程创建后,执行完一个任务后,又去执行另一个任务,而不是销毁。这就是线程池。

这也就是池化技术的思想,通过预先创建好多个线程,放在池中,这样可以在需要使用线程的时候直接获取,避免多次重复创建、销毁带来的开销。

# 为什么使用线程池

10 年前单核 CPU 电脑,假的多线程,像马戏团小丑玩多个球,CPU 需要来回切换。现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。

线程池的优势:

线程池做的工作主要是:控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用,控制最大并发数,管理线程。

  • 第一:降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗

  • 第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行

  • 第三:提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控

# 自定义线程池

image-20220515235222246

如图,先创建一些线程放入线程池(Thread Pool)里,防止访问量剧增,频繁的新增线程导致效率问题。线程处理任务(task),如果任务很多,而线程池的线程不能一次性处理完,所以将剩下的任务 task 放入阻塞队列(Blocking Queue),等线程处理完原来的任务,就来阻塞队列拿任务来处理。

下面开始自定义线程池,也可以看视频进行思路学习,视频有 8 个:P200 - P208,这里提供 P200:https://www.bilibili.com/video/BV16J411h7Rd?p=200

步骤1:自定义拒绝策略接口

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}
1
2
3
4

步骤2:自定义任务队列

class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();
    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    // 5. 容量
    private int capcity;
    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }
    // 带超时阻塞获取
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    // 返回值是剩余时间
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }
    // 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }
    // 阻塞添加
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }
    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) {
                try {
                    if(nanos <= 0) {
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否满
            if(queue.size() == capcity) {
                rejectPolicy.reject(this, task);
            } else { // 有空闲
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

步骤3:自定义线程池

class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;
    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();
    // 核心线程数
    private int coreSize;
    // 获取任务时的超时时间
    private long timeout;
    private TimeUnit timeUnit;
    private RejectPolicy<Runnable> rejectPolicy;
    // 执行任务
    public void execute(Runnable task) {
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // taskQueue.put(task);
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, 
                      RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }
    class Worker extends Thread{
        private Runnable task;
        public Worker(Runnable task) {
            this.task = task;
        }
        @Override
        public void run() {
            // 执行任务
            // 1) 当 task 不为空,执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
            // while(task != null || (task = taskQueue.take()) != null) {
            while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                workers.remove(this);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

步骤4:测试

public static void main(String[] args) {
    ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
        // 1. 死等
        // queue.put(task);
        // 2) 带超时等待
         queue.offer(task, 1500, TimeUnit.MILLISECONDS);
        // 3) 让调用者放弃任务执行
        // System.out.println("放弃" + task);
        // 4) 让调用者抛出异常
        // throw new RuntimeException("任务执行失败 " + task);
        // 5) 让调用者自己执行任务
        // task.run();
    });
    for (int i = 0; i < 4; i++) {
        int j = i;
        threadPool.execute(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(j);
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# ThreadPoolExecutor 介绍

image-20220517174014913

# 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,3 + 29 = 32 正好是 int 的长度,所以 ThreadPoolExecutor 只有一个 int 类型来存储线程池的数据。

状态名 高 3 位 接收新任务 处理阻塞队列任务 说明
RUNNING 111 正在处理任务
SHUTDOWN 000 不会接收新任务,但会处理阻塞队列剩余任务
STOP 001 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,111 的第一个 1 代表负数。

  • 线程池的 shutdown() 方法,将线程池由 RUNNING(运行状态)转换为 SHUTDOWN 状态
  • 线程池的 shutdownNow() 方法,将线程池由 RUNNING 或 SHUTDOWN 状态转换为 STOP 状态
  • SHUTDOWN 状态和 STOP 状态先会转变为 TIDYING 状态,最终都会变为 TERMINATED

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
1
2
3
4
5

# 构造方法

ThreadPoolExecutor 继承 AbstractExecutorService,而 AbstractExecutorService 实现了 ExecutorService 接口。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
1
2
3
4
5
6
7
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目(最大线程数目 = 核心线程数目 + 救急(空闲)线程数目
  • keepAliveTime 救急(空闲)线程的生存时间
  • unit 救急(空闲)线程的时间单位
  • workQueue 阻塞队列
  • threadFactory 线程工厂(可以为线程创建时起个好名字)
  • handler 拒绝策略

下面对这些参数进行讲解。

# 工作方式

image-20220517175439381

corePoolSize(核心线程数目)、maximumPoolSize(最大线程数目)

当调用线程池 execute() 方法添加一个任务时,线程池会做如下判断:

  • 如果有空闲线程,则直接执行该任务
  • 如果没有空闲线程,且当前运行的线程数少于 corePoolSize,则创建新的线程执行该任务
  • 如果没有空闲线程,且当前的线程数等于 corePoolSize,同时阻塞队列未满,则将任务入队列,而不添加新的线程
  • 如果没有空闲线程,且阻塞队列已满,如果队列选择了 有界队列,同时池中的线程数小于 maximumPoolSize ,则会创建 maximumPoolSize - corePoolSize 数目的线程,这些线程叫做 救急线程
  • 如果没有空闲线程,且阻塞队列已满,同时池中的线程数等于 maximumPoolSize ,则根据构造函数中的 handler 指定的策略来拒绝新的任务

handler(拒绝策略)

JDK 提供了 4 种拒绝策略实现:

  • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
  • CallerRunsPolicy 让调用者运行任务
  • DiscardPolicy 放弃本次任务
  • DiscardOldestPolicy 放弃队列中最旧的任务,最先提交而没有的任务取而代之

通过 ThreadPoolExecutor.XXX 来调用,如 ThreadPoolExecutor.AbortPolicy

最科学的的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。

其它著名框架也提供了实现:

  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
  • Netty 的实现,是创建一个新线程来执行任务
  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

image-20220517175609203

当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

KeepAliveTime(空闲线程的存活时间)、unit(keepAliveTime 的单位)

当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

注:如果线程池设置了 allowCoreThreadTimeout 参数为 true(默认 false),那么当空闲线程超过 keepaliveTime 后直接停掉。(不会判断线程数是否大于 corePoolSize)即:最终线程数会变为 0。

workQueue(任务队列)

ThreadPoolExecutor 线程池推荐了三种等待队列,它们是:SynchronousQueue 、LinkedBlockingQueue 和 ArrayBlockingQueue。

有界队列:

  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列

  • ArrayBlockingQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

无界队列:

  • LinkedBlockingQueue:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)

  • PriorityBlockingQueue:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现 compareTo() 方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue 不会保证优先级一样的元素的排序。

注意:keepAliveTime 和 maximumPoolSize 及 BlockingQueue 的类型均有关系。如果 BlockingQueue 是无界的,那么永远不会触发 maximumPoolSize,自然 keepAliveTime 也就没有了意义。

threadFactory(指定创建线程的工厂)

如果不指定线程工厂时,ThreadPoolExecutor 会使用 ThreadPoolExecutor.defaultThreadFactory 创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY 的优先级,以及名为 pool-XXX-thread- 的线程名(XXX 为创建线程时顺序序号),且创建的线程都是非守护进程。

# 常用方法

除了在创建线程池时指定上述参数的值外,还可在线程池创建以后通过如下方法进行设置。

// 设置 allowCoreThreadTimeout 
public void allowCoreThreadTimeOut(boolean value);
// 设置 空闲线程的存活时间
public void setKeepAliveTime(long time, TimeUnit unit);
// 设置 最大线程数目
public void setMaximumPoolSize(int maximumPoolSize);
// 设置 核心线程数目
public void setCorePoolSize(int corePoolSize);
// 设置 线程工厂
public void setThreadFactory(ThreadFactory threadFactory);
// 设置 拒绝策略
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
1
2
3
4
5
6
7
8
9
10
11
12

此外,还有一些方法:

  • getCorePoolSize():返回线程池的核心线程数,这个值是一直不变的,返回在构造函数中设置的 coreSize 大小
  • getMaximumPoolSize():返回线程池的最大线程数,这个值是一直不变的,返回在构造函数中设置的 coreSize 大小
  • getLargestPoolSize():记录了曾经出现的最大线程个数(水位线)
  • getPoolSize():线程池中当前线程的数量
  • getActiveCount():Returns the approximate(近似)number of threads that are actively executing tasks
  • prestartAllCoreThreads():会启动所有核心线程,无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到 corePoolSize
  • prestartCoreThread():会启动一个核心线程(同上)
  • allowCoreThreadTimeOut(true):允许核心线程在 KeepAliveTime 时间后,退出

# Executors 类

根据 ThreadPoolExecutor 的构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池,更加快捷。

Executors 类的底层实现便是 ThreadPoolExecutor 的构造方法,Executors 工厂方法有:

  • Executors.newFixedThreadPool(int n):创建一个固定大小为 n 的线程池
  • Executors.newCachedThreadPool():无界线程池,可以进行自动线程回收
  • Executors.newSingleThreadExecutor():只创建一个线程,且线程一直存在
  • Executors.newScheduledThreadPool(int n):创建一个固定大小为 n 的任务调度线程池,具体看 任务调度线程池

它们均为大多数使用场景预定义了设置。不过在阿里 Java 文档中说明,尽量不要用该类创建线程池。

# newFixedThreadPool

newFixedThreadPool 内部调用了如下代码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
1
2
3
4
5

特点:

  • 核心线程数 == 最大线程数,因为没有救急线程被创建,因此也无需超时时间,因为第一个参数 nThreads 等于第二个参数
  • 阻塞队列是无界的,可以放任意数量的任务
public static void main(String[] args) {
    ExecutorService pool = ExecutorsnewFixedThreadPool(2, new ThreadFactory() {

    });

    pool.execute(() -> {
        log.info("{}", 1);
    });

    pool.execute(() -> {
        log.info("{}", 2);
    });

    pool.execute(() -> {
        log.info("{}", 3);
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

输出:

2022-05-17 21:53:32,876 [pool-1-thread-1] TestThreadPoolExecutor - 1
2022-05-17 21:53:32,876 [pool-1-thread-1] TestThreadPoolExecutor - 3
2022-05-17 21:53:32,876 [pool-1-thread-2] TestThreadPoolExecutor - 2
1
2
3

评价:适用于任务量已知,相对耗时的任务。

ThreadFactory

上面提到,ThreadFactory 可以用来修改线程的名字,默认的线程名字是:pool-x-thread-x。我们可以调用 JDK 提供的线程工厂类,也可以自己自定义线程名:

public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
        AtomicInteger count = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "myPool_" + count.getAndIncrement());
        }
    });

    pool.execute(() -> {
        log.info("{}", 1);
    });

    pool.execute(() -> {
        log.info("{}", 2);
    });

    pool.execute(() -> {
        log.info("{}", 3);
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

输出:

2022-05-17 21:55:36,028 [myPool_2] TestThreadPoolExecutor - 2
2022-05-17 21:55:36,028 [myPool_2] TestThreadPoolExecutor - 3
2022-05-17 21:55:36,028 [myPool_1] TestThreadPoolExecutor - 1
1
2
3

# newCachedThreadPool

newCachedThreadPool 内部调用了如下代码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
1
2
3
4
5

特点:

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    • 全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现,特点是:它没有容量,没有线程来取,它是放不进去的(一手交钱、一手交货)

当往 SynchronousQueue 队列 put 数据时,它是 put 不进去的,只有当另一个线程来 take 获取,然后该队列才允许 put 进去,接着把数据给 take 的线程。

代码演示:

public static void main(String[] args) {
    SynchronousQueue<Integer> integers = new SynchronousQueue<>();
    new Thread(() -> {
        try {
            log.debug("putting {} ", 1);
            integers.put(1);
            log.debug("{} putted...", 1);
            
            log.debug("putting...{} ", 2);
            integers.put(2);
            log.debug("{} putted...", 2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"t1").start();
    
    sleep(1);
    
    new Thread(() -> {
        try {
            log.debug("taking {}", 1);
            integers.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"t2").start();
    
    sleep(1);
    
    new Thread(() -> {
        try {
            log.debug("taking {}", 2);
            integers.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    },"t3").start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

输出:

11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2 
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...
1
2
3
4
5
6

可以看到,integers.put(1) 前打印 putting 1,但是并没有立即打印 1 putted...,而是等 integers.take() 后才打印,说明没有 take 来告诉队列取数据,队列是不会事先运行数据进去。

评价:整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1 分钟后释放线程。适合任务数比较密集,但每个任务执行时间较短的情况。

# newSingleThreadExecutor

newSingleThreadExecutor 核心线程和总线程数都是一个,内部调用了如下代码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
1
2
3
4
5
6

可以看到,newSingleThreadExecutor 并没有立即返回 ThreadPoolExecutor,而是将 ThreadPoolExecutor 封装到 FinalizableDelegatedExecutorService 里再返回,那么什么是 FinalizableDelegatedExecutorService 呢?

FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法。什么是特有方法?

如下常用方法无法被调用,因为 FinalizableDelegatedExecutorService 里没有这些方法。

// 设置 allowCoreThreadTimeout 
public void allowCoreThreadTimeOut(boolean value);
// 设置 空闲线程的存活时间
public void setKeepAliveTime(long time, TimeUnit unit);
// 设置 最大线程数目
public void setMaximumPoolSize(int maximumPoolSize);
// 设置 核心线程数目
public void setCorePoolSize(int corePoolSize);
// 设置 线程工厂
public void setThreadFactory(ThreadFactory threadFactory);
// 设置 拒绝策略
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
1
2
3
4
5
6
7
8
9
10
11
12

使用场景:希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

与 Thread 创建一个线程区别

我们可以直接通过 new Thread(() - > { }).start(); 来启动一个线程,那么两者区别是什么?

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池即使当前线程失败了,还会新建一个新的线程,保证线程池最终都会有一个 完好无损 的线程
public static void main(String[] args) {
    ExecutorService pool = Executors.newSingleThreadExecutor();

    pool.execute(() -> {
        log.info("{}", 1);
        int i = 1 / 0;
    });

    pool.execute(() -> {
        log.info("{}", 2);
    });

    pool.execute(() -> {
        log.info("{}", 3);
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

输出:

2022-05-17 22:00:01,932 [pool-1-thread-1] TestThreadPoolExecutor - 1
2022-05-17 22:00:01,933 [pool-1-thread-2] TestThreadPoolExecutor - 2
2022-05-17 22:00:01,933 [pool-1-thread-2] TestThreadPoolExecutor - 3
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at cn.youngkbt.test.TestThreadPoolExecutor.lambda$test02$3(TestThreadPoolExecutor.java:49)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
1
2
3
4
5
6
7
8

可以看到,当线程执行第一个任务中出现 1 / 0 异常,线程池也会立马创建新的线程去执行剩下的任务。

与 newFixedThreadPool 区别

newFixedThreadPool 也能创建一个线程,那么两者区别是什么?

  • Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改
  • Executors.newFixedThreadPool(1) 初始时为 1,以后还可以修改
    • newFixedThreadPool 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
    • newSingleThreadExecutor 对外暴露的是 FinalizableDelegatedExecutorService,无法调用 setCorePoolSize 等方法进行修改

可以看 FinalizableDelegatedExecutorService 源码就知道哪些方法没有。

# ExecutorService 接口

该接口是真正的线程池接口。上面的 ThreadPoolExecutor 以及下面的 ScheduledThreadPoolExecutor 都是该接口的实现类。

# 提交任务

// 执行任务
void execute(Runnable command);

// 提交 Runnable 任务到线程池,返回 Future 对象,由于 Runnable 没有返回值,也就是说调用 Future 对象 get() 方法返回 null
Future<?> submit(Runnable task);

// 提交 Callable 任务到线程池,返回 Future 对象,调用 Future 对象 get() 方法可以获取 Callable 的返回值
<T> Future<T> submit(Callable<T> task);

// 提交 Runnable 任务到线程池,返回 Future 对象,调用 Future 对象 get() 方法可以获取 Runnable 的参数值
<T> Future<T> submit(Runnable task,T result);

// tasks 是个集合,提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间,超时后返回没有执行的任务集合
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务不管在不在运行,都直接取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

submit 方法

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(1);
    
    Future<String> future = pool.submit(() -> {
        log.debug("running");
        Thread.sleep(1000);
        return "ok";
    });
	
    log.debug("{}", future.get());
    log.debug("aa");
}
1
2
3
4
5
6
7
8
9
10
11
12

输出:

2022-05-17 22:16:05,655 [pool-1-thread-1] TestThreadPoolExecutor - running
2022-05-17 22:16:06,659 [main] TestThreadPoolExecutor - ok
2022-05-17 22:16:06,659 [main] TestThreadPoolExecutor - aa
1
2
3

可以看到,future 调用了 get() 方法,就会阻塞线程直到返回结果,才往下执行,打印 aa。

get() 方法可以加有两个参数,分别是超时时间和时间单位,如 future.get(900,TimeUnit.MILLISECONDS)

  • 如果 get 方法被打断,进入 InterruptedException 异常
  • 如果线程执行过程(call、run 方法)中抛出异常,进入 ExecutionException 异常
  • 如果 get 方法超时,进入 TimeoutException 异常

注意:如果任务内部发送了异常,则不会打印异常,而是将异常封装给 Future,有 Future 调用 get 方法获取异常。

public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(1);
    
    Future<String> future = pool.submit(() -> {
        int i = 1 / 0;
        return true;
    });
	
    log.debug("{}", future.get());
}
1
2
3
4
5
6
7
8
9
10

输出:

java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at cn.youngkbt.test.TestThreadPoolExecutor.test03(TestThreadPoolExecutor.java:70)
	at cn.youngkbt.test.TestThreadPoolExecutor.main(TestThreadPoolExecutor.java:19)
Caused by: java.lang.ArithmeticException: / by zero
	at cn.youngkbt.test.TestThreadPoolExecutor.lambda$test03$6(TestThreadPoolExecutor.java:66)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
1
2
3
4
5
6
7
8
9
10
11

如果没有调用 get,则不会输出异常信息。

invokeAll 方法

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(1);
    
    List<Future<String>> futures = pool.invokeAll(Arrays.asList(
        // 任务 1
        () -> {
            log.debug("begin");
            Thread.sleep(1000);
            return "1";
        },
        // 任务 2
        () -> {
            log.debug("begin");
            Thread.sleep(500);
            return "2";
        },
        // 任务 3
        () -> {
            log.debug("begin");
            Thread.sleep(2000);
            return "3";
        }
    ));
	// 打印返回值
    futures.forEach( f ->  {
        try {
            log.debug("{}", f.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

输出:

2022-05-17 22:19:51,977 [pool-1-thread-1] TestThreadPoolExecutor - begin
2022-05-17 22:19:52,980 [pool-1-thread-1] TestThreadPoolExecutor - begin
2022-05-17 22:19:53,493 [pool-1-thread-1] TestThreadPoolExecutor - begin
2022-05-17 22:19:55,509 [main] TestThreadPoolExecutor - 1
2022-05-17 22:19:55,509 [main] TestThreadPoolExecutor - 2
2022-05-17 22:19:55,509 [main] TestThreadPoolExecutor - 3
1
2
3
4
5
6

invokeAny 方法

只从头到尾执行一个线程,虽然它依然执行传过来的多个线程,但是一旦一个线程先结束,则其他线程立即结束。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);

    String result = pool.invokeAny(Arrays.asList(
        () -> {
            log.debug("begin 1");
            Thread.sleep(1000);
            log.debug("end 1");
            return "1";
        },
        () -> {
            log.debug("begin 2");
            Thread.sleep(500);
            log.debug("end 2");
            return "2";
        },
        () -> {
            log.debug("begin 3");
            Thread.sleep(2000);
            log.debug("end 3");
            return "3";
        }
    ));
    log.debug("{}", result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

输出:

2022-05-17 22:22:40,186 [pool-1-thread-1] TestThreadPoolExecutor - begin 1
2022-05-17 22:22:40,186 [pool-1-thread-2] TestThreadPoolExecutor - begin 2
2022-05-17 22:22:40,700 [pool-1-thread-2] TestThreadPoolExecutor - end 2
2022-05-17 22:22:40,700 [pool-1-thread-2] TestThreadPoolExecutor - begin 3
2022-05-17 22:22:40,700 [main] TestThreadPoolExecutor - 2
1
2
3
4
5

可以看到,两个线程分别执两个任务,因为第二个任务只睡眠 500 毫秒,所以很快返回结果,而一旦任务完成,则其他的任务直接被停止。

# 关闭线程池

shutdown 方法,将线程池状态变为 SHUTDOWN

  • 不会接收新任务
  • 但已提交任务会执行完
  • 此方法不会阻塞调用线程的执行
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(SHUTDOWN);
        // 仅会打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    tryTerminate();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

shutdownNow 方法,将线程池状态变为 STOP

  • 不会接收新任务
  • 会将队列中的任务返回
  • 并用 interrupt 的方式中断正在执行的任务
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(STOP);
        // 打断所有线程
        interruptWorkers();
        // 获取队列中剩余任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 尝试终结
    tryTerminate();
    return tasks;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

shutdown 方法

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);

    Future<Integer> result1 = pool.submit(() -> {
        log.debug("task 1 running...");
        Thread.sleep(1000);
        log.debug("task 1 finish...");
        return 1;
    });

    Future<Integer> result2 = pool.submit(() -> {
        log.debug("task 2 running...");
        Thread.sleep(1000);
        log.debug("task 2 finish...");
        return 2;
    });

    Future<Integer> result3 = pool.submit(() -> {
        log.debug("task 3 running...");
        Thread.sleep(1000);
        log.debug("task 3 finish...");
        return 3;
    });

    log.debug("shutdown");

    // 关闭线程池的所有线程,未完成的任务继续完成,完成后自动结束
    pool.shutdown();
    log.info("aa");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

输出:

2022-05-17 22:28:38,804 [main] TestThreadPoolExecutor - shutdown
2022-05-17 22:28:38,804 [pool-1-thread-2] TestThreadPoolExecutor - task 2 running...
2022-05-17 22:28:38,804 [pool-1-thread-1] TestThreadPoolExecutor - task 1 running...
2022-05-17 22:28:38,804 [main] TestThreadPoolExecutor - aa
2022-05-17 22:28:39,815 [pool-1-thread-2] TestThreadPoolExecutor - task 2 finish...
2022-05-17 22:28:39,815 [pool-1-thread-1] TestThreadPoolExecutor - task 1 finish...
2022-05-17 22:28:39,815 [pool-1-thread-2] TestThreadPoolExecutor - task 3 running...
2022-05-17 22:28:40,829 [pool-1-thread-2] TestThreadPoolExecutor - task 3 finish...
1
2
3
4
5
6
7
8

可以看到,执行了 shutdown 方法,并不会阻塞在那里,而是继续往下执行,打印 aa。

如果想让 shutdown 方法调用后阻塞在原地,则使用 awaitTermination 方法,如下:

pool.shutdown();
pool.awaitTermination(3, TimeUnit.SECONDS);
log.info("aa");
1
2
3

这样 aa 将在最后打印,但是 awaitTermination 方法需要指定超时时间,这样无法判断线程什么时候执行完,所以 awaitTermination 一般用于善后工作。

shutdownNow 方法

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(2);

    Future<Integer> result1 = pool.submit(() -> {
        log.debug("task 1 running...");
        Thread.sleep(1000);
        log.debug("task 1 finish...");
        return 1;
    });

    Future<Integer> result2 = pool.submit(() -> {
        log.debug("task 2 running...");
        Thread.sleep(1000);
        log.debug("task 2 finish...");
        return 2;
    });

    Future<Integer> result3 = pool.submit(() -> {
        log.debug("task 3 running...");
        Thread.sleep(1000);
        log.debug("task 3 finish...");
        return 3;
    });

    log.debug("shutdownNow");

    // 立即关闭线程池的所有线程,把未完成的任务存入集合并返回
    List<Runnable> runnables = pool.shutdownNow();
    log.debug("other.... {}" , runnables);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

输出:

2022-05-17 22:27:42,852 [pool-1-thread-1] TestThreadPoolExecutor - task 1 running...
2022-05-17 22:27:42,853 [main] TestThreadPoolExecutor - shutdownNow
2022-05-17 22:27:42,853 [pool-1-thread-2] TestThreadPoolExecutor - task 2 running...
2022-05-17 22:27:42,853 [main] TestThreadPoolExecutor - other.... [java.util.concurrent.FutureTask@22927a81]
1
2
3
4

调用了 shutdownNow 方法,无论线程是否在执行任务,都会马上停止执行,并将未完成的任务以集合形式返回,交给用户自行处理。

# 其他方法

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();

// 线程池状态是否是 TERMINATED
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
1
2
3
4
5
6
7
8

# 线程是否越多越好?

一个计算为主的程序(专业一点称为 CPU 密集型程序)。多线程跑的时候,可以充分利用起所有的 CPU 核心,比如说 4 个核心的 CPU,开 4 个线程的时候,可以同时跑 4 个线程的运算任务,此时是最大效率。但是如果线程远远超出 CPU 核心数量反而会使得任务效率下降,因为频繁的切换线程也是要消耗时间的。因此对于 CPU 密集型的任务来说,线程数等于 CPU 数是最好的了。

如果是一个磁盘或网络为主的程序(IO 密集型)。一个线程处在 IO 等待的时候,另一个线程还可以在 CPU 里面跑,有时候 CPU 闲着没事干,所有的线程都在等着 IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道 IO 的速度比起 CPU 来是慢到令人发指的。所以开多线程,比 方说多线程网络传输,多线程往不同的目录写文件,等等。此时线程数等于 IO 任务数是最佳的。

# 两个接口

# Callable 接口

95eef01f3a292df5cb9047105febf76635a87341

JDK1.5 以后创建线程可以通过一下方式:

  • 继承 Thread 类,实现 void run() 方法
  • 实现 Runnable 接口,实现 void run() 方法
  • 实现 Callable 接口,实现 V call() Throws Exception 方法

Callable 和 Runnale 接口区别:

  • Callable 可以抛出异常,和 Future、FutureTask 配合可以用来获取异步执行的结果
  • Runnable 没有返回结果,异常只能内部消化

执行 Callable 的线程的方法可以通过以下两种方式:

  • 借助 FutureTask,使用 Thread 的 start 方法来执行
  • 加入到线程池中,使用线程池的 execute 或 submit 执行
  • 注:Callable 无法直接使用 Thread 来执行

我们都知道,Callable 带有返回值的,如果我们不需要返回值,却又想用 Callable 该如何做?直接 return null

# Future 接口

Future 设计的初衷:对将来某个时刻会发生的结果进行建模。

当我们需要调用一个函数方法时。如果这个函数执行很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。因此,我们可以让被调用者立即返回,让他在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获取需要的数据。

它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在 Future 中出发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要等待耗时的操作完成。

Future 的优点:比更底层的 Thread 更易用。要使用 Future,通常只需要将耗时的操作封装在一个 Callable 对象中,再将它提交给 ExecutorService。

为了让程序更加高效,让 CPU 最大效率的工作,我们会采用异步编程。首先想到的是开启一个新的线程去做某项工作。再进一步,为了让新线程可以返回一个值,告诉主线程事情做完了,于是乎 Future 粉墨登场。然而 Future 提供的方式是主线程主动问询新线程,要是有个回调函数就爽了。所以,为了满足 Future 的某些遗憾,强大的 CompletableFuture 随着 Java8 一起来了。

Future 是用来获取异步计算结果的接口,常用方法:

  • boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true
  • boolean isCancelled():如果在任务正常完成前将其取消,则返回 true
  • boolean isDone():如果任务已完成,则返回 true,可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true
  • V get():获取异步结果,此方法会一直阻塞等到计算完成
  • V get(long timeout, TimeUnit unit):获取异步结果,此方法会在指定时间内一直阻塞等到计算完成,超时后会抛出超时异常

通过方法分析我们也知道实际上 Future 提供了 3 种功能:

  • 能够中断执行中的任务
  • 判断任务是否执行完成
  • 获取任务执行完成后的结果

但是 Future 只是一个接口,我们无法直接创建对象,因此就需要其实现类 FutureTask。

FutureTask 类

FutureTask 类的实现:

public class FutureTask<V> implements RunnableFuture<V> {
// ...
}
 
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
1
2
3
4
5
6
7
8
9
10
11

FutureTask 实现了 Runnable、Future 两个接口。由于 FutureTask 实现了 Runnable,因此它既可以通过 Thread 包装来直接执行,也可以提交给 ExecuteService 来执行。并且还可以直接通过 get() 函数获取执行结果,该函数会阻塞,直到结果返回。因此 FutureTask 既是 Future、Runnable,又是包装了 Callable(如果是 Runnable 最终也会被转换为 Callable),它是这两者的合体。

FutureTask 的构造函数:

public FutureTask(Callable<V> callable) {

}

public FutureTask(Runnable runnable, V result) {

}
1
2
3
4
5
6
7

FutureTask 包装过的 Callable 在 Thread、线程池上执行:

public static void main(String[] args) {
    int a = 1;
    int b = 2;
    Callable<Integer> callable = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return a + b;
        }
    };
    // 通过 futureTask 来执行 Callable
    FutureTask<Integer> futureTask = new FutureTask<>(callable);

    // 1.使用 Thread 执行线程
    new Thread(futureTask).start();
    try {
        Integer integer = futureTask.get();
        System.out.println(integer); // 输出 3
    } catch (InterruptedException e | ExecutionException e) {
        e.printStackTrace();
    }

    // 2.使用线程池执行线程
    Executors.newFixedThreadPool(1).submit(futureTask);
    threadPool.shutdown();
    try {
        Integer integer = futureTask.get();
        System.out.println(integer); // 输出 3
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

FutureTask 包装过的 Runnable 在 Thread、线程池上执行:

public static void main(String[] args) {
    Person p = new Person(0, "person");
    // RunnableTask 是自己写的类,继承 Runnable
    RunnableTask runnableTask = new RunnableTask(p);

    // 创建 futureTask 来执行 Runnable
    FutureTask<Person> futureTask = new FutureTask<>(runnableTask, p);

    // 1.使用 Thread 执行线程
    new Thread(futureTask).start();
    try {
        Person x = futureTask.get();
        System.out.println(x);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } 

    // 2.使用线程池执行线程
    threadPool.submit(futureTask);
    threadPool.shutdown();
    try {
        Person y = futureTask.get();
        System.out.println(y);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 异步模式之工作线程

# 定义

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。

例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message,即一个线程处理一个任务)

注意,不同任务类型应该使用不同的线程池,这样能够避免 饥饿,并能提升效率。

例如,如果一个餐馆的工人既要招呼客人(任务类型 A),又要到后厨做菜(任务类型 B)显然效率不咋地,分成服务员(线程池 A)与厨师(线程池 B)更为合理,当然你能想到更细致的分工。

# 饥饿

固定大小线程池会有饥饿现象:

  • 两个工人是同一个线程池中的两个线程
  • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
    • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
    • 后厨做菜:没啥说的,做就是了
  • 比如工人 A 处理了点餐任务,接下来它要等着工人B 把菜做好,然后上菜,他俩也配合的蛮好
  • 但现在同时来了两个客人,这个时候工人 A 和工人 B 都去处理点餐了,这时没人做饭了,产生饥饿
public class TestDeadLock {
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();
    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        
        executorService.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        
        /*
        executorService.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        */
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

输出:

17:21:27.883 c.TestDeadLock [pool-1-thread-1] - 处理点餐 ...
17:21:27.891 c.TestDeadLock [pool-1-thread-2] - 做菜
17:21:27.891 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅
1
2
3

当 23 - 35 注释取消后,可能的输出

17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐 ... 
17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐 ...
// 阻塞
1
2
3

这是因为线程池只创建了 2 个线程,而这 2 个线程都去点餐了,内部的做菜没有线程去执行,所以产生饥饿问题,类似于死锁的特性,但是又不是死锁。

解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池,例如:

public class TestDeadLock {
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();
    
    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }
    
    public static void main(String[] args) {
        // 2 个线程池,分别是处理点餐和做菜的
        ExecutorService waiterPool = Executors.newFixedThreadPool(1);
        ExecutorService cookPool = Executors.newFixedThreadPool(1);
        
        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = cookPool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        
        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future<String> f = cookPool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

输出:

17:25:14.626 c.TestDeadLock [pool-1-thread-1] - 处理点餐... 
17:25:14.630 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.631 c.TestDeadLock [pool-1-thread-1] - 上菜: 地三鲜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 处理点餐... 
17:25:14.632 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 上菜: 辣子鸡丁
1
2
3
4
5
6

# 创建多少线程池合适

  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿
  • 过大会导致更多的线程上下文切换,占用更多内存

# CPU 密集型运算

通常采用 CPU 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费。

# I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下:

  • 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU 计算时间 + 等待时间) / CPU 计算时间

例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 CPU 被 100% 利用,套用公式:

  • 4 * 100% * 100% / 50% = 8

例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 CPU 被 100% 利用,套用公式:

  • 4 * 100% * 100% / 10% = 40

计算时间和等待时间利用专门的工具进行计算。

# 正确处理执行任务异常

在执行任务时,代码出现了异常是不会体现出来的,我们要么主动捕捉异常,要么使用 Future 来 get 异常。

方法 1:主动捉异常

public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(1);
    pool.submit(() -> {
        try {
            log.debug("task1");
            int i = 1 / 0;
        } catch (Exception e) {
            log.error("error:", e);
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11

输出:

21:59:04.558 c.TestTimer [pool-1-thread-1] - task1 
21:59:04.562 c.TestTimer [pool-1-thread-1] - error: 
java.lang.ArithmeticException: / by zero 
 at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28) 
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748) 
1
2
3
4
5
6
7
8
9

方法 2:使用 Future

public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(1);
    Future<Boolean> f = pool.submit(() -> {
        log.debug("task1");
        int i = 1 / 0;
        return true;
    });
    log.debug("result:{}", f.get());
}
1
2
3
4
5
6
7
8
9

输出:

21:54:58.208 c.TestTimer [pool-1-thread-1] - task1 
Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.ArithmeticException: / by zero 
 at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
 at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
 at cn.itcast.n8.TestTimer.main(TestTimer.java:31) 
Caused by: java.lang.ArithmeticException: / by zero 
 at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28) 
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748) 
1
2
3
4
5
6
7
8
9
10
11
12

# 任务调度线程池

# Timer

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

public static void main(String[] args) {
    Timer timer = new Timer();
    TimerTask task1 = new TimerTask() {
        @Override
        public void run() {
            log.debug("task 1");
            sleep(2);
        }
    };
    TimerTask task2 = new TimerTask() {
        @Override
        public void run() {
            log.debug("task 2");
        }
    };
    // 使用 timer 添加两个任务,希望它们都在 1s 后执行
    // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务 1』的延时,影响了『任务 2』的执行
    timer.schedule(task1, 1000);
    timer.schedule(task2, 1000);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

输出:

20:46:09.444 c.TestTimer [main] - start... 
20:46:10.447 c.TestTimer [Timer-0] - task 1 
20:46:12.448 c.TestTimer [Timer-0] - task 2
1
2
3

上面 task1 任务里睡眠了 2s,导致计划 1s 后执行 task2,因为 task1 没结束,所以最终等了 3s(2s task1 任务的睡眠 + 1s 延迟) 才执行 task2。

# ScheduledExecutorService

任务调度线程池解决了这个问题,JDK 提供了 ScheduledExecutorService 类。

使用 ScheduledExecutorService 改写:

public static void main(String[] args) {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    
    // 添加两个任务,希望它们都在 1s 后执行
    executor.schedule(() -> {
        System.out.println("任务1,执行时间:" + new Date());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) { }
    }, 1000, TimeUnit.MILLISECONDS);

    executor.schedule(() -> {
        System.out.println("任务2,执行时间:" + new Date());
    }, 1000, TimeUnit.MILLISECONDS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

输出:

任务1,执行时间:Thu Jan 03 12:45:17 CST 2019 
任务2,执行时间:Thu Jan 03 12:45:17 CST 2019
1
2

可以看到,任务调度线程池有两个线程,所以可以分别同时执行两个任务。但是如果 只有一个线程,就等价于 Timer,即任务 1 依然睡眠 2s + 延迟 1s = 3s,后,才执行任务 2。

所以 ScheduledExecutorService 任务调度线程池需要多个线程才能解决这种等待问题。

# 常用 API

scheduleAtFixedRate

每隔一段时间反复执行任务。四个参数:

  • 第一个参数:执行的任务
  • 第二个参数:延迟时间
  • 第三个参数:延迟时间后,每隔多少时间执行一次任务
  • 第四个参数:时间单位

例子:

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    
    log.debug("start...");
    // 1s 后,每个 1s 执行一次任务
    pool.scheduleAtFixedRate(() -> {
        log.debug("running...");
    }, 1, 1, TimeUnit.SECONDS);
}
1
2
3
4
5
6
7
8
9

输出:

21:45:43.167 c.TestTimer [main] - start... 
21:45:44.215 c.TestTimer [pool-1-thread-1] - running... 
21:45:45.215 c.TestTimer [pool-1-thread-1] - running... 
21:45:46.215 c.TestTimer [pool-1-thread-1] - running... 
21:45:47.215 c.TestTimer [pool-1-thread-1] - running... 
// ...
1
2
3
4
5
6

任务执行时间超过了间隔时间呢?即任务里睡眠了一段时间,那么结果如何?

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    log.debug("start...");
    
    pool.scheduleAtFixedRate(() -> {
        log.debug("running...");
        sleep(2);
    }, 1, 1, TimeUnit.SECONDS);
}
1
2
3
4
5
6
7
8
9

输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s

21:44:30.311 c.TestTimer [main] - start... 
21:44:31.360 c.TestTimer [pool-1-thread-1] - running... 
21:44:33.361 c.TestTimer [pool-1-thread-1] - running... 
21:44:35.362 c.TestTimer [pool-1-thread-1] - running... 
21:44:37.362 c.TestTimer [pool-1-thread-1] - running... 
// ...
1
2
3
4
5
6

如果任务里出现了睡眠时间,那么最终的间隔时间 = Math.max(指定间隔时间, 任务执行时间)。

如上面,间隔时间为 1s,而任务执行时间花了 2s,所以最大值是 2s,最终的间隔时间就是 2s。

scheduleWithFixedDelay

每隔一段时间反复执行任务,比较 scheduleAtFixedRate,最终的间隔时间 = 指定间隔时间 + 任务执行时间

比较四个参数:

public static void main(String[] args) {
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);

    log.debug("start...");
    pool.scheduleWithFixedDelay(()-> {
        log.debug("running...");
        sleep(2);
    }, 1, 1, TimeUnit.SECONDS);
}
1
2
3
4
5
6
7
8
9

输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是上一个任务结束 + 延时,也就是下一个任务开始,所以间隔都是 3s

21:40:55.078 c.TestTimer [main] - start... 
21:40:56.140 c.TestTimer [pool-1-thread-1] - running... 
21:40:59.143 c.TestTimer [pool-1-thread-1] - running... 
21:41:02.145 c.TestTimer [pool-1-thread-1] - running... 
21:41:05.147 c.TestTimer [pool-1-thread-1] - running... 
// ...
1
2
3
4
5
6

即原本指定每隔 1s 执行一次,但是任务执行花了 2s,所以最终的执行间隔时间就是 2 + 1 = 3s。

评价:整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务。

两个方法区别

参数一样,但是执行任务的间隔不一样:

  • scheduleAtFixedRate 的最终执行间隔时间 = Math.max(指定间隔时间, 任务执行时间),谁时间大取谁
  • scheduleWithFixedDelay 的最终执行间隔时间 = 指定间隔时间 + 任务执行时间

# 应用

如何让每周四 18:00:00 定时执行任务?

public static void main(String[] args) {
    // 获得当前时间
    LocalDateTime now = LocalDateTime.now();
    // 获取本周四 18:00:00.000
    LocalDateTime thursday = now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);

    // 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
    if(now.compareTo(thursday) >= 0) {
        // 加一周
        thursday = thursday.plusWeeks(1);
    }
    // 计算当前时间到下周四的时间差,即延时执行时间
    long initialDelay = Duration.between(now, thursday).toMillis();
    // 计算间隔时间,即 1 周的毫秒值
    long oneWeek = 7 * 24 * 3600 * 1000;
    
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
    System.out.println("开始时间:" + new Date());
    
    executor.scheduleAtFixedRate(() -> {
        System.out.println("执行时间:" + new Date());
    }, initialDelay, oneWeek, TimeUnit.MILLISECONDS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# Tomcat 线程池

Tomcat 在哪里用到了线程池呢?

image-20220518010850018

Tomcat 线程池流程:

当一个 Socket 连接请求过来:

  • LimitLatch 类用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 类只负责「接收新的 Socket 连接」
  • Poller 类只负责监听 Socket Channel 是否有「可读的 I/O 事件」
  • 一旦可读,封装一个任务对象(SocketProcessor),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责「处理请求」

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同:

  • 如果总线程数达到 maximumPoolSize
    • 这时不会立刻抛 RejectedExecutionException 异常
    • 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

源码 tomcat-7.0.42

public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
        // 调用 JDK 的处理方法
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        // 如果抛出异常,则先获取队列,并尝试将任务再次加入队列
        if (super.getQueue() instanceof TaskQueue) {
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
                // 加入失败,抛出异常
                if (!queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.");
                }
            } catch (InterruptedException x) {
                submittedCount.decrementAndGet();
                Thread.interrupted();
                throw new RejectedExecutionException(x);
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

TaskQueue.java

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
    if ( parent.isShutdown() ) 
        throw new RejectedExecutionException(
        "Executor not running, can't force a command into the queue"
    );
    return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task 
    is rejected
}
1
2
3
4
5
6
7
8

配置

我们在 Tomcat 的 server.xml 经常见到:

<Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
          maxThreads="150" minSpareThreads="4"/>

<Connector port="8080" protocol="HTTP/1.1"
           connectionTimeout="20000"
           redirectPort="8443" />

<Connector executor="tomcatThreadPool"
           port="8080" protocol="HTTP/1.1"
           connectionTimeout="20000"
           redirectPort="8443" />
1
2
3
4
5
6
7
8
9
10
11

Connector 配置

配置项 默认值 说明
acceptorThreadCount 1 acceptor 线程数量
pollerThreadCount 1 poller 线程数量
minSpareThreads 10 核心线程数,即 corePoolSize
maxThreads 200 最大线程数,即 maximumPoolSize
executor - Executor 名称,用来引用下面的 Executor

如果 Connector 里配置了 executor,则优先以 executor 的配置使用。

Executor 线程配置

配置项 默认值 说明
threadPriority 5 线程优先级
daemon true 是否守护线程
minSpareThreads 25 核心线程数,即 corePoolSize
maxThreads 200 最大线程数,即 maximumPoolSize
maxIdleTime 60000 线程生存时间,单位是毫秒,默认值即 1 分钟
maxQueueSize Integer.MAX_VALUE 队列长度
prestartminSpareThreads false 核心线程是否在服务器启动时启动

我们可以看到,Tocmat 的 maxQueueSize 默认值是 Integer.MAX_VALUE,可以看作是无界队列,那么是不是就没有救急线程呢?

救急线程创建区别

回顾 JDK 的救急线程产生条件:当队列 Queue 满了,才创建救急线程,来处理无法进入队列的任务。

但是 Tomcat 的队列是无界的,所以 Tomcat 修改了救急线程的创建条件:

  • 当任务大于核心线程数,就马上创建 maxThreads - minSpareThreads(最大线程数 - 核心线程数)个救急线程处理剩下的任务,如果任务超过了最大线程数,才放入队列里

image-20220518152342295

# Fork/Join

# 概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 CPU 密集型运算。

所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。

Fork/Join 默认会创建与 CPU 核心数大小相同的线程池。

# 工作窃取

另外,Fork/Join 有一个工作窃取的概念。简单理解,就是一个工作线程下会维护一个包含多个子任务的双端队列。而对于每个工作线程来说,会从头部到尾部依次执行任务。这时,总会有一些线程执行的速度较快,很快就把所有任务消耗完了。那这个时候怎么办呢,总不能空等着吧,多浪费资源啊。

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:

JUC-00000029

那么为什么需要使用工作窃取算法呢?

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

于是,先做完任务的工作线程会从其他未完成任务的线程尾部依次获取任务去执行。这样就可以充分利用 CPU 的资源。这个非常好理解,就比如有个妹子程序员做任务比较慢,那么其他程序猿就可以帮她分担一些任务,这简直是双赢的局面啊,妹子开心了,你也开心了。

# ForkJoinPool

WorkQueue 是一个 ForkJoinPool 中的内部类,它是线程池中线程的工作队列的一个封装,支持任务窃取。

什么叫线程的任务窃取呢?就是说你和你的一个伙伴一起吃水果,你的那份吃完了,他那份没吃完,那你就偷偷的拿了他的一些水果吃了。存在执行 2 个任务的子线程,这里要讲成存在 A,B 两个了。

WorkQueue 在执行任务,A 的任务执行完了,B 的任务没执行完,那么 A 的 WorkQueue 就从 B 的 WorkQueue 的 ForkJoinTask 数组中拿走了一部分尾部的任务来执行,可以合理的提高运行和计算效率。

每个线程都有一个 WorkQueue,而 WorkQueue 中有执行任务的线程(ForkJoinWorkerThread owner),还有这个线程需要处理的任务(ForkJoinTask<?>[] array)。那么这个新提交的任务就是加到 Array 中。

# ForkJoinTask

ForkJoinTask 代表运行在 ForkJoinPool 中的任务。

主要方法:

  • fork() 在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务
  • join() 当任务完成的时候返回计算结果。
  • invoke() 开始执行任务,如果必要,等待计算完成。

子类: Recursive :递归

  • RecursiveAction 一个递归无结果的 ForkJoinTask(没有返回值)
  • RecursiveTask 一个递归有结果的 ForkJoinTask(有返回值)

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)。

继承后,就有 fork 方法来将任务进行拆分,创建线程执行拆分的任务,join 方法就获取执行任务后的返回值。

# 示例

例如下面定义了一个对 1 ~ n 之间的整数求和的任务。

@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask<Integer> {
    int n;
    public AddTask1(int n) {
        this.n = n;
    }
    
    @Override
    protected Integer compute() {
        // 如果 n 已经为 1,可以求得结果了
        if (n == 1) {
            log.debug("join() {}", n);
            return n;
        }

        AddTask1 t1 = new AddTask1(n - 1);
        // fork 方法,将任务进行拆分,创建线程执行
        t1.fork();
        log.debug("fork() {} + {}", n, t1);

        // join 方法返回结果结果
        int result = n + t1.join();
        log.debug("join() {} + {} = {}", n, t1, result);
        return result;
    }
    
    @Override
    public String toString() {
        return "{" + n + '}';
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

然后提交给 ForkJoinPool 来执行

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool(4);
    System.out.println(pool.invoke(new AddTask1(5)));
}
1
2
3
4

结果:

[ForkJoinPool-1-worker-0] - fork() 2 + {1} 
[ForkJoinPool-1-worker-1] - fork() 5 + {4} 
[ForkJoinPool-1-worker-0] - join() 1 
[ForkJoinPool-1-worker-0] - join() 2 + {1} = 3 
[ForkJoinPool-1-worker-2] - fork() 4 + {3} 
[ForkJoinPool-1-worker-3] - fork() 3 + {2} 
[ForkJoinPool-1-worker-3] - join() 3 + {2} = 6 
[ForkJoinPool-1-worker-2] - join() 4 + {3} = 10 
[ForkJoinPool-1-worker-1] - join() 5 + {4} = 15 
15
1
2
3
4
5
6
7
8
9
10

用图来表示:

image-20220518154934234

# 改进

手动将任务继续拆分。

class AddTask3 extends RecursiveTask<Integer> {

    int begin;
    int end;
    public AddTask3(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }
    @Override
    public String toString() {
        return "{" + begin + "," + end + '}';
    }
    @Override
    protected Integer compute() {
        // 5, 5
        if (begin == end) {
            log.debug("join() {}", begin);
            return begin;
        }
        // 4, 5
        if (end - begin == 1) {
            log.debug("join() {} + {} = {}", begin, end, end + begin);
            return end + begin;
        }

        int mid = (end + begin) / 2;
        
        AddTask3 t1 = new AddTask3(begin, mid);
        t1.fork();
        
        AddTask3 t2 = new AddTask3(mid + 1, end);
        t2.fork();
        
        log.debug("fork() {} + {} = ?", t1, t2);
        int result = t1.join() + t2.join();
        log.debug("join() {} + {} = {}", t1, t2, result);
        return result;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

然后提交给 ForkJoinPool 来执行

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool(4);
    System.out.println(pool.invoke(new AddTask3(1, 10)));
}
1
2
3
4

结果

[ForkJoinPool-1-worker-0] - join() 1 + 2 = 3 
[ForkJoinPool-1-worker-3] - join() 4 + 5 = 9 
[ForkJoinPool-1-worker-0] - join() 3 
[ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ? 
[ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ? 
[ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6 
[ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15 
15 
1
2
3
4
5
6
7
8

用图来表示

image-20220518160556249

打个比方,假设一个酒店有 400 个房间,一共有 4 名清洁工,每个工人每天可以打扫 100 个房间,这样,4 个工人满负荷工作时,400 个房间全部打扫完正好需要 1 天。

Fork/Join 的工作模式就像这样:首先,工人甲被分配了 400 个房间的任务,他一看任务太多了自己一个人不行,所以先把 400 个房间拆成两个 200,然后叫来乙,把其中一个 200 分给乙。紧接着,甲和乙再发现 200 也是个大任务,于是甲继续把 200 分成两个 100,并把其中一个 100 分给丙,类似的,乙会把其中一个 100 分给丁,这样,最终 4 个人每人分到 100 个房间,并发执行正好是 1天。

更新时间: 2024/01/17, 05:48:13
最近更新
01
JVM调优
12-10
02
jenkins
12-10
03
Arthas
12-10
更多文章>