ReentrantLock使用:

1
2
3
4
5
6
7
8
9
public class Demo {
private static int count=0;
static Lock lock=new ReentrantLock();
public static void inc(){
lock.lock();
count++;
lock.unlock();
}
}

ReentrantLock的使用很简单,调用lock()方法获得锁及unlock()方法释放锁。

AQS

同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步的工具。AQS 队列内部维护的是一个 FIFO 的双向链表,每个节点都有两个指针,分别指向直接的后继节点和直接前驱节点。

当线程争抢锁失败后会封装为一个节点加入到 AQS队列中,当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点。

以下是一个AQS同步队列的基本结构:

当向AQS中添加节点时,变化如下:

将新的线程封装成 Node 节点追加到同步队列中,设置 prev 节点指向尾节点,以及将尾节点的next指针指向自己;之后通过CAS 将 tail 重新指向新的尾部节点。

AQS中的head节点即表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下:

将head节点指向下一个获取到锁的节点,新获得锁的节点将prev指针指向null。

设置 head 节点不需要用 CAS,因为设置 head 节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得。

ReentrantLock 源码分析

以 ReentrantLock 作为切入点来看如何使用 AQS 来实现线程的同步。

ReentrantLock.lock()

ReentrantLock.lock()方法为获取锁的入口,代码如下:

1
2
3
public void lock() {
sync.lock();
}

sync 是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑,在不同的同步场景中,会继承 AQS 来实现对应场景的功能。

Sync 有两个具体的实现类,分别是:

  • NofairSync 非公平锁:不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁
  • FailSync 公平锁:表示所有线程严格按照 FIFO 来获取锁

以非公平锁为例来看 lock()方法中的实现

NofairSync.lock()

非公平锁和公平锁最大的区别在于,在非公平锁中不管有没有线程排队,先使用cas 去尝试获取锁;cas成功,就表示成功获得了锁 ,cas失败,调用 acquire(1)走锁竞争逻辑 。

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

cas方法代码如下:

1
2
3
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

通过 cas 乐观锁的方式来做比较并替换,如果当前内存中的 state 的值和预期值 expect 相等,则替换为update。更新成功返回 true,否则返回 false。

state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入锁的实现来说,表示一个同步状态。当 state等于0时,表示无锁状态;当 state大于0时,表示已经有线程获得了锁,因为ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增, 而在释放锁的时候,需要相同次数直到state等于0 其他线程才有资格获得锁。

acquire()

acquire 是 AQS 中的方法,如果 CAS 操作未能成功,说明 state 已经不为 0,此时执行acquire(1)操作。acquire方法的代码如下:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

首先通过tryAcquire()方法尝试获取锁,如果获取锁失败,则通过addWaiter()方法将当前线程封装为Node添加到AQS队列的尾部。

之后acquireQueued()方法将Node作为参数通过自旋去尝试获取锁。

tryAcquire()

调用NofairSync非公平锁的尝试获取锁方法,成功返回true,失败返回false,代码如下:

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

调用ReentrantLock.nofairTryAcquire()方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //获取当前执行线程
int c = getState(); //获取state
if (c == 0) { //如果state等于0,则表示无锁
if (compareAndSetState(0, acquires)) { //使用cas替换state的值
setExclusiveOwnerThread(current); //成功获得锁后保存当前线程,可重入
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //同一线程获取锁,增加重入次数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

首先获取当前线程后判断当前锁状态,如果state等于0为无锁状态,则通过cas更新state的值,cas成功则成功获得锁;如果当前线程为重入线程,则直接增加重入次数。

addWaiter()

如果获取锁失败,则通过addWaiter()方法将当前线程封装为Node添加到AQS队列的尾部。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //将当前线程封装为Node
Node pred = tail; //获取当前AQS队列的tail节点
if (pred != null) { //如果尾节点不为空,则设置当前节点为尾节点
node.prev = pred; //将当前线程的prev指针指向尾节点
if (compareAndSetTail(pred, node)) { //通过cas操作将node设置为尾节点
pred.next = node; //设置成功后,将原尾节点的next指针指向当前node
return node;
}
}
enq(node); //尾节点为空或cas失败,通过enq()方法加入到AQS队列
return node;
}

首先将当前线程封装为Node,如果尾节点不为空,则通过cas将当前线程设置为AQS尾节点;如果尾节点为空或者cas失败则调用enq()方法将节点添加到AQS队列。

enq()方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) { //自旋
Node t = tail; //获取当前AQS队列的tail节点
if (t == null) { //如果尾节点为空,则初始化AQS队列
if (compareAndSetHead(new Node())) //通过cas操作设置头节点
tail = head; //初始化AQS队列,头节点和尾节点都是new Node();
} else { //如果尾节点不为空,则设置当前节点为尾节点
node.prev = t; //将当前线程的prev指针指向尾节点
if (compareAndSetTail(t, node)) { //通过cas操作将node设置为尾节点
t.next = node; //设置成功后,将原尾节点的next指针指向当前node
return t;
}
}
}
}

