剑客
关注科技互联网

Java Streams,第 5 部分: 并行流性能

系列内容:

此内容是该系列 5
部分中的第 #
部分: Java Streams,第 5 部分

https://www.ibm.com/developerworks/cn/views/java/libraryview.jsp?sort_by=&show_abstract=true&show_all=&search_flag=&contentarea_by=Java+technology&search_by=Java+Streams+%E9%83%A8%E5%88%86&topic_by=-1&type_by=%E6%89%80%E6%9C%89%E7%B1%BB%E5%88%AB&ibm-search=%E6%90%9C%E7%B4%A2

敬请期待该系列的后续内容。

此内容是该系列的一部分: Java Streams,第 5 部分

敬请期待该系列的后续内容。


Java Streams

的第 4 部分 讨论了可能影响并行化有效性的因素。这些因素包括问题的特征,用于实现解决方案的算法,用于调入任务来并行执行的运行时框架,以及数据集的大小和内存布局。本期文章会将这些概念应用于 Streams 库,介绍为什么一些流管道的并行化比其他流管道更好。

并行流

关于本系列

借助 java.util.stream
包,您可以简明地、声明性地表达集合、数组和其他数据源上可能的并行批量操作。在 Java 语言架构师 Brian Goetz 编写的这个系列 中,我们将全面了解 Streams 库并学习如何最充分地使用它。

在第 3 部分 中我们已经了解到,流管道包含一个流来源、0 或多个中间操作和一个终止操作。要执行流管道,可以构造一个 “机器”,将来自来源的元素提供给这个机器来实现中间和终止操作。要并行执行流管道,可以使用 Spliterator
方法 trySplit()
,通过递归分解将来源数据分解为分段。结果会创建一个二叉计算树,其中的叶节点分别对应于来源数据的一个分段,内部节点分别对应于将问题拆分为子任务的点,最后需要组合两个子任务的结果。顺序执行会为整个数据集构造一个机器;并行执行会为来源数据的每个分段构造一个机器,生成每个叶节点的部分结果。然后,我们沿树往上,依据特定于终止操作的合并函数,将部分结果合并到更大的结果中。例如,对于终止操作 reduce()
,用于相减的二元运算符也是合并函数;对于 collect()
Collector
有一个合并函数用于将一个结果容器合并到另一个中。

学习更多知识。开发更多项目。联系更多同行。

developerWorks Premium
订阅计划提供了强大的开发工具和资源,包括 500 篇通过 Safari Books Online 提供的顶级技术文章(包含作者编写的 Java 并发性实战
),最重要开发人员活动的大幅折扣,最新的 O’Reilly 大会的视频录像,等等。立即注册。

上一期确定了可能导致并行执行效率降低的多个因素:

  • 来源的拆分成本很高或拆分不均。
  • 合并部分结果的成本很高。
  • 问题不允许足够的可利用并行性。
  • 数据布局导致糟糕的访问位置。
  • 没有足够的数据来克服并行性的启动成本。

我们现在将查看每个考虑因素,看看它们在并行流管道中的表现情况。

来源拆分

借助并行流,我们使用 Spliterator
方法 trySplit()
将来源数据的一个分段拆分为两个。计算树中的每个节点对应于一次二叉拆分,形成一个二叉树。理想情况下,这棵树是平衡的(每个叶节点表示完全相同的工作量),而且拆分成本为 0。

这种理想状态在实际中无法实现,但一些来源比其他来源更接近理想状态。数组是最佳情形。我们可以使用一个基础数组的引用,分段的起点和终点的完整偏移来描述数组的分段。将此分段拆分为两个相等分段的成本很低:我们计算分段的中点,为前半部分创建一个新描述符,并将当前分段的开始索引移到后半部分中的第一个元素。

清单 1 给出了 ArraySpliterator
中的 trySplit()
代码。数组拥有较低的拆分成本 — 一些算术运算和创建一个对象;它们也会均匀地被拆分(得到平衡的计算树)。 ArrayList
Spliterator
拥有同样令人满意的特征。(作为一个额外优势,在拆分数组时,我们还会知道所有拆分部分的准确大小,这使我们能够通过优化来删除一些流管道中的副本。)

