J.U,C
# AQS
- 用state属性表示资源状态(独占模式,共享模式(信号量)),子类需要定义如何维护这个状态,控制如何获取锁和释放锁;
- 提供基于FIFO等待队列,类似于Monitor的EntryList;
- 条件变量来实现等待,唤醒机制,支持多个条件变量,类似于Monitor的WaitSet;
同步器提供的模板方法分为3类:
- 独占式获取和释放同步状态;
- 共享式获取和释放同步状态;
- 查询同步队列中的等待线程情况;
同步状态用什么修饰?
volatile int
独占式获取/释放同步状态流程?
获取同步状态时,AQS维护一个同步队列,获取状态失败的线程被加入到队列并在队列中自旋;
移出队列的条件是前驱节点是头节点并且成功获取了同步状态;
释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头节点的后继节点;
# 自定义同步器
子类需要实现的方法:
- tryAcquire 阻塞线程用park/unpark
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
class MySync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int acquires) {//尝试一次
if(acquires == 1){
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());//当前线程为独占线程
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int acquires) {
if(acquires == 1){
if(getState() == 0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
return false;
}
@Override
protected boolean isHeldExclusively() {//是否独占锁
return getState() == 1;
}
protected Condition newCondition(){
return new ConditionObject();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 自定义不可重入锁
class MyLock implements Lock{
private MySync sync = new MySync();
@Override
public void lock() {//加锁,不成功进入等待队列
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {//只会尝试一次
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# ReentrantLock原理
类图:
加锁流程:(非公平锁)
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2
3
4
5
6
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//创建节点对象,加入等待队列,并再次尝试
selfInterrupt();
}
2
3
4
5
state=-1,代表有责任唤醒后继节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);//抢锁成功,自己设为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2
3
4
5
6
7
8
9
10
11
12
13
//将节点加入到等待队列尾部,返回前驱节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ReentrantReadWriteLock
同一时刻允许多个读线程访问,但写线程访问时,所有读线程和其它写线程均被阻塞;
写操作开始时,所有晚于读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(所有写操作依靠synchronzied进行同步),使读操作都能读取到正确数据,不会出现脏读;
原理:
读写状态设计:
一个整形变量,高16位表示读,低16位表示写;
写状态 = S & 0x0000FFFF(高16位抹去);
读状态 = S >> 16
推论:S≠0时,写状态=0,读状态>0,表示读锁被获取;
# Semaphore
限制同时访问共享资源的线程上限;
获取与释放资源:acquire(),release();
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} finally {
semaphore.release();
}
}).start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
限流应用:限制线程数,非资源数(Tomcat连接数是LimitLatch实现)
//对比享元模式的wait/notify
class Pool {
private final int poolSize;
private Connection[] connections;
private AtomicIntegerArray states;
private Semaphore semaphore;
public Pool(int poolSize) {
this.poolSize = poolSize;
// 让许可数与资源数一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i+1));
}
}
public Connection borrow() {
try {
semaphore.acquire(); // 没有许可的线程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
return null;
}
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
log.debug("free {}", conn);
semaphore.release();
break;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# CountdownLatch
用于线程同步协作;
await()等待计数归零,countDown()让计数减一;
使用Demo:
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(2);
ExecutorService service = Executors.newFixedThreadPool(3);
service.submit(() -> {
sleep(1000);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
sleep(1500);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(()->{
try {
latch.await();//等到上面减为0才执行以下代码
log.debug("wait end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
案例:等待多线程全部到100%
public static void main(String[] args) throws InterruptedException {
AtomicInteger num = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {
return new Thread(r, "t" + num.getAndIncrement());
});
CountDownLatch latch = new CountDownLatch(10);
String[] all = new String[10];
Random r = new Random();
for (int j = 0; j < 10; j++) {
int x = j; /*做成局部常量为了all[x]*/
service.submit(() -> {
for (int i = 0; i <= 100; i++) {
try {
Thread.sleep(r.nextInt(100));
} catch (InterruptedException e) {
}
all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";
System.out.print("\r" + Arrays.toString(all));
}
latch.countDown();
});
}
latch.await();
System.out.println("\n游戏开始...");
service.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# CyclicBarrier
区别于CountdownLatch:
等待的线程满足计数个数时,可以继续执行
案例:循环执行任务
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(3);
CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
log.debug("task1, task1, task2 finish...");
});
for (int i = 0; i < 3; i++) { // task1 task2 第二次循环的task1
service.submit(() -> {
log.debug("task1 begin...");
sleep(1);
try {
barrier.await(); // 2-1=1
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(() -> {
log.debug("task2 begin...");
sleep(2);
try {
barrier.await(); // 1-1=0
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 原子类
# AtomicInteger
AtomicInteger i = new AtomicInteger(5);
i.incrementAndGet();
i.getAndIncrement();
i.getAndAdd(5);
updateAndGet(i, p -> p / 2);//自定义的applyAsInt
2
3
4
5
6
原理:
public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator/*策略模式*/) {
while (true) {
int prev = i.get();
int next = operator.applyAsInt(prev);//任意计算方式
if (i.compareAndSet(prev, next)) {
return next;
}
}
}
2
3
4
5
6
7
8
9
// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
2
3
4
5
6
7
8
9
10
11
12
13
利用CAS+volatile保证原子性,volatile保证可见性,JVM保证任何时刻任何线程总能拿到该变量的最新值;
# AtomicReference
原子引用
使用案例代码:
@Slf4j
public class Atom {
public static void main(String[] args) {
DecimalAccount.test(new DecimalAccountCas(new BigDecimal("10000")));
}
}
class DecimalAccountCas implements DecimalAccount{
private AtomicReference<BigDecimal> balance;
public DecimalAccountCas(BigDecimal balance) {
this.balance = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withdraw(BigDecimal amount) {
while(true){
BigDecimal prev = balance.get();
BigDecimal nxt = prev.subtract(amount);
if(balance.compareAndSet(prev,nxt)){
break;
}
}
}
}
interface DecimalAccount{
BigDecimal getBalance();
void withdraw(BigDecimal amount);
static void test(DecimalAccount account){
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(()->{
account.withdraw(BigDecimal.TEN);
}));
}
ts.forEach(Thread::start);
/*ts.forEach(thread -> thread.start());*/
ts.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# LongAddr
原子累加器
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(
() -> new AtomicLong(0),
(adder) -> adder.getAndIncrement()
);
}
for (int i = 0; i < 5; i++) {
demo(
() -> new LongAdder(),
adder -> adder.increment()
);
}
}
/*
() -> 结果 提供累加器对象 supplier没有参数有返回值
(参数) -> 执行累加操作 consumer有参数没有返回值
*/
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
List<Thread> ts = new ArrayList<>();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
long start = System.nanoTime();
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start) / 1000_000);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
LongAddr为什么比AtomicLong性能好?
CAS一个竞争单元,大部分线程需要自旋,消耗性能;
LongAddr竞争时设置多个累加单元,Thread0累加Cell[0],Thread1累加Cell[1]...最后汇总,减少CAS重试失败次数,提高性能;
LongAddr几个关键域:
//累加单元组 transient禁止序列化 transient volatile Cell[] cells; //基础值,没有竞争,cas累加这个域 transient volatile long base; //在cells创建或扩容时,置为1,表示加锁 transient volatile int cellsBusy
1
2
3
4
5
6一个Cell 24字节:16字节的对象头,8字节的value,缓存行可以放下2个Cell对象