阅读使用阿卡流

问题描述:

我尝试阿卡流和这里的大型文件是一小段,我有:阅读使用阿卡流

override def main(args: Array[String]) { 
    val filePath = "/Users/joe/Softwares/data/FoodFacts.csv"//args(0) 

    val file = new File(filePath) 
    println(file.getAbsolutePath) 
    // read 1MB of file as a stream 
    val fileSource = SynchronousFileSource(file, 1 * 1024 * 1024) 
    val shaFlow = fileSource.map(chunk => { 
     println(s"the string obtained is ${chunk.toString}") 
    }) 
    shaFlow.to(Sink.foreach(println(_))).run // fails with a null pointer 

    def sha256(s: String) = { 
     val messageDigest = MessageDigest.getInstance("SHA-256") 
     messageDigest.digest(s.getBytes("UTF-8")) 
    } 
    } 

当我跑了这个片段,我得到:

Exception in thread "main" java.lang.NullPointerException 
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:365) 
    at com.test.api.consumer.DataScienceBoot$.main(DataScienceBoot.scala:30) 
    at com.test.api.consumer.DataScienceBoot.main(DataScienceBoot.scala) 

在我看来,它不是fileSource只是空的?为什么是这样?有任何想法吗? FoodFacts.csv如果大小为40MB,我所要做的就是创建一个1MB的数据流!

即使使用defaultChunkSize 8192也不行!

+0

您使用的是什么版本的阿卡流?我认为SynchronousFileSource现在已被弃用 – Jatin

+0

我使用1.0。我应该使用哪一个阅读一个大文件并将这些块作为流传递?任何线索? – sparkr

以及1.0已弃用。如果可以,请使用2.x

当我使用2.0.1版本使用FileIO.fromFile(file)而不是SynchronousFileSource进行尝试时,它是编译失败,消息为fails with null pointer。这仅仅是因为它的范围没有ActorMaterializer。包括它,使其工作:

object TImpl extends App { 
import java.io.File 

    implicit val system = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    val file = new File("somefile.csv") 
    val fileSource = FileIO.fromFile(file,1 * 1024 * 1024) 
    val shaFlow: Source[String, Future[Long]] = fileSource.map(chunk => { 
    s"the string obtained is ${chunk.toString()}" 
    }) 

    shaFlow.runForeach(println(_))  
} 

这适用于任何大小的文件。有关调度程序配置的更多信息,请参阅here