JUC(1) CompletableFuture
2023-12-27 22:55:14 # Language # Java

1. 线程基础知识

  • 1把锁:synchronized

  • 2个并:

    • 并发(concurrent):是在同一实体上的多个事件,是在一台机器上“同时”处理多个任务,同一时刻,其实是只有一个事情再发生。

    • 并行(parallel):是在不同实体上的多个事件,是在多台处理器上同时处理多个任务,同一时刻,大家都在做事情,你做你的,我做我的,各干各的。

  • 3个程:

    • 进程:在系统中运行的一个应用程序,每个进程都有它自己的内存空间和系统资源

    • 线程:也被称为轻量级进程,在同一个进程内会有1个或多个线程,是大多数操作系统进行时序调度的基本单元。

    • 管程:Monitor(锁),也就是我们平时所说的锁。Monitor其实是一种同步机制,它的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码,JVM中同步是基于进入和退出监视器(Monitor管程对象)来实现的,每个对象实例都会有一个Monitor对象,Monitor对象和Java对象一同创建并销毁,底层由C++语言实现。

  • 线程分类(一般不做特别说明配置,默认都是用户线程):

    • 用户线程:是系统的工作线程,它会完成这个程序需要完成的业务操作。

    • 守护线程:是一种特殊的线程为其他线程服务的,在后台默默地完成一些系统性的任务,比如垃圾回收线程就是最典型的例子。守护线程作为一个服务线程,没有服务对象就没有必要继续运行了,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了。所以假如当系统只剩下守护线程的时候,守护线程伴随着JVM一同结束工作。

public class DaemonDemo {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 开始运行," + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程"));
while (true) {

}
}, "t1");
t1.setDaemon(true);//通过设置属性Daemon来设置当前线程是否为守护线程
t1.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + " 主线程结束");
}
}


输出:
Thread-0 开始运行,守护线程
main 主线程结束

// 在main主线程结束后,守护线程会伴随着JVM一同结束工作,即使还有循环没有结束

2. Future

Future是Java5新加的一个接口,它提供一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们会就可以通过Future把这个任务放进异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

image.png

3. FutureTask

目的:异步多线程任务执行且返回有结果,三个特点:多线程、有返回、异步任务(班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

代码实现:Runnable接口 + Callable接口 + Future接口 + FutureTask实现类。

在这里插入图片描述

线程 Thread 构造函数仅接收 Runnable 类型参数,Runnable 无返回值

FutureTask 解决了无返回值问题,构造函数接收 Callable 类型,并且本身实现 Runnable 接口,可以作为 Thread 的构造函数参数

使用线程池的 submit() 可以省略构造线程并 start() 的代码

public void futureTaskDemo() throws Exception {
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + "come in");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
});

Thread t1 = new Thread(futureTask, "t1");
t1.start();

System.out.println(Thread.currentThread().getName() + " ------忙其他任务");

// 获取结果的三种方式
// System.out.println(futureTask.get());//这样会有阻塞的可能,在程序没有计算完毕的情况下。
// System.out.println(futureTask.get(3,TimeUnit.SECONDS));//只愿意等待三秒,计算未完成直接抛出异常
while (true) {//轮询
if(futureTask.isDone()) {
System.out.println(futureTask.get());
break;
} else {
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("正在处理中,不要催了,越催越慢");
}
}
}
  • 优点:Future + 线程池异步多线程任务配合,能显著提高程序的运行效率。

  • 缺点:

    • get()阻塞—-一旦调用get()方法求结果,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,如果没有计算完成容易程序堵塞。

    • isDone()轮询—-轮询的方式会耗费无谓的cpu资源,而且也不见得能及时得到计算结果,如果想要异步获取结果,通常会以轮询的方式去获取结果,尽量不要阻塞。

  • 结论:Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果

4. CompletableFuture

FutureTask 只能通过阻塞或轮询的方式得到任务的结果,显然不是我们想要的

对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。JDK8 设计的 CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

在这里插入图片描述

  • CompletionStage
    • 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
    • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:
      • stage.thenApply(x -> square(x)).thenAccept(x->System.out.println(x)).thenRun(()->System.out.println("123"))
  • CompletableFuture
    • 提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法
    • 它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作

核心的四个静态方法,来创建一个异步任务

在这里插入图片描述

函数式编程相关见 Java8函数式编程

img

public static void completableFutureBuildDemo() throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executorService);

System.out.println(completableFuture.get()); // null

CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "hello supplyAsync";
}, executorService);

System.out.println(objectCompletableFuture.get()); // hello supplyAsync

executorService.shutdown();
}

executorService.shutdown()

  • 线程池会把正在执行的任务及队列中等待执行的任务都执行完毕后,再去关闭;
  • 如果还有新的任务过来,线程池就会拒绝

