新聞中心
上一篇我們講了Future機(jī)制,有興趣的可以參考談?wù)凢uture、Callable、FutureTask關(guān)系

成都創(chuàng)新互聯(lián)公司專(zhuān)注于普洱網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供普洱營(yíng)銷(xiāo)型網(wǎng)站建設(shè),普洱網(wǎng)站制作、普洱網(wǎng)頁(yè)設(shè)計(jì)、普洱網(wǎng)站官網(wǎng)定制、微信平臺(tái)小程序開(kāi)發(fā)服務(wù),打造普洱網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供普洱網(wǎng)站排名全網(wǎng)營(yíng)銷(xiāo)落地服務(wù)。
但Future機(jī)制,還不那么靈活,比如怎么去利用Future機(jī)制描述兩個(gè)任務(wù)串行執(zhí)行,又或是兩個(gè)任務(wù)并行執(zhí)行,又或是只關(guān)心最先執(zhí)行結(jié)束的任務(wù)結(jié)果。
Future機(jī)制在一定程度上都無(wú)法快速地滿(mǎn)足以上需求,CompletableFuture便應(yīng)運(yùn)而生了。
本片會(huì)介紹CompletableFuture的api,并用一些示例演示如何去使用。
1. 創(chuàng)建一個(gè)異步任務(wù)
- public static CompletableFuture supplyAsync(Supplier supplier)
- public static CompletableFuture supplyAsync(Supplier supplier,Executor executor);
- public static CompletableFuture
runAsync(Runnable runnable); - public static CompletableFuture
runAsync(Runnable runnable,Executor executor);
supplyAsync與runAsync的區(qū)別在于:supplyAsync有返回值,而runAsync沒(méi)有返回值
帶Executor參數(shù)的構(gòu)造函數(shù),則使用線程池中的線程執(zhí)行異步任務(wù)(線程池可以參考說(shuō)說(shuō)線程池)
不帶Executor參數(shù)的構(gòu)造函數(shù),則使用ForkJoinPool.commonPool()中的線程執(zhí)行異步任務(wù)(Fork/Join框架可以參考談?wù)劜⑿辛鱬arallelStream)
1.1 示例:使用supplyAsync創(chuàng)建一個(gè)有返回值的異步任務(wù)
- public class Case1 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture=CompletableFuture.supplyAsync(()->{ - try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 1;
- });
- //該方法會(huì)一直阻塞
- Integer result = completableFuture.get();
- System.out.println(result);
- }
- }
2. 異步任務(wù)的回調(diào)
- public CompletableFuture
whenComplete(BiConsumer super T, ? super Throwable> action); - public CompletableFuture
whenCompleteAsync(BiConsumer super T, ? super Throwable> action); - public CompletableFuture
whenCompleteAsync(BiConsumer super T, ? super Throwable> action, Executor executor); - public CompletableFuture
exceptionally(Function fn);
whenComplete開(kāi)頭的方法在計(jì)算任務(wù)完成(包括正常完成與出現(xiàn)異常)之后會(huì)回調(diào)
而exceptionally則只會(huì)在計(jì)算任務(wù)出現(xiàn)異常時(shí)才會(huì)被回調(diào)
如何確定哪個(gè)線程去回調(diào)whenComplete,比較復(fù)雜,先略過(guò)。
而回調(diào)whenCompleteAsync的線程比較簡(jiǎn)單,隨便拿一個(gè)空閑的線程即可,后綴是Async的方法同理。
2.1 示例:計(jì)算出現(xiàn)異常,使用whenComplete與exceptionally進(jìn)行處理
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.function.BiConsumer;
- import java.util.function.Function;
- import java.util.stream.IntStream;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case2 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("執(zhí)行supplyAsync的線程:" + Thread.currentThread().getName());
- int i = 1 / 0;
- return 1;
- });
- completableFuture.whenComplete(new BiConsumer
() { - @Override
- public void accept(Integer integer, Throwable throwable) {
- System.out.println("執(zhí)行whenComplete的線程:" + Thread.currentThread().getName());
- if (throwable == null) {
- System.out.println("計(jì)算未出現(xiàn)異常,結(jié)果:" + integer);
- }
- }
- });
- completableFuture.exceptionally(new Function
() { - @Override
- public Integer apply(Throwable throwable) {
- //出現(xiàn)異常時(shí),則返回一個(gè)默認(rèn)值
- System.out.println("計(jì)算出現(xiàn)異常,信息:" + throwable.getMessage());
- return -1;
- }
- });
- System.out.println(completableFuture.get());
- }
- }
輸出:
當(dāng)然,CompletableFuture內(nèi)的各種方法是支持鏈?zhǔn)秸{(diào)用與Lambda表達(dá)式的,我們進(jìn)行如下改寫(xiě):
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("執(zhí)行supplyAsync的線程:" + Thread.currentThread().getName());
- int i = 1 / 0;
- return 1;
- }).whenComplete((integer, throwable) -> {
- System.out.println("執(zhí)行whenComplete的線程:" + Thread.currentThread().getName());
- if (throwable == null) {
- System.out.println("計(jì)算未出現(xiàn)異常,結(jié)果:" + integer);
- }
- }).exceptionally(throwable -> {
- //出現(xiàn)異常時(shí),則返回一個(gè)默認(rèn)值
- System.out.println("計(jì)算出現(xiàn)異常,信息:" + throwable.getMessage());
- return -1;
- });
- System.out.println("計(jì)算結(jié)果:" + completableFuture.get());
- }
3. 任務(wù)串行化執(zhí)行
- public CompletableFuture thenApply(Function super T,? extends U> fn);
- public CompletableFuture
thenRun(Runnable action); - public CompletableFuture
thenAccept(Consumer super T> action); - public CompletableFuture handle(BiFunction super T, Throwable, ? extends U> fn);
- public CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn);
thenApply,依賴(lài)上一次任務(wù)執(zhí)行的結(jié)果,參數(shù)中的Function super T,? extends U>,T代表上一次任務(wù)返回值的類(lèi)型,U代表當(dāng)前任務(wù)返回值的類(lèi)型,當(dāng)上一個(gè)任務(wù)沒(méi)有出現(xiàn)異常時(shí),thenApply才會(huì)被調(diào)用
thenRun,不需要知道上一個(gè)任務(wù)的返回結(jié)果,只是在上一個(gè)任務(wù)執(zhí)行完成之后開(kāi)始執(zhí)行Runnable
thenAccept,依賴(lài)上一次任務(wù)的執(zhí)行結(jié)果,因?yàn)槿雲(yún)⑹荂onsumer,所以不返回任何值。
handle和thenApply相似,不過(guò)當(dāng)上一個(gè)任務(wù)出現(xiàn)異常時(shí),能夠執(zhí)行handle,卻不會(huì)去執(zhí)行thenApply
thenCompose,傳入一次任務(wù)執(zhí)行的結(jié)果,返回一個(gè)新的CompleteableFuture對(duì)象
3.1 示例:使用串行化任務(wù)分解兩數(shù)相乘并輸出
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case4 {
- public static void main(String[] args) {
- CompletableFuture.supplyAsync(() -> 2)
- .thenApply(num -> num * 3)
- .thenAccept(System.out::print);
- }
- }
很顯然,輸出為6
3.2 示例:使用串行化任務(wù)并且模擬出現(xiàn)異常
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- import java.util.function.BiFunction;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case4 {
- public static void main(String[] args) {
- CompletableFuture.supplyAsync(() -> 2)
- .thenApply(num -> num / 0)
- .thenApply(result -> result * 3)
- .handle((integer, throwable) -> {
- if (throwable == null) {
- return integer;
- } else {
- throwable.printStackTrace();
- return -1;
- }
- }).thenAccept(System.out::print);
- }
- }
最終會(huì)輸出-1
4. 任務(wù)同時(shí)執(zhí)行,且都需要執(zhí)行完成
- public CompletableFuture
thenCombine(CompletionStage extends U> other, - Function super T,? super U,? extends V> fn);
- public CompletableFuture
thenAcceptBoth(CompletionStage extends U> other, - Consumer super T, ? super U> action);
- public CompletableFuture
runAfterBoth(CompletionStage> other,Runnable action); - public static CompletableFuture
allOf(CompletableFuture>... cfs);
thenCombine,合并兩個(gè)任務(wù),兩個(gè)任務(wù)可以同時(shí)執(zhí)行,都執(zhí)行成功后,執(zhí)行最后的BiFunction操作。其中T代表第一個(gè)任務(wù)的執(zhí)行結(jié)果類(lèi)型,U代表第二個(gè)任務(wù)的執(zhí)行結(jié)果類(lèi)型,V代表合并的結(jié)果類(lèi)型
thenAcceptBoth,和thenCombine特性用法都極其相似,唯一的區(qū)別在于thenAcceptBoth進(jìn)行一個(gè)消費(fèi),沒(méi)有返回值
runAfterBoth,兩個(gè)任務(wù)都執(zhí)行完成后,但不關(guān)心他們的返回結(jié)構(gòu),然后去執(zhí)行一個(gè)Runnable。
allOf,當(dāng)所有的任務(wù)都執(zhí)行完成后,返回一個(gè)CompletableFuture
4.1 示例:使用thenCombine合并任務(wù)
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case5 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
cf1 = CompletableFuture.supplyAsync(() -> { - System.out.println("任務(wù)1開(kāi)始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)1結(jié)束");
- return 2;
- });
- CompletableFuture
cf2 = CompletableFuture.supplyAsync(() -> { - System.out.println("任務(wù)2開(kāi)始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)2結(jié)束");
- return 3;
- });
- CompletableFuture
completableFuture = cf1.thenCombine(cf2, (result1, result2) -> result1 * result2); - System.out.println("計(jì)算結(jié)果:" + completableFuture.get());
- }
- }
輸出:
可以看到兩個(gè)任務(wù)確實(shí)是同時(shí)執(zhí)行的
當(dāng)然,熟練了之后,直接使用鏈?zhǔn)讲僮鳎a如下:
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case6 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - System.out.println("任務(wù)1開(kāi)始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)1結(jié)束");
- return 2;
- }).thenCombine(CompletableFuture.supplyAsync(() -> {
- System.out.println("任務(wù)2開(kāi)始");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)2結(jié)束");
- return 3;
- }), (result1, result2) -> result1 * result2);
- System.out.println("計(jì)算結(jié)果:" + completableFuture.get());
- }
- }
5. 任務(wù)同時(shí)執(zhí)行,且只取最先完成的那個(gè)任務(wù)
- public CompletableFuture applyToEither(CompletionStage extends T> other, Function super T, U> fn);
- public CompletableFuture
acceptEither(CompletionStage extends T> other, Consumer super T> action); - public CompletableFuture
runAfterEither(CompletionStage> other,Runnable action); - public static CompletableFuture
applyToEither,最新執(zhí)行完任務(wù),將其結(jié)果執(zhí)行Function操作,其中T是最先執(zhí)行完的任務(wù)結(jié)果類(lèi)型,U是最后輸出的類(lèi)型
acceptEither,最新執(zhí)行完的任務(wù),將其結(jié)果執(zhí)行消費(fèi)操作
runAfterEither,任意一個(gè)任務(wù)執(zhí)行完成之后,執(zhí)行Runnable操作
anyOf,多個(gè)任務(wù)中,返回最先執(zhí)行完成的CompletableFuture
5.1 示例:兩個(gè)任務(wù)同時(shí)執(zhí)行,打印最先完成的任務(wù)的結(jié)果
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case7 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - System.out.println("任務(wù)1開(kāi)始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)1結(jié)束");
- return 2;
- }).acceptEither(CompletableFuture.supplyAsync(() -> {
- System.out.println("任務(wù)2開(kāi)始");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)2結(jié)束");
- return 3;
- }), result -> System.out.println(result));
- //等待CompletableFuture返回,防止主線程退出
- completableFuture.join();
- }
- }
輸出:
可以看得到,任務(wù)2結(jié)束后,直接不再執(zhí)行任務(wù)1的剩余代碼
5.2 示例:多個(gè)任務(wù)同時(shí)執(zhí)行,打印最先完成的任務(wù)的結(jié)果
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case8 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
cf1 = CompletableFuture.supplyAsync(() -> { - System.out.println("任務(wù)1開(kāi)始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)1結(jié)束");
- return 2;
- });
- CompletableFuture
cf2 = CompletableFuture.supplyAsync(() -> { - System.out.println("任務(wù)2開(kāi)始");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)2結(jié)束");
- return 3;
- });
- CompletableFuture
cf3 = CompletableFuture.supplyAsync(() -> { - System.out.println("任務(wù)3開(kāi)始");
- try {
- Thread.sleep(4000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任務(wù)3結(jié)束");
- return 4;
- });
- CompletableFuture
- System.out.println(firstCf.get());
- }
- }
輸出:
名稱(chēng)欄目:什么,你還不會(huì)用CompletableFuture?
網(wǎng)站路徑:http://www.fisionsoft.com.cn/article/dghhihg.html


咨詢(xún)
建站咨詢(xún)
