4 引入流

集合是Java中使用最多的API。几乎每个Java应用程序都会制造和处理集合。集合对于很多编程任务来说都是非常基本的:它们可以把数据分组并加以处 理。尽管集合对于几乎任何一个Java应用都是不可或缺的,但集合操作却远远算不上完美。

  • 很多业务逻辑都涉及类似于数据库的操作。
  • 在处理大量元素时为了提高性能,需要并行处理,并利用多核架构。 但写并行代码比用迭代器还要复杂,而且调试复杂。

4.1 流是什么?

流是 Java API 的新成员,它允许以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理。

下面两段代码都是用来返回低热量的菜肴名称的, 并按照卡路里排序。分别使用 Java 7 和 Java 8。

Java 7

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
List<Dish> lowCaloricDishes = new ArrayList<>();
for (Dish d : menu) {
if (d.getCalories() < 400) {
lowCaloricDishes.add(d);
}
}

Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
public int compare(Dish d1, Dish d2) {
return Integer.compare(d1.getCalories(), d2.getCalories());
}
});

List<String> lowCaloricDishesName = new ArrayList<>();
for(Dish d: lowCaloricDishes) {
lowCaloricDishesName.add(d.getName());
}

在上面的代码中,用了一个“垃圾变量” lowCaloricDishes。它唯一的作用就是作为一次性的中间容器。在 Java 8 中,实现的细节被放在它本该归属的库里了。

1
2
3
4
5
List<String> lowCaloricDishesName = menu.stream()
.filter(d -> d.getCalories() < 400)
.sorted(comparing(Dish::getCalories))
.map(Dish::getName)
.collect(toList());

为了利用多核架构并行执行这段代码,只需要把 stream() 换成 parallelStream()。

在调用 parallelStream 方法的时候到底发生了什么。用了多少个线程?对性能有多大提升?这些会在第 7 章详细讨论,目前看来上面的代码至少有以下几点好处。

  • 代码是以声明性方式写的。
  • 可以把几个基础操作链接起来,来表达复杂的数据处理流水线,同时保持代码清晰可读。

因为 filter、sorted、map 和 collect 等操作是与具体线程模型无关的高层次构件,所以它们的内部实现可以是单线程的,也可能透明地充分利用多核架构。这意味着用不着为了让某些数据处理任务并行而去操心线程和锁了,Stream API 都做好了!

新的 Stream API 表达能力非常强。在读完第4、5、6章之后,应该就可以写出像下面这样的代码:

1
Map<Dish.Type, List<Dish> dishesByType> = menu.stream().collect(groupingBy(Dish::getType));

简单来说就是,按照 Map 里面的类别对菜肴进行分组。比如,Map 可能包含下列结果:

1
2
3
{FISH=[prawns, salmon],
OTHER=[french fries, rice, season fruit, pizza],
MEAT=[pork, beef, chicken]}

其他库:Guava、Apache和lambdaj

为了给Java程序员提供更好的库操作集合,前人已经做过了很多尝试。比如,Guava就是谷歌创建的一个很流行的库。它提供了multimaps和multisets等额外的容器类。Apache Commons Collections库也提供了类似的功能。最后,本书作者Mario Fusco编写的lambdaj受到函数式编程的启发,也提供了很多声明性操作集合的工具。

在本章剩下的部分和下一章中,会使用这样一个例子:一个 menu,它只是一张菜肴列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Dish {

private final String name;
private final boolean vegetarian;
private final int calories;
private final Type type;

public static final List<Dish> menu =
Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT),
new Dish("beef", false, 700, Dish.Type.MEAT),
new Dish("chicken", false, 400, Dish.Type.MEAT),
new Dish("french fries", true, 530, Dish.Type.OTHER),
new Dish("rice", true, 350, Dish.Type.OTHER),
new Dish("season fruit", true, 120, Dish.Type.OTHER),
new Dish("pizza", true, 550, Dish.Type.OTHER),
new Dish("prawns", false, 400, Dish.Type.FISH),
new Dish("salmon", false, 450, Dish.Type.FISH));
}

4.2 流简介

Java 8 中的集合支持一个新的 stream 方法,它会返回一个流。那么,流到底是什么呢?简短的定义就是“从支持数据处理操作的源生成的元素序列”。

  • 元素序列—就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList)。但流的目的在于表达计算,比如你前面见到的 filter、sorted 和 map。集合讲的是数据,流讲的是计算
  • 源—流会使用一个提供数据的源,如集合、数组或输入/输出资源。从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。
  • 数据处理操作—流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如 filter、map、reduce、find、match、sort等。流操作可以顺序执行,也可并行执行。

此外,流操作有两个重要的特点。

  • 流水线:很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。流水线的操作可以看做是对数据源进行数据库式查询。
  • 内部迭代:与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。

先来看一个能体现所有这些概念的代码:

1
2
3
4
5
List<String> threeHighCaloricDishName = menu.stream() // 获得 流
.filter(d -> d.getCalories() > 300) // 建立操作流水线,首先选出高热量的菜肴。
.map(Dish::getName) // 获取菜名
.limit(3) // 只选择前三条
.collect(toList()); // 将结果收集为 List。

