Java8-流收集数据

collect是一个归约操作,就像reduce一样可以接受各种做法作为参数,将流中的元素累积成一个汇总结果。这里主要学习一下用Collectors类创建和使用收集器,将数据流归约为一个值,汇总:归约的特殊情况,数据分组和分区,开发自己的自定义收集器


收集器简介

  • 函数式编程相对于指令式编程的一个主要优势:你只需指出希望的结果——“做什么”,而不用操心执行的步骤——“如何做”。
  • Java 8的流支持两种类型的操作:中间操作(如filter或map)和终端操作(如count、findFirst、forEach和reduce)。中间操作可以链接起来,将一个流转换为另一个流。这些操作不会消耗流,其目的是建立一个流水线。与此相反,终端操作会消耗流,以产生一个最终结果
  • 传递给collect方法的参数是Collector接口的一个实现,也就是给Stream中元素做汇总的方法。toList只是说“按顺序给每个元素生成一个列表”;groupingBy说的是“生成一个Map,它的键是xx,值则是那些元素的列表”。
  • 对流调用collect方法将对流中的元素触发一个归约操作(由Collector来参数化) 。
  • 一般来说,Collector会对元素应用一个转换函数(很多时候是不体现任何效果的恒等转换,例如toList),并将结果累积在一个数据结构中,从而产生这一过程的最终输出。
  • Collector接口中方法的实现决定了如何对流执行归约操作。Collectors实用类提供了很多静态工厂方法,可以方便地创建常见收集器的实例,最直接和最常用的收集器是toList静态方法,它会把流中所有的元素收集到一个List中.
  • java.util.stream.Collectors中预定义了大量的收集器.

demo

  • 按照货币类型进行分组.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class GroupingTransactions {

public static List<Transaction> transactions =
Arrays.asList(new Transaction(Currency.EUR, 1500.0),
new Transaction(Currency.USD, 2300.0),
new Transaction(Currency.GBP, 9900.0),
new Transaction(Currency.EUR, 1100.0),
new Transaction(Currency.JPY, 7800.0),
new Transaction(Currency.CHF, 6700.0),
new Transaction(Currency.EUR, 5600.0),
new Transaction(Currency.USD, 4500.0),
new Transaction(Currency.CHF, 3400.0),
new Transaction(Currency.GBP, 3200.0),
new Transaction(Currency.USD, 4600.0),
new Transaction(Currency.JPY, 5700.0),
new Transaction(Currency.EUR, 6800.0));

public static void main(String... args) {
groupImperatively();
groupFunctionally();

}

// 指令式风格对Transactions List进行分组.
private static void groupImperatively() {
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
for (Transaction transaction : transactions) {
Currency currency = transaction.getCurrency();
List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
if (transactionsForCurrency == null) {
transactionsForCurrency = new ArrayList<>();
transactionsByCurrencies.put(currency, transactionsForCurrency);
}
transactionsForCurrency.add(transaction);
}

System.out.println(transactionsByCurrencies);
}

// 函数式编程风格进行分组
private static void groupFunctionally() {
Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream()
// groupingBy生成一个Map,它的键是(货币),值则是元素的列表;元素的属性是该键类型
.collect(groupingBy(Transaction::getCurrency));
System.out.println(transactionsByCurrencies);
}

public static class Transaction {
private final Currency currency;
private final double value;

public Transaction(Currency currency, double value) {
this.currency = currency;
this.value = value;
}

public Currency getCurrency() {
return currency;
}

public double getValue() {
return value;
}

@Override
public String toString() {
return currency + " " + value;
}
}

public enum Currency {
EUR, USD, JPY, GBP, CHF
}
}

归约和汇总

  • 在需要将流项目重组成集合时,一般会使用收集器(Stream方法collect的参数)。再宽泛一点来说,但凡要把流中所有的项目合并成一个结果时就可以用。
1
2
3
4
5
// 利用counting工厂方法返回的收集器,数一数菜单里有多少种菜
long howManyDishes = menu.stream().collect(Collectors.counting());

