保护性暂停模式和Join()原理

同步模式——保护性暂停Guarded Suspension

定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

image-20210122153417077

实现

class GuardedObject {
 	private Object response;
 	private final Object lock = new Object();
 	public Object get() {
		synchronized (lock) {
			// 条件不满足则等待,避免虚假唤醒
			while (response == null) {
				try {
					lock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			return response;
		}
 	}
 	public void complete(Object response) {
		synchronized (lock) {
			// 条件满足,通知等待线程
			this.response = response;
			lock.notifyAll();
			}
 		}
	}

如果是带超时的GuardedObject

class GuardedObject {
 	private Object response;
 	public Object get(long timeout) {
		synchronized (this) {
			// started time
            long begin = System.currentTimeMills();
            // passed time
            long passedTime = 0;
			while (response == null) {
                if (passedTime >= timeout) {
                    //经历的时间超过了最大等待时间
                    break;
                }
				try {
                    // 关键! 这里写timeout行不行 不行 要考虑虚假唤醒的情况
                    // 要记录之前已经等过的时间
					this.wait(timeout - passenTime);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
                // 求经历时间
                passedTime = System.currentTimeMills() - begin;
			}
			return response;
		}
 	}
 	public void complete(Object response) {
		synchronized (lock) {
			// 条件满足,通知等待线程
			this.response = response;
			this.notifyAll();
			}
 		}
	}

Join() 原理

源码

    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            // isAlive() 判断线程是否还存活
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                // 保护性暂停
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

小测试:

public class JoinTest {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            try {
                Thread.sleep(15000);
                System.out.println("t1 finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1");
        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(1000);
                // 在这里等待t1结束
                t1.join();
                System.out.println("t2 finished");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2");
        t1.start();
        t2.start();
        System.out.println("hahahha");
    }
}

当我在t2线程里调用t1.join的时候,意味着t2将加入t1的wait set里面去,那么是谁来notify它的呢

void JavaThread::run() {
  ...
  thread_main_inner();
}

void JavaThread::thread_main_inner() {
  ...
  this->exit(false);
  delete this;
}

void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
  ...
  // Notify waiters on thread object. This has to be done after exit() is called
  // on the thread (if the thread is the last thread in a daemon ThreadGroup the
  // group should have the destroyed bit set before waiters are notified).
  ensure_join(this);
  ...
}

static void ensure_join(JavaThread* thread) {
  // We do not need to grap the Threads_lock, since we are operating on ourself.
  Handle threadObj(thread, thread->threadObj());
  assert(threadObj.not_null(), "java thread object must exist");
  ObjectLocker lock(threadObj, thread);
  // Ignore pending exception (ThreadDeath), since we are exiting anyway
  thread->clear_pending_exception();
  // Thread is exiting. So set thread_status field in  java.lang.Thread class to TERMINATED.
  java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
  // Clear the native thread instance - this makes isAlive return false and allows the join() 
  // to complete once we've done the notify_all below
  java_lang_Thread::set_thread(threadObj(), NULL);
  lock.notify_all(thread);
  // Ignore pending exception (ThreadDeath), since we are exiting anyway
  thread->clear_pending_exception();
}

t2实际上是进入了t1这个对象的monitor里的wait set里等待,如果t1正在运行的这个线程已经结束了,它会自行notifyAll来释放锁对象。

保护性暂停——多个GuardedObject

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

image-20210122171716366

如果一个消费者对应一个生产者,这往往不能满足我们现实需求,于是我们就有了生产、消费者模式。

Categories:

Concurrent   Java