Akka流:读取多个文件
我有一个文件列表。我想要:Akka流:读取多个文件
- 将它们全部作为单个来源读取。
- 应按顺序读取文件。 (不循环)
- 任何文件都不应该被要求完全在内存中。
- 从文件中读取错误应该折叠流。
这感觉就像这应该工作:(斯卡拉,阿卡流v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但是,由于在一个FileIO
编译错误结果已与其相关联的物化价值,并没有按Source.combine
不支持。
映射物化价值远让我不知道文件的读取错误是如何被处理,但并编译:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但在运行时会抛出IllegalArgumentException:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
为了清楚地模块化不同的问题,下面的代码并不尽可能简洁。
// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String)
// given as stream of Paths we read those files and count the number of lines
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right)
// Here's our test data source (replace paths with real paths)
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath))
// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes
testFiles runWith lineCounter foreach println
我有一个答案走出大门 - 不要使用akka.FileIO
。这似乎工作正常,例如:
val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _)
val source = Source.fromIterator[String](() => sources)
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
我还想知道是否有更好的解决方案。
通过使用'io.Source'你失去了很多的权力。对于小文件,这可能会起作用,但它不适用于大文件。 – jarandaf
@jarandaf你能澄清吗?我的印象是,io.Source只是使用了BufferedReader,而getLines迭代器不会立即加载整个文件或类似的东西。 – randomstatistic
更好的想法,你可能是对的(虽然'FileIO'处理'ByteString'而不是'String',这意味着更高性能)。另一方面,使用'io.Source'时,总是要记住关闭源代码(默认情况下不会这样做)。 – jarandaf
更新哦,我没看到接受的答案,因为我没有刷新页面> _ <。因为我还添加了一些关于错误处理的注释,所以我会在这里留下它。
我相信下面的程序你想要做什么:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import java.nio.file.Paths
import scala.concurrent.duration._
object TestMain extends App {
implicit val actorSystem = ActorSystem("test")
implicit val materializer = ActorMaterializer()
implicit def ec = actorSystem.dispatcher
val sources = Vector("build.sbt", ".gitignore")
.map(Paths.get(_))
.map(p =>
FileIO.fromPath(p)
.viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left)
.mapMaterializedValue { f =>
f.onComplete {
case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p")
case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}")
case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e")
}
NotUsed
}
)
val finalSource = Source(sources).flatMapConcat(identity)
val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
result.onComplete {
case Success(n) => println(s"Read $n lines total")
case Failure(e) => println(s"Reading failed: $e")
}
Await.ready(result, 10.seconds)
actorSystem.terminate()
}
这里的关键是flatMapConcat()
方法:它改变流的每个元素为源,并返回这些资源如果得到元素的流它们按顺序运行。
至于处理错误,您可以在mapMaterializedValue
参数中为未来添加处理程序,也可以通过将处理程序置于物化未来值上来处理运行流的最终错误。我在上面的例子中都做过了,如果你测试它,比如在一个不存在的文件上,你会看到相同的错误信息会被打印两次。不幸的是,flatMapConcat()
没有收集物化值,坦率地说,我看不出它能够做到这一点的方式,因此如有必要,您必须单独处理它们。
我正在寻找模块,所以我明白这一点。我使用行数作为我可以对文件进行处理的一个例子,并且将'lineCounter'写为文件读取。 (它是一个水槽)但是如果我将折叠和其他所有东西都移动到其他地方,我会留下一个Flow [Path,String,NotUsed],这正是我所寻找的。 – randomstatistic
能否请您提供您的示例的导入,他们是代码的重要组成部分。 –
@OsskarWerrewka它应该都在akka.stream.scaladsl和java IO/NIO中。你有问题吗? –