Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 线程

  • 共享模型

    • 管程
    • JMM
    • 乐观锁
    • 线程池
    • J.U,C
      • AQS
      • 自定义同步器
      • 自定义不可重入锁
      • ReentrantLock原理
      • ReentrantReadWriteLock
      • Semaphore
      • CountdownLatch
      • CyclicBarrier
      • 原子类
        • AtomicInteger
        • AtomicReference
        • LongAddr
    • 异步编程
    • 多线程编程
    • 动态线程池
  • 非共享模型

  • 并行

  • 多线程设计模式

  • JUC
  • 共享模型
Nreal
2023-11-01
目录

J.U,C

# AQS

  • 用state属性表示资源状态(独占模式,共享模式(信号量)),子类需要定义如何维护这个状态,控制如何获取锁和释放锁;
  • 提供基于FIFO等待队列,类似于Monitor的EntryList;
  • 条件变量来实现等待,唤醒机制,支持多个条件变量,类似于Monitor的WaitSet;

同步器提供的模板方法分为3类:

  1. 独占式获取和释放同步状态;
  2. 共享式获取和释放同步状态;
  3. 查询同步队列中的等待线程情况;

同步状态用什么修饰?

volatile int

独占式获取/释放同步状态流程?

获取同步状态时,AQS维护一个同步队列,获取状态失败的线程被加入到队列并在队列中自旋;

移出队列的条件是前驱节点是头节点并且成功获取了同步状态;

释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头节点的后继节点;

# 自定义同步器

子类需要实现的方法:

  • tryAcquire 阻塞线程用park/unpark
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively
class MySync extends AbstractQueuedSynchronizer{
    @Override
    protected boolean tryAcquire(int acquires) {//尝试一次
        if(acquires == 1){
            if(compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());//当前线程为独占线程
                return true;
            }
        }
        return false;
    }
    @Override
    protected boolean tryRelease(int acquires) {
        if(acquires == 1){
            if(getState() == 0){
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        return false;
    }
    @Override
    protected boolean isHeldExclusively() {//是否独占锁
        return getState() == 1;
    }

    protected Condition newCondition(){
        return new ConditionObject();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 自定义不可重入锁

class MyLock implements Lock{
    private MySync sync = new MySync();
    @Override
    public void lock() {//加锁,不成功进入等待队列
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {//只会尝试一次
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);

    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# ReentrantLock原理

类图:

加锁流程:(非公平锁)

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
1
2
3
4
5
6
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//创建节点对象,加入等待队列,并再次尝试
        selfInterrupt();
}
1
2
3
4
5

state=-1,代表有责任唤醒后继节点

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);//抢锁成功,自己设为头节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//将节点加入到等待队列尾部,返回前驱节点
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# ReentrantReadWriteLock

同一时刻允许多个读线程访问,但写线程访问时,所有读线程和其它写线程均被阻塞;

写操作开始时,所有晚于读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(所有写操作依靠synchronzied进行同步),使读操作都能读取到正确数据,不会出现脏读;

原理:

读写状态设计:

一个整形变量,高16位表示读,低16位表示写;

写状态 = S & 0x0000FFFF(高16位抹去);

读状态 = S >> 16

推论:S≠0时,写状态=0,读状态>0,表示读锁被获取;

# Semaphore

限制同时访问共享资源的线程上限;

获取与释放资源:acquire(),release();

public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(3);
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                log.debug("running...");
                sleep(1);
                log.debug("end...");
            } finally {
                semaphore.release();
            }
        }).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

限流应用:限制线程数,非资源数(Tomcat连接数是LimitLatch实现)

//对比享元模式的wait/notify
class Pool {
    private final int poolSize;
    private Connection[] connections;
    private AtomicIntegerArray states;
    private Semaphore semaphore;

    public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("连接" + (i+1));
        }
    }

    public Connection borrow() {
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if(states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    log.debug("borrow {}", connections[i]);
                    return connections[i];
                }
            }
        }
        return null;
    }

    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                log.debug("free {}", conn);
                semaphore.release();
                break;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# CountdownLatch

用于线程同步协作;

await()等待计数归零,countDown()让计数减一;

使用Demo:

public static void main(String[] args) {
    CountDownLatch latch = new CountDownLatch(2);
    ExecutorService service = Executors.newFixedThreadPool(3);
    service.submit(() -> {
        sleep(1000);
        latch.countDown();
        log.debug("end...{}", latch.getCount());
    });
    service.submit(() -> {
        sleep(1500);
        latch.countDown();
        log.debug("end...{}", latch.getCount());
    });
    service.submit(()->{
        try {
            latch.await();//等到上面减为0才执行以下代码
            log.debug("wait end...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

案例:等待多线程全部到100%

public static void main(String[] args) throws InterruptedException {
    AtomicInteger num = new AtomicInteger(0);
    ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {
        return new Thread(r, "t" + num.getAndIncrement());
    });
    CountDownLatch latch = new CountDownLatch(10);
    String[] all = new String[10];
    Random r = new Random();
    for (int j = 0; j < 10; j++) {
        int x = j; /*做成局部常量为了all[x]*/
        service.submit(() -> {
            for (int i = 0; i <= 100; i++) {
                try {
                    Thread.sleep(r.nextInt(100));
                } catch (InterruptedException e) {
                }
                all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";
                System.out.print("\r" + Arrays.toString(all));
            }
            latch.countDown();
        });
    }
    latch.await();
    System.out.println("\n游戏开始...");
    service.shutdown();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# CyclicBarrier

区别于CountdownLatch:

等待的线程满足计数个数时,可以继续执行

案例:循环执行任务

public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(3);
    CyclicBarrier barrier = new CyclicBarrier(2, ()-> {
        log.debug("task1, task1, task2 finish...");
    });
    for (int i = 0; i < 3; i++) { // task1  task2  第二次循环的task1
        service.submit(() -> {
            log.debug("task1 begin...");
            sleep(1);
            try {
                barrier.await(); // 2-1=1
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
        service.submit(() -> {
            log.debug("task2 begin...");
            sleep(2);
            try {
                barrier.await(); // 1-1=0
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        });
    }
    service.shutdown();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 原子类

# AtomicInteger

AtomicInteger i = new AtomicInteger(5);
i.incrementAndGet();
i.getAndIncrement();

i.getAndAdd(5);
updateAndGet(i, p -> p / 2);//自定义的applyAsInt
1
2
3
4
5
6

原理:

public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator/*策略模式*/) {
    while (true) {
        int prev = i.get();
        int next = operator.applyAsInt(prev);//任意计算方式
        if (i.compareAndSet(prev, next)) {
            return next;
        }
    }
}
1
2
3
4
5
6
7
8
9
// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

1
2
3
4
5
6
7
8
9
10
11
12
13

利用CAS+volatile保证原子性,volatile保证可见性,JVM保证任何时刻任何线程总能拿到该变量的最新值;

# AtomicReference

原子引用

使用案例代码:

@Slf4j
public class Atom {
    public static void main(String[] args) {
        DecimalAccount.test(new DecimalAccountCas(new BigDecimal("10000")));
    }
}

class DecimalAccountCas implements DecimalAccount{

    private AtomicReference<BigDecimal> balance;

    public DecimalAccountCas(BigDecimal balance) {
        this.balance = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while(true){
            BigDecimal prev = balance.get();
            BigDecimal nxt = prev.subtract(amount);
            if(balance.compareAndSet(prev,nxt)){
                break;
            }
        }
    }
}

interface DecimalAccount{
    BigDecimal getBalance();
    void withdraw(BigDecimal amount);
    static void test(DecimalAccount account){
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(()->{
                account.withdraw(BigDecimal.TEN);
            }));
        }
        ts.forEach(Thread::start);
        /*ts.forEach(thread -> thread.start());*/
        ts.forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# LongAddr

原子累加器

public static void main(String[] args) {
    for (int i = 0; i < 5; i++) {
        demo(
                () -> new AtomicLong(0),
                (adder) -> adder.getAndIncrement()
        );
    }

    for (int i = 0; i < 5; i++) {
        demo(
                () -> new LongAdder(),
                adder -> adder.increment()
        );
    }
}

/*
() -> 结果    提供累加器对象 supplier没有参数有返回值
(参数) ->     执行累加操作   consumer有参数没有返回值
 */
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
    T adder = adderSupplier.get();
    List<Thread> ts = new ArrayList<>();
    // 4 个线程,每人累加 50 万
    for (int i = 0; i < 4; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 500000; j++) {
                action.accept(adder);
            }
        }));
    }
    long start = System.nanoTime();
    ts.forEach(t -> t.start());
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    long end = System.nanoTime();
    System.out.println(adder + " cost:" + (end - start) / 1000_000);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

LongAddr为什么比AtomicLong性能好?

CAS一个竞争单元,大部分线程需要自旋,消耗性能;

LongAddr竞争时设置多个累加单元,Thread0累加Cell[0],Thread1累加Cell[1]...最后汇总,减少CAS重试失败次数,提高性能;

LongAddr几个关键域:

//累加单元组 transient禁止序列化
transient volatile Cell[] cells;
//基础值,没有竞争,cas累加这个域
transient volatile long base;
//在cells创建或扩容时,置为1,表示加锁
transient volatile int cellsBusy
1
2
3
4
5
6

一个Cell 24字节:16字节的对象头,8字节的value,缓存行可以放下2个Cell对象

线程池
异步编程

← 线程池 异步编程→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式