Java中线程等待和唤醒
2023-06-18 11:02:00

Java中线程等待和唤醒

本文主要是对Java中线程等待、唤醒相关的内容进行总结。

Java 线程状态变迁图

线程的生命周期和状态

Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态:

  • NEW: 初始状态,线程被创建出来但没有被调用 start()
  • RUNNABLE: 运行状态,线程被调用了 start()等待运行的状态。
  • BLOCKED:阻塞状态,需要等待锁释放。
  • WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)。
  • TIME_WAITING:超时等待状态,可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。
  • TERMINATED:终止状态,表示该线程已经运行完毕。

在操作系统层面,线程有 READY 和 RUNNING 状态;而在 JVM 层面,只能看到 RUNNABLE 状态(图源:HowToDoInJavaopen in new windowJava Thread Life Cycle and Thread Statesopen in new window),所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 。

图片

线程进入等待状态,即线程因为某种原因放弃了CPU使用权,阻塞也分为几种情况:

  • 等待阻塞:运行的线程执行wait方法,JVM会把当前线程放入到等待队列
  • 同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么JVM会把当前的线程放入到锁池中
  • 其他阻塞:运行的线程执行Thread.sleep或者join方法,或者发出了I/O请求时,JVM会把当前线程设置为阻塞状态,当sleep结束join线程终止、I/O处理完毕则线程恢复’

让线程等待和唤醒的使用方法

方式1: wait/notify

使用 Object 中的 wait() 方法让线程等待,使用 Object 中的 notify() 方法唤醒线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class WaitNotifyTest {
public static void main(String[] args) {
Object lock = new Object();

new Thread(() -> {
System.out.println("线程A等待获取lock锁");
synchronized (lock) {
try {
System.out.println("线程A获取了lock锁");
Thread.sleep(1000);
System.out.println("线程A将要运行lock.wait()方法进行等待");
lock.wait();
System.out.println("线程A等待结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
).start();

new Thread(() -> {
System.out.println("线程B等待获取lock锁");
synchronized (lock) {
System.out.println("线程B获取了lock锁");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程B将要运行lock.notify()方法进行通知");
lock.notify();
System.out.println("线程B结束!");
}
}
).start();
}
}

上面这段代码的输出为:

1
2
3
4
5
6
7
8
9
10
线程A等待获取lock锁
线程A获取了lock锁
线程B等待获取lock锁
线程A将要运行lock.wait()方法进行等待
线程B获取了lock锁
线程B将要运行lock.notify()方法进行通知
线程B结束!
线程A等待结束

进程已结束,退出代码0

注意:此种方式必须使用同一把锁并且必须包含在synchronized代码块中,如果未使用synchronized包裹,则会报错。

下面了解一下Object对象的wait和notify方法,源码分析内容可见参考资料3。

方法名称描述
notify()通知一个在对象上等待的线程,使其从wait()返回,而返回的前提是该线程获取到了对象的锁(举例为上述代码中最后两条输出相关的部分)
notifyAll()通知所有等待在该对象上的线程。
wait()调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意, 调用wait()方法后,会释放对象的锁
wait(long)超时等待一段时间,这里的参数是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回。
wait(long, int)对于超时时间更细粒度的控制,可以达到毫秒。

方式2: Condition

使用 JUC 包中 Condition 的 await() 方法让线程等待,使用 signal() 方法唤醒线程。

Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"synchronized关键字捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

举三个例子:

示例1是通过Object的wait(), notify()来演示线程的休眠/唤醒功能。
示例2是通过Condition的await(), signal()来演示线程的休眠/唤醒功能。
示例3是通过Condition的高级功能。

示例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class WaitTest1 {
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");
synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
try {
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();
System.out.println(Thread.currentThread().getName()+" block");
ta.wait(); // 等待
System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”
System.out.println(Thread.currentThread().getName()+" wakup others");
notify(); // 唤醒“当前对象上的等待线程”
}
}
}
}

示例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest1 {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");
lock.lock(); // 获取锁
try {
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();
System.out.println(Thread.currentThread().getName()+" block");
condition.await(); // 等待
System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}

static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
lock.lock(); // 获取锁
try {
System.out.println(Thread.currentThread().getName()+" wakup others");
condition.signal(); // 唤醒“condition所在锁上的其它线程”
} finally {
lock.unlock(); // 释放锁
}
}
}
}

运行结果

1
2
3
4
main start ta
main block
ta wakup others
main continue

通过“示例1”和“示例2”,我们知道Condition和Object的方法有一下对应关系:

1
2
3
4
              Object      Condition  
休眠 wait await
唤醒个线程 notify signal
唤醒所有线程 notifyAll signalAll

Condition除了支持上面的功能之外,它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。

例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,“读线程"需要等待。 如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程”,而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。

