第四章:引入流

流的定义

从支持数据处理操作的源生成的元素序列。

元素序列

就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。

因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList)。

但流的目的在于表达计算,比如前面见到的filter、sorted和map。

集合讲的是数据,流讲的是计算。

流会使用一个提供数据的源,如集合、数组或输入/输出资源。

请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。

数据处理操作

流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,

如filter、map、reduce、find、match、sort等。流操作可以顺序执行,也可并行执行。

流操作的特点

1. 流水线

很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。

这让之后章节中的一些优化成为可能,如延迟和短路。

流水线的操作可以看作对数据源进行数据库式(链式)查询。

2. 内部迭代

与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的(内部迭代)。

3. 流只能被消费一次

终止操作只能有一次,重复消费会抛出java.lang.IllegalStateException异常。

4. 对流的操作分为两类:中间操作和终止操作

  • 中间操作总是会惰性执行:执行一个中间操作不会在原流上做任何处理,而是创建一个标记了该操作的新流返回。
  • 终止操作才会进行真实的计算:计算发生时会把所有中间操作积攒的操作以Pipeline的方式执行,这样可以减少迭代次数,计算完成后流即失效。

集合与流的不同点:

  • 无存储:Stream不是一种数据结构,它只是某种数据源的一个视图。数据源可以是一个数组、Java容器或I/O资源等。
  • 为函数式编程而生:对Stream的任何修改都不会修改背后的数据源,比如对一个流执行filter过滤操作并不会删除被过滤的元素,而是会产生一个新的不包含被过滤元素的新流。
  • 惰性执行:对流的中间操作并不会立即执行,只有等一个终止操作来临时才会一次性全部执行。
  • 可消费性:一个流只能被消费一次,一旦产生终止操作,流即被消费。

第五章:使用流

1. 筛选和切片

选择流中的元素:用谓词筛选,筛选出各不相同的元素,忽略流 中的头几个元素,或将流截短至指定长度。

1.1 用谓词筛选

Streams接口支持filter方法。该操作会接受一个谓词(一个返回 boolean的函数)作为参数,并返回一个包括所有符合谓词的元素的流。

1.2 筛选各异的元素

流支持一个叫作distinct的方法,它会返回一个元素各异(根据流所生成元素的 hashCode和equals方法实现)的流。

1.3 截短流

流支持limit(n)方法,该方法会返回一个不超过给定长度的流。

所需的长度作为参数传递给limit。如果流是有序的,则多会返回前n个元素。

1.4 跳过元素

流还支持skip(n)方法,返回一个扔掉了前n个元素的流。如果流中元素不足n个,则返回一个空流。

请注意,limit(n)skip(n)是互补的。

2. 映射

一个非常常见的数据处理套路就是从某些对象中选择信息。

比如在SQL里,你可以从表中选择一列。Stream API也通过mapflatMap方法提供了类似的工具。

2.1 对流中每一个元素应用函数

流支持map方法,它会接受一个函数作为参数。这个函数会被应用到每个元素上,

并将其映射成一个新的元素(使用映射一词,是因为它和转换类似,但其中的细微差别在于它是“创建一个新版本”而不是去“修改”)。

2.2 流的扁平化

流支持flatMap方法,将各个单独的流合并起来,扁平化成一个流。

flatMap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。

3. 查找和匹配

另一个常见的数据处理套路是看看数据集中的某些元素是否匹配一个给定的属性。

Stream API通过anyMatchallMatchnoneMatchfindAnyfindFirst方法提供了这样的工具。

3.1 检查谓词是否至少匹配一个元素

anyMatch():流中是否有一个元素能匹配给定的谓词。

返回一个boolean,因此是一个终端操作。

3.2 检查谓词是否匹配所有元素

allMatch():流中的元素是否都能匹配给定的谓词。是一个终端操作。

3.3 检查谓词是否不匹配所有元素

noneMatch():流中没有任何元素与给定的谓词匹配。

3.4 查找元素

findAny():将返回当前流中的任意元素。如果流中没有元素,返回值可能为空,于是Java 8引入了Optional<T>类。

Optional简介

Optional<T>类(java.util.Optional)是一个容器类,代表一个值存在或不存在。

方法简介:

1、isPresent():将在Optional包含值的时候返回true, 否则返回false。

2、ifPresent(Consumer<T> block):会在值存在的时候执行给定的代码块。

3、T get():会在值存在时返回值,否则抛出一个NoSuchElement异常。

4、T orElse(T other):会在值存在时返回值,否则返回一个默认值。

3.5 查找第一个元素

findFirst():返回流中的第一个元素。同样地,如果流中没有元素,返回值可能为空,返回类型为Optional<T>类。

