Java 8 Friday:使用Streams API时的10个细微错误

Data Geekery ,我们喜欢Java。 而且,由于我们真的很喜欢jOOQ的流畅的API和查询DSL ,我们对Java 8将为我们的生态系统带来什么感到非常兴奋。

Java 8星期五

每个星期五,我们都会向您展示一些不错的教程风格的Java 8新功能,这些功能利用了lambda表达式,扩展方法和其他好东西。 您可以在GitHub上找到源代码


使用Streams API时的10个细微错误

我们已经完成了所有SQL错误列表:

但是我们还没有用Java 8列出十大错误列表! 对于今天的场合( 它的黑色星期五 ),我们会赶上什么会在你的应用程序出错,当你使用Java 8个工作(它不会发生在我们身上,因为我们坚持了Java 6的另一个而)。

1.意外重用流

想打赌,这至少每个人都会发生一次。 像现有的“流”(例如InputStream )一样,您只能使用一次流。 以下代码不起作用:

IntStream stream = IntStream.of(1, 2);
stream.forEach(System.out::println);

// That was fun! Let's do it again!
stream.forEach(System.out::println);

您会得到:

java.lang.IllegalStateException: 
  stream has already been operated upon or closed

因此,在使用流时要小心。 只能执行一次。

2.意外创建“无限”流

您可以很容易地创建无限流而无需注意。 请看以下示例:

// Will run indefinitely
IntStream.iterate(0, i -> i + 1)
         .forEach(System.out::println);

如果您将流设计为无限的,那么流的全部要点就是事实。 唯一的问题是,您可能不需要这样做。 因此,请确保始终设置适当的限制:

// That's better
IntStream.iterate(0, i -> i + 1)
         .limit(10)
         .forEach(System.out::println);

3.意外地创建“微妙”的无限流

我们不能这么说。 您最终意外地创建无限流。 以以下流为例:

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .distinct()
         .limit(10)
         .forEach(System.out::println);

所以…

  • 我们生成交替的0和1
  • 那么我们只保留不同的值,即单个0和单个1
  • 那么我们将流的大小限制为10
  • 然后我们消耗它

好吧…… distinct()操作不知道提供给iterate()方法的函数只会产生两个不同的值。 它可能会期望更多。 因此它将永远消耗流中的新值,并且永远不会达到limit(10) 不幸的是,您的应用程序停顿了。

4.意外地创建“微妙”的并行无限流

我们确实需要坚持,您可能会意外地尝试消耗无限的流。 让我们假设您认为 distinct()操作应该并行执行。 您可能正在写:

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .parallel()
         .distinct()
         .limit(10)
         .forEach(System.out::println);

现在,我们已经看到,这种情况将永远发生。 但至少在以前,您仅消耗计算机上的一个CPU。 现在,您可能会消耗其中的四个,可能会意外地无限消耗流,从而几乎占据整个系统。 真不好 之后,您可能可以硬重启服务器/开发计算机。 在爆炸之前,最后查看一下我的笔记本电脑的外观:

Java 8 Friday:使用Streams API时的10个细微错误

如果我是笔记本电脑,这就是我想要的方式。

5.混合操作顺序

那么,为什么我们坚持要您绝对无意中创建无限流? 这很简单。 因为您可能只是偶然地这样做。 如果您切换limit()distinct()的顺序,则可以完美地使用上述流:

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .limit(10)
         .distinct()
         .forEach(System.out::println);

现在产生:

0
1

为什么? 因为我们首先将无限流限制为10个值(0 1 0 1 0 1 0 1 0 1),然后再将有限流减少为无限值包含在其中(0 1)。

当然,这在语义上可能不再正确,因为您确实希望从一组数据中获得前10个不同的值(您刚好“忘记”了数据是无限的)。 没有人真正想要10个随机值,然后才将它们减小到与众不同。

如果您来自SQL背景,那么您可能不会想到这样的差异。 以SQL Server 2012为例。 以下两个SQL语句相同:

-- Using TOP
SELECT DISTINCT TOP 10 *
FROM i
ORDER BY ..

-- Using FETCH
SELECT *
FROM i
ORDER BY ..
OFFSET 0 ROWS
FETCH NEXT 10 ROWS ONLY