除 collect 之外,所有操作都会返回另一个流,它们接成一条流水线,于是就可以看作对源的一个查询。最后,collect 操作开始处理流水线,并返回结果。在调用 collect 之前,没有任何结果产生,实际上根本就没有从 menu 里选择元素。可以这么理解:链中的方法调用都在排队等待,直到调用 collect。

4.3 流与集合

粗略地说,集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构, 它包含数据结构中目前所有的值—集合中的每个元素都得先算出来才能添加到集合中。

相比之下,流则是在概念上固定的数据结构(不能添加或删除元素),其元素则是按需计算的。从另一个角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会计算值。与此相反,集合则是急切创建的。

4.3.1 只能遍历一次

和迭代器类似,流只能遍历一次。

1
2
3
4
List<String> title = Arrays.asList("Java8", "In", "Action");
Stream<String> s = title.stream();
s.forEach(System.out::println);
s.forEach(System.out::println); // java.lang.IllegalStateException:流已被操作或关闭

4.3.2 外部迭代与内部迭代

使用 Collection 接口需要用户去做迭代(比如用 for-each),这称为外部迭代。 相反, Streams 库使用内部迭代—它帮你把迭代做了,还把得到的流值存在了某个地方,你只要给出 一个函数说要干什么就可以了。

4.4 流操作

java.util.stream.Stream 中的 Stream 接口定义了许多操作。可以分为两大类:

  • filter、map 和 limit 可以连成一条流水线;
  • collect 触发流水线执行并关闭它。

可以连接起来的流操作称为中间操作,关闭流的操作称为终端操作。

4.4.1 中间操作

除非流水线上触发一个终端操作,否则中间操作不会执行任何处理。这是因为中间操作一般都可以合并起来,在终端操作时一次性全部处理。

在 4.2 节的代码中,有好几种优化利用了流的延迟性质。

  • 第一,尽管很多菜的热量都高于 300 卡路里,但只选出了前三个。这是因为 limit 操作和一种称为短路的技巧。
  • 第二,尽管 filter 和 map 是两个独立的操作,但它们合并到同一次遍历中了(循环合并)。

4.4.2 终端操作

终端操作会从流的流水线生成结果。其结果是任何不是流的值,比如 List、Integer,甚至 void。例如,在下面的流水线中,forEach 是一个返回 void 的终端操作,它会对源中的每道菜应用一个 Lambda。把 System.out.println 传递给 forEach,并要求它打印出由 menu 生成的流中的每一个 Dish。

1
menu.stream().forEach(System.out::println);

4.4.3 使用流

使用流一般包括三件事:

  • 一个数据源(如集合)来执行一个查询;
  • 一个中间操作链,形成一条流的流水线;
  • 一个终端操作,执行流水线,并能生成结果。

流的流水线背后的理念类似于构建器模式。在构建器模式中有一个调用链用来设置一套配置(对流来说这就是一个中间操作链),接着是调用 built 方法(对流来说就是终端操作)。

部分中间操作方法。

操作 返回类型 操作参数 函数描述符
filter Stream Predicate T -> boolean
map Stream Function<T, R> T -> R
limit Stream
sorted Stream Comparator (T, T) -> int
distinct Stream

终端操作部分方法

操作 目的
forEach 消费流中的每个元素并对其应用 Lambda。这一操作返回 void。
count 返回流中元素的个数。这一操作返回 long。
collect 把流归约成一个集合,比如 List、Map 甚至是 Integer。

4.5 小结

  • 流是“从支持数据处理操作的源生成的一系列元素”。
  • 流利用内部迭代:迭代通过 filter、map、sorted 等操作被抽象掉了。
  • 流操作有两类:中间操作和终端操作。
  • filter 和 map 等中间操作会返回一个流,并可以链接在一起。可以用它们来设置一条流水线,但并不会生成任何结果。
  • forEach 和 count 等终端操作会返回一个非流的值,并处理流水线以返回结果。
  • 流中的元素是按需计算的。

5 使用流

在接下来将会看到 Stream API 支持的许多操作。这些操作能快速完成复杂的数据查询,如筛选、切片、映射、查找、匹配和归约。还有一些特殊的流数值流、来 自文件和数组等多种来源的流,最后是无限流。

5.1 筛选和切片

在本节中,将看到如何选择流中的元素:用谓词筛选,筛选出各不相同的元素,忽略流中的头几个元素,或将流截短至指定长度。

5.1.1 用谓词筛选 filter

Streams 接口支持 filter 方法。该操作会接受一个谓词(一个返回 boolean 的函数)作为参数,并返回一个包括所有符合谓词的元素的流。

1
List<Dish> vegetarianMenu = menu.stream().filter(Dish::isVegetarian).collect(toList());

5.1.2 筛选各异的元素 distinct

distinct 方法,它会返回一个元素各异(根据流所生成元素的 hashCode 和 equals 方法实现)的流。

1
2
3
4
List<Integer> numbers = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
numbers.stream().filter(i -> i % 2 == 0)
.distinct()
.forEach(System.out::println);

5.1.3 截短流 limit

流支持 limit(n) 方法,该方法会返回一个不超过给定长度的流。所需的长度作为参数传递给 limit。如果流是有序的,则最多会返回前 n 个元素。

1
2
3
4
List<Dish> dishes = menu.stream()
.filter(d -> d.getCalories() > 300)
.limit(3)
.collect(toList());