看看下面的示例3,可能对这个概念有更深刻的理解。

示例3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[5];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
while (count == items.length)
notFull.await();
// 将x添加到缓冲中
items[putptr] = x;
// 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
if (++putptr == items.length) putptr = 0;
// 将“缓冲”数量+1
++count;
// 唤醒take线程,因为take线程通过notEmpty.await()等待
notEmpty.signal();

// 打印写入的数据
System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
} finally {
lock.unlock(); // 释放锁
}
}

public Object take() throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
while (count == 0)
notEmpty.await();
// 将x从缓冲中取出
Object x = items[takeptr];
// 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
if (++takeptr == items.length) takeptr = 0;
// 将“缓冲”数量-1
--count;
// 唤醒put线程,因为put线程通过notFull.await()等待
notFull.signal();

// 打印取出的数据
System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
return x;
} finally {
lock.unlock(); // 释放锁
}
}
}

public class ConditionTest2 {
private static BoundedBuffer bb = new BoundedBuffer();

public static void main(String[] args) {
// 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
// 启动10个“读线程”,从BoundedBuffer中不断的读数据。
for (int i=0; i<10; i++) {
new PutThread("p"+i, i).start();
new TakeThread("t"+i).start();
}
}

static class PutThread extends Thread {
private int num;
public PutThread(String name, int num) {
super(name);
this.num = num;
}
public void run() {
try {
Thread.sleep(1); // 线程休眠1ms
bb.put(num); // 向BoundedBuffer中写入数据
} catch (InterruptedException e) {
}
}
}

static class TakeThread extends Thread {
public TakeThread(String name) {
super(name);
}
public void run() {
try {
Thread.sleep(10); // 线程休眠1ms
Integer num = (Integer)bb.take(); // 从BoundedBuffer中取出数据
} catch (InterruptedException e) {
}
}
}
}

总结一下方式1方式2的区别:

  1. 方式1 可以使用任意对象作为锁,方式2 需创建一个Lock对象
  2. Object#wait -> Condition#await 两者的返回条件不同,wait方法需要锁对象调用notif方法,而await需要condition对象调用signal,而且必须是调用await 的同一个condition。

方式3: LockSupport

LockSupport 类可以阻塞当前线程以及唤醒指定被阻塞的线程。LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。
因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。

LockSupport 类使用了一种名为 Permit ( 许可) 的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit),可以把许可堪称是一种 (0,1)信号量(Semaphore), 但与 Semaphore 不同的是,许可的累加上限是 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Thread a = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " \t ======= 进入锁");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "\t ======== 被唤醒");
}, "A");
a.start();

TimeUnit.SECONDS.sleep(3);

Thread b = new Thread(() -> {
LockSupport.unpark(a);
System.out.println(Thread.currentThread().getName() + "\t ======== 通知了");
}, "A");
b.start();

试验结论:

1、支持无锁的情况调用,执行线程的阻塞;

2、支持先 unpark , 然后 park 操作依然有效。

LockSupport 是用来创建锁和其他同步类的基本线程阻塞原语。

LockSupport 是一个线程阻塞工具类, 所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法。

归根结底, LockSupport 调用 Unsafe 的 native 代码

LockSupport 提供 park() 和 unpark() 方法实现阻塞吓成和解除线程阻塞的过程。

LockSupport 和每个使用它的线程都有一个许可(permit)关联。permit 相当于 1, 0 的开关,默认是0,

调用一次 unpark 就加 1 变成 1。

调用一次 park 会消费 permit , 也就是将 1 变成 0, 同时 park 立即返回。

如果再次调用 park 就会变成阻塞(因为 permit 为 0 了会阻塞在这里,直到 permit 变为 1),这时候调用 unpark 会把 permit 设置为 1。每个线程都有一个相关的 permit, permit 最多只有一个, 重复调用 unpark 也不会累积凭证。

形象的理解

线程阻塞需要消耗凭证(permit), 这个凭证最多只有 1个

当调用 park 方法时

  • 如果有凭证,则会直接消耗掉这个凭证然后正常退出。
  • 如果无凭证,就必须阻塞等待凭证可用。

而 unpark 则相反,它会增加一个凭证,但凭证最多只能有 1 个,累加无效。

参考资料

  1. 线程的几种状态你真的了解么 (qq.com)
  2. Java并发常见面试题总结(上) | JavaGuide(Java面试 + 学习指南)
  3. Java并发编程之Object.wait()/notify()详解_java object wait_DivineH的博客-CSDN博客
  4. Java多线程系列–“基础篇”05之 线程等待与唤醒 - 如果天空不死 - 博客园 (cnblogs.com)
  5. Java多线程系列目录(共43篇) - 如果天空不死 - 博客园 (cnblogs.com)
  6. LockSupport 原理解析 - 掘金 (juejin.cn)