清单 1. ArraySpliterator.trySplit() 的实现。

public Spliterator<T> trySplit() {
    int lo = index, mid = (lo + fence) >>> 1;
    return (lo >= mid)
           ? null
           : new ArraySpliterator<>(array,
                                    lo, index = mid,
                                    characteristics);
}

另一方面,链接列表的拆分结果很差。拆分成本很高 — 要找到中点,必须一次一个节点地遍历列表的一半内容。为了降低拆分成本,我们可以尝试接受不均衡的拆分 — 但这仍没有多大帮助。在极端情况下,我们最终会得到一个病态的不均衡(右侧更重)的树,每个拆分部分仅包含 (first element, rest of list)
。所以拆分部分的数量不是 O(lg n)
,而是 O(n)
,这需要 O(n)
个组合步骤。我们最终需要在两个选项中进行艰难选择:创建成本非常高(由于导致 阿姆达尔定律
的串行碎片而限制了并行性)的计算树,以及允许相对较低的并行性且拥有很高的组合成本(因为它非常不均衡)的计算树。也就是说,在链接列表中放弃并行性不是 不可能
— 只要为每个节点执行的操作的成本足够高。(参阅)

二叉树(比如 TreeMap
)和基于哈希值的集合(比如 HashSet
)的拆分结果比链接列表更好,但仍比不上数组。二叉树被拆分为两个,成本很低,而且如果树是相对平衡的,得到的计算树也将是平衡的。我们将 HashMap
实现为一个桶数组,每个桶是一个链接列表。如果该哈希函数将各个元素均匀地分散各个桶中,该集合应得到相对较好的拆分(直到您达到单个桶 (single bucket),然后您会获得一个链接列表,该列表理想情况下很小)。但是,基于树和基于哈希的集合的拆分通常都没有数组那样可预测 — 我们无法预测结果拆分部分的大小,所以在某些情况下会丧失通过优化来删除副本的能力。

生成器作为来源

不是所有流都使用集合作为来源;一些流使用生成器函数,比如 IntStream.range()
。应用于集合来源的考虑因素也可直接应用于生成器。

下面两个示例展示了两种生成由 0 到 99 组成的流的方式。此代码使用了 Stream.iterate()
(Java 9 中添加了 iterate()
的 3 参数版本):

IntStream stream1 = IntStream.iterate(0, n -> n < 100,
                                      n -> n + 1);

此代码使用了 IntStream.range()

IntStream stream2 = IntStream.range(0, 100);

这两个示例生成相同的结果,但拥有明显不同的拆分特征 — 因此将拥有显著不同的并行性能。

Stream.iterate()
接受一个初始值和两个函数(一个用于生成下一个值,另一个用于确定是否停止生成元素),类似一个 for
循环。可以直观地看到,此生成器根本上是顺序性的:仅在生成元素 n-1
后才能生成元素 n
。因此,拆分顺序生成器函数拥有与拆分链接列表相同的特征(高拆分成本与高度不均匀的拆分之间的艰难选择),而且会得到同样糟糕的并行性。

另一方面, range()
生成器拆分更像一个数组 — 计算范围的中点很容易且成本很低,无需计算介于范围中的元素。

尽管 Streams 库的设计受到函数编程原理的很大影响,但前面的示例中两个生成器函数之间的并行性能特征差异演示了针对拥有函数编程背景的用户的一个潜在陷阱。在函数编程中,从一个应用迭代式函数构造的无限流以惰性消耗方式生成一个范围,这是常见和自然的形成范围的方式 — 但这种方言出现在数据并行性完全还是理论概念的时代。函数编程人员很容易借用熟悉的 iterate
方言,而没有立即认识到此方法的内在顺序性。

结果组合

拆分来源(无论是否高效且均匀)是实现并行计算的必要成本。如果我们够幸运,拆分的成本可能比较适中,我们可以尽早开始分解工作,避免与阿姆达尔定律发生冲突。

每次拆分来源时,都会积累一种组合该拆分的中间结果的责任。在叶节点完成其输入分段上的工作后,我们继续沿树向上,不断组合结果。