有些流有一个出现顺序(encounter order)来指定流中项目出现的逻辑顺序(比如由List或排序好的数据列生成的流)。对于这种流,可能想要找到第一个元素。

4. 归约

reduce():将流中所有元素反复结合起来,得到一个值。

reduce()方法有两个重载的方法:

1
2
3
4
5
6
7
8
9
10
11
/**
* @param identity 归约的初始值
* @param accumulator 归约操作
*/
T reduce(T identity, BinaryOperator<T> accumulator);

/**
* @param accumulator 归约操作
* @return Optional<T>对象,由于没有初始值,流中元素可能为空,故返回值会存在空的情况。
*/
Optional<T> reduce(BinaryOperator<T> accumulator);

map和reduce的连接通常称为map-reduce模式,因Google用它来进行网络搜索而出名,因为它很容易并行化。

5. 数值流

流中的元素是数值型的,例如int,long,double。尽管可以把这些数值型元素放入流中,但实际上是暗含了装箱操作,将数值型元素转换成对应的包装类型,从而形成对象流。

Stream API提供了原始类型流特化,专门支持处理数值流的方法。减去装箱操作的损耗。

5.1 原始类型流特化

Java 8引入了三个原始类型特化流接口:IntStreamDoubleStreamLongStream,分别将流中的元素特化为int、long和double,从而避免了暗含的装箱成本。

每个接口都带来了进行常用数值归约的新方法,比如对数值流求和的sum,找到大元素的max。此外还有在必要时再把它们转换回对象流的方法。

5.1.1 映射到数值流

将流转换为特化版本的常用方法是mapToIntmapToDoublemapToLong。这些方法map方法的工作方式一样,只是它们返回的是一个特化流,而不是Stream<T>

5.1.2 转换回对象流

一旦有了数值流,可能需要将其转换回非特化流。

5.1.3 默认值OptionalInt

对于原始类型特化流接口中的max,min,average等方法的返回值。如果流是空的,这些方法的返回值为空,但不能默认为0。因为可能真实计算的结果恰好为0。

可以使用Optional类来解决返回值为空的情况。但Optional<T>只能接收包装类型。传递原始类型会触发自动装箱操作,产生损耗。

Java 8同样引入了Optional原始类型特化版本:OptionalIntOptionalDoubleOptionalLong,用这些Optional类来解决传递原始类型时自动装箱的问题。

5.2 数值范围

和数字打交道时,有一个常用的东西就是数值范围。比如生成1和100之间的所有数字。

Java 8引入了两个可以用于IntStreamLongStream的静态方法,帮助生成这种范围: rangerangeClosed

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* range方法表示的范围为:[startInclusive,endExclusive)
* @param startInclusive 数值范围开始
* @param endExclusive 数值范围结束(不包含该值)
*/
IntStream range(int startInclusive, int endExclusive);

/**
* rangeClosed方法表示的范围为:[startInclusive,endExclusive]
* @param startInclusive 数值范围开始
* @param endExclusive 数值范围结束(包含该值)
*/
IntStream rangeClosed(int startInclusive, int endInclusive);

这两个方法都是第一个参数接受起始值,第二个参数接受结束值。但range是不包含结束值的,而rangeClosed则包含结束值。

6. 构建流

介绍如何从值序列、数组、文件来创建流,甚至由生成函数来创建无限流。

6.1 由值创建流

使用静态方法Stream.of(T...values),通过显式值创建一个流。它可以接受任意数量的参数。

使用静态方法Stream.empty(),创建一个空流。

6.2 由数组创建流

使用静态方法Arrays.stream从数组创建一个流。它接受一个数组作为参数。

6.3、由文件生成流

Java中用于处理文件等I/O操作的NIO API(非阻塞 I/O)已更新,以便利用Stream API。

java.nio.file.Files中的很多静态方法都会返回一个流。

例如,Files.lines,它会返回一个由指定文件中的各行构成的字符串流。

6.4 由函数生成流:创建无限流

Stream API提供了两个静态方法来从函数生成流:Stream.iterateStream.generate

这两个操作可以创建所谓的无限流:不像从固定集合创建的流那样有固定大小的流。

iterategenerate产生的流会用给定的函数按需创建值,因此可以无穷无尽地计算下去!

一般来说,应该使用limit(n)来对这种流加以限制,以避免打印无穷多个值。

第六章:用流收集数据

1. 汇总

Collectors类专门为汇总提供了一个工厂方法:Collectors.summingInt

1
Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> mapper);

它可接受一个把对象映射为求和所需int的函数,并返回一个收集器,该收集器在传递给普通的collect方法后即执行我们需要的汇总操作。

类似的还有Collectors.summingDouble方法和Collectors.summingLong方法,汇总为double和long类型。

