异步-CompletableFuture详解

it2024-01-08  60

1.0 线程回顾

1.1 初始化线程的4中方式

继承Thread实现Runnable接口实现Callable 接口+FutureTask (可以拿到返回结果,可以处理异常)线程池

通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。

import java.util.concurrent.*; public class ThreadTest { public static ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main......start..."); // Thread01 thread01 = new Thread01(); thread01.start(); //启动线程 // Runnable01 runnable01 = new Runnable01(); // new Thread(runnable01).start(); // FutureTask<Integer> futureTask = new FutureTask<>(new Callable01()); // new Thread(futureTask).start(); // //阻塞等待 整个线程执行完成,获取返回结果 // Integer integer = futureTask.get(); // System.out.println("main......end...."+integer ); //这个最后执行 //将所有的多线程异步任务都交给线程池执行,这样不用一个任务申请一个线程了 // new Thread(()-> System.out.println("hello")).start(); //当前系统中只有一二个,每个异步任务直接提交给线程池 service.execute(new Runnable01()); System.out.println("main......end...."); } public static class Thread01 extends Thread{ @Override public void run() { System.out.println("dangqiang线程" +Thread.currentThread().getId()); int i =10 /2; System.out.println("运行结果"+i); } } public static class Runnable01 implements Runnable{ @Override public void run() { System.out.println("dangqiang线程" +Thread.currentThread().getId()); int i =10 /2; System.out.println("运行结果"+i); } } public static class Callable01 implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("dangqiang线程" +Thread.currentThread().getId()); int i =10 /2; System.out.println("运行结果"+i); return i; } } } main......start... main......end.... dangqiang线程11 运行结果5 方式1和2 : 主线程无法获取线程的运算结果,不适合当前场景 方式3: 主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源.可以导致服务器的资源耗尽 方式4: 通过以下二种方式初始化线程池 Executors.newFiexedThreadPool(3); //或者 new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime,TimeUnit unit,workQueue,threadFactory,handler)

1.2 线程池

2.CompletableFuture 异步编排

2.1 启动异步任务

public static ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main....start....."); // CompletableFuture.runAsync(()->{ // System.out.println("当前线程: "+Thread.currentThread().getId()); // int i=10/2; // System.out.println("运行结果:" +i); // },service); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程: " + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; }, service); Integer integer = future.get(); System.out.println("main........end......."+integer); } main....start..... 当前线程: 11 运行结果:5 main........end.......5

.2.2 CompletableFuture-完成回调与异常感知

public static ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main....start....."); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程: " + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; }, service).whenComplete((res,excption)->{ // 虽然得到异常,但是无法修改返回数据 System.out.println("异常任务成功...结果是:"+res +";异常是: "+excption); }).exceptionally(throwable -> { //可以感知异常,同时返回默认值 return 10; }); Integer integer = future.get(); System.out.println("main........end......."+integer); } } main....start..... 当前线程: 11 运行结果:5 异常任务成功...结果是:5;异常是: null main........end.......5 如过业务有异常(int i =10/0), main....start..... 当前线程: 11 异常任务成功...结果是:null;异常是: java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero main........end.......10

2.3 handle 方法完成后的处理

.handle((res,thr)->{ if (res!=null){ return res*2; //没异常 } if (thr!=null){ //有异常 return 0; } return 1; });

2.4 线程串行化方法

public static ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main....start....."); CompletableFuture<String> applyAsync = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程: " + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; }, service).thenApplyAsync((res) -> { System.out.println("任务二启动了 ..." + res); return "hello" + res; }, service); String s = applyAsync.get(); System.out.println("main........end......."+s); } } main....start..... 当前线程: 11 运行结果:5 任务二启动了 ...5 main........end.......hello5

2.5 两任务组合-都要完成

System.out.println("main....start....."); /** * 两个都完成 */ CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("任务1结束:"); return i; }, service); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程:" + Thread.currentThread().getId()); System.out.println("任务2结束:"); return "Hello"; }, service); future01.runAfterBothAsync(future02,()->{ //不接受上二步的返回值,自己也没返回值 System.out.println("任务3开始"); }, service); future01.thenAcceptBothAsync(future02,(f1, f2)->{ System.out.println("任务3开始。。。之前的结果:" + f1 + "==>" + f2); }, service); System.out.println("main....end....." ); main....start..... 任务1线程:11 任务1结束: 任务2线程:12 任务2结束: 任务3开始 main....end..... 任务3开始。。。之前的结果:5==>Hello CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> { return f1 + ":" + f2 + " -> Haha"; }, service); System.out.println("main....end....." + future.get()); main....start..... 任务1线程:11 任务1结束: 任务2线程:12 任务2结束: main....end.....5:Hello -> Haha

2.6 两任务组合-只要一个完成

当两个任务中,任意一个future任务完成的时候,执行任务。 applyIoEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。 acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。 runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { System.out.println("任务1线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("任务1结束:"); return i; }, service); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { System.out.println("任务2线程:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务2结束:"); return "Hello"; }, service); future01.runAfterEitherAsync(future02, () -> { System.out.println("任务3结束:"); }, service); System.out.println("main....end....."); } main....start..... 任务1线程:11 任务1结束: 任务2线程:12 main....end..... 任务3结束: 任务2结束:

2.7 allof,anyof方法

allof: 顾名思义,就是所有的任务串行处理,无返回值. anyif: 就是只要有一个任务执行完成后就返回future并将第一个完成的参数带着一起返回

public class ThreadTest { public static ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); System.out.println("f1..."); } catch (InterruptedException e) { e.printStackTrace(); } return "f1"; }, service); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("f2..."); } catch (InterruptedException e) { e.printStackTrace(); } // throw new RuntimeException("aa"); // 如果想处理异常,可以直接处理,可以通过whencomplete,handler方法… return "f2"; }, service); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); System.out.println("f3..."); } catch (InterruptedException e) { e.printStackTrace(); } return "f3"; }, service); CompletableFuture<String> all = CompletableFuture.allOf(f1, f2, f3).thenApplyAsync(x -> { return "等到f1,f2和f3都结束了,才能执行 ,X="+x; }, service); System.out.println(":主线程..."); //join 或者get String aVoid = all.get(); System.out.println(aVoid); // 阻塞等待 System.out.println("任务均已完成。"); //这个最后显示,因为要等上一步的输出 } } :主线程... //第一个出来 f2... //2秒后出来 f1... //这个及下面的 差不多一起 f3... 等到f1,f2和f3都结束了,才能执行 ,X=null 任务均已完成。 CompletableFuture<Object> handle = CompletableFuture.anyOf(f1, f2, f3).handle((res, thr) -> { if (res != null) { System.out.println("可以直接返回一个已完成的线程: " + res); return "xiaoming"; } if (thr != null) { //有异常 return 0; } return 1; }); //不能异步处理 :主线程... f2... 可以直接返回一个已完成的线程: f2 xiaoming 任务均已完成。 f1... f3...
最新回复(0)