目录

JUC - 共享模式之锁基础

# Monitor 概念

# Java 对象头

以 32 位虚拟机为例。

普通对象

image-20220509170854982

数组对象

image-20220509170904523

其中对象标记 Mark Word 结构为:

32 位虚拟机 Mark Word

  • biased_lock:0 代表没加锁,二进制的结尾用 01 表示
  • biased_lock:1 代表成功加锁,二进制的结尾用 01 表示
  • ptr_to_lock_record:30 代表轻量级锁,二进制的结尾用 00 表示
  • ptr_to_lock_record:30 代表重量级锁,二进制的结尾用 10 表示

image-20220509171038115

64 位虚拟机 Mark Word

  • biased_lock:0 代表没加锁,二进制的结尾用 01 表示
  • biased_lock:1 代表成功加锁,二进制的结尾用 01 表示
  • ptr_to_lock_record:62 代表轻量级锁,二进制的结尾用 00 表示
  • ptr_to_lock_record:62 代表重量级锁,二进制的结尾用 10 表示

image-20220509171248707

参考资料:https://stackoverflow.com/questions/26357186/what-is-in-java-object-header

# Monitor 锁

Monitor 被翻译为 监视器管程,也就是经常说到的

每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针。

Monitor 结构如下:

image-20220509171454348

Owner 是主人,即谁拥有锁。EntryList 代表其他的等待线程,也就是非 Owner 线程。

  • 刚开始 Monitor 中 Owner 为 null
  • 当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor 中只能有一个 Owner
  • 在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行 synchronized(obj) 就会进入 EntryList,形成 BLOCKED 状态
  • Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争的时是非公平的
  • 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲 wait & notify 时会分析

注意:

  • synchronized 必须是进入同一个对象的 Monitor 才有上述的效果
  • 不加 synchronized 的对象不会关联监视器,不遵从以上规则

# synchronized 原理

static final object lock = new object();
static int counter = 0;

public static void main( String[] args ) {
    synchronized (lock) {
        counter++;
    }
}
1
2
3
4
5
6
7
8

对应的字节码:

public static void main(java.lang.string[]);
    descriptor: ([Ljava/lang/string;)v
    flags: AcC_PUBLIC, Acc_STATIC
    code:
        stack=2, locals=3, args_size=1
            e: getstatic	#2      // <- lock 引用 (synchronized 开始)
            3: dup
            4: astore_1				// lock 引用 -> slot 1
            5: monitorenter 		// 将 lock 对象 Markword 置为 Monitor 指针
            6: getstatic#3			// <- i
            9: iconst_1				// 准备常数 1
            10: iadd				// +1
            11: putstatic	#3		// -> i
            14: aload_1				// <- lock 引用
            15: monitorexitT		// 将 lock 对象 MarkWord 重置,唤醒 EntryList
            16: goto		24
            19: astore_2			// e -> slot 2
            I2o: aload_1			// <- lock 引用
            21: monitorexit			// 将 lock 对象 Markword 重置,唤醒 EntryList
            22: aload_2				// <- slot 2 (e)
            23 : athrow				// throw e
            24: return
    Exception table :
    from  to  target type
    6     16   19     any
    19    22   19     any
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

注意:方法级别的 synchronized 不会在字节码指令中有所体现。

# synchronized 原理进阶

# 小故事

故事角色:

  • 老王 - JVM
  • 小南和小女 - 线程
  • 房间 - 对象
  • 房间门上 - 防盗锁 - Monitor
  • 房间门上 - 小南书包 - 轻量级锁
  • 房间门上 - 刻上小南大名 - 偏向锁
  • 批量重刻名 - 一个类的偏向锁撤销到达 20 阈值
  • 不能刻名字 - 批量撤销该类对象的偏向锁,设置该类不可偏向

小南要使用房间保证计算不被其它人干扰(原子性),最初,他用的是防盗锁,当上下文切换时,锁住门。这样,即使他离开了,别人也进不了门,他的工作就是安全的。

但是,很多情况下没人跟他来竞争房间的使用权。小女是要用房间,但使用的时间上是错开的,小南白天用,小女晚上用。每次上锁太麻烦了,有没有更简单的办法呢?

小南和小女商量了一下,约定不锁门了,而是谁用房间,谁把自己的书包挂在门口,但他们的书包样式都一样,因此每次进门前得翻翻书包,看课本是谁的,如果是自己的,那么就可以进门,这样省的上锁解锁了。万一书包不是自己的,那么就在门外等,并通知对方下次用锁门的方式。

后来,小女回老家了,很长一段时间都不会用这个房间。小南每次还是挂书包,翻书包,虽然比锁门省事了,但仍然觉得麻烦。

于是,小南干脆在门上刻上了自己的名字:【小南专属房间,其它人勿用】,下次来用房间时,只要名字还在,那么说明没人打扰,还是可以安全地使用房间。如果这期间有其它人要用这个房间,那么由使用者将小南刻的名字擦掉,升级为挂书包的方式。

同学们都放假回老家了,小南就膨胀了,在 20 个房间刻上了自己的名字,想进哪个进哪个。后来他自己放假回老家了,这时小女回来了(她也要用这些房间),结果就是得一个个地擦掉小南刻的名字,升级为挂书包的方式。老王觉得这成本有点高,提出了一种批量重刻名的方法,他让小女不用挂书包了,可以直接在门上刻上自己的名字。

后来,刻名的现象越来越频繁,老王受不了了:算了,这些房间都不能刻名了,只能挂书包。

# 重量级锁

重量级锁就是 Monitor 锁。

# 轻量级锁

轻量级锁概念:多个线程在不同时间使用锁,注意是不同时间,也就是线程 1 在使用期间,线程 2 不会出现,当线程 1 使用完释放后,线程 2 才出现,去获取。如果线程 1 在使用期间,线程 2 出现来抢锁,那么就变成重量级锁(虽然不可能抢到,但是一旦发送竞争,就变成重量级锁)。

轻量级锁的使用场景:如果一个对象虽然有多线程要加锁,但加锁的时间是错开的(也就是没有竞争),那么可以使用轻量级锁来优化。

轻量级锁对使用者是透明的,即语法仍然是 synchronized。

static final Object obj = new Object();
public static void method1() {
    synchronized(obj) {
        // 同步块 A
        method2();
    }
}
public static void method2() {
    synchronized(obj) {
        // 同步块 B
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

创建锁记录(Lock Record)对象,每个线程都的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的 Mark Word。

image-20220509180137968

让锁记录中 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录。

image-20220509180215783

如果 cas 替换成功,对象头中存储了锁记录地址和状态 00,表示由该线程给对象加锁,这时图示如下(不是图演示的交换,而是对象头记录锁记录地址):

image-20220509180234684

如果 cas 失败,有两种情况:

  • 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
  • 如果是自己执行了 synchronized 锁重入(对一个对象可以重复加锁,即可以多次调用 synchronized(锁)),那么再添加一条 Lock Record 作为重入的计数

image-20220509180312315

当退出 synchronized 代码块(解锁时)如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减一。

image-20220509180234684

当退出 synchronized 代码块(解锁时)锁记录的值不为 null,这时使用 cas 将 Mark Word 的值恢复给对象头。

  • 成功,则解锁成功
  • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

# 锁膨胀

如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。

static Object obj = new Object();
public static void method1() {
    synchronized( obj ) {
        // 同步块
    }
}
1
2
3
4
5
6

当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁。

image-20220509181325425

这时 Thread-1 加轻量级锁失败,进入锁膨胀流程。

  • 即为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址
  • 然后自己进入 Monitor 的 EntryList,形成 BLOCKED 状态

image-20220509181418627

当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,失败。这时会进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程。

# 自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。

自旋重试成功的情况:

线程 1(core 1 上) 对象 Mark 线程 2(core 2 上)
- 10(重量锁) -
访问同步块,获取 monitor 10(重量锁)重量锁指针 -
成功(加锁) 10(重量锁)重量锁指针 -
执行同步块 10(重量锁)重量锁指针 -
执行同步块 10(重量锁)重量锁指针 访问同步块,获取 monitor
执行同步块 10(重量锁)重量锁指针 自旋重试
执行完毕 10(重量锁)重量锁指针 自旋重试
成功(解锁) 01(无锁) 自旋重试
- 10(重量锁)重量锁指针 成功(加锁)
- 10(重量锁)重量锁指针 执行同步块
- ... ...

如上,线程 1 加锁后,线程 2 访问发现被锁住,并没有立即进入阻塞状态,而是不断自旋重试获取锁,后面发现正好线程 1 释放锁,所以拿到锁。

自旋重试失败的情况:

程 1(core 1 上) 对象 Mark 线程 2(core 2 上)
- 10(重量锁) -
访问同步块,获取 monitor 10(重量锁)重量锁指针 -
成功(加锁) 10(重量锁)重量锁指针 -
执行同步块 10(重量锁)重量锁指针 -
执行同步块 10(重量锁)重量锁指针 访问同步块,获取 monitor
执行同步块 10(重量锁)重量锁指针 自旋重试
执行同步块 10(重量锁)重量锁指针 自旋重试
执行同步块 10(重量锁)重量锁指针 自旋重试
执行同步块 10(重量锁)重量锁指针 阻塞
- ... ...

自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。

在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。

Java 7 之后不能控制是否开启自旋功能。

# 偏向锁

轻量级锁在没有竞争时(就自己这个线程),每次重入(可以多次执行 synchronized(锁))仍然需要执行 CAS 操作。

Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有。

例如:

static final Object obj = new Object();
public static void m1() {
    synchronized( obj ) {
        // 同步块 A
        m2();
    }
}
public static void m2() {
    synchronized( obj ) {
        // 同步块 B
        m3();
    }
}
public static void m3() {
    synchronized( obj ) {
        // 同步块 C
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

轻量级锁和偏向锁区别图示:

image-20220509182850742

image-20220509182904165

# 偏向状态

偏向状态视频:https://www.bilibili.com/video/BV16J411h7Rd?p=83,这部分内容笔记不好形容,看视频更容易理解。

回忆一下对象头格式:

  • biased_lock:0 代表没加锁,二进制的结尾用 01 表示
  • biased_lock:1 代表成功加锁,二进制的结尾用 01 表示
  • ptr_to_lock_record:62 代表轻量级锁,二进制的结尾用 00 表示
  • ptr_to_lock_record:62 代表重量级锁,二进制的结尾用 10 表示

image-20220509171248707

一个对象创建时:

  • 如果开启了偏向锁(默认开启),那么对象创建后,markword 值为 0x05 即最后 3 位为 101,这时它的 thread、epoch、age 都为 0
  • 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数 -XX:BiasedLockingStartupDelay=0 来禁用延迟
  • 如果没有开启偏向锁,那么对象创建后,markword 值为 0x01 即最后 3 位为 001,这时它的 hashcode、age 都为 0,第一次用到 hashcode 时才会赋值

测试偏向锁

代码:

public class TestBiased {
    public static void main(String[] args) throws InterruptedException {
        Dog d = new Dog();
        log.debug( ClassLayout.parseInstance(d).toPrintableSimple( withoutHex: true));
        synchronized (d) {
            log.debug( ClassLayout.parseInstance(d).toPrintableSimple( withoutHex: true));
        }
    }
}
1
2
3
4
5
6
7
8
9

输出:

20:04:36.135 c.TestBiased [main] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000001
20:04:40.137 c.TestBiased [main] - 00000000 00000000 00000000 00000000 00000011 01100011 01000000 00000101
1
2

第一个最后的 001 是没加锁的,101 则是加锁的,因为偏向锁是默认是延迟的,所以第一个来不及变成偏向锁,即是 001,所以等待几秒或者加锁,它就会变成 101。

禁用偏向锁

在上面测试代码运行时在添加 VM 参数 -XX:-UseBiasedLocking 禁用偏向锁。

输出:

20:04:36.135 c.TestBiased [main] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000001 
20:05:36.135 c.TestBiased [main] - 00000000 00000000 00000000 00000000 00100000 00010100 11110011 10001000 
20:06:36.135 c.TestBiased [main] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000001
1
2
3

# 撤销偏向锁 - 调用对象 hashCode

如果在形成偏向锁之前,调用哈希值,则撤销偏向锁的使用。

public class TestBiased {
    public static void main(String[] args) throws InterruptedException {
        Dog d = new Dog();
        d.hashCode(); // 禁用偏向锁
        log.debug( ClassLayout.parseInstance(d).toPrintableSimple( withoutHex: true));
        synchronized (d) {
            log.debug( ClassLayout.parseInstance(d).toPrintableSimple( withoutHex: true));
        }
    }
}
1
2
3
4
5
6
7
8
9
10

输出:

00000000 00000000 00000000 01101010 00000010 01001010 01100111 00000001 
00000000 00000000 00000000 00000000 00100000 11000011 11110011 01101000
00000000 00000000 00000000 01101010 00000010 01001010 01100111 00000001
1
2
3

原因:因为第一次调用哈希值方法时,生成哈希值,而 Mark Word 中存储的是线程 id,即剩余空间不足以存放哈希值,所以不得不把偏向锁的空间去掉,存放哈希值,导致撤销偏向锁。

  • 轻量级锁会在锁记录中记录 hashCode
  • 重量级锁会在 Monitor 中记录 hashCode

在调用 hashCode 后使用偏向锁,记得去掉 -XX:-UseBiasedLocking

# 撤销偏向锁 - 其它线程使用对象

当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁。

注意:没有发生竞争锁,只是当前线程用完锁并释放了锁,其他线程才来使用锁,此时发现线程 ID 不对,进化成轻量级锁。

private static void test2() throws InterruptedException {
    Dog d = new Dog();
    Thread t1 = new Thread(() -> {
        synchronized (d) {
            log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        }
        synchronized (TestBiased.class) {
            TestBiased.class.notify();
        }
        // 如果不用 wait/notify 使用 join 必须打开下面的注释
        // 因为:t1 线程不能结束,否则底层线程可能被 jvm 重用作为 t2 线程,底层线程 id 是一样的
        /*
        try {
            System.in.read();
         } catch (IOException e) {
             e.printStackTrace();
         }
         */
    }, "t1");
    t1.start();
    Thread t2 = new Thread(() -> {
        synchronized (TestBiased.class) {
            try {
                TestBiased.class.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        synchronized (d) {
            log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        }
        log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
    }, "t2");
    t2.start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

输出:

[t1] - 00000000 00000000 00000000 00000000 00011111 01000001 00010000 00000101 
[t2] - 00000000 00000000 00000000 00000000 00011111 01000001 00010000 00000101 
[t2] - 00000000 00000000 00000000 00000000 00011111 10110101 11110000 01000000 
[t2] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000001
1
2
3
4

看结尾,前两个 101 都是偏向锁,然后第三个 00 变成轻量级锁。

# 撤销偏向锁 - 调用 wait/notify

wait/notify 属于重量锁的 API,所以一旦调用这两个 API,则变成重量级锁

public static void main(String[] args) throws InterruptedException {
    Dog d = new Dog();
    Thread t1 = new Thread(() -> {
        log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        synchronized (d) {
            log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
            try {
                d.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        }
    }, "t1");
    t1.start();
    new Thread(() -> {
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (d) {
            log.debug("notify");
            d.notify();
        }
    }, "t2").start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

输出:

[t1] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000101 
[t1] - 00000000 00000000 00000000 00000000 00011111 10110011 11111000 00000101 
[t2] - notify 
[t1] - 00000000 00000000 00000000 00000000 00011100 11010100 00001101 11001010
1
2
3
4

结尾的 101 代表偏向锁,10 代表重量级锁。

# 批量重偏向

如果对象虽然被多个线程访问,但没有竞争(对象锁被释放后才被访问),这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID。

什么时候有机会重新偏向其他线程?

当撤销偏向锁阈值超过 20 次后,JVM 会这样觉得,我是不是偏向错了呢,于是会在给这些对象加锁时重新偏向至新的加锁线程。

在没有超出 20 次前,处于轻量级锁,超出 20 次后,降为偏向锁。

private static void test3() throws InterruptedException {
    Vector<Dog> list = new Vector<>();
    Thread t1 = new Thread(() -> {
        for (int i = 0; i < 30; i++) {
            Dog d = new Dog();
            list.add(d);
            synchronized (d) {
                log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
            }
        }
        synchronized (list) {
            list.notify();
        } 
    }, "t1");
    t1.start();

    Thread t2 = new Thread(() -> {
        synchronized (list) {
            try {
                list.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("===============> ");
        for (int i = 0; i < 30; i++) {
            Dog d = list.get(i);
            log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
            synchronized (d) {
                log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
            }
            log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
        }
    }, "t2");
    t2.start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

输出(截取部分):

[t1] - 23 00000000 00000000 00000000 00000000 00011111 11110011 11100000 00000101 
[t1] - 24 00000000 00000000 00000000 00000000 00011111 11110011 11100000 00000101 
[t1] - 25 00000000 00000000 00000000 00000000 00011111 11110011 11100000 00000101 
[t1] - 26 00000000 00000000 00000000 00000000 00011111 11110011 11100000 00000101 
[t1] - 27 00000000 00000000 00000000 00000000 00011111 11110011 11100000 00000101 
[t1] - 28 00000000 00000000 00000000 00000000 00011111 11110011 11100000 00000101 
[t1] - 29 00000000 00000000 00000000 00000000 00011111 11110011 11100000 00000101 
[t2] - ===============> 
[t2] - 14 00000000 00000000 00000000 00000000 00011111 11110011 11100000 00000101 
[t2] - 14 00000000 00000000 00000000 00000000 00100000 01011000 11110111 00000000
[t2] - 25 00000000 00000000 00000000 00000000 00011111 11110011 11100001 00000101 
1
2
3
4
5
6
7
8
9
10
11

可以发现 t2 撤销偏向锁 25 次后,线程 ID 发送了改变,倒数第二个变成了 11100001。

t2 撤销偏向锁 14 次的 00 是轻量级锁,因为偏向锁偏向 t1,一旦 t2 使用了锁,则变成轻量级锁,但是 t2 一直使用到 20 阈值,则变成偏向锁,偏向 t2 线程。

# 批量撤销

当撤销偏向锁阈值超过 40 次,JVM 会这样觉得,自己确实偏向错了,根本就不该偏向。于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的。

public class Test {
    static Thread t1,t2,t3;
    private static void test4() throws InterruptedException {
        Vector<Dog> list = new Vector<>();
        int loopNumber = 39;
        t1 = new Thread(() -> {
            for (int i = 0; i < loopNumber; i++) {
                Dog d = new Dog();
                list.add(d);
                synchronized (d) {
                    log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
                }
            }
            LockSupport.unpark(t2);
        }, "t1");
        t1.start();
        t2 = new Thread(() -> {
            LockSupport.park();
            log.debug("===============> ");
            for (int i = 0; i < loopNumber; i++) {
                Dog d = list.get(i);
                log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
                synchronized (d) {
                    log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
                }
                log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
            }
            LockSupport.unpark(t3);
        }, "t2");
        t2.start();
        t3 = new Thread(() -> {
            LockSupport.park();
            log.debug("===============> ");
            for (int i = 0; i < loopNumber; i++) {
                Dog d = list.get(i);
                log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
                synchronized (d) {
                    log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
                }
                log.debug(i + "\t" + ClassLayout.parseInstance(d).toPrintableSimple(true));
            }
        }, "t3");
        t3.start();
        t3.join();
        log.debug(ClassLayout.parseInstance(new Dog()).toPrintableSimple(true));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 锁消除 - 同步省略

public class MyBenchmark {
    static int x = 0;
    @Benchmark
    public void a() throws Exception {
        x++;
    }
    @Benchmark
    public void b() throws Exception {
        Object o = new Object();
        synchronized (o) {
            x++;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

JVM 有个强大的 JIT 即时编译器,能实现 同步省略

同步省略:当对局部变量加锁时,且该局部变量不会被其他地方使用,则自动省略、去掉加锁代码,所以 a 方法和 b 方法在最终运行的效率是接近的,即去掉:Object o = new Object();synchronized (o) {} 代码,只执行 x++ 代码。

同步省略具体看:https://notes.youngkbt.cn/java/jvm/heap/#同步省略

# 总结

优先级:偏向锁 -> 轻量级锁 -> 重量级锁。

偏向锁与轻量级锁互相切换:

  • 初始化对象为偏向锁,即第一个线程使用了对象锁,那么该对象锁就是偏向锁,偏向第一个线程
  • 当第一个线程 先释放锁,第二个线程 再使用锁,则变成轻量级锁,当第二个线程使用锁到 20 次(使用 synchronized 20 次),则变成偏向锁,此时偏向锁偏向第二个线程
  • 当第二个线程或者后面的线程 连续 使用锁到 40 次(使用 synchronized 40 次),则取消偏向锁,即整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的

偏向锁、轻量级锁与重量级锁切换:

  • 无论对象锁处于偏向锁还是轻量级锁,只要出现 竞争,都会变成重量级锁,即对象锁还没被释放掉,就被另一个线程想使用,则变成重量级锁
  • 调用重量级锁的 API,如 wait、notify 等

所以偏向锁与轻量级锁可以互相切换的原因:线程使用的对象锁 先释放掉,再被其他线程拿到使用,不能出现 竞争行为

# wait & notify

# 小故事 - 为什么需要 wait

由于条件不满足,小南不能继续进行计算,但小南如果一直占用着锁,其它人就得一直阻塞,效率太低。

image-20220509215911235

于是老王单开了一间休息室(调用 wait 方法),让小南到休息室(WaitSet)等着去了,但这时锁释放开,其它人可以由老王随机安排进屋。

直到小 M 将烟送来,大叫一声 [你的烟到了] (调用 notify 方法)。

image-20220509215922304

小南于是可以离开休息室,重新进入竞争锁的队列。

image-20220509220320527

# wait notify 原理

image-20220509171454348

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入 EntryList 重新竞争

# API 介绍

  • obj.wait() 让进入 object 监视器的线程到 waitSet 等待
  • obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
  • obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法,否则如下直接调用,会报错:

public class Test18 {
    static final Object lock = new Object();
    public static void main(String[] args) {
        try {
            lock.wait();  // 报错,因为没有 synchronized(lock) 加锁
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10

API 代码:

final static Object obj = new Object();
public static void main(String[] args) {
    new Thread(() -> {
        synchronized (obj) {
            log.debug("执行....");
            try {
                obj.wait(); // 让线程在 obj 上一直等待下去
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("其它代码....");
        }
    }).start();
    new Thread(() -> {
        synchronized (obj) {
            log.debug("执行....");
            try {
                obj.wait(); // 让线程在obj 上一直等待下去
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("其它代码....");
        }
    }).start();
    // 主线程两秒后执行
    sleep(2);
    log.debug("唤醒 obj 上其它线程");
    synchronized (obj) {
        obj.notify(); // 唤醒obj上一个线程
        // obj.notifyAll(); // 唤醒obj上所有等待线程
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

notify 的一种结果输出:

20:00:53.096 [Thread-0] c.TestWaitNotify - 执行.... 
20:00:53.099 [Thread-1] c.TestWaitNotify - 执行.... 
20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....
1
2
3
4

notifyAll 的结果输出:

19:58:15.457 [Thread-0] c.TestWaitNotify - 执行.... 
19:58:15.460 [Thread-1] c.TestWaitNotify - 执行.... 
19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码.... 
19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码.... 
1
2
3
4
5

wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到 notify 为止。

wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是在 n 毫秒前被 notify 唤醒。

# wait notify 的正确姿势

开始之前先看看 sleep(long n)wait(long n) 的区别:

  • sleep 是 Thread 方法,而 wait 是 Object 的方法
  • sleep 不需要强制和 synchronized 配合使用,但 wait 需要 和 synchronized 一起使用
  • sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁

共同的:

  • 它们的状态都是 TIMED_WAITING

例 1












 


















 




public class Test {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    sleep(2);
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以开始干活了");
                }
            }
        }, "小南").start();
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }
    }
    sleep(1);
    new Thread(() -> {
        // 这里能不能加 synchronized (room)?
        hasCigarette = true;
        log.debug("烟到了噢!");
    }, "送烟的").start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

输出:

20:49:49.883 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:49:49.887 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:49:50.882 [送烟的] c.TestCorrectPosture - 烟到了噢!
20:49:51.887 [小南] c.TestCorrectPosture - 有烟没?[true] 
20:49:51.887 [小南] c.TestCorrectPosture - 可以开始干活了
20:49:51.887 [其它人] c.TestCorrectPosture - 可以开始干活了
20:49:51.887 [其它人] c.TestCorrectPosture - 可以开始干活了
20:49:51.888 [其它人] c.TestCorrectPosture - 可以开始干活了
20:49:51.888 [其它人] c.TestCorrectPosture - 可以开始干活了
20:49:51.888 [其它人] c.TestCorrectPosture - 可以开始干活了
1
2
3
4
5
6
7
8
9
10

小南因为没有烟,所以使用 sleep 方法睡眠了 2 秒,等待 1 秒后送烟过来,然后再醒过来干活,最后其他人干活。

加了 synchronized(room) 后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main 线程没加 synchronized 就好像 main 线程是翻窗户进来的。

缺点:小南因为没有烟罢工,导致其他人无法干活,因为 slepp 方法不释放锁,导致其他人无法获取锁。

解决:使用 wait-notify 机制。

例 2













 






















 





public class Test {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    try {
                        room.wait(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以开始干活了");
                }
            }
        }, "小南").start();
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("可以开始干活了");
                }
            }, "其它人").start();
        }
        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasCigarette = true;
                log.debug("烟到了噢!");
                room.notify();
            }
        }, "送烟的").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

输出:

20:51:42.489 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:51:42.493 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:51:42.493 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:42.493 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:42.494 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:42.494 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:42.494 [其它人] c.TestCorrectPosture - 可以开始干活了
20:51:43.490 [送烟的] c.TestCorrectPosture - 烟到了噢!
20:51:43.490 [小南] c.TestCorrectPosture - 有烟没?[true] 
20:51:43.490 [小南] c.TestCorrectPosture - 可以开始干活了
1
2
3
4
5
6
7
8
9
10

wait-notify 机制解决了其它干活的线程阻塞的问题。但如果有其它线程也在等待条件呢?毕竟 notify 是随机唤醒线程的,例子只有一个。

例 3













 



















 

















 





public class Test {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("有烟没?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("没烟,先歇会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("可以开始干活了");
                } else {
                    log.debug("没干成活...");
                }
            }
        }, "小南").start();
        new Thread(() -> {
            synchronized (room) {
                Thread thread = Thread.currentThread();
                log.debug("外卖送到没?[{}]", hasTakeout);
                if (!hasTakeout) {
                    log.debug("没外卖,先歇会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("外卖送到没?[{}]", hasTakeout);
                if (hasTakeout) {
                    log.debug("可以开始干活了");
                } else {
                    log.debug("没干成活...");
                }
            }
        }, "小女").start();
        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("外卖到了噢!");
                room.notify();
            }
        }, "送外卖的").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

输出:

20:53:12.173 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:53:12.176 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:53:12.176 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
20:53:12.176 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:53:13.174 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:53:13.174 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:53:13.174 [小南] c.TestCorrectPosture - 没干成活... 
1
2
3
4
5
6
7

notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为 虚假唤醒

解决方法:使用 wait-notifyAll 机制。

例 4

将例子 3 代码的 47 到 53 代码换成如下代码:

new Thread(() -> {
    synchronized (room) {
        hasTakeout = true;
        log.debug("外卖到了噢!");
        room.notifyAll();
    }
}, "送外卖的").start();
1
2
3
4
5
6
7

用 notifyAll 仅解决某个线程的唤醒问题,但是也会唤醒其他线程,而其他线程需要 wait 的问题没有得到解决就被唤醒,则浪费了线程。

解决方法:使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了,所以用 while + wait,当其他线程条件不成立,再次 wait。

例 5

将例子 3 代码

if (!hasCigarette) {
    log.debug("没烟,先歇会!");
    try {
        room.wait();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
1
2
3
4
5
6
7
8

改成

while (!hasCigarette) {
    log.debug("没烟,先歇会!");
    try {
        room.wait();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
1
2
3
4
5
6
7
8

输出:

20:58:34.322 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:58:34.326 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:58:34.326 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
20:58:34.326 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:58:35.323 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:58:35.324 [小女] c.TestCorrectPosture - 外卖送到没?[true] 
20:58:35.324 [小女] c.TestCorrectPosture - 可以开始干活了
20:58:35.324 [小南] c.TestCorrectPosture - 没烟,先歇会!
1
2
3
4
5
6
7
8

# 总结

有上面 5 个例子得到:

使用 wait-notifyAll 机制时,因为 notifyAll 唤醒所有线程,而有些线程需要 wait 的问题没有得到解决就被唤醒,导致线程浪费,所以利用 while 进行重复判断,也就是线程调用 wait 方法 必须 处于 while 循环里,当线程被唤醒时,重新进入 while 判断,如果问题依然没有得到解决,重新进行 wait。

模板:

// 线程 n
synchronized(lock) {
    while(条件不成立) {
        lock.wait();
    }
    // 满足条件,不再 wait
}
// 线程 m
synchronized(lock) {
    while(条件不成立) {
        lock.wait();
    }
    // 满足条件,不再 wait
}
// 完成条件线程
synchronized(lock) {
    // 这里完成了某些线程的条件,然后唤醒所有线程
    lock.notifyAll();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 同步模式之保护性暂停

学完 wait-notify 机制,那么由此衍生的实际开发环境的一个常用模式:同步模式之保护性暂停。

# 定义

保护性暂停:即 Guarded Suspension,用在一个线程等待另一个线程的执行结果。

也就是一个线程需要某些数据,则进行 wait 等待,直到另一个线程获取了这些数据,给该线程并唤醒它。

要点:

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject,即关联同一个保护线程
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

image-20220511153837217

# GuardedObject 实现

保护线程代码:

class GuardedObject {
    // 数据
    private Object response;
    // 锁
    private final Object lock = new Object();
    public Object get() {
        synchronized (lock) {
            // 条件不满足则等待
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

一个线程等待另一个线程的执行结果

public static void main(String[] args) {
    GuardedObject guardedObject = new GuardedObject();
    new Thread(() -> {
        try {
            // 子线程执行下载,download 内部执行了的代码是从网上下载一些资源,需要一些时间
            List<String> response = download();
            log.debug("download complete...");
            // 下载的结果放回守护线程里
            guardedObject.complete(response);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }).start();
    
    log.debug("waiting...");
    // 主线程阻塞等待,直到第 9 行代码执行成功
    Object response = guardedObject.get();
    log.debug("get response: [{}] lines", ((List<String>) response).size());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

执行结果:

08:42:18.568 [main] c.TestGuardedObject - waiting...
08:42:23.312 [Thread-0] c.TestGuardedObject - download complete...
08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines
1
2
3

上面的守护线程类是一个实际常用的类。如有两段代码,第一段代码获取数据,第二段需要数据,则让第二段代码等待,直至第一段代码获取数据成功,接着第二代码才往下执行。

# GuardedObject 超时版

进一步优化代码,如果我们不需要一直等待呢,超出一定时间后,就不再等待。

class GuardedObjectV2 {
    private Object response;
    private final Object lock = new Object();
    public Object get(long millis) {
        synchronized (lock) {
            // 1) 记录最初时间
            long begin = System.currentTimeMillis();
            // 2) 已经经历的时间
            long timePassed = 0;
            while (response == null) {
                // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);
                if (waitTime <= 0) {
                    log.debug("break...");
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
                timePassed = System.currentTimeMillis() - begin;
                log.debug("timePassed: {}, object is null {}", timePassed, response == null);
            }
            return response;
        }
    }
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            log.debug("notify...");
            lock.notifyAll();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

代码里 lock.wait(waitTime); 是防止被唤醒时,依然没有数据,导致进入 while 循环再次 wait 时,此时新一轮 wait 时间要减去前面被唤醒花费的时间,因为开始 waitTime 就等于传过来的时间减 0。

测试代码:

public static void main(String[] args) {
    GuardedObjectV2 v2 = new GuardedObjectV2();
    new Thread(() -> {
        sleep(1);
        v2.complete(null);
        sleep(1);
        v2.complete(Arrays.asList("a", "b", "c"));
    }).start();
    
    // 等待时间超时
    Object response = v2.get(2500);
    // 等待时间不足
	// List<String> lines = v2.get(1500);
    
    if (response != null) {
        log.debug("get response: [{}] lines", ((List<String>) response).size());
    } else {
        log.debug("can't get response");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

输出:

08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 2500
08:49:40.917 [Thread-0] c.GuardedObjectV2 - notify...
08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true
08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 1497
08:49:41.918 [Thread-0] c.GuardedObjectV2 - notify...
08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false
08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines
1
2
3
4
5
6
7

# 原理之 join

join 方法内部其实就用到了保护性暂停模式。源码如下:

public final void join() throws InterruptedException {
    join(0);
}

public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

发现和保护性暂停模式的 GuardedObject 超时版类似,只不过 join 封装了代码,只能针对整个线程的结束,而自己写的保护性暂停模式代码内部可以针对某个条件进行处理。

上面源码没有 notify 方法,当线程运行完后,自动调用 notify 方法。

# 多任务版 GuardedObject

image-20220511173538431

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦「结果等待者」和「结果生产者」,还能够同时支持多个任务的管理。

新增 id 用来标识 Guarded Object。

class GuardedObject {
    // 标识 Guarded Object
    private int id;
    public GuardedObject(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    // 结果
    private Object response;
    // 获取结果
    // timeout 表示要等待多久 2000
    public Object get(long timeout) {
        synchronized (this) {
            // 开始时间 15:00:00
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            while (response == null) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // 虚假唤醒 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求得经历时间
                passedTime = System.currentTimeMillis() - begin; // 15:00:02
            }
            return response;
        }
    }
    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

中间解耦类:邮箱

class Mailboxes {
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    private static int id = 1;
    // 产生唯一 id
    private static synchronized int generateId() {
        return id++;
    }
    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

业务相关类:居民

class People extends Thread{
    @Override
    public void run() {
        // 收信
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.debug("开始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
    }
}
1
2
3
4
5
6
7
8
9
10

业务相关类:邮递员

class Postman extends Thread {
    private int id;
    private String mail;
    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }
    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail);
        guardedObject.complete(mail);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

测试

public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 3; i++) {
        new People().start();
    }
    Sleeper.sleep(1);
    for (Integer id : Mailboxes.getIds()) {
        new Postman(id, "内容" + id).start();
    }
}
1
2
3
4
5
6
7
8
9

某次运行结果:

10:35:05.689 c.People [Thread-1] - 开始收信 id:3
10:35:05.689 c.People [Thread-2] - 开始收信 id:1
10:35:05.689 c.People [Thread-0] - 开始收信 id:2
10:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容2
10:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容1
10:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容2
10:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容1
10:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容3
10:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3
1
2
3
4
5
6
7
8
9

# 异步模式之生产者/消费者

# 定义

要点:

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

image-20220511170449680

# 实现

class Message {
    private int id;
    private Object message;
    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }
    public int getId() {
        return id;
    }
    public Object getMessage() {
        return message;
    }
}
class MessageQueue {
    private LinkedList<Message> queue;
    private int capacity;
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }
    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

应用

public class Test {
    public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(2);
        // 4 个生产者线程, 下载任务
        for (int i = 0; i < 4; i++) {
            int id = i;
            new Thread(() -> {
                try {
                    log.debug("download...");
                    // download 内部下载一些d
                    List<String> response = Downloader.download();
                    log.debug("try put message({})", id);
                    messageQueue.put(new Message(id, response));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }, "生产者" + i).start();
        }
        // 1 个消费者线程, 处理结果
        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                List<String> response = (List<String>) message.getMessage();
                log.debug("take message({}): [{}] lines", message.getId(), response.size());
            }
        }, "消费者").start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

某次运行结果:

10:48:38.070 [生产者3] c.TestProducerConsumer - download...
10:48:38.070 [生产者0] c.TestProducerConsumer - download...
10:48:38.070 [消费者] c.MessageQueue - 没货了, wait
10:48:38.070 [生产者1] c.TestProducerConsumer - download...
10:48:38.070 [生产者2] c.TestProducerConsumer - download...
10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1)
10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2)
10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0)
10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3)
10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines
10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines
10:48:41.240 [消费者] c.MessageQueue - 没货了, wait
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
更新时间: 2024/01/17, 05:48:13
最近更新
01
JVM调优
12-10
02
jenkins
12-10
03
Arthas
12-10
更多文章>