因此,作为SQL专家,您可能没有意识到流操作顺序的重要性。

Java 8 Friday:使用Streams API时的10个细微错误

6.再次混合操作顺序

说到SQL,如果您是MySQL或PostgreSQL的人,则可能会习惯LIMIT .. OFFSET子句。 SQL充满了微妙的怪癖,这就是其中之一。 OFFSET子句应用首先 ,在SQL Server 2012中的(即建议的SQL:2008标准的)语法。

如果将MySQL / PostgreSQL的方言直接转换为流,则可能会出错:

IntStream.iterate(0, i -> i + 1)
         .limit(10) // LIMIT
         .skip(5)   // OFFSET
         .forEach(System.out::println);

以上收益

5
6
7
8
9

是。 它不会在9之后继续,因为现在应用limit() ,生成(0 1 2 3 4 5 6 7 8 9)。 之后应用skip() ,将流减少到(5 6 7 8 9)。 不是您可能想要的。

注意LIMIT .. OFFSET"OFFSET .. LIMIT"陷阱!

7.使用过滤器遍历文件系统

以前我们已经在博客上写过 似乎一个好主意是使用过滤器遍历文件系统:

Files.walk(Paths.get("."))
     .filter(p -> !p.toFile().getName().startsWith("."))
     .forEach(System.out::println);

上面的流似乎仅在非隐藏目录(即不以点开头的目录)中移动。 不幸的是,您又犯了#5和#6错误。 walk()已经产生了当前目录的整个子目录流。 虽然懒惰,但逻辑上包含所有子路径。 现在,过滤器将正确过滤出名称以点“。”开头的路径。 例如.git.idea将不属于结果流。 但是这些路径将是: .\.git\refs.\.idea\libraries 不是你想要的。

现在,不要通过编写以下内容解决此问题:

Files.walk(Paths.get("."))
     .filter(p -> !p.toString().contains(File.separator + "."))
     .forEach(System.out::println);

尽管这将产生正确的输出,但仍将通过遍历完整的目录子树,然后递归到“隐藏”目录的所有子目录中来实现。

我猜您将不得不再次使用旧的JDK 1.0 File.list() 好消息是, FilenameFilterFileFilter都是功能接口。

8.修改流的后备集合

在迭代List ,一定不能在迭代主体中修改相同的列表。 在Java 8之前确实如此,但是对于Java 8流,它可能变得更加棘手。 考虑以下来自0..9的列表:

// Of course, we create this list using streams:
List<Integer> list = 
IntStream.range(0, 10)
         .boxed()
         .collect(toCollection(ArrayList::new));

现在,假设我们要在使用每个元素时将其删除:

list.stream()
    // remove(Object), not remove(int)!
    .peek(list::remove)
    .forEach(System.out::println);

有趣的是,这将适用于某些元素! 您可能获得的输出是以下内容:

0
2
4
6
8
null
null
null
null
null
java.util.ConcurrentModificationException

如果我们在捕获到异常之后对列表进行内省,那么就会发现一个有趣的发现。 我们会得到:

[1, 3, 5, 7, 9]

嘿,它对所有奇数都有效。 这是错误吗? 不,它看起来像个功能。 如果您正在研究JDK代码,则可以在ArrayList.ArraListSpliterator找到以**释:

/*
 * If ArrayLists were immutable, or structurally immutable (no
 * adds, removes, etc), we could implement their spliterators
 * with Arrays.spliterator. Instead we detect as much
 * interference during traversal as practical without
 * sacrificing much performance. We rely primarily on
 * modCounts. These are not guaranteed to detect concurrency
 * violations, and are sometimes overly conservative about
 * within-thread interference, but detect enough problems to
 * be worthwhile in practice. To carry this out, we (1) lazily
 * initialize fence and expectedModCount until the latest
 * point that we need to commit to the state we are checking
 * against; thus improving precision.  (This doesn't apply to
 * SubLists, that create spliterators with current non-lazy
 * values).  (2) We perform only a single
 * ConcurrentModificationException check at the end of forEach
 * (the most performance-sensitive method). When using forEach
 * (as opposed to iterators), we can normally only detect
 * interference after actions, not before. Further
 * CME-triggering checks apply to all other possible
 * violations of assumptions for example null or too-small
 * elementData array given its size(), that could only have
 * occurred due to interference.  This allows the inner loop
 * of forEach to run without any further checks, and
 * simplifies lambda-resolution. While this does entail a
 * number of checks, note that in the common case of
 * list.stream().forEach(a), no checks or other computation
 * occur anywhere other than inside forEach itself.  The other
 * less-often-used methods cannot take advantage of most of
 * these streamlinings.
 */