limit 也可以用在无序流上,比如源是一个 Set 。这种情况下,limit 的结果不会以任何顺序排列。

5.1.4 跳过元素 skip

skip(n) 方法,返回一个扔掉了前 n 个元素的流。如果流中元素不足 n 个,则返回一个空流。limit(n)skip(n) 是互补的。

1
2
3
4
List<Dish> dishes = menu.stream()
.filter(d -> d.getCalories() > 300)
.skip(2)
.collect(toList());

5.2 映射 map

一个非常常见的数据处理套路就是从某些对象中选择信息。比如在 SQL 里,可以从表中选择一列。Stream API 也通过 mapflatMap 方法提供了类似的工具。

5.2.1 对流中每一个元素应用函数

流支持 map 方法,它接受一个函数作为参数。这个函数会被应用到每个元素上,并将其映射成一个新的元素。

1
2
3
List<String> dishNames = menu.stream()
.map(Dish::getName) // 提取每个元素的菜名
.collect(toList());

5.2.2 流的扁平化 flatMap

对于一张单词表,如何返回一张列表,列出里面各不相同的字符?例如,给定单词列表 [“Hello”,”World”],返回列表[“H”,”e”,”l”, “o”,”W”,”r”,”d”]。

可以把每个单词映射成一张字符表,然后调用 distinct 来过滤重复的字符。

1
2
3
4
words.stream()
.map(word -> word.split(""))
.distinct()
.collect(toList());

这个方法的问题在于,传递给 map 方法的 Lambda 为每个单词返回了一个 String[]。因此,map 返回的流实际上是 Stream<String[]> 类型的。而目标结果是用 Stream<String> 来表示一个字符流。

可以用 flatMap 来解决这个问题。

  1. 尝试使用 mapArrays.stream()

有一个叫作 Arrays.stream() 的方法可以接受一个数组并产生一个流。

1
2
3
4
words.stream().map(word -> word.split("")) // 将每个单词转换为由其字母构成的数组
.map(Arrays::stream) // 让每个数组变成一个单独的流
.distinct()
.collect(toList());

现在得到的是一个流的列表(更准确地说是 Stream<String>)。先是把每个单词转换成一个字母数组,然后把每个数组变成了一 个独立的流。

  1. 使用 flatMap
1
2
3
4
words.stream().map(word -> word.split("")) // 将每个单词转换为由其字母构成的数组
.flapMap(Arrays::stream) // 让各个生成流扁平化为单个流
.distinct()
.collect(toList());

使用 flatMap 方法的效果是,各个数组并不是分别映射成一个流,而是映射成流的内容。所有使用 map(Arrays::stream) 时生成的单个流都被合并起来,即扁平化为一个流。

一言以蔽之,flapMap 方法把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。

5.3 查找和匹配

另一个常见的数据处理套路是看看数据集中的某些元素是否匹配一个给定的属性。Stream API 通过 allMatch、anyMatch、noneMatch、findFirstfindAny 方法提供了这样的工具。

5.3.1 检查谓词是否至少匹配一个元素

anyMatch 方法可以回答“流中是否有一个元素能匹配给定的谓词”。anyMatch方法返回一个 boolean,因此 是一个终端操作。

1
2
3
if (menu.stream().anyMatch(Dish::isVegetarian)) {
System.out.println("The menu is (somewhat) vegetarian friendly!!");
}

5.3.2 检查谓词是否匹配所有元素

allMatchnoneMatch正好相反。

anyMatch、allMatchnoneMatch 这三个操作都用到了短路,这就是Java中 &&|| 运算符短路在流中的版本。

5.3.3 查找元素

findAny 方法将返回当前流中的任意元素。流水线将在后台进行优化使其只需走一遍,并在利用短路找到结果时立即结束。

1
2
Optional<Dish> dish = menu.stream().filter(Dish::isVegetarian)
.findAny();

Optional<T>java.util.Optional 是一个容器类,代表一个值存在或不存在。下面是 Optional 里面几种可以迫使显式地检查值是否存在或处理值不存在的情形的方法。

  • isPresent() 将在 Optional 包含值的时候返回 true, 否则返回 false。
  • ifPresent(Consumer<T> block) 会在值存在的时候执行给定的代码块。接收 T 类型参数,并返回 void 的 Lambda 表达式。
  • T get() 会在值存在时返回值,否则抛出一个 NoSuchElement 异常。
  • T orElse(T other) 会在值存在时返回值,否则返回一个默认值。
1
2
3
menu.stream().filter(Dish::isVegetarian)
.findAny() // 返回一个 Optional<Dish>
.ifPresent(d -> System.out.println(d.getName())); // 如果包含一个值就打印,否则什么都不做。

5.3.4 查找第一个元素

findFirst 工作方式类似于 findAny

何时使用findFirstfindAny

并行。找到第一个元素在并行上限制更多。如果不关心返回的元素是哪个,使用 findAny,因为它在使用并行流时限制较少。

5.4 归约

在此之前见到的终端操作都是返回一个 boolean(allMatch之类的)、void (forEach) 或 Optional 对象(findAny等)。或者是使用 collect 来将流中的所有元素组合成一个 List。

所谓归约,在这里就是指使用 reduce 操作来表达更复杂的查询。此类查询需要将流中所有元素反复结合起来,得到一个值,比如一个 Integer。

5.4.1 元素求和

