ParallelStream使用的坑怎么解决
比如下面的代码片段,让人阅读的时候就像是读诗一样。但是一旦用不好,也是会要命的。
List<Integer> transactionsIds = widgets.stream() .filter(b -> b.getColor() == RED) .sorted((x,y) -> x.getWeight() - y.getWeight()) .mapToInt(Widget::getWeight) .sum();
这段代码有一个关键的函数,那就是stream。利用它,我们可以将一个普通的列表转换为流,进而利用管道的方式处理该列表。总之,用过的都说好。
对这些函数还不是太熟悉?可以参考:《到处是map、flatMap,啥意思?》
问题来了
假如我们把stream换成parallelStream,会发生什么情况?
根据字面上的意思,流会从串行 变成并行。
考虑到这是一个并行的情况,很明显会存在线程安全问题。然而,我们在此讨论的并不是需要使用线程安全集合,因为这个话题太基础了。在当今时代,学会在线程不安全的情况下使用线程安全的集合已成为一项基本技能。
这次踩坑的地方,是并行流的性能问题。
我们用代码来说话。
以下代码同时启用了8个线程,所有线程都在使用并行流进行数据计算。在执行的逻辑中,我们让每个任务都sleep 1秒钟,这样就能够模拟一些I/O请求的耗时等待。
使用stream,程序会在30秒后返回,但我们期望程序能够在1秒多返回,因为它是并行流,得对得起这个称号。
测试发现,我们等了好久,任务才执行完毕。
static void paralleTest() { List<Integer> numbers = Arrays.asList( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29 ); final long begin = System.currentTimeMillis(); numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList()); } public static void main(String[] args) { // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); new Thread(() -> paralleTest()).start(); }
坑
实际上,在不同的机器上执行,这段代码花费的时间都不一样。
既然是并行,那肯定得有个并行度。如果并行度太低,就无法发挥其能力;如果并行度太高,会浪费上下文切换的时间。我是很沮丧的发现,很多高级研发,将线程池的各种参数背的滚瓜烂熟,各种调优,竟然敢睁一只眼闭一只眼的在I/O密集型业务中用上parallelStream。
要了解这个并行度,我们需要查看具体的构造方法。在ForkJoinPool类中找到这样的代码。
try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); if (pp != null) parallelism = Integer.parseInt(pp); fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( "java.util.concurrent.ForkJoinPool.common.exceptionHandler"); } catch (Exception ignore) { } if (fac == null) { if (System.getSecurityManager() == null) fac = defaultForkJoinWorkerThreadFactory; else // use security-managed default fac = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP;
可以看到,并行度到底是多少,是由下面的参数来控制的。如果无法获取这个参数,则默认使用 CPU个数-1 的并行度。
可以看到,这个函数是为了计算密集型业务去设计的。当你向它分配过多的任务时,它的并行执行会降级为类似于串行的效果。
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N设置了一个初始值大小,它依然有问题。
一旦设定,parallelism变量就被设为final,禁止修改。也就是说,上面的参数只会生效一次。
张三可能使用下面的代码,设置了并行度大小为20。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
李四可能用同样的方式,设置了这个值为30。要确定项目中使用的是哪个值,需要询问JVM如何加载类信息。
这种方式并不太非常靠谱。
一种解决方式
我们可以通过提供外置的forkjoinpool,也就是改变提交方式,来实现不同类型的任务分离。
代码如下所示,通过显式的代码提交,即可实现任务分离。
ForkJoinPool pool = new ForkJoinPool(30); final long begin = System.currentTimeMillis(); try { pool.submit(() -> numbers.parallelStream().map(k -> { try { Thread.sleep(1000); System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } return k; }).collect(Collectors.toList())).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
这样,不同的场景,就可以拥有不同的并行度。手动管理资源是在这种方式和CountDownLatch之间有相似之处,它们有异曲同工之妙。
以上就是ParallelStream使用的坑怎么解决的详细内容,更多请关注其它相关文章!