163study

Java 線程同步原理探析

現如今,服務器性能日益增長,并發(concurrency)編程已經“深入人心”,但由于馮諾依式計算機“指令存儲,順序執行”的特性,使得編寫跨越時間維度的并發程序異常困難,所以現代編程語言都對并發編程提供了一定程度的支持,像 Golang 里面的?Goroutines、Clojure 里面的?STM(Software Transactional Memory)、Erlang 里面的?Actor

Java 對于并發編程的解決方案是多線程(Multi-threaded programming),而且 Java 中的線程 與 native 線程一一對應,多線程也是早期操作系統支持并發的方案之一(其他方案:多進程、IO多路復用)。

本文著重介紹 Java 中線程同步的原理、實現機制,更側重操作系統層面,部分原理參考?openjdk 源碼。閱讀本文需要對 CyclicBarrier、CountDownLatch 有基本的使用經驗。

JUC

在 Java 1.5 版本中,引入?JUC?并發編程輔助包,很大程度上降低了并發編程的門檻,JUC 里面主要包括:

  • 線程調度的 Executors
  • 緩沖任務的 Queues
  • 超時相關的 TimeUnit
  • 并發集合(如 ConcurrentHashMap)
  • 線程同步類(Synchronizers,如 CountDownLatch )

個人認為其中最重要也是最核心的是線程同步這一塊,因為并發編程的難點就在于如何保證「共享區域(專業術語:臨界區,Critical Section)的訪問時序問題」。

AbstractQueuedSynchronizer

JUC 提供的同步類主要有如下幾種:

  • Semaphore?is a classic concurrency tool.
  • CountDownLatch?is a very simple yet very common utility for blocking until a given number of signals, events, or conditions hold.
  • A?CyclicBarrier?is a resettable multiway synchronization point useful in some styles of parallel programming.
  • A?Phaser?provides a more flexible form of barrier that may be used to control phased computation among multiple threads.
  • An?Exchanger?allows two threads to exchange objects at a rendezvous(約會) point, and is useful in several pipeline designs.

通過閱讀其源碼可以發現,其實現都基于?AbstractQueuedSynchronizer?這個抽象類(一般簡寫 AQS),正如其 javadoc 開頭所說:

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.

也就是說,AQS 通過維護內部的 FIFO 隊列和具備原子更新的整型 state 這兩個屬性來實現各種鎖機制,包括:是否公平,是否可重入,是否共享,是否可中斷(interrupt),并在這基礎上,提供了更方便實用的同步類,也就是一開始提及的 Latch、Barrier 等。

這里暫時不去介紹 AQS 實現細節與如何基于 AQS 實現各種同步類(挖個坑),感興趣的可以移步美團的一篇文章《不可不說的Java“鎖”事》?第六部分“獨享鎖 VS 共享鎖”。

在學習 Java 線程同步這一塊時,對我來說困擾最大的是「線程喚醒」,試想一個已經 wait/sleep/block 的線程,是如何響應 interrupt 的呢?當調用 Object.wait() 或 lock.lock() 時,JVM 究竟做了什么事情能夠在調用 Object.notify 或 lock.unlock 時重新激活相應線程?

帶著上面的問題,我們從源碼中尋找答案。

Java 如何實現堵塞、通知

wait/notify

public final native void wait(long timeout) throws InterruptedException;
public final native void notify();

在 JDK 源碼中,上述兩個方法均用 native 實現(即 cpp 代碼),追蹤相關代碼