没什么好说的,写几个例子算了。

1
2
3
4
5
6
7
8
9
10
11
12
// v1
int sum = 0;
for (int x : numbers) {
sum += x;
}
// v2
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
// v3
int sum = numbers.stream().reduce(0, Integer::sum);

// 无初始值的求和,会返回一个 Optional 对象。
Optional<Integer> sum = numbers.stream().reduce((a, b) -> (a + b));

5.4.2 最大值和最小值

reduce 接受两个参数:

  • 一个初始值;
  • 一个 Lambda 来把两个流元素结合起来并产生一个新值。
1
2
3
4
// 求最大值
Optional<Integer> max = numbers.stream().reduce(Integer::max);
// 求最小值,也可以写成 (x, y) -> x < y ? x : y
Optional<Integer> min = numbers.stream().reduce(Integer::min);

怎样用 map 和 reduce 方法数一数流中有多少个菜?

1
int count = menu.stream().map(d -> 1).reduce(0, (a, b) -> a + b);

map 和 reduce 的连接通常成为 map-reduce 模式,因 Google 用它来进行网络搜索而出名, 因为它很容易并行化。

传递给 reduce 的 Lambda 不能更改状态(如实例变量),而且操作必须满足结合律才可以按任意顺序执行。

![中间操作和终端操作](/Users/raymond/Library/Application Support/typora-user-images/image-20210130133213822.png)

5.5 付诸实践

5.5.1 领域:交易员和交易

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Data
public class Trader{
private String name;
private String city;
}
@Data
public class Transaction{
private Trader trader;
private int year;
private int value;
}

Trader raoul = new Trader("Raoul", "Cambridge");
Trader mario = new Trader("Mario","Milan");
Trader alan = new Trader("Alan","Cambridge");
Trader brian = new Trader("Brian","Cambridge");
List<Transaction> transactions = Arrays.asList(
new Transaction(brian, 2011, 300),
new Transaction(raoul, 2012, 1000),
new Transaction(raoul, 2011, 400),
new Transaction(mario, 2012, 710),
new Transaction(mario, 2012, 700),
new Transaction(alan, 2012, 950)
);
  • 找出2011年发生的所有交易,并按交易额排序(从低到高)。
1
2
3
4
List<Transaction> tr2011 = transactions.stream()
.filter(transaction -> transaction.getYear() == 2011)
.sorted(comparing(Transaction::getValue))
.collect(toList());
  • 交易员都在哪些不同的城市工作过?
1
2
3
4
List<String> cities = transactions.stream()
.map(transaction -> transaction.getTrader().getCity())
.distinct()
.collect(toList());
  • 查找所有来自于剑桥的交易员,并按姓名排序。
1
2
3
4
5
6
Set<String> cities = transactions.stream()
.map(Transaction::getTrader)
.filter(trader -> trader.getCity().equals("Cambridge"))
.distinct()
.sorted(comparing(Trader::getName))
.collect(toList());
  • 返回所有交易员的姓名字符串,按字母顺序排序。
1
2
3
4
5
String traderStr = transactions.stream()
.map(transaction -> transaction.getTrader().getName())
.distinct()
.sorted()
.reduce("", (n1, n2) -> n1 + n2);
  • 有没有交易员是在米兰工作的?
1
2
boolean milanBased = transactions.stream()
.anyMatch(transaction -> transaction.getTrader().getCity().equals("Milan"));
  • 打印生活在剑桥的交易员的所有交易额。
1
2
3
4
transactions.stream()
.filter(t -> "Cambridge".equals(t.getTrader().getCity()))
.map(Transaction::getValue)
.forEach(System.out::println);
  • 所有交易中,最高的交易额是多少?
1
2
3
Optional<Integer> highestValue = transactions.stream()
.map(Transaction::getValue)
.reduce(Integer::max);
  • 找到交易额最小的交易。
1
2
Optional<Transaction> smallestTransaction = transactions.stream()
.reduce((t1, t2) -> t1.getValue() < t2.getValue() ? t1 : t2);

5.6 数值流

使用 reduce 计算菜单的热量:

1
2
int calories = menu.stream().map(Dish::getCalories)
.reduce(0, Integer::sum);

上面这段代码的问题是,它有一个暗含的装箱成本。每个 Integer 都必须拆箱成一个原始类型,再进行求和。 Stream API 提供了原始类型流特化,专门支持处理数值流的方法。

5.6.1 原始类型流特化

Java 8 引入了三个原始类型特化流接口来解决这个问题:IntStream、DoubleStream 和 LongStream,分别将流中的元素特化为 int、long 和 double,从而避免了暗含的装箱成本。每个接口都带来了进行常用数值归约的新方法,比如对数值流求和的 sum,找到最大元素的 max。 此外还有在必要时再把它们转换回对象流的方法。

特化的原因并不在于流的复杂性,而是装箱造成的复杂性—即类似 int 和 Integer 之间的效率差异。

  1. 映射到数值流

将流转换为特化版本的常用方法是 mapToInt、mapToDouble 和 mapToLong。这些方法和前面说的 map 方法的工作方式一样,只是它们返回的是一个特化流,而不是 Stream<T>

1
2
int calories = menu.stream().mapToInt(Dish::getCalories)
.sum();

