0%

stream and lambda(19) - 终止操作之自定义收集器 Collector

JDK 内置的收集器基础上已经可以满足我们的日常开发需求,但有时候我们也可能会遇到不满足的情况,这种情况下我们可以自定义收集器来完成我们的需求。

收集器接口 Collector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Collector<T, A, R> {

Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Characteristics> characteristics();
...
enum Characteristics {
CONCURRENT,
UNORDERED,
IDENTITY_FINISH
}
}

可以看出,收集器内部有一个抽象接口需要我们来实现。

五个抽象方法

  • supplier

    此方法提供一个容器,可以承载收集过程中单个线程处理的中间值。

  • accumulator

    此方法提供处理中间值与新的流内元素,可以参考 reduce 的 accumulator

  • combiner

    此方法用来处理并行流多线程的中间值合并,可以参考 reduce 的 combiner

  • finisher

    此方法是终止方法,可以将中间值处理成最终值

  • characteristics

    该方法用于给收集器设置特征值,特征值有三种

Characteristics 特征值

枚举类 Characteristics 中共有三个特征值下:

  • CONCURRENT

    表示此收集器是并发收集的,中间值承载的容器,可以正常正确处理并行流,因此,如果设置了这个值,结果容器必须是线程安全的。
    串行流时,combiner 不会被调用。
    并行流且设置了 CONCURRENT、UNORDERED 特征值,combiner 不会被调用。
    并行流且设置了 CONCURRENT 特征值,如果流数据源是无序的,combiner 不会被调用。

  • UNORDERED

    表示流中的元素处理顺序在并行流时可能是无序的。

  • IDENTITY_FINISH

    表示中间结果容器类型与最终结果类型一致,或者类型可以强转。
    设置此特征值时finiser()方法不会被调用。

你品,你细品,你细细品,品不懂看源码一起品。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}

自定义字符串 join 收集器

我们也来实现一个字符串收集器,可以将分隔符、前缀、后缀当成参数传进来。思路如下:

  1. 由于是将多个字符串连接起来,我们可以用 StringBuffer 或者 StringBuilder 当容器。
  2. 分隔符可以放在 accumulator 里在每个字母后面加上,产生的问题是最后一个字符串后面会多一个分隔符。
  3. 分隔符可以放在 combiner 里在合并不同线程结果时加上,但是问题时,如果是串行流不会起作用,并行流才会起作用。当和 accumulator 都操作分隔符时,就会错误处理 2 遍。
  4. 由于流有可能不是并行流,我们不能将分隔符放在 combiner 里处理。因为只能放在 accumulator 处理,并在最后将多余的分隔符去除。
  5. 将中间结果转成最终 String 的时候,我们可以在这个时机加上前缀和后缀。
  6. 特征值不能设置 IDENTITY_FINISH,因为中间结果容器不是String。不能设置 UNORDERED,因为顺序我们还是很重要的。

接着,就是编码时间了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class MyStringJoinCollector implements Collector<String, StringBuffer, String> {
private final String delimiter;
private final String prefix;
private final String suffix;

public MyStringJoinCollector(String delimiter, String prefix, String suffix) {
this.delimiter = delimiter == null ? "" : delimiter;
this.prefix = prefix == null ? "" : prefix;
this.suffix = suffix == null ? "" : suffix;
}

@Override
public Supplier<StringBuffer> supplier() {
return StringBuffer::new;
}

@Override
public BiConsumer<StringBuffer, String> accumulator() {
return (sb, s) -> sb.append(s).append(delimiter);
}

@Override
public BinaryOperator<StringBuffer> combiner() {
return StringBuffer::append;
}

@Override
public Function<StringBuffer, String> finisher() {
return sb -> {
String tmp = sb.toString();
int dl = delimiter.length();
tmp = tmp.length() >= dl ? tmp.substring(0, tmp.length() - dl) : tmp;
return prefix + tmp + suffix;
};
}

@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}

}

下面,来到我们的验证时间。

1
2
3
4
5
public static void main(String[] args) {
Stream<String> stream = Stream.of("this", "is", "myStringJoinCollector");
String collect = stream.collect(new MyStringJoinCollector(",", "[", "]"));
System.out.println(collect);
}

ok,程序正确输出:[this,is,myStringJoinCollector]

总结

  • 自定义收集器其实不难,只要我们实现接口的几个抽象方法就可以。
  • 有些内容纯靠 JDK 的文档不容易理解,但是我们可以看部分源码,比如 Characteristics 的几个值到底是什么意思。