首页
首页 教程 枫叶助手
  • 首页
  • 教程
  • 登录
登录枫叶社区畅享更多权益

用户名密码登录

其他登录:
icon_GitHubCreated with sketchtool.
其他登录敬请期待

微信扫码/长按识别登录

输入验证码
有效期五分钟 👉 手动刷新

登录即同意 用户协议 和 隐私政策

1
8个经典谬误
更新时间: 2022年10月08日
限免
2
使用ForkJoin
更新时间: 2024年11月30日
限免
3
使用CompletableFuture
更新时间: 2024年11月30日
限免
关注公众号
原创
使用CompletableFuture

使用CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

我们以获取股票价格为例,看看如何使用CompletableFuture:

import java.util.concurrent.CompletableFuture;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        // 1、创建异步任务
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
        // 2、异步任务执行成功
        cf.thenAccept((result)->{
            System.out.println("price: "+ result);
        });
        // 2、异步任务执行失败
        cf.exceptionally((e)->{
            e.printStackTrace();
            return null;
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
        Thread.sleep(200);
    }
    
    static Double fetchPrice() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        if (Math.random() < 0.3) {
            throw new RuntimeException("fetch price failed");
        }
        return 5 + Math.random() * 20;
    }
}

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个Supplier接口的对象:

@FunctionalInterface
public interface Supplier<T> {
    T get();
}

这里我们用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice静态方法的签名符合Supplier接口的定义(除了方法名外)。

紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,ComplatableFuture会调用Consumer对象:

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

异常时,CompletableFuture会调用Function对象:

public interface Function<T, R> {
    R apply(T t);
}

这里我们都用lambda语法简化了代码。

可见CompletableFuture的优点是:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行。

如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个ComplatableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 第一个任务:
        CompletableFuture<String> cfQueury = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油");
        });
        // cfQuery成功后继续执行下一个任务
        CompletableFuture<Double> cfFetch = cfQueury.thenApplyAsync((code) -> {
            return fetchPrice(code);
        });
        cfFetch.thenAccept((result)->{
            System.out.println("price: " + result);
        });
        // 主线程不要立刻结束,否则否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(300);
    }


    static String queryCode(String name) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "601857";
    }

    static Double fetchPrice(String code) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return 5 + Math.random() * 20;
    }
}

除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们可以考虑这样的场景:

同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油", "https://finance.sina.com.cn/code/");
        });
        CompletableFuture<String> cfQueuryFrom163 = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油", "https://money.163.com/code/");
        });

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueuryFrom163);

        // 两个CompletableFuture执行异步查询:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
        });

        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://money.163.com/price/");
        });

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFrom163, cfFetchFromSina);

        // 最终结果:
        cfFetch.thenAccept((result)->{
            System.out.println("price: " + result);
        });

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(200);

    }


    static String queryCode(String name, String url) {
        System.out.println("query code from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "601857";
    }

    static Double fetchPrice(String code, String url) {
        System.out.println("query price from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return 5 + Math.random() * 20;
    }

}

上述逻辑实现的异步查询规则实际上是:

流程

小结:

CompletableFuture可以指定异步处理流程:

  • thenAccept()处理正常结果;
  • execeptional()处理异常结果;
  • thenApplyAsync用于串行化另一个CompletableFuture;
  • anyOf()和allOf()用于并行化多个CompletableFuture
删除提醒

确定删除《使用CompletableFuture》吗

1人已点赞

回复