这里,mapToInt 会从每道菜中提取热量(用一个 Integer 表示),并返回一个 IntStream (而不是一个 Stream<Integer>)。接下来就可以调用 IntStream 接口中定义的 sum 方法。如果流是空的,sum 默认返回 0。IntStream 还支持其他的方便方法,如 max、min、average 等。

  1. 转换会对象流

IntStream 上的操作只能产生原始整数:IntStream 的 map 操作接受的 Lambda 必须接受 int 并返回 int。但如果想利用 int 生成 Dish 对象呢?要把原始流转换成一般流(每个 int 都会装箱成一个 Integer),可以使用 boxed 方法,如下所示:

1
2
IntStream intStream = menu.stream().mapToInt(Dish::getCalories);
Stream<Integer> stream = intStream.boxed();
  1. 默认值 OptionalInt

对于三种原始流特化,也分别有一个 Optional 原始类型特化版本:OptionalInt、OptionalDouble 和 OptionalLong。

要找到 IntStream 中的最大元素,可以调用 max 方法,它会返回一个 OptionalInt:

1
2
3
OptionalInt maxCalories = menu.stream()
.mapToInt(Dish::getCalories)
.max();

如果没有最大值,可以显式处理 OptionalInt 去定义一个默认值:

1
int max = maxCalories.orElse(1);

5.6.2 数值范围

Java 8 引入了两个可以用于 IntStream 和 LongStream 的静态方法,帮助生成范围或者区间内的所有数值: range 和 rangeClosed。这两个方法都是第一个参数接受起始值,第二个参数接受结束值。但 range 是不包含结束值的,而 rangeClosed 则包含结束值。

1
IntStream evenNumbers = IntStream.rangeClosed(1, 100) .filter(n -> n % 2 == 0);

5.6.3 生成勾股数流

1
2
3
4
Stream<double[]> pythagoreanTriples = IntStream().rangeClosed(1, 100).boxed()
.flatMap(a -> IntStream().rangeClosed(a, 100)
.mapToObj(b -> new double[]{a, b, Math.sqrt(a * a + b * b)})
.filter(t -> t[2] % 1 == 0));

5.7 构建流

介绍如何从值序列、数组、文件来创建流,甚至由生成函数来创建无限流!

5.7.1 由值创建流

可以使用静态方法 Stream.of,通过显式值创建一个流。它可以接受任意数量的参数。

1
2
3
Stream<String> stream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action");
// 使用 empty 得到一个空流
Stream<String> emptyStream = Stream.empty();

5.7.2 由数组创建流

使用静态方法 Arrays.stream 从数组创建一个流。它接受一个数组作为参数。

1
2
int[] numbers = {2, 3, 5, 7, 11, 13};
int sum = Arrays.stream(numbers).sum();

5.7.3 由文件生成流

Java 中用于处理文件等 I/O 操作的 NIO API(非阻塞 I/O)已更新,以便利用 Stream API。java.nio.file.Files 中的很多静态方法都会返回一个流。例如,方法 Files.lines,它会返回一个由指定文件中的各行构成的字符串流。

5.7.4 由函数生成流:创建无限流

Stream API 提供了两个静态方法来从函数生成流:Stream.iterateStream.generate。 这两个操作可以创建无限流:不像从固定集合创建的流那样有固定大小的流。由 iterate 和 generate 产生的流会用给定的函数按需创建值,因此可以无穷无尽地计算下去!一般来说, 应该使用 limit(n) 来对这种流加以限制。

  1. 迭代,生成斐波那契数列前 20 个值。
1
2
3
Stream.iterate(new int[]{0, 1}, t -> new int[]{ t[1], t[0] + t[1]})
.limit(20)
.forEach(t -> System.out.println("(" + t[0] + "," + t[1] +")"));

一般来说,在需要依次生成一系列值的时候应该使用 iterate

  1. 生成,generate 不是依次对每个新生成的值应用函数的。它接受一个 Supplier<T> 类型的 Lambda 提供新的值。
1
2
Stream.generate(Math::random).limit(5)
.forEach(System.out::println);

5.8 小结

  • Streams API 可以表达复杂的数据处理查询。
  • 可以使用 filter、distinct、skip 和 limit 对流做筛选和切片。
  • 可以使用 map 和 flatMap 提取或转换流中的元素。
  • 可以使用 findFirst 和 findAny 方法查找流中的元素。
  • 可以使用 allMatch、noneMatch 和 anyMatch 方法让流匹配给定的谓词。
  • 这些方法都利用了短路:找到结果就立即停止计算;没有必要处理整个流。
  • 可以利用 reduce 方法将流中所有的元素迭代合并成一个结果,例如求和或查找最大元素。
  • filter 和 map 等操作是无状态的,它们并不存储任何状态。reduce 等操作要存储状态才能计算出一个值。 sorted 和 distinct 等操作也要存储状态,因为它们需要把流中的所有元素缓存起来才能返回一个新的流。这种操作称为有状态操作。
  • 流有三种基本的原始类型特化:IntStream、DoubleStream 和 LongStream。它们的操作也有相应的特化
  • 流不仅可以从集合创建,也可从值、数组、文件以及 iterate 与 generate 等特定方法创建。
  • 无限流是没有固定大小的流。

6 用流收集数据