1
2
Collector<T, ?, Long> summingLong(ToLongFunction<? super T> mapper);
Collector<T, ?, Double> summingDouble(ToDoubleFunction<? super T> mapper);

汇总不仅仅只有求和。

平均数:

1
2
3
Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper);
Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> mapper);
Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> mapper);

一次操作取得多个汇总结果:

1
2
3
Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper);
Collector<T, ?, LongSummaryStatistics> summarizingLong(ToLongFunction<? super T> mapper);
Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper);

可以通过对用getter方法取得汇总结果。

2. 连接字符串

joining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符串连接成一个字符串。

joining方法有3个重载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 内部使用StringBuilder拼接
*/
Collector<CharSequence, ?, String> joining();

/**
* @param delimiter 表示每个字符串连接时的分隔符
*/
Collector<CharSequence, ?, String> joining(CharSequence delimiter);

/**
* @param delimiter 分隔符
* @param prefix 字符串前缀
* @param suffix 字符串后缀
*/
Collector<CharSequence, ?, String> joining(CharSequence delimiter,CharSequence prefix,CharSequence suffix);

3. 分组

groupingBy:跟数据库中的group by分组操作一样。同时支持多级分组。

groupingBy方法有多个重载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 内部调用的是带两个参数的groupingBy方法,第二个参数传递的是Collectors.toList()。
* @param classifier 一个Function函数型接口。
*/
groupingBy(Function<? super T, ? extends K> classifier);

/**
* 内部调用的是三个参数的groupingBy方法。
* @param classifier 一个Function函数型接口
* @param downstream 一个收集器对象
*/
groupingBy(Function<? super T, ? extends K> classifier,Collector<? super T, A, D> downstream);

/**
* 有三个参数,groupingBy分组的具体实现。
*/
groupingBy(Function<? super T, ? extends K> classifier,Supplier<M> mapFactory,Collector<? super T, A, D> downstream);

可以把第二个groupingBy收集器传递给外层收集器来实现多级分组。 但进一步说,传递给第一个groupingBy的第二个收集器可以是任何类型,而不一定是另一个groupingBy

收集器返回的结果可能是Optional包装后的对象,对于多级分组来说,第二个收集器对象参数返回的Optional对象可能没什么用。第一层groupingBy已经把为空的情况给排除掉了。

Collectors收集器提供了collectingAndThen方法将收集器进行转换。

1
2
3
4
5
6
/**
* 返回转换后的另一个收集器
* @param downstream 需要转换的收集器
* @param finisher 转换函数
*/
Collector<T,A,RR> Collectors.collectingAndThen(Collector<T,A,R> downstream,Function<R,RR> finisher);

groupingBy可以联合其它收集器使用,经常使用的是mapping方法。它可以让接受特定类型元素的收集器适应不同类型的对象。

1
2
3
4
5
/**
* @param mapper 一个函数型接口,对流中的元素做映射
* @param downstream 一个收集器,将映射后的元素收集起来
*/
Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,Collector<? super U, A, R> downstream);

4. 分区

partitioningBy分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。

分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它多可以分为两组——true是一组,false是一组。

partitioningBy收集器有两个重载的方法:

1
2
3
4
5
6
7
8
9
10
11
/**
* 内部调用了两个参数的重载方法,第二个参数传递的是一个Collectors.toList()收集器。
* @param predicate 断言型接口
*/
Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate);

/**
* @param predicate 断言型接口
* @param downstream 收集器
*/
Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,Collector<? super T, A, D> downstream);

分区也可用相同的谓词使用filter筛选来实现。

分区的好处:

  • 保留了分区函数返回true或false的两套流元素列表。
  • 使用filter筛选需要操作两次,一次利用谓词,一次利用谓词的非。

第七章:并行数据处理与性能

并行处理数据一定比串行处理快吗?

答案是不一定。

假设你现在要进行数字累加的操作,例如计算1至100万之间所有数字的和。

现在有三种方案可供选择。

方案一:原始迭代方式。

1
2
3
4
5
6
7
public static long iterateSum(long n) {
long result = 0;
for (int i = 0; i <= n; i++) {
result += i;
}
return result;
}

方案二:使用Stream串行流处理。

1
2
3
4
5
6
public static long streamSum(long n) {
Long result = Stream.iterate(0L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
return result;
}

方案三:使用ParallelStream并行流处理。

1
2
3
4
5
6
7
public static long parallelSum(long n) {
Long result = Stream.iterate(0L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
return result;
}

原始迭代方案中不需要对原始类型的数字进行装箱操作。

串行流方案中利用Stream接口的iterate方法生成0到n的自然数流,默认是串行流,然后归约求和。

并行流方案中利用Stream接口的iterate方法生成0到n的自然数流,使用parallel方法将流转换成并行流,然后归约求和。

测试方法:运行10次,取最短时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static long measureSumPerf(Function<Long,Long> adder,long n) {
long fastest = Integer.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime() - start) / 1_100_100;
if (duration < fastest) {
fastest = duration;
}
}
return fastest;
}

