越是簡單的東西,在深入瞭解後發現越複雜。想起了曾在初中階段,語文老師給我們解說《論語》的道理,順便給我們提了一句,說老子的無為思想比較消極,學生時代不要太關注。現在有了一定的生活閱歷,再來看老子的《道德經》,發現那才是大智慧,《論語》屬於儒家是講人與人的關係,《道德經》屬於道家講人與自然的關係,效法天之道來行人之道,儒家講入世,仁義禮智信,道家講出世,無為而無不為。老子把道比作水、玄牝(女性的生殖器)、嬰兒、山谷等,高深莫測,卻滋養萬物,源源不斷的化生萬物並滋養之,而不居功,故能天長地久。儒家教我們做聖人,道家教我們修成仙,顯然境界更高。
synchronized
的設計思想就像道家的思想一樣,看著用起來很簡單,但是底層極其複雜,好像永遠看不透一樣。一直想深入寫一篇synchronized
的文章,卻一直不敢動手,直到最近讀了幾遍hotspot原始碼後,才有勇氣寫一些自己的理解。下文就從幾個層面逐步深入,談談對synchronized
的理解,總結精華思想並用到我們自己的專案設計中。
首先通過一張圖來概覽synchronized在各層面的實現細節與原理,並在下面的章節逐一分析
臨界區與臨界資源
一段程式碼塊內如果存在對共用資源的多執行緒讀寫操作,稱這段程式碼塊為臨界區,其共用資源為臨界資源。舉例如下:
@Slf4j
public class SyncDemo {
private static volatile int counter = 0;
public static void increment() {
counter++;
}
public static void decrement() {
counter--;
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
increment();
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
decrement();
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
//思考: counter=?
log.info("counter={}", counter);
}
}
以上的結果可能是正數、負數、零。為什麼呢?因為 Java 中對靜態變數的自增、自減並不是原子操作,個執行緒執行過程中會被其他執行緒打斷。
競態條件
多個執行緒在臨界區內執行,由於程式碼的執行序列不同而導致結果無法預測,稱之為發生了競態條件。為了避免臨界區的競態條件發生,有多種手段可以達到目的:
synchronized 同步塊是 Java 提供的一種原子性內建鎖
,Java 中的每個物件都可以把它當作一個同步鎖來使用,這些 Java 內建的使用者看不到的鎖被稱為內建鎖,也叫作監視器鎖,目的就是保證多個執行緒在進入synchronized程式碼段或者方法時,將並行變序列。
使用層面
解決之前的共用問題,只需要在兩個方法前面加上synchronized,就可以得到期望為零的結果。
public static synchronized void increment() {
counter++;
}
public static synchronized void decrement() {
counter--;
}
通過IDEA自帶的工具view->show bytecode with jclasslib,檢視到存取標誌0x0029
通過查詢java 位元組碼手冊,synchronized對應的位元組碼為ACC_SYNCHRONIZED,0x0020,三個修飾符加起來正好是0x0029。
如果加在程式碼塊或者物件上, 對應的位元組碼是monitorenter
與monitorexit
。
同步方法
The Java® Virtual Machine Specification針對同步方法的說明
Method-level synchronization is performed implicitly, as part of method invocation and return. A synchronized method is distinguished in the run-time constant pool’s method_info structure by the ACC_SYNCHRONIZED flag, which is checked by the method invocation instructions. When invoking a method for which ACC_SYNCHRONIZED is set, the executing thread enters a monitor, invokes the method itself, and exits the monitor whether the method invocation completes normally or abruptly. During the time the executing thread owns the monitor, no other thread may enter it. If an exception is thrown during invocation of the synchronized method and the synchronized method does not handle the exception, the monitor for the method is automatically exited before the exception is rethrown out of the synchronized method.
這段話適合好好讀讀,大致含義如下:
同步方法是隱式的。一個同步方法會在執行時常數池中的method_info結構體中存放ACC_SYNCHRONIZED識別符號。當一個執行緒存取方法時,會去檢查是否存在ACC_SYNCHRONIZED標識,如果存在,則先要獲得對應的
monitor鎖
,然後執行方法。當方法執行結束(不管是正常return還是丟擲異常)都會釋放對應的monitor鎖
。如果此時有其他執行緒也想要存取這個方法時,會因得不到monitor鎖而阻塞。當同步方法中丟擲異常且方法內沒有捕獲,則在向外丟擲時會先釋放已獲得的monitor鎖。
同步程式碼塊
同步程式碼塊使用monitorenter和monitorexit兩個指令實現同步, The Java® Virtual Machine Specification中有關於這兩個指令的介紹:
Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:
If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
If another thread already owns the monitor associated with objectref, the thread blocks until the monitor's entry count is zero, then tries again to gain ownership.
大致含義如下:
每個物件都會與一個monitor相關聯,當某個monitor被擁有之後就會被鎖住,當執行緒執行到monitorenter指令時,就會去嘗試獲得對應的monitor。步驟如下:
1.每個monitor維護著一個記錄著擁有次數的計數器。未被擁有的monitor的該計數器為0,當一個執行緒獲得monitor(執行monitorenter)後,該計數器自增變為 1 。
- 當同一個執行緒再次獲得該monitor的時候,計數器再次自增;
- 當不同執行緒想要獲得該monitor的時候,就會被阻塞。
2.當同一個執行緒釋放 monitor(執行monitorexit指令)的時候,計數器再自減。當計數器為0的時候。monitor將被釋放,其他執行緒便可以獲得monitor。
Monitor(管程/監視器)
monitor又稱監視器,作業系統層面為管程
,管程是指管理共用變數以及對共用變數操作的過程,讓它們支援並行,所以管程是作業系統層面的同步機制。
MESA模型
在管程的發展史上,先後出現過三種不同的管程模型,分別是Hasen模型、Hoare模型和MESA模型。現在正在廣泛使用的是MESA模型。下面我們便介紹MESA模型
管程中引入了條件變數的概念,而且每個條件變數都對應有一個等待佇列。條件變數和等待佇列的作用是解決執行緒之間的同步問題。
synchronized關鍵字和wait()、notify()、notifyAll()這三個方法是Java中實現管程技術的組成部分,設計思想也是來源與作業系統的管程。在應用層面有個經典的等待通知正規化
。
等待方
等待方遵循如下的原則:
synchronized( 物件 ) {
while( 條件不滿足 ) {
物件.wait();
}
對應的處理邏輯
}
通知方
通知方遵循如下的原則:
synchronized( 物件 ) {
改變條件;
物件.notifyAll();
}
喚醒的時間和獲取到鎖繼續執行的時間是不一致的,被喚醒的執行緒再次執行時可能條件又不滿足了,所以迴圈檢驗條件。MESA模型的wait()方法還有一個超時引數,為了避免執行緒進入等待佇列永久阻塞。
Java語言的內建管程synchronized
Java 參考了 MESA 模型,語言內建的管程(synchronized)對 MESA 模型進行了精簡。MESA模型中,條件變數可以有多個,ReentrantLock可以實現多個條件變數以及對應的佇列,Java 語言內建的管程裡只有一個條件變數。模型如下圖所示。
核心態
jdk1.5之前,synchronized是重量級鎖,會直接由作業系統核心態控制,jdk1.6以後,對synchronized做了大量的優化,如鎖粗化(Lock Coarsening)、鎖消除(Lock Elimination)、輕量級鎖(Lightweight Locking)、偏向鎖(Biased Locking)、自適應自旋(Adaptive Spinning)等技術來減少鎖操作的開銷,內建鎖的並行效能已經基本與Lock持平,只有升級到重量級鎖後才會有大的開銷。
等待通知正規化案例
用上面講的等待通知正規化,實現一個資料庫連線池;連線池有獲取連線,釋放連線兩個方法,DBPool模擬一個容器放連線,初始化20個連線,獲取連線的執行緒得到連線池為空時,阻塞等待,喚起釋放連線的執行緒;
import java.sql.Connection;
import java.util.LinkedList;
/**
*類說明:連線池的實現
*/
public class DBPool {
/*容器,存放連線*/
private static LinkedList<Connection> pool = new LinkedList<Connection>();
/*限制了池的大小=20*/
public DBPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
}
/*釋放連線,通知其他的等待連線的執行緒*/
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool){
pool.addLast(connection);
//通知其他等待連線的執行緒
pool.notifyAll();
}
}
}
/*獲取*/
// 在mills內無法獲取到連線,將會返回null 1S
public Connection fetchConnection(long mills)
throws InterruptedException {
synchronized (pool){
//永不超時
if(mills<=0){
while(pool.isEmpty()){
pool.wait();
}
return pool.removeFirst();
}else{
/*超時時刻*/
long future = System.currentTimeMillis()+mills;
/*等待時長*/
long remaining = mills;
while(pool.isEmpty()&&remaining>0){
pool.wait(remaining);
/*喚醒一次,重新計算等待時長*/
remaining = future-System.currentTimeMillis();
}
Connection connection = null;
if(!pool.isEmpty()){
connection = pool.removeFirst();
}
return connection;
}
}
}
}
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
*類說明:
*/
public class SqlConnectImpl implements Connection{
/*拿一個資料庫連線*/
public static final Connection fetchConnection(){
return new SqlConnectImpl();
}
@Override
public void commit() throws SQLException {
SleepTools.ms(70);
}
......
}
測試類:總共50個執行緒,每個執行緒嘗試20次
import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
*類說明:
*/
public class DBPoolTest {
static DBPool pool = new DBPool(10);
// 控制器:控制main執行緒將會等待所有Woker結束後才能繼續執行
static CountDownLatch end;
public static void main(String[] args) throws Exception {
// 執行緒數量
int threadCount = 50;
end = new CountDownLatch(threadCount);
int count = 20;//每個執行緒的操作次數
AtomicInteger got = new AtomicInteger();//計數器:統計可以拿到連線的執行緒
AtomicInteger notGot = new AtomicInteger();//計數器:統計沒有拿到連線的執行緒
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new Worker(count, got, notGot),
"worker_"+i);
thread.start();
}
end.await();// main執行緒在此處等待
System.out.println("總共嘗試了: " + (threadCount * count));
System.out.println("拿到連線的次數: " + got);
System.out.println("沒能連線的次數: " + notGot);
}
static class Worker implements Runnable {
int count;
AtomicInteger got;
AtomicInteger notGot;
public Worker(int count, AtomicInteger got,
AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}
public void run() {
while (count > 0) {
try {
// 從執行緒池中獲取連線,如果1000ms內無法獲取到,將會返回null
// 分別統計連線獲取的數量got和未獲取到的數量notGot
Connection connection = pool.fetchConnection(1000);
if (connection != null) {
try {
connection.createStatement();
// PreparedStatement preparedStatement
// = connection.prepareStatement("");
// preparedStatement.execute();
connection.commit();
} finally {
pool.releaseConnection(connection);
got.incrementAndGet();
}
} else {
notGot.incrementAndGet();
System.out.println(Thread.currentThread().getName()
+"等待超時!");
}
} catch (Exception ex) {
} finally {
count--;
}
}
end.countDown();
}
}
}
結果:
hotspot原始碼ObjectMonitor.hpp定義了ObjectMonitor的資料結構
ObjectMonitor() {
_header = NULL; //物件頭 markOop
_count = 0;
_waiters = 0,
_recursions = 0; // 鎖的重入次數
_object = NULL; //儲存鎖物件
_owner = NULL; // 標識擁有該monitor的執行緒(當前獲取鎖的執行緒)
_WaitSet = NULL; // 等待執行緒(呼叫wait)組成的雙向迴圈連結串列,_WaitSet是第一個節點
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ; //多執行緒競爭鎖會先存到這個單向連結串列中 (FILO棧結構)
FreeNext = NULL ;
_EntryList = NULL ; //存放在進入或重新進入時被阻塞(blocked)的執行緒 (也是存競爭鎖失敗的執行緒)
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
大致執行過程如下如所示:
大量執行緒進入監視器的等待佇列EntryList,只有通過CAS拿到鎖的執行緒,把進入鎖的標識變數_recursions置為1,如果方法是遞迴迴圈呼叫,支援鎖重入,_recursions可以累加,並將_owner置為獲取鎖的執行緒ID,呼叫wait方法後,釋放鎖,再重新進入等待佇列EntryList。
物件的記憶體佈局
上述過程可以看到,加鎖是加在物件頭上的,Hotspot虛擬機器器中,物件在記憶體中儲存的佈局可以分為三塊區域:物件頭(Header)、範例資料
(Instance Data)和對齊填充(Padding)。
物件頭詳解
HotSpot虛擬機器器的物件頭包括:
使用JOL工具檢視記憶體佈局
給大家推薦一個可以檢視普通java物件的內部佈局工具JOL(JAVA OBJECT LAYOUT),使用此工具可以檢視new出來的一個java物件的內部佈局,以及一個普通的java物件佔用多少位元組。引入maven依賴
<!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core -->
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
<scope>provided</scope>
</dependency>
使用方法:
//檢視物件內部資訊
System.out.println(ClassLayout.parseInstance(obj).toPrintable());
測試
public static void main(String[] args) throws InterruptedException {
Object obj = new Object();
//檢視物件內部資訊
System.out.println(ClassLayout.parseInstance(obj).toPrintable());
}
利用jol檢視64位元系統java物件(空物件),預設開啟指標壓縮,總大小顯示16位元組,前12位元組為物件頭。
關閉指標壓縮後,物件頭為16位元組:XX:UseCompressedOops
Mark Word的結構
Hotspot通過markOop型別實現Mark Word,具體實現位於markOop.hpp檔案中。MarkWord 結構搞得這麼複雜,是因為需要節省記憶體,讓同一個記憶體區域在不
同階段有不同的用處。
// Bit-format of an object header (most significant first, big endian layout below):
//
// 32 bits:
// --------
// hash:25 ------------>| age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:23 epoch:2 age:4 biased_lock:1 lock:2 (biased object)
// size:32 ------------------------------------------>| (CMS free block)
// PromotedObject*:29 ---------->| promo_bits:3 ----->| (CMS promoted object)
//
// 64 bits:
// --------
// unused:25 hash:31 -->| unused:1 age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:54 epoch:2 unused:1 age:4 biased_lock:1 lock:2 (biased object)
// PromotedObject*:61 --------------------->| promo_bits:3 ----->| (CMS promoted object)
// size:64 ----------------------------------------------------->| (CMS free block)
//
// unused:25 hash:31 -->| cms_free:1 age:4 biased_lock:1 lock:2 (COOPs && normal object)
// JavaThread*:54 epoch:2 cms_free:1 age:4 biased_lock:1 lock:2 (COOPs && biased object)
// narrowOop:32 unused:24 cms_free:1 unused:4 promo_bits:3 ----->| (COOPs && CMS promoted object)
// unused:21 size:35 -->| cms_free:1 unused:7 ------------------>| (COOPs && CMS free block)
......
......
// [JavaThread* | epoch | age | 1 | 01] lock is biased toward given thread
// [0 | epoch | age | 1 | 01] lock is anonymously biased
//
// - the two lock bits are used to describe three states: locked/unlocked and monitor.
//
// [ptr | 00] locked ptr points to real header on stack
// [header | 0 | 01] unlocked regular object header
// [ptr | 10] monitor inflated lock (header is wapped out)
// [ptr | 11] marked used by markSweep to mark an object
// not valid at any other time
32位元JVM下的物件結構描述
64位元JVM下的物件結構描述
Mark Word中鎖標記列舉
enum { locked_value = 0, //00 輕量級鎖
unlocked_value = 1, //001 無鎖
monitor_value = 2, //10 監視器鎖,也叫膨脹鎖,也叫重量級鎖
marked_value = 3, //11 GC標記
biased_lock_pattern = 5 //101 偏向鎖
};
更直觀的理解方式
偏向鎖
在大多數情況下,鎖不僅不存在多執行緒競爭,而且總是由同一執行緒多次獲得,對於沒有鎖競爭的場合,偏向鎖有很好的優化效果。
/***StringBuffer內部同步***/
public synchronized int length() {
return count;
}
//System.out.println 無意識的使用鎖
public void println(String x) {
synchronized (this) {
print(x); newLine();
}
}
當JVM啟用了偏向鎖模式(jdk6預設開啟),新建立物件的Mark Word中的Thread Id為0,說明此時處於可偏向但未偏向任何執行緒,也叫做匿名偏向狀態(anonymously biased)
偏向鎖延遲偏向
偏向鎖模式存在偏向鎖延遲機制:HotSpot 虛擬機器器在啟動後有個 4s 的延遲才會對每個新建的物件開啟偏向鎖模式。因為JVM啟動時會進行一系列的複雜活動,比如裝載設定,系統類初始化等等。在這個過程中會使用大量synchronized關鍵字對物件加鎖,且這些鎖大多數都不是偏向鎖。待啟動完成後再延遲開啟偏向鎖。
//關閉延遲開啟偏向鎖
‐XX:BiasedLockingStartupDelay=0
//禁止偏向鎖
‐XX:‐UseBiasedLocking
//啟用偏向鎖
‐XX:+UseBiasedLocking
@Slf4j
public class LockEscalationDemo{
public static void main(String[] args) throws InterruptedException {
log.debug(ClassLayout.parseInstance(new Object()).toPrintable());
Thread.sleep(5000);
log.info("===============延遲5秒後=================");
log.debug(ClassLayout.parseInstance(new Object()).toPrintable());
}
}
5s後偏向鎖為可偏向或者匿名偏向狀態,此時ThreadId=0;
偏向鎖狀態跟蹤
@Slf4j
public class LockEscalationDemo{
private final static Logger log = LoggerFactory.getLogger(SyncDemo1.class);
public static void main(String[] args) throws InterruptedException {
log.debug(ClassLayout.parseInstance(new Object()).toPrintable());
//HotSpot 虛擬機器器在啟動後有個 4s 的延遲才會對每個新建的物件開啟偏向鎖模式
Thread.sleep(5000);
log.info("===============延遲5秒後=================");
Object obj = new Object();
log.debug(ClassLayout.parseInstance(obj).toPrintable());
new Thread(new Runnable() {
@Override
public void run() {
log.debug(Thread.currentThread().getName()+"開始執行。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
synchronized (obj){
log.debug(Thread.currentThread().getName()+"獲取鎖執行中。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
log.debug(Thread.currentThread().getName()+"釋放鎖。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
},"thread1").start();
}
}
上圖可以看出,只有一個執行緒時,從匿名偏向到偏向鎖,並在偏向鎖後面帶上了執行緒id。
思考:如果物件呼叫了hashCode,還會開啟偏向鎖模式嗎?
偏向鎖復原之呼叫物件HashCode
匿名偏向後,呼叫物件HashCode,導致偏向鎖復原。因為對於一個物件,其HashCode只會生成一次並儲存,偏向鎖是沒有地方儲存hashcode的。
偏向鎖復原之呼叫wait/notify
偏向鎖狀態執行obj.notify() 會升級為輕量級鎖,呼叫obj.wait(timeout) 會升級為重量級鎖
synchronized (obj) {
obj.notify();
log.debug(Thread.currentThread().getName() + "獲取鎖執行中。。。\n"
+ ClassLayout.parseInstance(obj).toPrintable());
}
synchronized (obj) {
try {
obj.wait(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug(Thread.currentThread().getName() + "獲取鎖執行中。。。\n"
+ ClassLayout.parseInstance(obj).toPrintable());
}
輕量級鎖
倘若偏向鎖失敗,虛擬機器器並不會立即升級為重量級鎖,它還會嘗試使用一種稱為輕量級鎖的優化手段,此時Mark Word 的結構也變為輕量級鎖的結構。輕量級鎖所適應的場景是執行緒交替執行同步塊的場合,如果存在同一時間多個執行緒存取同一把鎖的場合,就會導致輕量級鎖膨脹為重量級鎖。
輕量級鎖跟蹤
public class LockEscalationDemo {
public static void main(String[] args) throws InterruptedException {
log.debug(ClassLayout.parseInstance(new Object()).toPrintable());
//HotSpot 虛擬機器器在啟動後有個 4s 的延遲才會對每個新建的物件開啟偏向鎖模式
Thread.sleep(4000);
Object obj = new Object();
// 思考: 如果物件呼叫了hashCode,還會開啟偏向鎖模式嗎
obj.hashCode();
//log.debug(ClassLayout.parseInstance(obj).toPrintable());
new Thread(new Runnable() {
@Override
public void run() {
log.debug(Thread.currentThread().getName()+"開始執行。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
synchronized (obj){
log.debug(Thread.currentThread().getName()+"獲取鎖執行中。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
log.debug(Thread.currentThread().getName()+"釋放鎖。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
},"thread1").start();
Thread.sleep(5000);
log.debug(ClassLayout.parseInstance(obj).toPrintable());
}
}
偏向鎖升級輕量級鎖
模擬兩個執行緒輕度競爭場景
@Slf4j
public class LockEscalationDemo{
private final static Logger log = LoggerFactory.getLogger(SyncDemo1.class);
public static void main(String[] args) throws InterruptedException {
log.debug(ClassLayout.parseInstance(new Object()).toPrintable());
//HotSpot 虛擬機器器在啟動後有個 4s 的延遲才會對每個新建的物件開啟偏向鎖模式
Thread.sleep(5000);
log.info("===============延遲5秒後=================");
Object obj = new Object();
log.debug(ClassLayout.parseInstance(obj).toPrintable());
new Thread(new Runnable() {
@Override
public void run() {
log.debug(Thread.currentThread().getName()+"開始執行。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
synchronized (obj){
log.debug(Thread.currentThread().getName()+"獲取鎖執行中。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
log.debug(Thread.currentThread().getName()+"釋放鎖。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
},"thread1").start();
//控制執行緒競爭時機
Thread.sleep(1);
new Thread(new Runnable() {
@Override
public void run() {
log.debug(Thread.currentThread().getName()+"開始執行。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
synchronized (obj){
log.debug(Thread.currentThread().getName()+"獲取鎖執行中。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
log.debug(Thread.currentThread().getName()+"釋放鎖。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
},"thread2").start();
}
}
輕量級鎖膨脹為重量級鎖
@Slf4j
public class LockEscalationDemo{
private final static Logger log = LoggerFactory.getLogger(SyncDemo1.class);
public static void main(String[] args) throws InterruptedException {
log.debug(ClassLayout.parseInstance(new Object()).toPrintable());
//HotSpot 虛擬機器器在啟動後有個 4s 的延遲才會對每個新建的物件開啟偏向鎖模式
Thread.sleep(5000);
log.info("===============延遲5秒後=================");
Object obj = new Object();
// 思考: 如果物件呼叫了hashCode,還會開啟偏向鎖模式嗎
// obj.hashCode();
log.debug(ClassLayout.parseInstance(obj).toPrintable());
new Thread(new Runnable() {
@Override
public void run() {
log.debug(Thread.currentThread().getName()+"開始執行。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
synchronized (obj){
// 思考:偏向鎖執行過程中,呼叫hashcode會發生什麼?
// obj.hashCode();
// obj.notify();
// try {
// obj.wait(50);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
log.debug(Thread.currentThread().getName()+"獲取鎖執行中。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
log.debug(Thread.currentThread().getName()+"釋放鎖。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
},"thread1").start();
//控制執行緒競爭時機
// Thread.sleep(1);
new Thread(new Runnable() {
@Override
public void run() {
log.debug(Thread.currentThread().getName()+"開始執行。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
synchronized (obj){
log.debug(Thread.currentThread().getName()+"獲取鎖執行中。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
log.debug(Thread.currentThread().getName()+"釋放鎖。。。\n"
+ClassLayout.parseInstance(obj).toPrintable());
}
},"thread2").start();
}
}
將上述控制執行緒競爭時機的程式碼注掉,讓執行緒2與執行緒1發生競爭,執行緒2就由原來的偏向鎖升級到重量級鎖。
下面思考幾個問題:
思考1:重量級鎖釋放之後變為無鎖,此時有新的執行緒來呼叫同步塊,會獲取什麼鎖?
通過實驗可以得出,後面的執行緒會獲得輕量級鎖,相當於執行緒競爭不激烈,多個執行緒通過CAS就能輪流獲取鎖,並且釋放。
思考2:為什麼有輕量級鎖還需要重量級鎖?
因為輕量級鎖時通過CAS自旋的方式獲取鎖,該方式消耗CPU資源的,如果鎖的時間長,或者自旋執行緒多,CPU會被大量消耗;而重量級鎖有等待佇列,所有拿不到鎖的進入等待佇列,不需要消耗CPU資源。
思考3:偏向鎖是否一定比輕量級鎖效率高嗎?
不一定,在明確知道會有多執行緒競爭的情況下,偏向鎖肯定會涉及鎖復原,需要暫停執行緒,回到安全點,並檢查執行緒釋放活著,故復原需要消耗效能,這時候直接使用輕量級鎖。
JVM啟動過程,會有很多執行緒競,所以預設情況啟動時不開啟偏向鎖,過一段兒時間再開啟。
鎖升級的狀態圖
無鎖是鎖升級前的一箇中間態,必須要恢復到無鎖才能進行升級,因為需要有拷貝mark word的過程,並且修改指標。
鎖記錄的重入
輕量級鎖在拷貝mark word到執行緒棧Lock Record中時,如果有重入鎖,則線上程棧中繼續壓棧Lock Record記錄,只不過mark word的值為空,等到解鎖後,依次彈出,最終將mard word恢復到物件頭中,如圖所示
鎖升級的具體細節會稍後結合hotspot原始碼進行。
上面談到的鎖升級,一直提到了一個CAS操作,CAS是英文單詞CompareAndSwap的縮寫,中文意思是:比較並替換。CAS需要有3個運算元:記憶體地址V,舊的預期值A,即將要更新的目標值B。
CAS指令執行時,當且僅當記憶體地址V的值與預期值A相等時,將記憶體地址V的值修改為B,否則就什麼都不做。整個比較並替換的操作是一個原子操作。
這個方法是java native方法,在unsafe類中,
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
需要開啟hotspot的unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
可以看到呼叫了Atomic::cmpxchg方法,Atomic::cmpxchg方法引入了組合指令,
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
mp是os::is_MP()的返回結果,os::is_MP()是一個行內函式,用來判斷當前系統是否為多處理器。如果當前系統是多處理器,該函數返回1。否則,返回0。
__asm__代表是組合開始,volatile代表,禁止CPU指令重排,並且讓值修改後,立馬被其他CPU可見,保持資料一致性。
LOCK_IF_MP(mp)會根據mp的值來決定是否為cmpxchg指令新增lock字首。如果通過mp判斷當前系統是多處理器(即mp值為1),則為cmpxchg指令新增lock字首。否則,不加lock字首。
內嵌組合模板
asm ( assembler template
: output operands (optional)
: input operands (optional)
: list of clobbered registers
(optional)
);
這裡涉及操作記憶體,與CPU的暫存器,大致意思就是在先判斷CPU是否多核,如果是多核,禁止CPU級別的指令重排,並且通過Lock字首指令,鎖住並進行比較與交換,然後把最新的值同步到記憶體,其他CPU從記憶體載入資料取最新的值。
上面LOCK_IF_MP在atomic_linux_x86.inline.hpp有詳細宏定義
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
很明顯,帶了一個lock字首的指令,lock 和cmpxchgl是CPU指令,lock指令是個字首,可以修飾其他指令,cmpxchgl即為CAS指令,查閱英特爾操作手冊,在Intel® 64 and IA-32 Architectures Software Developer’s Manual 中的章節LOCK—Assert LOCK# Signal Prefix 中給出LOCK指令的詳細解釋
就是排他的使用共用記憶體。這裡一般有兩種方式,鎖匯流排與鎖cpu的快取行
lock指令會產生匯流排鎖也可能會產生快取鎖,看具體的條件,有下面幾種情況只能使用匯流排鎖
無論是匯流排鎖還是快取鎖這都是CPU在硬體層面上提供的鎖,肯定效率比軟體層面的鎖要高。
偏向鎖批次重偏向&批次復原
從偏向鎖的加鎖解鎖過程中可看出,當只有一個執行緒反覆進入同步塊時,偏向鎖帶來的效能開銷基本可以忽略,但是當有其他執行緒嘗試獲得鎖時,就需要等到safe point時,再將偏向鎖復原為無鎖狀態或升級為輕量級,會消耗一定的效能,所以在多執行緒競爭頻繁的情況下,偏向鎖不僅不能提高效能,還會導致效能下降
。於是,就有了批次重偏向與批次復原的機制。
原理
以class為單位,為每個class維護一個偏向鎖復原計數器,每一次該class的物件發生偏向復原操作時,該計數器+1,當這個值達到重偏向閾值(預設20)時,JVM就認為該class的偏向鎖有問題,因此會進行批次重偏向。
每個class物件會有一個對應的epoch欄位,每個處於偏向鎖狀態物件的Mark Word中也有該欄位,其初始值為建立該物件時class中的epoch的值。每次發生批次重偏向時,就將該值+1,同時遍歷JVM中所有執行緒的棧,找到該class所有正處於加鎖狀態的偏向鎖,將其epoch欄位改為新值。下次獲得鎖時,發現當前物件的epoch值和class的epoch不相等,那就算當前已經偏向了其他執行緒,也不會執行復原操作,而是直接通過CAS操作將其Mark Word的Thread Id 改成當前執行緒Id。
當達到重偏向閾值(預設20)後,假設該class計數器繼續增長,當其達到批次復原的閾值後(預設40),JVM就認為該class的使用場景存在多執行緒競爭,會標記該class為不可偏向,之後,對於該class的鎖,直接走輕量級鎖的邏輯。
應用場景
批次重偏向(bulk rebias)機制是為了解決:一個執行緒建立了大量物件並執行了初始的同步操作,後來另一個執行緒也來將這些物件作為鎖物件進行操作,這樣會導致大量的偏向鎖復原操作。批次復原(bulk revoke)機制是為了解決:在明顯多執行緒競爭劇烈的場景下使用偏向鎖是不合適的。
JVM的預設引數值
設定JVM引數-XX:+PrintFlagsFinal,在專案啟動時即可輸出JVM的預設引數值
intx BiasedLockingBulkRebiasThreshold = 20 //預設偏向鎖批次重偏向閾值
intx BiasedLockingBulkRevokeThreshold = 40 //預設偏向鎖批次復原閾值
我們可以通過-XX:BiasedLockingBulkRebiasThreshold 和 -XX:BiasedLockingBulkRevokeThreshold 來手動設定閾值
測試:批次重偏向
@Slf4j
public class BiasedLockingTest {
private final static Logger log = LoggerFactory.getLogger(BiasedLockingTest.class);
public static void main(String[] args) throws InterruptedException {
//延時產生可偏向物件
Thread.sleep(5000);
// 建立一個list,來存放鎖物件
List<Object> list = new ArrayList<>();
// 執行緒1
new Thread(() -> {
for (int i = 0; i < 50; i++) {
// 新建鎖物件
Object lock = new Object();
synchronized (lock) {
list.add(lock);
}
}
try {
//為了防止JVM執行緒複用,在建立完物件後,保持執行緒thead1狀態為存活
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thead1").start();
//睡眠3s鍾保證執行緒thead1建立物件完成
Thread.sleep(3000);
log.debug("列印thead1,list中第20個物件的物件頭:");
log.debug((ClassLayout.parseInstance(list.get(19)).toPrintable()));
// 執行緒2
new Thread(() -> {
for (int i = 0; i < 40; i++) {
Object obj = list.get(i);
synchronized (obj) {
if(i>=15&&i<=21||i>=38){
log.debug("thread2-第" + (i + 1) + "次加鎖執行中\t"+
ClassLayout.parseInstance(obj).toPrintable());
}
}
if(i==17||i==19){
log.debug("thread2-第" + (i + 1) + "次釋放鎖\t"+
ClassLayout.parseInstance(obj).toPrintable());
}
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thead2").start();
Thread.sleep(3000);
new Thread(() -> {
for (int i = 0; i < 50; i++) {
Object lock =list.get(i);
if(i>=17&&i<=21||i>=35&&i<=41){
log.debug("thread3-第" + (i + 1) + "次準備加鎖\t"+
ClassLayout.parseInstance(lock).toPrintable());
}
synchronized (lock){
if(i>=17&&i<=21||i>=35&&i<=41){
log.debug("thread3-第" + (i + 1) + "次加鎖執行中\t"+
ClassLayout.parseInstance(lock).toPrintable());
}
}
}
},"thread3").start();
Thread.sleep(3000);
log.debug("檢視新建立的物件");
log.debug((ClassLayout.parseInstance(new Object()).toPrintable()));
LockSupport.park();
}
}
當復原偏向鎖閾值超過 20 次後,jvm 會這樣覺得,我是不是偏向錯了
,於是會在給這些物件加鎖時重新偏向至加鎖執行緒,重偏向會重置象 的 Thread ID
測試結果:
thread1: 建立50個偏向執行緒thread1的偏向鎖 1-50 偏向鎖
thread2:
1-18 偏向鎖復原,升級為輕量級鎖 (thread1釋放鎖之後為偏向鎖狀態)
19-40 偏向鎖復原達到閾值(20),執行了批次重偏向 (測試結果在第19就開始批次重偏向了)
測試:批次復原
當復原偏向鎖閾值超過 40 次後,jvm 會認為不該偏向,於是整個類的所有物件都會變為不可偏向的,新建的物件也是不可偏向的。
thread3:
1-18 從無鎖狀態直接獲取輕量級鎖 (thread2釋放鎖之後變為無鎖狀態)
19-40 偏向鎖復原,升級為輕量級鎖 (thread2釋放鎖之後為偏向鎖狀態)
41-50 達到偏向鎖復原的閾值40,批次復原偏向鎖,升級為輕量級鎖 (thread1釋放鎖之後為偏向鎖狀態)
新建立的物件: 無鎖狀態
總結
自旋優化
量級鎖競爭的時候,還可以使用自旋來進行優化,如果當前執行緒自旋成功(即這時候持鎖執行緒已經退出了同步塊,釋放了鎖),這時當前執行緒就可以避免阻塞。
注意:自旋的目的是為了減少執行緒掛起的次數,儘量避免直接掛起執行緒(掛起操作涉及系統呼叫,存在使用者態和核心態切換,這才是重量級鎖最大的開銷)
鎖粗化
假設一系列的連續操作都會對同一個物件反覆加鎖及解鎖,甚至加鎖操作是出現在迴圈體中的
,即使沒有出現執行緒競爭,頻繁地進行互斥同步操作也會導致不必要的效能損耗。如果JVM檢測到有一連串零碎的操作都是對同一物件的加鎖,將會擴大加鎖同步的範圍(即鎖粗化)到整個操作序列的外部。
StringBuffer buffer = new StringBuffer();
/**
* 鎖粗化
*/
public void append(){
buffer.append("aaa").append(" bbb").append(" ccc");
}
上述程式碼每次呼叫 buffer.append 方法都需要加鎖和解鎖,如果JVM檢測到有一連串的對同一個物件加鎖和解鎖的操作,就會將其合併成一次範圍更大的加鎖和解鎖操作,即在第一次append方法時進行加鎖,最後一次append方法結束後進行解鎖。
鎖消除
鎖消除即刪除不必要的加鎖操作。鎖消除是Java虛擬機器器在JIT編譯期間,通過對執行上下文的掃描,去除不可能存在共用資源競爭的鎖,通過鎖消除,可以節省毫無意義的請求鎖時間。
StringBuffer的append是個同步方法,但是append方法中的 StringBuffer 屬於一個區域性變數,不可能從該方法中逃逸出去,因此其實這過程是執行緒安全的,可以將鎖消除。
public void append(String str1, String str2) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(str1).append(str2);
}
public static void main(String[] args) throws InterruptedException {
LockEliminationTest demo = new LockEliminationTest();
long start = System.currentTimeMillis();
for (int i = 0; i < 100000000; i++) {
demo.append("aaa", "bbb");
}
long end = System.currentTimeMillis();
System.out.println("執行時間:" + (end - start) + " ms");
}
通過比較實際,開啟鎖消除用時4秒多,未開啟鎖消除用時6秒多。
測試程式碼:
/**
進行兩種測試
* 關閉逃逸分析,同時調大堆空間,避免堆內GC的發生,如果有GC資訊將會被列印出來
* VM執行引數:-Xmx4G -Xms4G -XX:-DoEscapeAnalysis -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError
*
* 開啟逃逸分析 jdk8預設開啟
* VM執行引數:-Xmx4G -Xms4G -XX:+DoEscapeAnalysis -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError
*
* 執行main方法後
* jps 檢視程序
* jmap -histo 程序ID
*
*/
@Slf4j
public class EscapeTest {
private final static Logger log = LoggerFactory.getLogger(EscapeTest.class);
public static void main(String[] args) {
long start = System.currentTimeMillis();
for (int i = 0; i < 500000; i++) {
alloc();
}
long end = System.currentTimeMillis();
log.info("執行時間:" + (end - start) + " ms");
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
/**
* JIT編譯時會對程式碼進行逃逸分析
* 並不是所有物件存放在堆區,有的一部分存線上程棧空間
* Ponit沒有逃逸
*/
private static String alloc() {
Point point = new Point();
return point.toString();
}
/**
*同步省略(鎖消除) JIT編譯階段優化,JIT經過逃逸分析之後發現無執行緒安全問題,就會做鎖消除
*/
public void append(String str1, String str2) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(str1).append(str2);
}
/**
* 標量替換
*
*/
private static void test2() {
Point point = new Point(1,2);
System.out.println("point.x="+point.getX()+"; point.y="+point.getY());
// int x=1;
// int y=2;
// System.out.println("point.x="+x+"; point.y="+y);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Point{
private int x;
private int y;
}
通過測試發現,開啟逃逸分析後,執行緒範例總共50萬個,只有8萬多個在堆中,其他都在棧上分配;
關閉逃逸分析後50萬全部都在堆中。
首先準備好HotSpot原始碼
jdk8 hotspot原始碼下載地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/,選擇gz或者zip包下載。
目錄結構
share下面還有兩個目錄
然後使用Source Insight工具開啟原始碼工程,如下圖
首先看入口InterpreterRuntime:: monitorenter方法
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
看到上面註解Retry fast entry if bias is revoked to avoid unnecessary inflation
,意思就是如果偏向鎖開啟,就直接進入ObjectSynchronizer的fast_enter方法,避免不必要的膨脹,否則進入slow_enter方法,由此可知偏向鎖執行fast_enter方法,鎖的升級則進入slow_enter方法。
接著進入synchronizer.cpp
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
// 如果偏向鎖開啟
if (UseBiasedLocking) {
// 非安全點
if (!SafepointSynchronize::is_at_safepoint()) {
// 重偏向
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
// 回到安全點
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 重點看
slow_enter (obj, lock, THREAD) ;
}
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
// 判斷是否無鎖
if (mark->is_neutral()) {
// Anticipate successful CAS -- the ST of the displaced mark must
// be visible <= the ST performed by the CAS.
//把mark word儲存到偏向鎖的displaced_header欄位上
lock->set_displaced_header(mark);
// 通過CAS將mark word更新為指向Lock Record的指標
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
// Fall through to inflate() ...
} else
// 已經有鎖,並且mark中的指標指向當前執行緒的指標
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
// 鎖重入,將null入棧到本地執行緒棧
lock->set_displaced_header(NULL);
return;
}
#if 0
// The following optimization isn't particularly useful.
if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) {
lock->set_displaced_header (NULL) ;
return ;
}
#endif
// The object header will never be displaced to this lock,
// so it does not matter what the value is, except that it
// must be non-zero to avoid looking like a re-entrant lock,
// and must not look locked either.
lock->set_displaced_header(markOopDesc::unused_mark());
// 1.先膨脹生成ObjectMonitor物件,
// 2.再進入enter方法
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
首先看ObjectMonitor
生成的過程。
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
// Inflate mutates the heap ...
// Relaxing assertion for bug 6320749.
assert (Universe::verify_in_progress() ||
!SafepointSynchronize::is_at_safepoint(), "invariant") ;
//迴圈,保證多執行緒同時呼叫
for (;;) {
const markOop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
// The mark can be in one of the following states:
// * Inflated - just return 重量級鎖直接返回
// * Stack-locked - coerce it to inflated 輕量級鎖則膨脹
// * INFLATING - busy wait for conversion to complete 膨脹中,則等膨脹完成
// * Neutral - aggressively inflate the object. 無鎖狀態則膨脹
// * BIASED - Illegal. We should never see this 非法狀態, 不會出現
// CASE: inflated 如果重量級鎖
if (mark->has_monitor()) {
// 獲取指向ObjectMonitor的指標,並直接返回
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
// CASE: inflation in progress - inflating over a stack-lock.
// Some other thread is converting from stack-locked to inflated.
// Only that thread can complete inflation -- other threads must wait.
// The INFLATING value is transient.
// Currently, we spin/yield/park and poll the markword, waiting for inflation to finish.
// We could always eliminate polling by parking the thread on some auxiliary list.
// 檢查是否在膨脹狀態,如果膨脹,呼叫ReadStableMark等待
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
// CASE: stack-locked
// Could be stack-locked either by this thread or by some other thread.
//
// Note that we allocate the objectmonitor speculatively, _before_ attempting
// to install INFLATING into the mark word. We originally installed INFLATING,
// allocated the objectmonitor, and then finally STed the address of the
// objectmonitor into the mark. This was correct, but artificially lengthened
// the interval in which INFLATED appeared in the mark, thus increasing
// the odds of inflation contention.
//
// We now use per-thread private objectmonitor free lists.
// These list are reprovisioned from the global free list outside the
// critical INFLATING...ST interval. A thread can transfer
// multiple objectmonitors en-mass from the global free list to its local free list.
// This reduces coherency traffic and lock contention on the global free list.
// Using such local free lists, it doesn't matter if the omAlloc() call appears
// before or after the CAS(INFLATING) operation.
// See the comments in omAlloc().
/// 輕量級鎖時,開始膨脹
if (mark->has_locker()) {
// 建立ObjectMonitor物件
ObjectMonitor * m = omAlloc (Self) ;
// Optimistically prepare the objectmonitor - anticipate successful CAS
// We do this before the CAS in order to minimize the length of time
// in which INFLATING appears in the mark.
// 物件初始化
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
// CAS設定為膨脹中
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
// CAS失敗,釋放重試
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ; // Interference -- just retry
}
// We've successfully installed INFLATING (0) into the mark-word.
// This is the only case where 0 will appear in a mark-work.
// Only the singular thread that successfully swings the mark-word
// to 0 can perform (or more precisely, complete) inflation.
//
// Why do we CAS a 0 into the mark-word instead of just CASing the
// mark-word from the stack-locked value directly to the new inflated state?
// Consider what happens when a thread unlocks a stack-locked object.
// It attempts to use CAS to swing the displaced header value from the
// on-stack basiclock back into the object header. Recall also that the
// header value (hashcode, etc) can reside in (a) the object header, or
// (b) a displaced header associated with the stack-lock, or (c) a displaced
// header in an objectMonitor. The inflate() routine must copy the header
// value from the basiclock on the owner's stack to the objectMonitor, all
// the while preserving the hashCode stability invariants. If the owner
// decides to release the lock while the value is 0, the unlock will fail
// and control will eventually pass from slow_exit() to inflate. The owner
// will then spin, waiting for the 0 value to disappear. Put another way,
// the 0 causes the owner to stall if the owner happens to try to
// drop the lock (restoring the header from the basiclock to the object)
// while inflation is in-progress. This protocol avoids races that might
// would otherwise permit hashCode values to change or "flicker" for an object.
// Critically, while object->mark is 0 mark->displaced_mark_helper() is stable.
// 0 serves as a "BUSY" inflate-in-progress indicator.
// fetch the displaced mark from the owner's stack.
// The owner can't die or unwind past the lock while our INFLATING
// object is in the mark. Furthermore the owner can't complete
// an unlock on the object, either.
// CAS成功,替換mark word到本地執行緒棧,並設定monitor的_header、_owner、_object
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
// Setup monitor fields to proper values -- prepare the monitor
m->set_header(dmw) ;
// Optimization: if the mark->locker stack address is associated
// with this thread we could simply set m->_owner = Self and
// m->OwnerIsThread = 1. Note that a thread can inflate an object
// that it has stack-locked -- as might happen in wait() -- directly
// with CAS. That is, we can avoid the xchg-NULL .... ST idiom.
m->set_owner(mark->locker());
m->set_object(object);
// TODO-FIXME: assert BasicLock->dhw != 0.
// Must preserve store ordering. The monitor state must
// be stable at the time of publishing the monitor address.
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
// 設定mark word為重量級鎖狀態
object->release_set_mark(markOopDesc::encode(m));
// Hopefully the performance counters are allocated on distinct cache lines
// to avoid false sharing on MP systems ...
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite stacklock) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
// CASE: neutral
// TODO-FIXME: for entry we currently inflate and then try to CAS _owner.
// If we know we're inflating for entry it's better to inflate by swinging a
// pre-locked objectMonitor pointer into the object header. A successful
// CAS inflates the object *and* confers ownership to the inflating thread.
// In the current implementation we use a 2-step mechanism where we CAS()
// to inflate and then CAS() again to try to swing _owner from NULL to Self.
// An inflateTry() method that we could call from fast_enter() and slow_enter()
// would be useful.
assert (mark->is_neutral(), "invariant");
// 如果無鎖狀態,建立ObjectMonitor並初始化
ObjectMonitor * m = omAlloc (Self) ;
// prepare m for installation - set monitor to initial state
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
// CAS設定物件為重量級鎖狀態,CAS失敗,釋放重量級鎖再重試
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
// interference - the markword changed - just retry.
// The state-transitions are one-way, so there's no chance of
// live-lock -- "Inflated" is an absorbing state.
}
// Hopefully the performance counters are allocated on distinct
// cache lines to avoid false sharing on MP systems ...
// 下面是快取行避免偽共用發生的情況
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite neutral) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
}
ObjectMonitor生成後,進入ObjectMonitor.enter方法,重量級加鎖的邏輯都是在這裡完成的。
void ATTR ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD ;
void * cur ;
//通過CAS操作嘗試將_owner變數設定為當前執行緒,如果_owner為NULL表示鎖未被佔用
//CAS:記憶體值、預期值、新值,只有當記憶體值==預期值,才能將新值替換記憶體值
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) { //如果NULL,表示獲取鎖成功,直接返回即可
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
return ;
}
//執行緒重入,synchronized的可重入特性原理,_owner儲存的執行緒與當前正在執行的執行緒相同,將_recursions++
if (cur == Self) {
_recursions ++ ;
return ;
}
//表示執行緒第一次進入monitor,則進行一些設定
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ; //鎖的次數設定為1
_owner = Self ; //將_owner設定為當前執行緒
OwnerIsThread = 1 ;
return ;
}
.....
//獲取鎖失敗
for (;;) {
jt->set_suspend_equivalent();
//等待鎖的釋放
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;
jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
}
上面步驟總結為
接著進入所等待原始碼
在鎖競爭原始碼中最後一步,如果獲取鎖失敗,則等待鎖的釋放,由MonitorObject類中的EnterI()方法來實現
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
//再次嘗試獲取鎖,獲取成功直接返回
if (TryLock (Self) > 0) {
....
return ;
}
DeferredInitialize () ;
//嘗試自旋獲取鎖,獲取鎖成功直接返回
if (TrySpin (Self) > 0) {
....
return ;
}
//前面的嘗試都失敗,則將該執行緒資訊封裝到node節點
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
//將node節點插入到_cxq的頭部,前面說過鎖獲取失敗的執行緒首先會進入_cxq
//_cxq是一個單連結串列,等到一輪過去在該_cxq列表中的執行緒還未成功獲取鎖,
//則進入_EntryList列表
for (;;) { //注意這裡的死迴圈操作
node._next = nxt = _cxq ;
//這裡插入節點時也使用了CAS,因為可能有多個執行緒失敗將加入_cxq連結串列
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
//如果執行緒CAS插入_cxq連結串列失敗,它會再搶救一下看看能不能獲取到鎖
if (TryLock (Self) > 0) {
...
return ;
}
}
//競爭減弱時,將該執行緒設定為_Responsible(負責執行緒),定時輪詢_owner
//後面該執行緒會呼叫定時的park方法,防止死鎖
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
//前面獲取鎖失敗的執行緒已經放入到了_cxq列表,但還未掛起
//下面是將_cxq列表掛起的程式碼,執行緒一旦掛起,必須喚醒之後才能繼續操作
for (;;) {
//掛起之前,再次嘗試獲取鎖,看看能不能成功,成功則跳出迴圈
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
//將當前執行緒掛起(park()方法)
// park self
//如果當前執行緒是_Responsible執行緒,則呼叫定時的park方法,防止死鎖
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
//當執行緒被喚醒之後,會再次嘗試獲取鎖
if (TryLock(Self) > 0) break ;
//喚醒鎖之後,還出現競爭,記錄喚醒次數,這裡的計數器
//並沒有受鎖的保護,也沒有原子更新,為了獲取更低的探究影響
TEVENT (Inflated enter - Futile wakeup) ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
++ nWakeups ; //喚醒次數
//自旋嘗試獲取鎖
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset() ;
OrderAccess::fence() ;
}
if (_succ == Self) _succ = NULL ;
// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence() ;
}
//已經獲取到了鎖,將當前節點從_EntryList佇列中刪除
UnlinkAfterAcquire (Self, &node) ;
if (_succ == Self) _succ = NULL ;
...
return ;
}
步驟分為如下幾步:
這裡的設計精髓是通過多次tryLock嘗試獲取鎖和CAS獲取鎖無限推遲了執行緒的掛起操作
,你可以看到從開始到執行緒掛起的程式碼中,出現了多次的嘗試獲取鎖;因為執行緒的掛起與喚醒涉及到了狀態的轉換(核心態和使用者態),這種頻繁的切換必定會給系統帶來效能上的瓶頸。所以它的設計意圖就是儘量推辭執行緒的掛起時間,取一個極限的時間掛起執行緒。另外原始碼中定義了負責執行緒_Responsible,這種標識的執行緒呼叫的是定時的park(執行緒掛起),避免死鎖.
鎖釋放原始碼
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
if (THREAD != _owner) { //判斷當前執行緒是否是執行緒持有者
//當前執行緒是之前持有輕量級鎖的執行緒。由輕量級鎖膨脹後還沒呼叫過enter方法,_owner會是指向Lock Record的指標
if (THREAD->is_lock_owned((address) _owner)) {
assert (_recursions == 0, "invariant") ;
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else { //當前執行緒不是鎖的持有者--》出現異常
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
//重入,計數器-1,返回
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
//_Responsible設定為NULL
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
#if INCLUDE_JFR
if (not_suspended && EventJavaMonitorEnter::is_enabled()) {
_previous_owner_tid = JFR_THREAD_ID(Self);
}
#endif
for (;;) {
assert (THREAD == _owner, "invariant") ;
if (Knob_ExitPolicy == 0) {
//先釋放鎖,這時如果有其他執行緒獲取鎖,則能獲取到
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ; // See if we need to wake a successor
//等待佇列為空,或者有"醒著的執行緒」,則不需要去等待佇列喚醒執行緒了,直接返回即可
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
TEVENT (Inflated exit - complex egress) ;
//當前執行緒重新獲取鎖,因為後序要喚醒佇列
//一旦獲取失敗,說明有執行緒獲取到鎖了,直接返回即可,不需要獲取鎖再去喚醒執行緒了
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
TEVENT (Exit - Reacquired) ;
} else {
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ;
// Ratify the previously observed values.
if (_cxq == NULL || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
//當前執行緒重新獲取鎖,因為後序要喚醒佇列
//一旦獲取失敗,說明有執行緒獲取到鎖了,直接返回即可,不需要獲取鎖再去喚醒執行緒了
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
TEVENT (Inflated exit - reacquired failed) ;
} else {
TEVENT (Inflated exit - complex egress) ;
}
}
guarantee (_owner == THREAD, "invariant") ;
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ; //根據QMode的不同,會有不同的喚醒策略
if (QMode == 2 && _cxq != NULL) {
//QMode==2,_cxq中有優先順序更高的執行緒,直接喚醒_cxq的隊首執行緒
.........
return ;
}
//當QMode=3的時候 講_cxq中的資料加入到_EntryList尾部中來 然後從_EntryList開始獲取
if (QMode == 3 && _cxq != NULL) {
.....
}
....... //省略
.......
//當QMode=4的時候 講_cxq中的資料加入到_EntryList前面來 然後從_EntryList開始獲取
if (QMode == 4 && _cxq != NULL) {
......
}
//批次修改狀態標誌改成TS_ENTER
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
//插到原有的_EntryList前面 從員_EntryList中獲取
// Prepend the RATs to the EntryList
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
}
..........
}
}
上述程式碼總結如下步驟:
QMode = 2且cxq非空:cxq中有優先順序更高的執行緒,直接喚醒_cxq的隊首執行緒;
QMode = 3且cxq非空:把cxq佇列插入到EntryList的尾部;
QMode = 4且cxq非空:把cxq佇列插入到EntryList的頭部;
QMode = 0:暫時什麼都不做,繼續往下看;
只有QMode=2的時候會提前返回,等於0、3、4的時候都會繼續往下執行:
這裡的設計精髓是首先就將鎖釋放,然後再去判斷是否有醒著的執行緒,因為可能有執行緒正在嘗試或者自旋獲取鎖,如果有執行緒活著,需要再讓該執行緒重新獲取鎖去喚醒執行緒。
最後通過流程兩張圖把建立ObjectMonitor物件的過程與進入enter方法加鎖與解鎖的過程呈現出來,把我主要流程,更多地程式碼細節也可以從原始碼的英文註解中得到答案。
通過本篇由淺入深,逐步分析了synchronized從各層角度實現的細節以及原理,我們可以從中學到一些思路,比如鎖的設計,能夠通過CAS,偏向鎖,輕量級鎖等方式來實現的時候,儘量不要升級到重量級鎖,等到競爭太大浪費cpu開銷的時候,才引入重量級鎖;比如synchronized原子性是通過鎖物件保證只有一個執行緒存取臨界資源來實現,可見性通過原始碼裡的組合volatile結合硬體底層指令實現,有序性通過原始碼底層的讀寫屏障並藉助於硬體指令完成。synchronized底層在鎖優化的時候也用了大量的CAS操作,提升效能;以及等待佇列與阻塞佇列設計如何同步進入管程的設計,這些設計思想也是後面Reentrantlock設計的中引入條件佇列的思想,底層都是相同的,任何應用層的軟體設計都能從底層的設計思想與精髓實現中找到原型與影子
,當看到最底層C、C++或者組合以及硬體指令級別的實現時,一切頂層彷彿就能通透。