// 简便写法
long howManyDishes = menu.stream().count();

查找流中的最大值和最小值

  • 收集器Collectors.maxBy和Collectors.minBy用来计算流中的最大或最小值。这两个收集器接收一个Comparator参数来比较流中的元素。

demo-1

  • 找出菜单中热量最高的菜
1
2
3
4
// comparingInt接收一个Function<T>, 生成一个Comparator
Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCalorieDish = menu.stream()
.collect(maxBy(dishCaloriesComparator));

汇总

  • Collectors.summingInt可接受一个把对象映射为求和所需int的函数,并返回一个收集器;该收集器在传递给普通的collect方法后即执行我们需要的汇总操作。
  • Collectors.summingLongCollectors.summingDouble方法的作用完全一样,可以用于求和字段为long或double的情况
  • Collectors.averagingIntaveragingLongaveragingDouble可以计算数值的平均数
  • summarizing操作你可以就数出菜单中元素的个数,并得到菜肴热量总和、平均值、最大值和最小值

demo-1

  • 求出菜单列表的总热量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// summingInt(ToIntFunction<? super T> mapper) (T -> int)
int totalCalories = menu.stream()
.collect(summingInt(Dish::getCalories));

double avgCalories = menu.stream()
.collect(averagingInt(Dish::getCalories));

IntSummaryStatistics menuStatistics = menu.stream()
.collect(summarizingInt(Dish::getCalories));

> output
4300
477.77777777777777
IntSummaryStatistics{count=9, sum=4300, min=120, average=477.777778, max=800}

连接字符串

  • joining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符串连接成一个字符串。

demo-1

  • 把菜单中所有菜肴的名称连接起来
1
2
3
4
5
6
7
String shortMenu = menu.stream()
.map(Dish::getName)
.collect(joining());

String menu2 = menu.stream()
.map(Dish::getName)
.collect(joining(", "));

广义的归约汇总

  • 讨论的所有收集器,都是一个可以用reducing工厂方法定义的归约过程的特殊情况而已。Collectors.reducing工厂方法是所有这些特殊情况的一般化。
  • reducing的三个参数
    • 第一个参数是归约操作的起始值,也是流中没有元素时的返回值,所以很显然对于数值和而言0是一个合适的值。
    • 第二个参数是一个函数引用,将菜肴转换成一个表示其所含热量的int。
    • 第三个参数是一个BinaryOperator,将两个项目累积成一个同类型的值。这里它就是对两个int求和。
  • 单参数reducing工厂方法创建的收集器看作三参数方法的特殊情况,它把流中的第一个项目作为起点,把恒等函数(即一个函数仅仅是返回其输入参数)作为一个转换函数。这也意味着,要是把单参数reducing收集器传递给空流的collect方法,收集器就没有起点;因此而返回一个Optional对象。
  • Stream接口的collectreduce方法有何不同
    • reduce方法旨在把两个值结合起来生成一个新值,它是一个不可变的归约。
    • collect方法的设计就是要改变容器,从而累积要输出的结果。
  • counting收集器也是类似地利用三参数reducing工厂方法实现的。它把流中的每个元素都转换成一个值为1的Long型对象,然后再把它们相加

demo-1

  • 计算菜单的总热量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int totalCalories1 = menu.stream()
.collect(reducing(0, Dish::getCalories, (i, j) -> i + j));

// 等价
int totalCalories2 = menu.stream()
.collect(reducing(0, Dish::getCalories, Integer::sum));

// 等价, 但是 orElse或orElseGet来解开Optional中包含的值更为安全。
// reduce
// 避免自动拆箱操作
int totalCalories3 = menu.stream()
.map(Dish::getCalories)
.reduce(Integer::sum)
.get();

// 等价
int totalCalories4 = menu.stream()
.mapToInt(Dish::getCalories) // 映射为一个IntStream
.sum();

demo-2

  • 找到热量最高的菜