enq()方法就是通过自旋操作把当前节点加入到队列中,只有在节点成功加入AQS队列后,才会跳出循环。

acquireQueued()

将当前线程封装为Node添加到AQS队列后,acquireQueued()方法将Node作为参数通过自旋去尝试获取锁。该方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //获取Node的前置节点
if (p == head && tryAcquire(arg)) { //如果前置节点是head节点,则可以尝试获取锁
setHead(node); //获取锁成功,设置Node为新的head节点
p.next = null; //将原head节点的next指针设置为null,help GC
failed = false;
return interrupted;
}
//调用shouldParkAfterFailedAcquire()方法判断是否需要挂起
//调用parkAndCheckInterrupt()将线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

获取当前节点的前置节点,如果前置节点为head节点,那么它就是下一个可以获取锁的幸运儿,所以有资格可以调用tryAcquire()方法尝试获取锁。

如果获取锁成功,则把获取锁的节点设置为head节点,将原来head节点的引用清除。如果获取锁失败,则需要调用shouldParkAfterFailedAcquire()方法判断是否需要挂起线程,parkAndCheckInterrupt()方法将线程挂起;

shouldParkAfterFailedAcquire()方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //获取前置节点的状态
if (ws == Node.SIGNAL) //如果前置节点的状态为SIGNAL,则挂起当前线程
return true;
if (ws > 0) { //如果前置节点状态大于0,则移除这个节点并重新获取前置节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); //循环移除状态大于0的节点
pred.next = node;
} else {
//cas设置前置节点的状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

通过Node的状态来判断获取锁失败后是否要被挂起。返回true时,表示需要调用调用parkAndCheckInterrupt()方法将当前线程挂起,返回false时,表示不需要挂起。

如果判断线程需要挂起,则调用parkAndCheckInterrupt()方法将线程挂起,代码如下:

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

使用 LockSupport.park()方法挂起当前线程,线程状态变为WATING。

当线程重新被唤醒时,代码才继续往下执行,Thread.interrupted()方法返回当前线程是否被其他线程触发过中断请求,如果有触发过中断请求,那么这个方法会返回当前的中断标识true,并且对中断标识进行复位表示已经响应过了中断请求。

如果返回 true,意味着在 acquire()方法中会执行 selfInterrupt()方法。

selfInterrupt()方法的代码如下:

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

该方法就表示如果当前线程在 acquireQueued()方法中被中断过,则需要产生一个中断请求,原因是线程在调用 acquireQueued()方法的时候是不会响应中断请求的。

ReentrantLock.unlock()

ReentrantLock.unlock()方法为释放锁的入口,代码如下:

1
2
3
public void unlock() {
sync.release(1);
}

release方法代码如下:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) { //调用tryRelease()方法释放锁
Node h = head; //获取head节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //调用unparkSuccessor()方法唤醒后续节点
return true;
}
return false;
}

ReentrantLock.tryRelease()

release()方法中调用tryRelease()方法来释放锁,该方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

该方法将 state 状态减掉传入的参数值,如果结果状态为 0,就将锁的 Owner 设置为 null。

因为ReentrantLock允许重入,所以只有unlock()的次数与lock()的次数相同才能成功释放锁。

unparkSuccessor()

release()方法中调用unparkSuccessor()方法唤醒后续节点,该方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 private void unparkSuccessor(Node node) {
int ws = node.waitStatus; //获取当前节点状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); //设置当前节点状态为0
Node s = node.next; //获取下一个节点
if (s == null || s.waitStatus > 0) {
//如果下一个节点为 null 或者 status>0 表示节点为 cancelled 状态
s = null;
//从尾部节点开始扫描,找到距离 head 最近的一个节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //下一个节点状态正常,则直接唤醒
LockSupport.unpark(s.thread);
}

acquireQueued()

在线程被唤醒后,从acquireQueued()方法中重新开始执行,因为线程就是在该方法中被挂起的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //获取Node的前置节点
if (p == head && tryAcquire(arg)) { //如果前置节点是head节点,则可以尝试获取锁
setHead(node); //获取锁成功,设置Node为新的head节点
p.next = null; //将原head节点的next指针设置为null,help GC
failed = false;
return interrupted;
}
//调用shouldParkAfterFailedAcquire()方法判断是否需要挂起
//调用parkAndCheckInterrupt()将线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

因为锁已经被释放,所以此时可以成功通过tryAcquire()方法获取锁,然后设置该线程尾新的head节点,且将原head节点的next指针设置为null。

以上便是ReentrantLock获取锁和释放锁的全部过程。