Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 线程

  • 共享模型

    • 管程
    • JMM
    • 乐观锁
    • 线程池
    • J.U,C
    • 异步编程
      • FutureTask
      • CompletableFuture
      • 常用API
    • 多线程编程
    • 动态线程池
  • 非共享模型

  • 并行

  • 多线程设计模式

  • JUC
  • 共享模型
Nreal
2023-11-01
目录

异步编程

# FutureTask

实现了RunnableFuture接口(继承了Runnable和Future接口)

案例代码:

public class futureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask<>(new MyThread());
        Thread t = new Thread(futureTask, "t");
        t.start();
        System.out.println(futureTask.get());
    }
}
class MyThread implements Callable<String>{
    @Override
    public String call() throws Exception {
        System.out.println("异步执行中。。。");
        return "执行完成";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Future缺点:

  • get阻塞

    相当于join阻塞了main

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            TimeUnit.SECONDS.sleep(5);
            return "task over";
        });
        Thread t1 = new Thread(futureTask, "t1");
        t1.start();
        System.out.println(futureTask.get());
        //main被阻塞
        System.out.println(Thread.currentThread().getName());
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
  • isDone()轮询

    利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu

    #public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask&lt;String&gt; futureTask = new FutureTask&lt;&gt;(() -&gt; {
            TimeUnit.SECONDS.sleep(5);
            return &quot;task over&quot;;
        });
        Thread t1 = new Thread(futureTask, &quot;t1&quot;);
        t1.start();
        while(true){
            if(futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }else{
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println(&quot;处理中...&quot;);
            }
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

# CompletableFuture

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 
1
  • 无返回值runAsync

    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
    
    1
    2

    配合线程池案例代码

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },threadPool);
        System.out.println(completableFuture.get());
    }
    //pool-1-thread-1
    //null 无返回值
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  • 有返回值supplyAsync

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
    
    1
    2

    配合whenComplete减少阻塞轮询

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            int res = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return res;
        },threadPool).whenComplete((v,e)->{//v:值e:异常
            if(e==null){
                System.out.println("计算完成,v="+v);
            }
        }).exceptionally(e->{
            e.printStackTrace();;
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27

优点:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法。

# 常用API

  1. 获得结果和触发计算

    • get()阻塞
    • join()类似get,区别在于不抛出异常
    • getNow(T valueIfAbsent)没计算完成情况下,给一个替代结果
    • complete(T val)是否立即打断get()方法返回括号值

    案例代码:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);//执行需要2秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });
        //System.out.println(uCompletableFuture.getNow("xxx"));//直接返回xxx
        //打断了返回true completeValue;没打断false abc
        System.out.println(uCompletableFuture.complete("completeValue")+"\t"+uCompletableFuture.get());
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
  2. 对计算结果进行处理

    • thenApply

      计算结果存在在依赖关系,使得线程串行化,因为依赖关系,所以一旦有异常,直接叫停

      public static void main(String[] args) throws ExecutionException, InterruptedException
      {
          //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
          CompletableFuture.supplyAsync(() -> {
              try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
              System.out.println("111");
              return 1024;
          }).thenApply(f -> {
              System.out.println("222");
              return f + 1;
          }).thenApply(f -> {
              //int age = 10/0; // 异常情况:那步出错就停在那步。
              System.out.println("333");
              return f + 1;
          }).whenCompleteAsync((v,e) -> {
              System.out.println("v: "+v);
          }).exceptionally(e -> {
              e.printStackTrace();
              return null;
          });
      
          System.out.println("-----主线程结束,END");
          // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
          //或者自己开线程池
          try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
    • handle

      类似于thenApply,有异常任然可以往下走一步

  3. 对计算结果进行消费

    接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,1&2的是Function

    • thenAccept

      public static void main(String[] args) throws ExecutionException, InterruptedException
      {
          CompletableFuture.supplyAsync(() -> {
              return 1;
          }).thenApply(f -> {
              return f + 2;
          }).thenApply(f -> {
              return f + 3;
          }).thenApply(f -> {
              return f + 4;
          }).thenAccept(r -> System.out.println(r));
      }
      //6
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13

    Code之任务之间执行顺序

    • thenRun(Runnable runnable)

      任务A执行完执行B,并且B不需要A的结果

    • thenAccept(Consumer action)

      任务A执行完执行B,B需要A的结果,但是任务B无返回值

    • thenApply(Function fn)

      任务A执行完执行B,B需要A的结果,同时任务B有返回值

J.U,C
多线程编程

← J.U,C 多线程编程→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式