1
2
Optional<Dish> mostCalorieDish = menu.stream()
.collect(reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2));

demo-3

  • 用reducing连接字符串
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ok
String shortMenu = menu.stream()
.map(Dish::getName)
.collect( reducing ( (s1, s2) -> s1 + s2 ) )
.get();

// wrong
String shortMenu = menu.stream()
// 因为reducing接收的是一个BinaryOperator<U>, 看reducing的实现.
.collect( reducing( (d1, d2) -> d1.getName() + d2.getName())) //
.get();

// ok
String shortMenu = menu.stream()
.collect( reducing( "",Dish::getName, (s1, s2) -> s1 + s2 ) );


// reducing的实现
public static <T, U> Collector<T, ?, U> reducing(U identity,
Function<? super T, ? extends U> mapper,
BinaryOperator<U> op) {
return new CollectorImpl<>(
boxSupplier(identity),
(a, t) -> { a[0] = op.apply(a[0], mapper.apply(t)); },
(a, b) -> { a[0] = op.apply(a[0], b[0]); return a; },
a -> a[0], CH_NOID);
}

分组

  • 给groupingBy方法传递了一个Function,分类函数,因为它用来把流中的元素分成不同的组。分组操作的结果是一个Map,把分组函数返回的值作为映射的键,把流中所有具有这个分类值的项目的列表作为对应的映射值

demo-1

  • 把菜单中的菜按照类型进行分类,有肉的放一组,有鱼的放一组,其他的都放另一组。
1
2
3
4
Map<Dish.Type, List<Dish>> dishesByType = menu.stream()
.collect(groupingBy(Dish::getType));
> output
{FISH=[prawns, salmon], MEAT=[pork, beef, chicken], OTHER=[french fries, rice, season fruit, pizza]}

demo-2

  • 把热量不到400卡路里的菜划分为“低热量”(diet),热量400到700卡路里的菜划为“普通”(normal),高于700卡路里的划为“高热量”(fat),这里没有这样的一个方法引用,所以需要提供一个Lambda表达式.
1
2
3
4
5
6
7
8
9
public enum CaloricLevel { DIET, NORMAL, FAT }
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}));
> output
{DIET=[chicken, rice, season fruit, prawns], NORMAL=[beef, french fries, pizza, salmon], FAT=[pork]}

多级分组

  • 实现多级分组,我们可以使用一个由双参数版本的Collectors.groupingBy工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受collector类型的第二个参数
  • 要进行二级分组的话,我们可以把一个内层groupingBy传递给外层groupingBy,并定义一个为流中项目分类的二级标准。这种多级分组操作可以扩展至任意层级, n级分组就会得到一个代表n级树形结构的n级
    Map。

demo-1

  • 菜单中的菜肴同时按照类型和热量进行分组
1
2
3
4
5
6
7
8
9
10
11
12
13
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> collect = menu.stream().collect(
// 外层Map的键就是第一级分类函数生成的值:“ fish, meat, other”
groupingBy(Dish::getType, // 一级分类函数
// 键是二级分类函数生成的值:“ normal, diet, fat”
groupingBy((Dish dish) -> { // 二级分类函数
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
})
)
);
> output
{FISH={DIET=[prawns], NORMAL=[salmon]}, MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]}, OTHER={DIET=[rice, season fruit], NORMAL=[french fries, pizza]}}

按子组收集数据

  • 传递给第一个groupingBy的第二个收集器可以是任何类型,而不一定是另一个groupingBy
  • 普通的单参数groupingBy(f)(其中f是分类函数)实际上是groupingBy(f, toList())的简便写法。也就是第二个collector是toList()静态方法生成的收集器,即key对应的List列表值直接转为List即可.
  • demo2的maxBy实现中:Map中的值是Optional,因为这是maxBy工厂方法生成的收集器的类型,但实际上,如果菜单中没有某一类型的Dish,这个类型就不会对应一个Optional.empty()值,而且根本不会出现在Map的键中。groupingBy收集器只有在应用分组条件后,第一次在流中找到某个键对应的元素时才会把键加入分组Map中