4.1 CompletableFuture 常用方法

获得结果

  • public T get(): 获取结果,会阻塞
  • public T get(long timeout, TimeUnit unit): 设置超时时间
  • public T join(): 和get一样的作用,只是不需要抛出异常
  • public T getNow(T valuelfAbsent): 计算完成就返回正常值,否则返回valuelfAbsent, 立即获取结果不阻塞

触发计算

  • public boolean complete(T value): 是否打断get方法立即返回括号值
    • 一旦调用了 complete() 方法,CompletableFuture 对象的状态会立即变为已完成,而且之后任何对该对象的计算都不会再触发异步任务的执行。如果该对象已经处于完成状态,再次调用 complete() 方法不会有任何效果。
    • 如果异步任务已经抛出了异常,调用 complete() 方法将不会有任何效果。此时,可以使用 completeExceptionally(Throwable ex) 方法手动设置异步任务的异常结果。
    • 如果有多个线程同时尝试调用 complete() 方法,只有第一个成功的线程能够设置结果,其他线程的调用将被忽略。

对计算结果进行处理

  • thenApply:上一步有异常就叫停
  • handle:上一步有异常,该步骤正常执行
public static void completableFutureApiDemo() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}, threadPool).thenApply(f -> {
System.out.println("222");
int i = 10 / 0; // 除0异常
return f + 2;
}).handle((f, e) -> { // 会正常执行
System.out.println("3333");
return f + 2;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("----计算结果" + v);
} else {
System.out.println("----出现异常, 计算结果" + v);
}
}).exceptionally(e -> {
System.out.println("出现异常");
return null;
});
System.out.println(Thread.currentThread().getName() + "------主线程先去做其他事情");
threadPool.shutdown();
}

发生异常后进入exceptionally代码块,但是thenApply中的代码不会执行,whenComplete依旧会执行

main------主线程先去做其他事情
222
3333
----出现异常, 计算结果null
出现异常

img

对计算结果进行消费

  • thenAccept

对比补充

  • thenRun(Runnable runnable) :任务A执行完执行B,并且不需要A的结果
  • thenAccept(Consumer action): 任务A执行完执行B,B需要A的结果,但是任务B没有返回值
  • thenApply(Function fn): 任务A执行完执行B,B需要A的结果,同时任务B有返回值
public static void main(String[] args) {
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenRun(() -> {}).join());//null
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenAccept(r -> System.out.println(r)).join());//result null
System.out.println(CompletableFuture.supplyAsync(() -> "result").thenApply(f -> f + 2).join());//result2
}

CompletableFuture和线程池说明

  • 如果没有传入自定义线程池,都用默认线程池ForkJoinPool

  • 如果你执行第一个任务时,传入了一个自定义线程池

    • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务时共用同一个线程池

    • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自定义的线程池,第二个任务使用的是ForkJoin线程池

  • 备注:可能是线程处理太快,系统优化切换原则,直接使用了 main 线程处理

  • thenAcceptthenAcceptAsyncthenApplythenApplyAsync等之间的区别同理

对计算速度进行选用

  • 谁快用谁
  • applyToEither
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("A come in");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playA";
}, threadPool);


CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("B come in");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playB";
}, threadPool);

CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + " is winner";
});

/**
* A come in
* B come in
* main-----------winner:playA is winner
*/
System.out.println(Thread.currentThread().getName() + "-----------winner:" + result.join());
}

对计算结果合并

  • 两个CompletableStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理
  • 先完成的等待其他分支任务
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 启动");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 启动");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});

CompletableFuture<Integer> finalResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("----------开始两个结果合并");
return x + y;
});
System.out.println(finalResult.join());

}
/*
ForkJoinPool.commonPool-worker-9 启动
ForkJoinPool.commonPool-worker-2 启动
----------开始两个结果合并
30
*/

并行执行

  • allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
  • anyOf():只要有一个任务执行完成后就返回 future 并将第一个完成的参数带着一起返回
public static void main(String[] args) throws Exception {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f1";
});

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f2";
});

// allof测试
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2).thenApply(x -> {
System.out.println("all");
return null;
});
// anyof测试
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2).thenApply(x -> {
System.out.println(x);
return x;
});;


System.out.println(System.currentTimeMillis() + ":阻塞");
Void aVoid = all.get();
System.out.println(System.currentTimeMillis() + ":阻塞结束");

//一个需要耗时2秒,一个需要耗时3秒,只有当最长的耗时3秒的完成后,才会结束。
System.out.println("任务均已完成。");
/*
1703688408160:阻塞
all
1703688411161:阻塞结束
任务均已完成。
*/

System.out.println(any.get());
/*
f2
f2
*/
}