1. 线程基础知识
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class DaemonDemo { public static void main (String[] args) { Thread t1 = new Thread (() -> { System.out.println(Thread.currentThread().getName() + " 开始运行," + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程" )); while (true ) { } }, "t1" ); t1.setDaemon(true ); t1.start(); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 主线程结束" ); } } 输出: Thread-0 开始运行,守护线程 main 主线程结束
2. Future Future是Java5新加的一个接口,它提供一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们会就可以通过Future把这个任务放进异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
3. FutureTask 目的:异步多线程任务执行且返回有结果,三个特点:多线程、有返回、异步任务 (班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)
代码实现:Runnable接口 + Callable接口 + Future接口 + FutureTask实现类。
线程 Thread 构造函数仅接收 Runnable 类型参数,Runnable 无返回值
FutureTask 解决了无返回值问题,构造函数接收 Callable 类型,并且本身实现 Runnable 接口,可以作为 Thread 的构造函数参数
使用线程池的 submit() 可以省略构造线程并 start() 的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void futureTaskDemo () throws Exception { FutureTask<String> futureTask = new FutureTask <>(() -> { System.out.println(Thread.currentThread().getName() + "come in" ); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } return "task over" ; }); Thread t1 = new Thread (futureTask, "t1" ); t1.start(); System.out.println(Thread.currentThread().getName() + " ------忙其他任务" ); while (true ) { if (futureTask.isDone()) { System.out.println(futureTask.get()); break ; } else { TimeUnit.MILLISECONDS.sleep(500 ); System.out.println("正在处理中,不要催了,越催越慢" ); } } }
4. CompletableFuture FutureTask 只能通过阻塞或轮询 的方式得到任务的结果,显然不是我们想要的
对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。JDK8 设计的 CompletableFuture提供了一种观察者模式 类似的机制,可以让任务执行完成后通知监听的一方。
CompletionStage
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:
stage.thenApply(x -> square(x)).thenAccept(x->System.out.println(x)).thenRun(()->System.out.println("123"))
CompletableFuture
提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果 ,也提供了转换和组合CompletableFuture的方法
它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作
核心的四个静态方法,来创建一个异步任务
函数式编程相关见 Java8函数式编程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static void completableFutureBuildDemo () throws ExecutionException, InterruptedException, TimeoutException { ExecutorService executorService = Executors.newFixedThreadPool(3 ); CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, executorService); System.out.println(completableFuture.get()); CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "hello supplyAsync" ; }, executorService); System.out.println(objectCompletableFuture.get()); executorService.shutdown(); }
executorService.shutdown()
线程池会把正在执行的任务及队列中等待执行的任务都执行完毕后,再去关闭;
如果还有新的任务过来,线程池就会拒绝
4.1 CompletableFuture 常用方法 获得结果
public T get()
: 获取结果,会阻塞
public T get(long timeout, TimeUnit unit)
: 设置超时时间
public T join()
: 和get一样的作用,只是不需要抛出异常
public T getNow(T valuelfAbsent)
: 计算完成就返回正常值,否则返回valuelfAbsent, 立即获取结果不阻塞
触发计算
public boolean complete(T value)
: 是否打断get方法立即返回括号值
一旦调用了 complete()
方法,CompletableFuture 对象的状态会立即变为已完成,而且之后任何对该对象的计算都不会再触发异步任务的执行。如果该对象已经处于完成状态,再次调用 complete()
方法不会有任何效果。
如果异步任务已经抛出了异常,调用 complete()
方法将不会有任何效果。此时,可以使用 completeExceptionally(Throwable ex)
方法手动设置异步任务的异常结果。
如果有多个线程同时尝试调用 complete()
方法,只有第一个成功的线程能够设置结果,其他线程的调用将被忽略。
对计算结果进行处理
thenApply
:上一步有异常就叫停
handle
:上一步有异常,该步骤正常执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static void completableFutureApiDemo () { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return 1 ; }, threadPool).thenApply(f -> { System.out.println("222" ); int i = 10 / 0 ; return f + 2 ; }).handle((f, e) -> { System.out.println("3333" ); return f + 2 ; }).whenComplete((v, e) -> { if (e == null ) { System.out.println("----计算结果" + v); } else { System.out.println("----出现异常, 计算结果" + v); } }).exceptionally(e -> { System.out.println("出现异常" ); return null ; }); System.out.println(Thread.currentThread().getName() + "------主线程先去做其他事情" ); threadPool.shutdown(); }
发生异常后进入exceptionally代码块,但是thenApply中的代码不会执行,whenComplete依旧会执行
1 2 3 4 5 main------主线程先去做其他事情 222 3333 ----出现异常, 计算结果null 出现异常
对计算结果进行消费
对比补充
thenRun(Runnable runnable)
:任务A执行完执行B,并且不需要A的结果
thenAccept(Consumer action)
: 任务A执行完执行B,B需要A的结果,但是任务B没有返回值
thenApply(Function fn)
: 任务A执行完执行B,B需要A的结果,同时任务B有返回值
1 2 3 4 5 public static void main (String[] args) { System.out.println(CompletableFuture.supplyAsync(() -> "result" ).thenRun(() -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "result" ).thenAccept(r -> System.out.println(r)).join()); System.out.println(CompletableFuture.supplyAsync(() -> "result" ).thenApply(f -> f + 2 ).join()); }
CompletableFuture和线程池说明
对计算速度进行选用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public static void main (String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(3 ); CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> { try { System.out.println("A come in" ); TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } return "playA" ; }, threadPool); CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> { try { System.out.println("B come in" ); TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } return "playB" ; }, threadPool); CompletableFuture<String> result = playA.applyToEither(playB, f -> { return f + " is winner" ; }); System.out.println(Thread.currentThread().getName() + "-----------winner:" + result.join()); }
对计算结果合并
两个CompletableStage任务都完成后,最终能把两个任务的结果一起交给thenCombine
来处理
先完成的等待其他分支任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public static void main (String[] args) { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " 启动" ); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } return 10 ; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " 启动" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } return 20 ; }); CompletableFuture<Integer> finalResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> { System.out.println("----------开始两个结果合并" ); return x + y; }); System.out.println(finalResult.join()); }
并行执行
allOf()
:当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
anyOf()
:只要有一个任务执行完成后就返回 future 并将第一个完成的参数带着一起返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public static void main (String[] args) throws Exception { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } return "f1" ; }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } return "f2" ; }); CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2).thenApply(x -> { System.out.println("all" ); return null ; }); CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2).thenApply(x -> { System.out.println(x); return x; });; System.out.println(System.currentTimeMillis() + ":阻塞" ); Void aVoid = all.get(); System.out.println(System.currentTimeMillis() + ":阻塞结束" ); System.out.println("任务均已完成。" ); System.out.println(any.get()); }