demo-1

  • 菜单中每类菜有多少个,可以传递counting收集器作为groupingBy收集器的第二个参数
1
2
3
4
Map<Dish.Type, Long> collect = menu.stream()
.collect(groupingBy(Dish::getType, counting()));
> output
{FISH=2, MEAT=3, OTHER=4}

demo-2

  • 查找菜单不同分类中热量最高的菜肴
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 实现1
Map<Dish.Type, Optional<Dish>> collect = menu.stream()
.collect(
// 因为存在Stream为为空的情况.所以结果用Optional来包装起来.
groupingBy(Dish::getType,reducing((Dish d1, Dish d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2)));

// 实现2
Map<Dish.Type, Optional<Dish>> collect = menu.stream()
.collect(groupingBy(Dish::getType, maxBy(comparingInt(Dish::getCalories))));

// 实现3
Map<Dish.Type, Dish> collect = menu.stream()
.collect(
groupingBy(Dish::getType,
collectingAndThen(
reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2), Optional::get)
)
);
> output:
{FISH=Optional[salmon], MEAT=Optional[pork], OTHER=Optional[pizza]}
{FISH=Optional[salmon], MEAT=Optional[pork], OTHER=Optional[pizza]}
{FISH=salmon, MEAT=pork, OTHER=pizza}


// groupingBy的实现, 接收一个Function返回分类标准.后接一个Collector对象进行二次collect操作.
public static <T, K, A, D>
Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream) {
return groupingBy(classifier, HashMap::new, downstream);
}
```

### 把收集器的结果转换为另一种类型
- 如上分组操作的Map结果中的每个值上包装的Optional没什么用,所以可能想要把它们去掉。要做到这一点,或者更一般地来说,**把收集器返回的结果转换为另一种类型**,你可以使用Collectors.collectingAndThen工厂方法返回的收集器
- `Collectors.collectingAndThen`工厂方法接受两个参数 -- **要转换的收集器**以及**转换函数**,并**返回另一个收集器。这个收集器相当于旧收集器的一个包装**, collect(collectingAndThen)操作的最后一步就是将返回值用转换函数做一个映射。在这里,被包起来的收集器就是用maxBy建立的那个,而转换函数Optional::get则把返回的Optional中的值提取出来。
- 执行过程
- groupingBy是最外层,根据菜肴的类型把菜单流分组,得到三个子流。
- groupingBy收集器包裹着collectingAndThen收集器,因此分组操作得到的每个子流都用这第二个收集器做进一步归约。
- collectingAndThen收集器又包裹着第三个收集器maxBy。
- 随后由归约收集器进行子流的归约操作,然后包含它的collectingAndThen收集器会对其结果应用Optional:get转换函数。
- 对三个子流分别执行这一过程并转换而得到的三个值,也就是各个类型中热量最高的Dish,将成为**groupingBy收集器返回的Map中与各个分类键(Dish的类型)相关联的值**。

``` java
// 实现4
Map<Dish.Type, Dish> collect = menu.stream()
.collect(groupingBy(Dish::getType,
collectingAndThen(
maxBy(comparingInt(Dish::getCalories)),
Optional::get)));
  • java8-groupby-1.png-115.2kB

与groupingBy联合使用的其他收集器的例子

  • 通过groupingBy工厂方法的第二个参数传递的收集器将会对分到同一组中的所有流元素执行进一步归约操作。
  • 常常和groupingBy联合使用的另一个收集器是mapping方法生成的。这个方法接受两个参数:一个函数对流中的元素做变换,另一个则将变换的结果对象收集起来。其目的是在累加之前对每个输入元素应用一个映射函数,这样就可以让接受特定类型元素的收集器适应不同类型的对象。
  • toSet()对于返回的Set是什么类型并没有任何保证。但通过使用toCollection,你可以有更多的控制。