Java 8 的流支持两种类型的操作:中间操作(如 filter 或 map)和终端操作(如count、findFirst、forEach和reduce)。

  • 中间操作可以链接起来,将一个流转换为另一个流。这些操作不会消耗流,其目的是建立一个流水线。
  • 与此相反,终端操作会消耗流,以产生一个最终结果,例如返回流中的最大元素。它们通常可以通过优化流水线来缩短计算时间。

6.1 收集器简介

1
Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().collect(groupingBy(Transaction::getCurrency));

groupingBy 说的是“生成一个 Map,它的键是(货币)桶,值则是桶中那些元素的列表”。

6.1.1 收集器用作高级归约

对流调用 collect 方法将对流中的元素触发一个归约操作(由Collector来参数化)。

般来说,Collector 会对元素应用一个转换函数(很多时候是不体现任何效果的恒等转换, 例如toList),并将结果累积在一个数据结构中,从而产生这一过程的最终输出。

Collectors 实用类提供了很多静态工厂方法, 可以方便地创建常见收集器的实例。

1
List<Transaction> transactions = transactionStream.collect(Collectors.toList());

6.1.2 预定义收集器

系统中预定义的收集器主要提供三大功能:

  • 将流元素归约和汇总为一个值
  • 元素分组
  • 元素分区

6.2 归约和汇总

6.2.1 查找流中的最大值和最小值

Collectors.maxByCollectors.minBy

6.2.2 汇总

Collectors 类专门为汇总提供了一个工厂方法:Collectors.summingInt。它可接受一个把对象映射为求和所需 int 的函数,并返回一个收集;该收集器在传递给普通的 collect 方法后即执行所需要的汇总操作。

1
int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));

Collectors.summingLongCollectors.summingDouble 方法的作用完全一样,可以用 于求和字段为 long 或 double 的情况。

但汇总不仅仅是求和;还有 Collectors.averagingInt,连同对应 的averagingLongaveragingDouble 可以计算数值的平均数。

很多时候,可能想要得到两个或更多这样的结果,而且希望只需一次操作就可以完成。在这种情况下,可以使用 summarizingInt 工厂方法返回的收集器。

1
IntSummaryStatistics menuStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));

这个收集器会把所有这些信息收集到一个叫作 IntSummaryStatistics 的类里,它提供了方便的取值(getter)方法来访问结果。

1
IntSummaryStatistics {count=9, sum=4300, min=120, average=477.777778, max=800}

同样,相应的 summarizingLongsummarizingDouble 工厂方法有相关的 LongSummaryStatisticsDoubleSummaryStatistics 类型,适用于收集的属性是原始类型 longdouble 的情况。

6.2.3 连接字符串

joining 工厂方法返回的收集器会把对流中每一个对象应用 toString 方法得到的所有字符串连接成一个字符串。

1
String shortMenu = menu.stream().map(Dish::getName).collect(joining());

joining 在内部使用了 StringBuilder 来把生成的字符串逐个追加起来。如果 Dish 类有一个 toString 方法来返回菜肴的名称,无需用提取每一道菜名称的函数来对原流做映射就能够得到相同的结果:

1
String shortMenu = menu.stream().collect(joining());

joining 工厂方法有一个重载版本可以接受元素之间的分界符。

1
String shortMenu = menu.stream().map(Dish::getName).collect(joining(", "));

6.2.4 广义归约汇总

之前讨论的所有收集器,都是一个可以用 reducing 工厂方法定义的归约过程 的特殊情况。Collectors.reducing 工厂方法是所有这些特殊情况的一般化。

reduce 方法旨在把两个值结合起来生成一个新值,它是一个不可变的归约。与此相反,collect 方法的设计就是要改变容器,从而累积要输出的结果。

1
int totalCalories = menu.stream().collect(reducing(0, Dish::getCalories, Integer::sum));

reducing 三个参数:初始值、转换函数、累积函数。

6.3 分组

一个常见的数据库操作是根据一个或多个属性对集合中的项目进行分组。

假设要把菜单中的菜按照类型进行分类, 有肉的放一组,有鱼的放一组,其他的都放另一组。用 Collectors.groupingBy 工厂方法返回的收集器可以轻松地完成这项任务。

1
Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));

分组操作的结果是一个 Map,把分组函数返回的值作为映射的键,把流中所有具有这个分类值的项目的列表作为对应的映射值。

6.3.1 多级分组

要实现多级分组,可以使用一个由双参数版本的 Collectors.groupingBy 工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受 collector 类型的第二个参数。要进行二级分组的话,可以把一个内层 groupingBy 传递给外层 groupingBy,并定义一个为流中项目分类的二级标准。

1
2
3
4
5
6
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = menu.stream().collect(groupingBy(Dish::getType, 
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}) ));

二级分组结果如下

1
2
3
4
5
{
MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]},
FISH={DIET=[prawns], NORMAL=[salmon]},
OTHER={DIET=[rice, seasonal fruit], NORMAL=[french fries, pizza]}
}

一般来说,把 groupingBy 看作“桶”比较容易明白。第一个 groupingBy 给每个键建立 一个桶。然后再用下游的收集器去收集每个桶中的元素,以此得到 n 级分组。

n层嵌套映射和n维分类表之间的等价关系

6.3.2 按子组收集数据