// test
public static void main(String[] args) {
// 原始迭代方式更快,它不需要对原始类型进行装箱/拆箱操作
System.out.println("iterate sum min time:" + measureSumPerf(
StreamPerformanceTest::iterateSum, 100000000));
// iterate生成的是装箱的对象,必须拆箱成数字才能求和。
System.out.println("stream sum min time:" + measureSumPerf(
StreamPerformanceTest::streamSum, 100000000));
// 整个数字在过程开始时并没有准备好,无法有效的把流划分为小块来并行处理。
System.out.println("parallel stream sum min time:" + measureSumPerf(
StreamPerformanceTest::parallelSum, 100000000));
}

测试结果:

1
2
3
原始迭代方案最快:iterate sum min time:11
串行流方案第二快:stream sum min time:198
并行流方案最慢:parallel stream sum min time:267

可见并行执行并不比串行执行快。但实际上这次试验的很大一部分时间消耗在对原始数据类型进行装箱的操作上。因为Stream接口的iterate方法生成的是包装对象,求和时需要拆箱成数字。同时使用iterate方法在程序开始时并没有把整个数字序列准备好,无法有效的把流划分为小块来并行处理。

接下来使用原始类型特化流来生成数字流。

原始类型特化流串行处理:

1
2
3
4
public static long longStreamSum(long n) {
long result = LongStream.rangeClosed(1, n).reduce(0L, Long::sum);
return result;
}

原始类型特化流并行处理:

1
2
3
4
public static long longParallelSum(long n) {
long result = LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);
return result;
}

测试代码:

1
2
3
4
5
// LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销。
System.out.println("long stream sum min time:" + measureSumPerf(
StreamPerformanceTest::longStreamSum,100000000));
// LongStream.rangeClosed在过程开始时就会生成数字范围,很容易拆分为独立的小块进行并行处理。
System.out.println("long parallel stream sum min time:" + measureSumPerf(StreamPerformanceTest::longParallelSum,100000000));

测试结果:

1
2
原始类型特化流串行处理:long stream sum min time:7
原始类型特化流并行处理:long parallel stream sum min time:1

LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销。

LongStream.rangeClosed在过程开始时就会生成数字范围,很容易拆分为独立的小块进行并行处理。

并行流有时候比串行流慢的原因

并行流有时候比串行流慢的原因:

  1. 没有使用合理的使用数据结构,导致时间浪费在其它非流处理的操作上。
  2. 并行流的底层实现是Fork/Join框架。它是基于多线程的,线程之间进行上下文切换需要耗时。同时线程是操作系统进行调度的。线程自己无法控制时间。

Fork/Join框架

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。体现了分治法的思想。Fork的意思是拆分,Join的意思是合并。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和, 最终汇总这10个子任务的结果。Fork/Join框架的运行流程图如下:

Fork-Join.png

Fork/Join的拆分重点在于任务什么时候足够小或不可分,不可再拆分的任务顺序执行,不同的任务之间并行执行。所有不可拆分的子任务分配到多个任务队列中等待线程去执行,每个队列都有一个单独的线程去执行任务。理想情况下,划分并行任务时,应该让每个任务都用相同的时间完成,让所有的CPU都同样繁忙,充分的利用CPU。但实际中,由于线程是由操作系统根据时间片进行调度的,每个子任务所花的时间可能天差地别。于是会出现一个任务队列的任务全部执行完了,另一个队列中还有很多任务的情况。这个时候CPU的利用率没有最大化。Java 8采用工作窃取算法来解决这一问题。

工作窃取(work-stealing)算法

工作窃取算法是指某个线程从其它队列中窃取任务来执行。某个线程,自己队列中的任务执行完了,就去别的还有任务的队列中窃取一个任务来执行。这个时候会存在线程竞争关系,窃取线程和被窃取线程之间同时访问同一个队列。通常使用双端队列来解决。被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点:

充分利用线程进行并行计算,减少了线程间的竞争。

工作窃取算法的缺点:

在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

Fork/Join框架的设计思路

  1. 首先需要一个Fork类去把大任务进行递归Fork拆分,直至拆分成不可再分的小任务。
  2. 拆分出来的子任务均匀地分配到n个双端队列中,启动n个线程分别从双端队列中获取任务执行。每个子任务执行完的结果统一放在一个结果队列中,启动一个线程从结果队列中取结果,然后Join合并成最终结果。