// java.base/share/native/libjava/Object.c
static JNINativeMethod methods[] = {
    {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
    {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
    {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
    {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
    {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
};

通過上面的 cpp 代碼,我們大概能猜出 JVM 是使用 monitor 來實現的 wait/notify 機制,至于這里的 monitor 是何種機制,這里暫時跳過,接著看 lock 相關實現

lock/unlock

LockSupport 是用來實現堵塞語義模型的基礎輔助類,主要有兩個方法:park 與 unpark。(在英文中,park 除了“公園”含義外,還有“停車”的意思)

// LockSupport.java
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
// Unsafe.java
    /**
     * Unblocks the given thread blocked on {@code park}, or, if it is
     * not blocked, causes the subsequent call to {@code park} not to
     * block.  Note: this operation is "unsafe" solely because the
     * caller must somehow ensure that the thread has not been
     * destroyed. Nothing special is usually required to ensure this
     * when called from Java (in which there will ordinarily be a live
     * reference to the thread) but this is not nearly-automatically
     * so when calling from native code.
     *
     * @param thread the thread to unpark.
     */
    @HotSpotIntrinsicCandidate
    public native void unpark(Object thread);

    /**
     * Blocks current thread, returning when a balancing
     * {@code unpark} occurs, or a balancing {@code unpark} has
     * already occurred, or the thread is interrupted, or, if not
     * absolute and time is not zero, the given time nanoseconds have
     * elapsed, or if absolute, the given deadline in milliseconds
     * since Epoch has passed, or spuriously (i.e., returning for no
     * "reason"). Note: This operation is in the Unsafe class only
     * because {@code unpark} is, so it would be strange to place it
     * elsewhere.
     */
    @HotSpotIntrinsicCandidate
    public native void park(boolean isAbsolute, long time);

// hotspot/share/prims/unsafe.cpp
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
  HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
  EventThreadPark event;

  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
  if (event.should_commit()) {
    post_thread_park_event(&event, thread->current_park_blocker(), time);
  }
  HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END

通過上述 unsafe.cpp 可以看到每個 thread 都會有一個 Parker 對象,所以我們需要查看 parker 對象的定義

// hotspot/share/runtime/park.hpp
class Parker : public os::PlatformParker
...
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();

// hotspot/os/posix/os_posix.hpp
class PlatformParker : public CHeapObj<mtInternal> {
 protected:
  enum {
    REL_INDEX = 0,
    ABS_INDEX = 1
  };
  int _cur_index;  // which cond is in use: -1, 0, 1
  pthread_mutex_t _mutex[1];
  pthread_cond_t  _cond[2]; // one for relative times and one for absolute
  ...
};

看到這里大概就能知道 park 是使用?pthread_mutex_t?與?pthread_cond_t?實現。好了,到目前為止,就引出了 Java 中與堵塞相關的實現,不難想象,都是依賴底層操作系統的功能。

OS 支持的同步原語

Semaphore

并發編程領域的先鋒人物 Edsger Dijkstra(沒錯,也是最短路徑算法的作者)在 1965 年首次提出了信號量( Semaphores) 這一概念來解決線程同步的問題。信號量是一種特殊的變量類型,為非負整數,只有兩個特殊操作PV:

  • P(s) 如果 s!=0,將 s-1;否則將當前線程掛起,直到 s 變為非零
  • V(s) 將 s+1,如果有線程堵塞在 P 操作等待 s 變成非零,那么 V 操作會重啟這些線程中的任意一個

注:Dijkstra 為荷蘭人,名字 P 和 V 來源于荷蘭單詞 Proberen(測試)和Verhogen(增加),為方便理解,后文會用 Wait 與 Signal 來表示。

struct semaphore {
     int val;
     thread_list waiting;  // List of threads waiting for semaphore
}
wait(semaphore Sem):    // Wait until > 0 then decrement
  // 這里用的是 while 而不是 if
  // 這是因為在 wait 過程中,其他線程還可能繼續調用 wait
  while (Sem.val <= 0) {
    add this thread to Sem.waiting;
    block(this thread);
  }
  Sem.val = Sem.val - 1;
return;

signal(semaphore Sem):// Increment value and wake up next thread
     Sem.val = Sem.val + 1;
     if (Sem.waiting is nonempty) {
         remove a thread T from Sem.waiting;
         wakeup(T);
     }

有兩點注意事項:

  1. wait 中的「測試和減 1 操作」,signal 中的「加 1 操作」需要保證原子性。一般來說是使用硬件支持的?read-modify-write 原語,比如 test-and-set/fetch-and-add/compare-and-swap,除了硬件支持外,還可以用?busy wait?的軟件方式來模擬。
  2. signal 中沒有定義重新啟動的線程順序,也即多個線程在等待同一信號量時,無法預測重啟哪一個線程

使用場景

信號量為控制并發程序的執行提供了強有力工具,這里列舉兩個場景:

互斥

信號量提供了了一種很方便的方法來保證對共享變量的互斥訪問,基本思想是

將每個共享變量(或一組相關的共享變量)與一個信號量 s (初始化為1)聯系起來,然后用 wait/signal 操作將相應的臨界區包圍起來。

二元信號量也被稱為互斥鎖(mutex,mutual exclusve, 也稱為 binary semaphore),wait 操作相當于加鎖,signal 相當于解鎖。
一個被用作一組可用資源的計數器的信號量稱為計數信號量(counting semaphore)

調度共享資源

除了互斥外,信號量的另一個重要作用是調度對共享資源的訪問,比較經典的案例是生產者消費者,偽代碼如下:

emptySem = N
fullSem = 0
// Producer
while(whatever) {
    locally generate item
    wait(emptySem)
    fill empty buffer with item
    signal(fullSem)
}
// Consumer
while(whatever) {
    wait(fullSem)
    get item from full buffer
    signal(emptySem)
    use item
}

POSIX 實現

POSIX 標準中有定義信號量相關的邏輯,在?semaphore.h?中,為 sem_t 類型,相關 API:

// Intialize: 
sem_init(&theSem, 0, initialVal);
// Wait: 
sem_wait(&theSem);
// Signal: 
sem_post(&theSem);
// Get the current value of the semaphore:       
sem_getvalue(&theSem, &result);

信號量主要有兩個缺點:

  • Lack of structure,在設計大型系統時,很難保證 wait/signal 能以正確的順序成對出現,順序與成對缺一不可,否則就會出現死鎖!
  • Global visiblity,一旦程序出現死鎖,整個程序都需要去檢查

解決上述兩個缺點的新方案是監控器(monitor)

Monitors

C. A. R. Hoare?在 1974 年的論文?Monitors: an operating system structuring concept?首次提出了「監控器」概念,它提供了對信號量互斥和調度能力的更高級別的抽象,使用起來更加方便,一般形式如下:

monitor1 . . . monitorM
process1 . . . processN

我們可以認為監控器是這么一個對象:

  • 所有訪問同一監控器的線程通過條件變量(condition variables)間接通信
  • 某一個時刻,只能有一個線程訪問監控器

Condition variables

上面提到監控器通過條件變量(簡寫 cv)來協調線程間的通信,那么條件變量是什么呢?它其實是一個 FIFO 的隊列,用來保存那些因等待某些條件成立而被堵塞的線程,對于一個條件變量 c 來說,會關聯一個斷言(assertion) P。線程在等待 P 成立的過程中,該線程不會鎖住該監控器,這樣其他線程就能夠進入監控器,修改監控器狀態;在 P 成立時,其他線程會通知堵塞的線程,因此條件變量上主要有三個操作:

  1. wait(cv, m)?等待 cv 成立,m 表示與監控器關聯的一 mutex 鎖
  2. signal(cv)?也稱為?notify(cv)?用來通知 cv 成立,這時會喚醒等待的線程中的一個執行。根據喚醒策略,監控器分為兩類:Hoare vs. Mesa,后面會介紹
  3. broadcast(cv)?也稱為?notifyAll(cv)?喚醒所有等待 cv 成立的線程
POSIX 實現

在 pthreads 中,條件變量的類型是?pthread_cond_t,主要有如下幾個方法:

// initialize
pthread_cond_init() 
pthread_cond_wait(&theCV, &someLock);
pthread_cond_signal(&theCV);
pthread_cond_broadcast(&theCV);

使用方式

在 pthreads 中,所有使用條件變量的地方都必須用一個 mutex 鎖起來,這是為什么呢?看下面一個例子:

pthread_mutex_t myLock;
pthread_cond_t myCV;
int count = 0;

// Thread A
pthread_mutex_lock(&myLock);
while(count < 0) {
    pthread_cond_wait(&myCV, &myLock);
}
pthread_mutex_unlock(&myLock);

// Thread B

pthread_mutex_lock(&myLock);
count ++;
while(count == 10) {
    pthread_cond_signal(&myCV);
}
pthread_mutex_unlock(&myLock);

如果沒有鎖,那么

  • 線程 A 可能會在其他線程將 count 賦值為10后繼續等待
  • 線程 B 無法保證加一操作與測試 count 是否為零 的原子性

這里的關鍵點是,在進行條件變量的 wait 時,會釋放該鎖,以保證其他線程能夠將之喚醒。不過需要注意的是,在線程 B 通知(signal) myCV 時,線程 A 無法立刻恢復執行,這是因為 myLock 這個鎖還被線程 B 持有,只有在線程 B?unlock(&myLock)?后,線程 A 才可恢復。總結一下:

  1. wait 時會釋放鎖
  2. signal 會喚醒等待同一 cv 的線程
  3. 被喚醒的線程需要重新獲取鎖,然后才能從 wait 中返回

Hoare vs. Mesa 監控器語義

在上面條件變量中,我們提到 signal 在調用時,會去喚醒等待同一 cv 的線程,根據喚醒策略的不同,監控器也分為兩類:

  • Hoare 監控器(1974),最早的監控器實現,在調用 signal 后,會立刻運行等待的線程,這時調用 signal 的線程會被堵塞(因為鎖被等待線程占有了)
  • Mesa 監控器(Xerox PARC, 1980),signal 會把等待的線程重新放回到監控的 ready 隊列中,同時調用 signal 的線程繼續執行。這種方式是現如今 pthreads/Java/C# 采用的

這兩類監控器的關鍵區別在于等待線程被喚醒時,需要重新檢查 P 是否成立。

監控器工作示意圖

上圖表示藍色的線程在調用監控器的 get 方式時,數據為空,因此開始等待 emptyFull 條件;緊接著,紅色線程調用監控器的 set 方法改變 emptyFull 條件,這時

  • 按照 Hoare 思路,藍色線程會立刻執行,并且紅色線程堵塞
  • 按照 Mesa 思路,紅色線程會繼續執行,藍色線程會重新與綠色線程競爭與監控器關聯的鎖

Java 中的監控器

在 Java 中,每個對象都是一個監控器(因此具備一個 lock 與 cv),調用對象 o 的 synchronized 方法 m 時,會首先去獲取 o 的鎖,除此之外,還可以調用 o 的 wait/notify/notify 方法進行并發控制

Big Picture

操作系統并發相關 API 概括圖

Interruptible

通過介紹操作系統支持的同步原語,我們知道了 park/unpark、wait/notify 其實就是利用信號量(?pthread_mutex_t)、條件變量(?pthread_cond_t)實現的,其實監控器也可以用信號量來實現。在查看 AQS 中,發現有這么一個屬性:

/**
 * The number of nanoseconds for which it is faster to spin
 * rather than to use timed park. A rough estimate suffices
 * to improve responsiveness with very short timeouts.
 */
static final long spinForTimeoutThreshold = 1000L;

也就是說,在小于 1000 納秒時,await 條件變量 P 時,會使用一個循環來代替條件變量的堵塞與喚醒,這是由于堵塞與喚醒本身的操作開銷可能就遠大于 await 的 timeout。相關代碼:

// AQS 的 doAcquireNanos 方法節選
for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return true;
    }
    nanosTimeout = deadline - System.nanoTime();
    if (nanosTimeout <= 0L)
        return false;
    if (shouldParkAfterFailedAcquire(p, node) &&
        nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
    if (Thread.interrupted())
        throw new InterruptedException();
}

在 JUC 提供的高級同步類中,acquire 對應 park,release 對應 unpark,interrupt 其實就是個布爾的 flag 位,在 unpark 被喚醒時,檢查該 flag ,如果為 true,則會拋出我們熟悉的 InterruptedException。

Selector.select()?響應中斷異常的邏輯有些特別,因為對于這類堵塞 IO 操作來說,沒有條件變量的堵塞喚醒機制,我們可以再看下 Thread.interrupt 的實現

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

OpenJDK 使用了這么一個技巧來實現堵塞 IO 的中斷喚醒:在一個線程被堵塞時,會關聯一個 Interruptible 對象。
對于 Selector 來說,在開始時,會關聯這么一個Interruptible 對象

protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread target) {
                    synchronized (closeLock) {
                        if (closed)
                            return;
                        closed = true;
                        interrupted = target;
                        try {
                            AbstractInterruptibleChannel.this.implCloseChannel();
                        } catch (IOException x) { }
                    }
                }};
    }
    blockedOn(interruptor);
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

當調用 interrupt 方式時,會關閉該 channel,這樣就會關閉掉這個堵塞線程,可見為了實現這個功能,代價也是比較大的。LockSupport.park 中采用了類似技巧。

總結

也許基于多線程的并發編程不是最好的(可能是最復雜的,Clojure 大法好 :-),但卻是最悠久的。
即便我們自己不去寫往往也需要閱讀別人的多線程代碼,而且能夠寫出“正確”(who knows?)的多線程程序往往也是區分 senior 與 junior 程序員的標志,希望這篇文章能幫助大家理解 Java 是如何實現線程控制,有疑問歡迎留言指出,謝謝!

參考

發表我的評論

取消評論
表情 插代碼

Hi,您需要填寫昵稱和郵箱!

  • 必填項
  • 必填項
22选5今晚开奖公告