从上一节例子得知,可以把第二个 groupingBy 收集器传递给外层收集器来实现多级分组。但进一步说,传递给第一个 groupingBy 的第二个收集器可以是任何类型,而不一定是另一个 groupingBy。例如,要数一数菜单中每类菜有多少个,可以传递 counting 收集器作为 groupingBy 收集器的第二个参数。

1
Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));

普通的单参数 groupingBy(f)实际上是 groupingBy(f, toList()) 的简便写法。

  1. 把收集器的结果转换为另一种类型(Collectors.collectingAndThen
1
Map<Dish.Type, Dish> mostCalorcByType = menu.stream().collect(groupingBy(Dish::getType, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get)));

Optional::get 操作在这里是安全的,因为 reducing 收集器永远都不会返回 Optional.empty()。

  1. groupingBy 联合使用的其他收集器的例子

一般来说,通过 groupingBy 工厂方法的第二个参数传递的收集器将会对分到同一组中的所有流元素执行进一步归约操作。

常常和 groupingBy 联合使用的另一个收集器是 mapping 方法生成的。这个方法接受两个参数:一个函数对流中的元素做变换,另一个则将变换的结果对象收集起来。其目的是在累加之前对每个输入元素应用一个映射函数,这样就可以让接受特定类型元素的收集器适应不同类型的对象。

1
2
3
4
5
6
7
8
9
10
11
Map<Dish.Type, Set<CaloricLevel>> caloricLevelByType = menu.stream().collect(
groupingBy(Dish::getType, mapping(
dish -> {
if (dish.getCalories() <= 400)
return CaloricLevel.DIET;
else if (dish.getCalories() <= 700)
return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}, toSet()
))
);

6.4 分区

分区是分组的特殊情况:由一个谓词作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组 Map 的键类型是 Boolean,于是它最多可以分为两组—true 是一组,false 是一组。

1
2
3
4
5
// 将菜单按是否是素食分开
Map<Boolean, List<Dish>> partitiondMenu = menu.stream()
.collect(partitioningBy(Dish::isVegetarian));
// {false=[pork, beef, chicken, prawns, salmon],
// true=[french fries, rice, season fruit, pizza]}

6.4.1 分区的优势

分区的好处在于保留了分区函数返回 truefalse 的两套流元素列表,而避免使用两次 filter 进行筛选。partitioningBy 工厂方法有一个重载版本,可以传递第二个收集器:

1
2
3
4
Map<Boolean, Map<Dish.Type, List<Dish>>> vegetarianByType = menu.stream()
.collect(partitioningBy(Dish::isVegetarian, groupingBy(Dish::getType)));
// {false={FISH=[prawns, salmon], MEAT=[pork, beef, chicken]},
// true={OTHER=[french fries, rice, season fruit, pizza]}}

6.4.2 Collectors类的静态工厂方法

工厂方法 返回类型 用途
toList List 把流中所有项目收集到一个 List
toSet Set 把流中所有项目收集到一个 Set,删除重复项
toCollection Collection 把流中所有项目收集到给定的供应源创建的集合
counting Long 计算流中元素的个数
summingInt Integer 对流中项目的一个整数属性求和
averagingInt Double 计算流中项目 Integer 属性的平均值
summarizingInt IntSummaryStatistics 收集关于流中项目 Integer 属性的统计值,例如最大、最小、总和与平均值
joining String 连接对流中每个项目调用 toString 方法所生成的字符串
maxBy Optionnal 一个包裹了流中按照给定比较器选出的最大元素的 Optional,或如果流为空则为 Optional.empty()
minBy Optional 一个包裹了流中按照给定比较器选出的最小元素的 Optional,或如果流为空则为 Optional.empty()
reducing 归约操作产生的类型 从一个作为累加器的初始值开始,利用 BinaryOperator 与流 中的元素逐个结合,从而将流归约为单个值
collectingAndThen 转换函数返回类型 包裹另一个收集器,对其结果应用转换函数
groupingBy Map<K, List> 根据项目的一个属性的值对流中的项目作问组,并将属性值作 为结果 Map 的键
partitioningBy Map<Boolean, List> 根据对流中每个项目应用谓词的结果来对项目进行分区

6.5 收集器接口

Collector 接口的定义,它列出了接口的签名以及声明的五个方法。

1
2
3
4
5
6
7
8
9
10
// T 是流中要收集的项目的泛型
// A 是累加器的类型,累加器是在收集过程中用于累积部分结果的对象。
// R 是收集操作得到的对象(通常但并不一定是集合)的类型。
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
Function<A, R> finisher();
BinaryOperator<A> combiner();
Set<Characteristics> characteristics();
}

如果要实现一个 ToListCollector<T> 类,将 Stream<T> 中的所有元素收集到一个 List<T> 里,它的签名应该是:

1
public class ToListCollector<T> implements Collector<T, List<T>, List<T>>

6.5.1 理解Collector接口声明的方法

一个个分析 Collector 接口声明的五个方法。

6.5.2 全部融合到一起

6.6 开发自定义收集器改善性能

6.7 小结

  • collect 是一个终端操作,它接受的参数是将流中元素累积到汇总结果的各种方式(称为收集器)。
  • 预定义收集器包括将流元素归约和汇总到一个值,例如计算最小值、最大值或平均值。
  • 预定义收集器可以用 groupingBy 对流中元素进行分组,或用 partitioningBy 进行分区。
  • 收集器可以进行高效复合,进行多级分组、分区和归约。
  • 可以实现 Collector 接口中自定义收集器。