demo-1

  • 各个类型菜肴的热量总和
1
2
3
4
Map<Dish.Type, Integer> collect = menu.stream().collect(groupingBy(Dish::getType,
summingInt(Dish::getCalories)));

//

demo-2

  • 对于每种类型的Dish,菜单中都有哪些CaloricLevel。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Map<Dish.Type, Set<CaloricLevel>> collect = menu.stream().collect(
groupingBy(Dish::getType, mapping(
dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
},
// 流中的元素累积到一个Set
toSet())));
// 指定生成Hashset
Map<Dish.Type, HashSet<CaloricLevel>> collect = menu.stream().collect(
groupingBy(Dish::getType, mapping(
dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}, toCollection(HashSet::new)
))
);

分区

  • 分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组, false是一组。

demo-1

  • 把菜单按照素食和非素食分开
1
2
3
4
5
6
7
8
9
10
11
12
Map<Boolean, List<Dish>> partitionedMenu = enu.stream()
.collect(partitioningBy(Dish::isVegetarian));

// 所有的素食菜肴
List<Dish> vegetarianDishes = partitionedMenu.get(true);

// 以上等价于
List<Dish> vegetarianDishes = menu.stream()
.filter(Dish::isVegetarian)
.collect(toList());
> output
{false=[pork, beef, chicken, prawns, salmon], true=[french fries, rice, season fruit, pizza]}

分区的优势

  • 分区的好处在于保留了分区函数返回true或false的两套流元素列表。同分组类似, partitioningBy工厂方法有一个重载版本,可以传递第二个收集器.

demo-1

  • 先按照素食与否分组,然后按类型对菜肴分组
1
2
3
4
5
6
Map<Boolean, Map<Dish.Type, List<Dish>>> collect = menu.stream()
.collect(partitioningBy(Dish::isVegetarian,
groupingBy(Dish::getType)));
> output
// 二级Map:
{false={FISH=[prawns, salmon], MEAT=[pork, beef, chicken]}, true={OTHER=[french fries, rice, season fruit, pizza]}}

demo-2

  • 素食和非素食中热量最高的菜
1
2
3
4
5
6
7
8
9
Map<Boolean, Dish> collect = menu.stream().collect(
partitioningBy(Dish::isVegetarian,
collectingAndThen(
// maxBy的结果作用于get()
maxBy(comparingInt(Dish::getCalories)),
Optional::get)));

// output
{false=pork, true=pizza}

将数字按质数和非质数分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 测试某一个待测数字是否是质数的谓词
public boolean isPrime(int candidate) {
int candidateRoot = (int) Math.sqrt((double) candidate);
return IntStream.rangeClosed(2, candidateRoot)
.noneMatch(i -> candidate % i == 0);
}

// 创建一个包含这n个数的流,用刚刚写的isPrime方法作为谓词,再给partitioningBy收集器归约
public Map<Boolean, List<Integer>> partitionPrimes(int n) {
return IntStream.rangeClosed(2, n).boxed()
.collect(partitioningBy(candidate -> isPrime(candidate)));
}

//output
{false=[4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20], true=[2, 3, 5, 7, 11, 13, 17, 19]}

Collectors类的静态工厂方法能够创建的所有收集器

java8-collectors-1.png-105.7kB
java8-collectors-2.png-176.8kB
java8-collectors-3.png-142.5kB


收集器接口