现在,检查当我们告诉流产生sorted()结果时会发生什么:

list.stream()
    .sorted()
    .peek(list::remove)
    .forEach(System.out::println);

现在将产生以下“预期”输出

0
1
2
3
4
5
6
7
8
9

和流消费后的清单? 它是空的:

[]

因此,所有元素都被消耗并正确删除。 sorted()操作是“有状态中间操作” ,这意味着后续操作不再对后备集合进行操作,而是对内部状态进行操作。 现在从列表中删除元素是“安全的”!

好吧,我们真的可以吗? 让我们继续进行parallel()sorted()移除:

list.stream()
    .sorted()
    .parallel()
    .peek(list::remove)
    .forEach(System.out::println);

现在产生:

7
6
2
5
8
4
1
0
9
3

并且列表包含

[8]

真是的 我们没有删除所有元素! 解决此流难题的任何人都可以免费获得啤酒( 和jOOQ贴纸 )!

所有这些看起来都是非常随机和微妙的,我们只能建议您在使用流时不要真正修改后备集合。 就是行不通。

9.忘记实际消耗流

您如何看待以下信息流?

IntStream.range(1, 5)
         .peek(System.out::println)
         .peek(i -> { 
              if (i == 5) 
                  throw new RuntimeException("bang");
          });

阅读此内容时,您可能会认为它将打印(1 2 3 4 5),然后引发异常。 但这是不正确的。 它什么也不会做。 流只是坐在那里,从来没有被消耗过。

与任何流畅的API或DSL一样,您实际上可能会忘记调用“终端”操作。 当您使用peek()时尤其如此,因为peek()forEach()非常相似。

当您忘记调用execute()fetch()时, jOOQ可能会发生同样的情况:

DSL.using(configuration)
   .update(TABLE)
   .set(TABLE.COL1, 1)
   .set(TABLE.COL2, "abc")
   .where(TABLE.ID.eq(3));

哎呀。 没有execute()

Java 8 Friday:使用Streams API时的10个细微错误


是的,“最佳”方法-1-2次警告!

10.并行流死锁

现在这真是个好东西!

如果您未正确同步所有事物,则所有并发系统都可能陷入死锁。 虽然找不到真实的例子很明显,但找到强制的例子很明显。 保证以下parallel()流会陷入死锁:

Object[] locks = { new Object(), new Object() };

IntStream
    .range(1, 5)
    .parallel()
    .peek(Unchecked.intConsumer(i -> {
        synchronized (locks[i % locks.length]) {
            Thread.sleep(100);

            synchronized (locks[(i + 1) % locks.length]) {
                Thread.sleep(50);
            }
        }
    }))
    .forEach(System.out::println);

请注意Unchecked.intConsumer()的使用,该函数将功能性IntConsumer接口转换为org.jooq.lambda.fi.util.function.CheckedIntConsumer ,允许抛出已检查的异常。

好。 您的机器运气不好。 这些线程将永远被阻塞!

好消息是,用Java编写死锁的教科书示例从未如此简单!

有关更多详细信息,另请参见Brian Goetz对Stack Overflow的此问题的回答

结论

借助流和功能性思维,我们将遇到大量新的,细微的错误。 除了实践和保持专注之外,几乎无法避免这些错误。 您必须考虑如何订购您的手术。 您必须考虑流是否可能是无限的。

流(和lambda)是一个非常强大的工具。 但是首先我们需要掌握一种工具。

翻译自: https://www.javacodegeeks.com/2014/06/java-8-friday-10-subtle-mistakes-when-using-the-streams-api.html