异步编程
# FutureTask
实现了RunnableFuture接口(继承了Runnable和Future接口)
案例代码:
public class futureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask<>(new MyThread());
Thread t = new Thread(futureTask, "t");
t.start();
System.out.println(futureTask.get());
}
}
class MyThread implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("异步执行中。。。");
return "执行完成";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Future缺点:
get阻塞
相当于join阻塞了main
public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(() -> { TimeUnit.SECONDS.sleep(5); return "task over"; }); Thread t1 = new Thread(futureTask, "t1"); t1.start(); System.out.println(futureTask.get()); //main被阻塞 System.out.println(Thread.currentThread().getName()); }
1
2
3
4
5
6
7
8
9
10
11isDone()轮询
利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu
#public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(() -> { TimeUnit.SECONDS.sleep(5); return "task over"; }); Thread t1 = new Thread(futureTask, "t1"); t1.start(); while(true){ if(futureTask.isDone()){ System.out.println(futureTask.get()); break; }else{ TimeUnit.MILLISECONDS.sleep(500); System.out.println("处理中..."); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
1
无返回值runAsync
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
1
2配合线程池案例代码
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3); CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } },threadPool); System.out.println(completableFuture.get()); } //pool-1-thread-1 //null 无返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14有返回值supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
1
2配合whenComplete减少阻塞轮询
public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(3); CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()); int res = ThreadLocalRandom.current().nextInt(10); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return res; },threadPool).whenComplete((v,e)->{//v:值e:异常 if(e==null){ System.out.println("计算完成,v="+v); } }).exceptionally(e->{ e.printStackTrace();; System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage()); return null; }); System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
优点:
- 异步任务结束时,会自动回调某个对象的方法;
- 主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行
- 异步任务出错时,会自动回调某个对象的方法。
# 常用API
获得结果和触发计算
- get()阻塞
- join()类似get,区别在于不抛出异常
- getNow(T valueIfAbsent)没计算完成情况下,给一个替代结果
- complete(T val)是否立即打断get()方法返回括号值
案例代码:
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2);//执行需要2秒 } catch (InterruptedException e) { e.printStackTrace(); } return "abc"; }); //System.out.println(uCompletableFuture.getNow("xxx"));//直接返回xxx //打断了返回true completeValue;没打断false abc System.out.println(uCompletableFuture.complete("completeValue")+"\t"+uCompletableFuture.get()); }
1
2
3
4
5
6
7
8
9
10
11
12
13对计算结果进行处理
thenApply
计算结果存在在依赖关系,使得线程串行化,因为依赖关系,所以一旦有异常,直接叫停
public static void main(String[] args) throws ExecutionException, InterruptedException { //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化, CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111"); return 1024; }).thenApply(f -> { System.out.println("222"); return f + 1; }).thenApply(f -> { //int age = 10/0; // 异常情况:那步出错就停在那步。 System.out.println("333"); return f + 1; }).whenCompleteAsync((v,e) -> { System.out.println("v: "+v); }).exceptionally(e -> { e.printStackTrace(); return null; }); System.out.println("-----主线程结束,END"); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: //或者自己开线程池 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26handle
类似于thenApply,有异常任然可以往下走一步
对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,1&2的是Function
thenAccept
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { return 1; }).thenApply(f -> { return f + 2; }).thenApply(f -> { return f + 3; }).thenApply(f -> { return f + 4; }).thenAccept(r -> System.out.println(r)); } //6
1
2
3
4
5
6
7
8
9
10
11
12
13
Code之任务之间执行顺序
thenRun(Runnable runnable)
任务A执行完执行B,并且B不需要A的结果
thenAccept(Consumer action)
任务A执行完执行B,B需要A的结果,但是任务B无返回值
thenApply(Function fn)
任务A执行完执行B,B需要A的结果,同时任务B有返回值