Collector.java

  • 参数说明

    • T是流中要收集的项目的泛型。
    • A是累加器的类型,累加器是在收集过程中用于累积部分结果的对象。
    • R是收集操作得到的对象(通常但并不一定是集合)的类型。
  • 实现一个ToListCollector<T>类,将Stream<T>中的所有元素收集到一个List<T>里,它的签名如下:public class ToListCollector<T> implements Collector<T, List<T>, List<T>>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public interface Collector<T, A, R> {
/**
* A function that creates and returns a new mutable result container.
*/
Supplier<A> supplier();

/**
* A function that folds a value into a mutable result container.
*/
BiConsumer<A, T> accumulator();

/**
* A function that accepts two partial results and merges them. The
* combiner function may fold state from one argument into the other and
* return that, or may return a new result container.
*
* 返回一个函数用于聚合结果.
*/
BinaryOperator<A> combiner();

/**
* Perform the final transformation from the intermediate accumulation type
* {@code A} to the final result type {@code R}.
*
* <p>If the characteristic {@code IDENTITY_TRANSFORM} is
* set, this function may be presumed to be an identity transform with an
* unchecked cast from {@code A} to {@code R}.
*
* 返回一个函数用于将中间结果转换为最终结果.
*/
Function<A, R> finisher();

/**
* Returns a {@code Set} of {@code Collector.Characteristics} indicating
* the characteristics of this Collector. This set should be immutable.
* 提供了一系列特征,也就是一个提示列表,告诉collect方法在执行归约操作的时候可以应用哪优
* 化(比如并行化)
*/
Set<Characteristics> characteristics();

public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,
BiConsumer<R, T> accumulator,
BinaryOperator<R> combiner,
Characteristics... characteristics) {
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
Objects.requireNonNull(characteristics);
Set<Characteristics> cs = (characteristics.length == 0)
? Collectors.CH_ID
: Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH,
characteristics));
return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);
}

//
enum Characteristics {

CONCURRENT,
UNORDERED,
IDENTITY_FINISH
}
}

理解 Collector 接口声明的方法

supplier方法: 建立新的结果容器

  • supplier方法必须返回一个结果为空的Supplier,也就是一个无参数函数,在调用时它会创建一个空的累加器实例,供数据收集过程使用。
  • 在我们的ToListCollector中, supplier返回一个空的List
1
2
3
4
5
6
7
8
public Supplier<List<T>> supplier() {
return () -> new ArrayList<T>();
}

// 只传递一个构造函数引用:
public Supplier<List<T>> supplier() {
return ArrayList::new;
}

accumulator方法: 将元素添加到结果容器

  • accumulator方法会返回执行归约操作的函数。当遍历到流中第n个元素时,这个函数执行时会有两个参数:保存归约结果的累加器(已收集了流中的前n-1个项目),还有第n个元素本身。该函数将返回void,因为累加器是原位更新,即函数的执行改变了它的内部状态以体现遍历的元素的效果。
  • ToListCollector,这个函数仅仅会把当前项目添加至已经遍历过的项目的列表
1
2
3
4
5
6
7
public BiConsumer<List<T>, T> accumulator() {
return (list, item) -> list.add(item);
}
// 方法引用
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}

finisher方法: 对结果容器应用最终转换

  • 遍历完流后, finisher方法必须返回在累积过程的最后要调用的一个函数,以便将累加器对象转换为整个集合操作的最终结果
  • ToListCollector实现中,累加器对象恰好符合预期的最终结果(也是一个List),因此无需进行转换。所以finisher方法只需返回identity函数
1
2
3
public Function<List<T>, List<T>> finisher() {  
return Function.identity();
}

顺序归约过程的逻辑步骤

java8-sequentail-reduce-1.png-61.8kB

combiner方法: 合并两个结果容器

  • combiner方法会返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并
  • ToListCollector的实现只要把从流的第二个部分收集到的项目列表加到遍历第一部分
    • 原始流会以递归方式拆分为子流;所有的子流都可以并行处理,而每个子流都采用如上的顺序归约过程,然后使用收集器combiner方法返回的函数,将所有的部分结果两两合并。
1
2
3
4
5
6
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
}
}

characteristics方法

  • 定义了收集器的行为 – 尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示
    • UNORDERED – 归约结果不受流中项目的遍历和累积顺序的影响。
    • CONCURRENT – accumulator函数可以从多个线程同时调用,且该收集器可以并行归约流。如果收集器没有标为UNORDERED,那它仅在用于无序数据源时才可以并行归约。
    • IDENTITY_FINISH – 这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器A不加检查地转换为结果R是安全的。