一些组合操作(比如加和减)的成本很低。但其他组合(比如合并两个集)的成本要高得多。组合步骤所花的时间量与计算树的深度成正比;平衡的树将拥有深度 O(lg n)
,而病态的不平衡树(比如我们拆分一个链接列表或一个迭代生成函数所得到的树)将拥有深度 O(n)

另一个合并成本很高的问题是最后一次合并(其中将合并两半部分的结果)将顺序地执行(因为没有其他需要做的工作)。合并步骤数为 O(n)
的流管道(比如使用 sorted()
collect(Collectors.joining())
终止操作的流管道)的并行性可能受到此影响的限制。


在操作与遇到顺序紧密关联时,您可能对并行执行的时间和空间成本感到很奇怪。对于顺序执行,在显明实现中,我们通常会按遇到顺序遍历输入,所以对遇到顺序的依赖很少很明显或成本很高。在并行执行中,这种依赖性可能具有非常高的成本

操作语义

像一些来源(比如链接列表或迭代生成器函数)具有内在顺序性一样,一些流操作也拥有内在的顺序方面,我们可以将这视为并行性的一种阻碍。这些操作的语义通常以 遇到顺序
的形式定义。

例如, findFirst()
终止操作得到流中的第一个元素。(此操作通常与过滤相组合,所以最终的意图通常是 “找到满足某个条件的第一个元素。”)顺序地实施 findFirst()
具有极低的成本:沿管道推送数据,直到生成一些结果,然后停止。在并行执行中,我们可以轻松地并行化上游操作,但当结果是由某个子任务生成的结果时,我们的工作还未完成。我们仍需要等待在遇到顺序中先出现的所有子任务完成。(至少我们可以取消在遇到顺序中后出现的所有子任务。)并行执行需要辅助分解和任务管理的所有成本,但不太可能获益。另一方面,终止操作 findAny()
更有可能获得并行提速,因为它让所有核心一直繁忙地搜索匹配值,而且可以在找到一个匹配值后立即终止。

另一个语义与遇到顺序关联的终止操作是 forEachOrdered()
。同样地,尽管通常可以全面并行化中间操作的执行,但最终适合的步骤是顺序性的。另一方面, forEach()
终止操作不受遇到顺序约束。可在任何时候和提供每个元素的任何线程中对该元素执行适合的步骤。

中间操作(比如 limit()
skip()
)可能也受遇到顺序约束。 limit(n)
操作在 前 n
个元素后截断输入流。像 findFirst()
一样,当元素由某个任务生成时, limit(n)
必须等待遇到顺序中位于它之前的所有任务完成,才知道是否将这些元素推送到管道的剩余部分 — 而且它必须缓存生成的元素,直到获悉是否需要这些元素。(对于一个 无序
流, limit(n)
可以选择 任意 n 个
元素 — 而且像 findAny()
一样,更加适合并行化。)

在操作与遇到顺序紧密关联时,您可能对并行执行的时间和空间成本感到很奇怪。 findFirst()
limit()
的显明顺序实现非常简单高效,而且几乎不需要空间开销,但并行实现很复杂,通常涉及到大量的等待和缓存。对于顺序执行,在显明实现中,通常会按遇到顺序遍历输入,所以对顺序执行的依赖很少是明显或昂贵的。在并行执行中,这些依赖性可能具有很高的成本。

幸运的是,这些对顺序执行的依赖性常常可通过对管道的细微更改来消除。我们通常可将 findFirst()
替换为 findAny()
,而不损失任何正确性。类似地,我们在第 3 部分中已经看到,通过 unordered()
操作可让流变得无序,我们可以删除 limit()
distinct()
sorted()
collect()
中内在的遇到顺序依赖性,而不损失正确性。

我们目前看到的对并行提速的各种危害都是累积性的。就像 3 参数 iterate()
来源比范围构造函数的效率低得多一样,将 2 参数 iterate()
来源与 limit()
组合的效率更低,因为它将一个顺序生成步骤与一个对顺序执行敏感的操作相组合。例如,下面给出了对不适合并行化的生成整数范围的方式:

IntStream stream3 = IntStream.iterate(0, n -> n+1).limit(100);

内存位置

现代计算机系统采用复杂的多级缓存将常用数据保存在离 CPU 尽可能近的地方(实际上,光速也是一个限制因素!)。从 L1 缓存抓取数据的速度很容易达到从主存储器抓取数据的 100 倍。CPU 能越高效地预测接下来需要哪个数据,CPU 执行计算所花的周期就越多,而且等待数据所花的周期就越少。

数据按 缓存行
的粒度级别分页存储到缓存中;如今的 x86 芯片使用的缓存行大小为 64 字节。这为拥有良好的 内存位置
的程序带来了好处 — 倾向于访问离最近访问的位置很近的内存位置。线性地处理某个数组不仅会拥有不错的位置,也会通过 预取
获得进一步的好处 — 当检测到线性的内存访问模式时,硬件就会开始预先获取假定可能很快需要的下一个缓存行中的内存数据。

主流 Java 实现在内存中连续放置对象的字段和数组的元素(但字段不一定按源文件中声明的顺序放置)。访问离最近访问的字段或元素 “很近” 的字段或数组元素,很有可能获得已在缓存中的数据。另一方面,对其他对象的引用被表示为指针,所以解除对象引用很有可能获得不在缓存中的数据,因而导致延迟。

原语数组提供了最佳的位置。最初解除数组引用后,数据连续地存储在内存中,所以我们可以最大化每批数据抓取的计算量。当抓取数组中的下一个引用时,对象引用数组将获得不错的位置,但在解除这些对象引用时,可能出现缓存缺失。类似地,一个包含多个原语字段的类可能将字段彼此邻近地放在内存中,而一个包含许多对象引用的类将需要多次解除引用才能访问它的状态。最终,数据结构中的指针越多,遍历这种数据结构给内存抓取单元带来的压力就越大,这可能对计算时间(CPU 会花时间等待数据时)和并行性(因为许多核心同时从内存抓取会给可用来将数据从内存传输到缓存的带宽造成压力)产生负面影响。

NQ
模型

要确定并行性是否会带来提速,需要考虑的最后两个因素是可用的数据量和针对每个数据元素执行的计算量。

在我们最初的并行分解描述中,我们采用的概念是拆分来源,直到分段足够小,以致解决该分段上的问题的顺序方法更高效。分段大小必须依赖于所解决的问题,确切的讲,取决于每个元素完成的工作量。例如,计算一个字符串的长度涉及的工作比计算字符串的 SHA-1 哈希值要少得多。为每个元素完成的工作越多,“大到足够利用并行性” 的阈值就越低。类似地,拥有的数据越多,拆分的分段就越多,而不会与 “太小” 阈值发生冲突。

一个简单但有用的并行性能模型是 NQ
模型,其中 N
是数据元素数量, Q
是为每个元素执行的工作量。乘积 N*Q
越大,就越有可能获得并行提速。对于具有很小的 Q
的问题,比如对数字求和,您通常可能希望看到 N
> 10,000 以获得提速;随着 Q
增加,获得提速所需的数据大小将会减小。

并行化的许多阻碍(比如拆分成本、组合成本或遇到顺序敏感性)都可以通过 Q
更高的操作来缓解。尽管拆分某个 LinkedList
特征的结果可能很糟糕,但只要拥有足够大的 Q
,仍然可能获得并行提速。

结束语

因为并行性通常仅提供了加快运行时间的潜力,所以仅在它能带来实际提速时才应使用它。使用顺序流开发和测试您的代码;然后,如果您的性能需求暗示需要进一步改进,可考虑采用并行性作为可能的优化战略。尽管度量对确保您的优化工作不会适得其反至关重要,但对于许多流管道,您可以通过检查来确定它们不太适合并行性。损害潜在并行提速的因素包括不良或不均匀的拆分来源、高组合成本、对遇到顺序的依赖性、位置不佳或没有足够的数据。另一方面,每个元素的大计算量 ( Q
) 可以弥补其中一些缺陷。

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址