保护性暂停模式和Join()原理
07 Feb 2020 -
2 minute read
同步模式——保护性暂停Guarded Suspension
定义
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
实现
class GuardedObject {
private Object response;
private final Object lock = new Object();
public Object get() {
synchronized (lock) {
// 条件不满足则等待,避免虚假唤醒
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
如果是带超时的GuardedObject
class GuardedObject {
private Object response;
public Object get(long timeout) {
synchronized (this) {
// started time
long begin = System.currentTimeMills();
// passed time
long passedTime = 0;
while (response == null) {
if (passedTime >= timeout) {
//经历的时间超过了最大等待时间
break;
}
try {
// 关键! 这里写timeout行不行 不行 要考虑虚假唤醒的情况
// 要记录之前已经等过的时间
this.wait(timeout - passenTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求经历时间
passedTime = System.currentTimeMills() - begin;
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
this.notifyAll();
}
}
}
Join() 原理
源码
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
// isAlive() 判断线程是否还存活
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
// 保护性暂停
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
小测试:
public class JoinTest {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
Thread.sleep(15000);
System.out.println("t1 finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
Thread t2 = new Thread(() -> {
try {
Thread.sleep(1000);
// 在这里等待t1结束
t1.join();
System.out.println("t2 finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2");
t1.start();
t2.start();
System.out.println("hahahha");
}
}
当我在t2线程里调用t1.join的时候,意味着t2将加入t1的wait set里面去,那么是谁来notify它的呢
void JavaThread::run() {
...
thread_main_inner();
}
void JavaThread::thread_main_inner() {
...
this->exit(false);
delete this;
}
void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
...
// Notify waiters on thread object. This has to be done after exit() is called
// on the thread (if the thread is the last thread in a daemon ThreadGroup the
// group should have the destroyed bit set before waiters are notified).
ensure_join(this);
...
}
static void ensure_join(JavaThread* thread) {
// We do not need to grap the Threads_lock, since we are operating on ourself.
Handle threadObj(thread, thread->threadObj());
assert(threadObj.not_null(), "java thread object must exist");
ObjectLocker lock(threadObj, thread);
// Ignore pending exception (ThreadDeath), since we are exiting anyway
thread->clear_pending_exception();
// Thread is exiting. So set thread_status field in java.lang.Thread class to TERMINATED.
java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
// Clear the native thread instance - this makes isAlive return false and allows the join()
// to complete once we've done the notify_all below
java_lang_Thread::set_thread(threadObj(), NULL);
lock.notify_all(thread);
// Ignore pending exception (ThreadDeath), since we are exiting anyway
thread->clear_pending_exception();
}
t2实际上是进入了t1这个对象的monitor里的wait set里等待,如果t1正在运行的这个线程已经结束了,它会自行notifyAll来释放锁对象。
保护性暂停——多个GuardedObject
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
如果一个消费者对应一个生产者,这往往不能满足我们现实需求,于是我们就有了生产、消费者模式。