ToListCollector.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {

@Override
public Supplier<List<T>> supplier() {
return () -> new ArrayList<T>();
}

@Override
public BiConsumer<List<T>, T> accumulator() {
return (list, item) -> list.add(item);
}

@Override
public Function<List<T>, List<T>> finisher() {
return i -> i;
}

@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}

@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT));
}
}

// 使用, new 来初始化
List<Dish> dishes = menuStream.collect(new ToListCollector<Dish>());
// 标准实现
List<Dish> dishes = menuStream.collect(toList());

collect方法重载版本

  • 对于IDENTITY_FINISH (Finisher做的就是横等变换,所以不需要提供) 的收集操作,还可以使用Stream的一个重载方法来实现类似Collector的功能。Stream有一个重载的collect方法可以接受另外三个函数——supplier、accumulator和combiner,其语义和Collector接口的相应方法返回的函数完全相同。
  • 这种重载的collect方法不能传递任何Characteristics,所以它永远都是一个IDENTITY_FINISH和CONCURRENT但并非UNORDERED的收集器。

demo-1

  • 把菜肴流中的项目收集到一个List中
1
2
3
4
List<Dish> dishes = menuStream.collect(
ArrayList::new,
List::add,
List::addAll);

开发你自己的收集器以获得更好的性能

利用collect方法来划分质数和非质数

1
2
3
4
5
6
7
8
9
10
11
public Map<Boolean, List<Integer>> partitionPrimes(int n) {
return IntStream.rangeClosed(2, n).boxed()
.collect(partitioningBy(candidate -> isPrime(candidate));
}

// 限制除数不超过被测试数的平方根
public boolean isPrime(int candidate) {
int candidateRoot = (int) Math.sqrt((double) candidate);
return IntStream.rangeClosed(2, candidateRoot)
.noneMatch(i -> candidate % i == 0);
}

自定义收集器来划分质数和非质数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class PartitionPrimeNumbers {

public static void main(String... args) {
System.out.println("Numbers partitioned in prime and non-prime: " + partitionPrimes(20));
System.out.println("Numbers partitioned in prime and non-prime: " + partitionPrimesWithCustomCollector(100));

}

public static Map<Boolean, List<Integer>> partitionPrimes(int n) {
return IntStream.rangeClosed(2, n).boxed()
.collect(partitioningBy(candidate -> isPrime(candidate)));
}

public static Map<Boolean, List<Integer>> partitionPrimesWithCustomCollector(int n) {
return IntStream.rangeClosed(2, n).boxed().collect(new PrimeNumbersCollector());
}

public static boolean isPrime(int candidate) {
return IntStream.rangeClosed(2, candidate - 1)
.limit((long) Math.floor(Math.sqrt((double) candidate)) - 1)
.noneMatch(i -> candidate % i == 0);
}

public static boolean isPrime(List<Integer> primes, Integer candidate) {
double candidateRoot = Math.sqrt((double) candidate);
return takeWhile(primes, i -> i <= candidateRoot).stream().noneMatch(i -> candidate % i == 0);
}

// takeWhile实现是即时的
public static <A> List<A> takeWhile(List<A> list, Predicate<A> p) {
int i = 0;
for (A item : list) {
if (!p.test(item)) { // //只要求list中所有满足p的item, 也就是所有小于root的item(都是质数), 进一步比sqrt快了.
return list.subList(0, i);
}
i++;
}
return list;
}

/**
* public interface Collector<T, A, R>
* 其中T、 A和R分别是流中元素的类型、用于累积部分结果的对象类型,以及collect操作最
* 终结果的类型。这里应该收集Integer流,而累加器和结果类型则都是 Map<Boolean,
* List<Integer>> 键是true和false,值则分别是质数和非质数的List
*/
public static class PrimeNumbersCollector
implements Collector<Integer, Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> {
/**
* 创建累加器并进行初始化,为true和false两个键下面初始化了对应的空列表。
* 在收集过程中会把质数和非质数分别添加到这里
* @return
*/
@Override
public Supplier<Map<Boolean, List<Integer>>> supplier() {
return () -> new HashMap<Boolean, List<Integer>>() {{
put(true, new ArrayList<Integer>());
put(false, new ArrayList<Integer>());
}};
}

@Override
public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {
acc.get(isPrime(acc.get(true), candidate)) // 根据isPrime的结果,获取质数或非质数列表
.add(candidate); // 将被测数添加到相应的列表中
};
}

/**
* 合并两个Map,即将第二个Map中质数和非质数列表中的所有数字合并到第一个Map的对应列表即可
*
* 本例子中这个收集器是不能并行使用的,因为该算法本身是顺序的。
* 这意味着永远都不会调用combiner方法,你可以把它的实现留空
* (更好的做法是抛出一个UnsupportedOperationException异常)
* @return
*/
@Override
public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
return (Map<Boolean, List<Integer>> map1, Map<Boolean, List<Integer>> map2) -> {
map1.get(true).addAll(map2.get(true));
map1.get(false).addAll(map2.get(false));
return map1;
};
}

/**
* accumulator正好就是收集器的结果,
* 用不着进一步转换,那么finisher方法就返回identity函数
* @return
*/
@Override
public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
// return i -> i; // 也可以
return Function.identity();
}