7 并行数据处理与性能

在 Java 7 之前,并行处理数据集合非常麻烦。

  • 第一,需要明确地把包含数据的数据结分成若干子部分。
  • 第二,要给每个子部分分配一个独立的线程。
  • 第三,需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。

Java 7 引入了一个叫作 分支/合并 的框架,Stream 则在幕后利用到了 Fork/Join 框架。

7.1 并行流

并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流,自动把给定操作的工作负荷分配给多核处理器的所有内核。

7.1.1 将顺序流转换为并行流

对顺序流调用 parallel 方法:

1
2
3
4
5
6
7
// 求 1 ~ n 之和
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}

同一个归纳操作会将各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果。

![并行归纳操作](/Users/raymond/Library/Application Support/typora-user-images/image-20210202153937167.png)

对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个 boolean 标志,表示想让调用 parallel 之后进行的所有操作都并行执行。只需要对并行流调用 sequential 方法就可以把它变成顺序流。

配置并行流使用的线程池

并行流内部使用了默认的 ForkJoinPool,它默认的线程数量就是处理器数量,这个值可以由 Runtime.getRuntime().availableProcessors() 得到的。

可 以 通 过 系 统 属 性 java.util.concurrent.ForkJoinPool.common. parallelism 来改变线程池大小。

这是全局设置,因此将影响代码中所有的并行流。目前还无法专为某个并行流指定这个值。一般而言,让 ForkJoinPool 的大小等于处理器数量。

1
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

7.1.2 测量流性能

软件工程可不是靠猜靠想靠说的学科,在优化性能时,应该始终遵循三个黄金规则:测量,测量,再测量。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 测量对前 n 个自然数求和的函数的性能
public long measureSumPerf(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime() - start) / 1000000;
System.out.println("result: " + sum);
if (duration < fastest)
fastest = duration;
}
return fastest;
}

iterate 生成的流不易分块,并且生成的是装箱的对象,必须拆箱成数字才能求和,所以效果是反直觉的。高效的求和方法:

1
2
3
4
5
public static long parallelRangedSum(long n) {
return LongStream.rangeClosed(1, n)
.parallel()
.reduce(0L, Long::sum);
}

并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能很大,所以要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。

7.1.3 正确使用并行流

错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。一些在本质上就是顺序执行的代码使用 parallel 是没有意义的。

7.1.4 高效使用并行流

  • 并行流并不总是比顺序流快,在考虑选择顺序流还是并行流时,最重要的考虑就是用适当的基准来检查其性能。
  • 留意装箱。
  • 有些操作本身在并行流上的性能就比顺序流差。特别是 limitfindFirst 等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。
  • 要考虑流的操作流水线的总计算成本。
  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
  • 要考虑流背后的数据结构是否易于分解。ArrayList 的拆分效率比 LinkedList 高得多,用 range 工厂方法创建的原始类型流也可以快速分解。
  • 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。
  • 要考虑终端操作中合并步骤的代价是大是小。

7.2 分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool)中的工作线程。

7.2.1 使用RecursiveTask

要把任务提交到这个池,必须创建 RecursiveTask<R> 的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction 类型。要定义 RecursiveTask,需实现抽象方法 compute

1
protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。伪代码类似于:

1
2
3
4
5
6
7
if (任务足够小或不可分) { 
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}

接下来使用 RecursiveTask 实现并行对前 n 个自然数求和加法计算器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
// 要求和的数组
private final long[] numbers;
// 子任务处理的数组的起始和终止位置
private final int start, end;
// 不再将任务分解为子任务的数组大小
private static final long THRESHOLD = 1_0000;

private ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}

public ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
int length = end - start;
if (length < THRESHOLD)
return computeSequentially();
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}

private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++)
sum += numbers[i];
return sum;
}

public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinSumCalculator task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
}

在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用。

7.2.2 使用分支/合并框架的最佳做法

  • 对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。
  • 不应该在 RecursiveTask 内部使用 ForkJoinPoolinvoke 方法。应该始终直接调用 computefork 方法,只有顺序代码才应该用 invoke 来启动并行计算。
  • 对子任务调用 fork 方法可以把它排进 ForkJoinPool
  • 调试使用分支/合并框架的并行计算可能有点棘手。因为调用 compute 的线程并不是概念上的调用方,后者是调用 fork 的那个。
  • 和并行流一样,不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。一个惯用方法是把输入/输出放在一个子任务里,计算放在另一个子任务里。

7.2.3 工作窃取

理想情况下,划分并行任务时, 应该让每个任务都用完全相同的时间完成,让所有的 CPU 内核都同样繁忙。实际中每个子任务所花的时间可能天差地别。

分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这些任务差不多被平均分配到 ForkJoinPool 中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。

因为某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这个线程不用闲下来,而是随机选一个别的线程,从队 列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。用于在池中的工作线程之间重新分配和平衡任务。

7.3 Spliterator

SpliteratorJava 8 中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和 Iterator 一样,Spliterator 也用于遍历数据源中的元素,但它是为了并行执行而设计的。在实践中基本上用不到自己开发 Spliterator

1
2
3
4
5
6
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimatedSize();
int characteristics();
}

评论