/**
* 既不是CONCURRENT也不是UNORDERED,但却是IDENTITY_FINISH的
* @return
*/
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));
}
}
}

比较收集器的性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class CollectorHarness {

public static void main(String[] args) {
System.out.println("Partitioning done in: " + execute(PartitionPrimeNumbers::partitionPrimes) + " msecs");
System.out.println("Partitioning done in: " + execute(PartitionPrimeNumbers::partitionPrimesWithCustomCollector) + " msecs" );
System.out.println("Partitioning done in: " + execute(PartitionPrimeNumbers::partitionPrimesWithInlineCollector) + " msecs" );
}

private static long execute(Consumer<Integer> primePartitioner) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
primePartitioner.accept(1_000_000);
long duration = (System.nanoTime() - start) / 1_000_000;
if (duration < fastest) fastest = duration;
System.out.println("done in " + duration);
}
return fastest;
}
}

// time explicity
done in 1524
done in 1392
done in 1378
done in 1349
done in 1275
done in 1380
done in 1255
done in 1253
done in 1258
done in 1253
Partitioning done in: 1253 msecs
done in 790
done in 554
done in 544
done in 616
done in 623
done in 598
done in 603
done in 636
done in 602
done in 600
Partitioning done in: 544 msecs
done in 632
done in 628
done in 622
done in 617
done in 613
done in 622
done in 624
done in 656
done in 619
done in 620
Partitioning done in: 613 msecs

collect方法的重载版本来划分质数和非质数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static Map<Boolean, List<Integer>> partitionPrimesWithInlineCollector(int n) {
return Stream.iterate(2, i -> i + 1).limit(n)
.collect(
// supplier
() -> new HashMap<Boolean, List<Integer>>() {{
put(true, new ArrayList<Integer>());
put(false, new ArrayList<Integer>());
}},
// accumulator
(acc, candidate) -> {
acc.get(isPrime(acc.get(true), candidate))
.add(candidate);
},
// combiner
(map1, map2) -> {
map1.get(true).addAll(map2.get(true));
map1.get(false).addAll(map2.get(false));
});
}

小结

  • collect是一个终端操作,它接受的参数是将流中元素累积到汇总结果的各种方式(称为收集器)。
  • 预定义收集器包括将流元素归约和汇总到一个值,例如计算最小值、最大值或平均值。
  • 预定义收集器可以用groupingBy对流中元素进行分组,或用partitioningBy进行分区
  • 收集器可以高效地复合起来,进行多级分组、分区和归约。
  • 你可以实现Collector接口中定义的方法来开发你自己的收集器。