Akka流 - 将一串ByteString分割为多个文件

Akka流 - 将一串ByteString分割为多个文件

问题描述:

我试图将一个传入的Akka字节流(来自http请求的正文,但也可能来自一个文件)分成多个定义的文件尺寸。Akka流 - 将一串ByteString分割为多个文件

例如,如果我正在上传10Gb文件,它会创建类似10G的1个文件。这些文件会有随机生成的名称。我的问题是,我不知道从哪里开始,因为我读过的所有响应和示例都将整个块存储到内存中,或者使用基于字符串的分隔符。除非我真的不能拥有1Gb的“块”,然后就把它们写到磁盘上。

有没有容易方法来执行那种操作?我唯一的想法是使用类似http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings的东西,但转换为FlowShape[ByteString, File]之类的东西,将自己写入文件大块直到达到最大文件大小,然后创建一个新文件等,然后流回创建的文件。它看起来像没有使用正确阿卡一个穷凶极恶的想法..提前

感谢

我经常回到纯粹的功能,非阿卡,对于像这样的问题的技术,然后“升降机”这些功能集成到阿卡结构。我的意思是我尽量只使用Scala的“东西”,然后尝试换阿卡以后里面的东西......

文件创建

基于“随机生成FileOutputStream创建启动名称“:

def randomFileNameGenerator : String = ??? //not specified in question 

import java.io.FileOutputStream 

val randomFileOutGenerator :() => FileOutputStream = 
() => new FileOutputStream(randomFileNameGenerator) 

国家

需要有存储的某种方式‘当前文件的状态’,例如,字节数已经写好:

case class FileState(byteCount : Int = 0, 
        fileOut : FileOutputStream = randomFileOutGenerator()) 

文件写入

首先我们确定我们是否会违反与指定ByteString的最大文件大小阈值:

import akka.util.ByteString 

val isEndOfChunk : (FileState, ByteString, Int) => Boolean = 
    (state, byteString, maxBytes) => 
    state.byteCount + byteString.length > maxBytes 

那么我们有编写新的函数,如果我们已经清除了当前函数的容量,或者返回当前状态(如果它仍然低于容量),则创建新的FileState

val closeFileInState : FileState => Unit = 
    (_ : FileState).fileOut.close() 

val getCurrentFileState(FileState, ByteString, Int) => FileState = 
    (state, byteString, maxBytes) => 
    if(isEndOfChunk(maxBytes, state, byteString)) { 
     closeFileInState(state) 
     FileState() 
    } 
    else 
     state 

剩下的唯一一件事就是写FileOutputStream

val writeToFileAndReturn(FileState, ByteString) => FileState = 
    (fileState, byteString) => { 
    fileState.fileOut write byteString.toArray 
    fileState copy (byteCount = fileState.byteCount + byteString.size) 
    } 

//the signature ordering will become useful 
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =  
    writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)  

折任何GenTraversableOnce

Scala中的一个GenTraversableOnce的任何集合,平行与否,具有折叠运算符。这些包括迭代器,矢量,数组,Seq,Scala流......个最终writeToChunkedFile功能完全匹配的GenTraversableOnce#fold签名:

val anyIterable : Iterable = ??? 

val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes)) 

最后一个松散端;最后的FileOutputStream也需要关闭。由于折只会发出,去年FileState我们可以关闭一个:

closeFileInState(finalFileState) 

阿卡流

阿卡流量获取其foldFlowOps#fold这恰好符合GenTraversableOnce签名。因此,我们可以“提升”我们的常规功能为类似的方式流值,我们使用Iterable倍:

import akka.stream.scaladsl.Flow 

def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] = 
    Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes)) 

约定期功能处理问题的好处是,他们可以超越流以外的异步框架内使用,例如期货或演员。你也不需要在单元测试中使用akka ActorSystem,只需要定期的语言数据结构。

import akka.stream.scaladsl.Sink 
import scala.concurrent.Future 

def byteStringSink(maxBytes : Int) : Sink[ByteString, _] = 
    chunkerFlow(maxBytes) to (Sink foreach closeFileInState) 

然后,您可以使用此SinkHttpEntityHttpRequest到来。

+0

哇谢谢,这是一个非常详细的回应!没有那样做。在'GraphStageLogic'中,这种方法与'fold'之间的任何区别,并且具有相同的逻辑(保持状态并手动创建输出流)?例如(根据您的评论+我以前的链接)http://pastebin.com/tzLFAmzk?最小的优点是,它允许创建文件时尽快创建文件(但代码更长,更容易出错)。 – Vuzi

+0

@Vuzi欢迎您。我在“真实世界”中看到的方法和“GraphStageLogic”之间的主要区别在于测试。通过我的函数,所有的测试都可以在没有'ActorSystem','ActorMaterializer'和'ExecutionContext'的情况下完成。如果你把你的逻辑放在akka流构造中,那么你需要所有的akka​​框架来测试这个逻辑。快乐的黑客攻击。 –

您可以编写自定义图形阶段。 您的问题类似于在上传到亚马逊S3期间在alpakka中遇到的问题。 (谷歌alpakka s3连接器..他们不会让我发布超过2个链接)

但由于某些原因,s3连接器DiskBuffer写入整个传入来源的字节串到文件,然后发出该块进行进一步的流处理..

我们想要的东西类似于limit a source of byte strings to specific length。在这个例子中,他们通过维护一个内存缓冲区,将传入的Source [ByteString,_]限制为一个固定大小的byteStrings的源。我采用它来处理文件。 这样做的好处是您可以使用专用线程池来执行阻塞IO。要获得良好的反应流,您希望阻止IO在actor系统中的单独线程池中。 PS:这不会尝试生成确切大小的文件..所以如果我们在100MB文件中读取2KB的额外文件,我们会将这些额外的字节写入当前文件,而不是尝试实现确切的大小。

import java.io.{FileOutputStream, RandomAccessFile} 
import java.nio.channels.FileChannel 
import java.nio.file.Path 

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} 
import akka.stream._ 
import akka.util.ByteString 

case class MultipartUploadChunk(path: Path, size: Int, partNumber: Int) 
//Starts writing the byteStrings received from upstream to a file. Emits a path after writing a partSize number of bytes. Does not attemtp to write exact number of bytes. 
class FileChunker(maxSize: Int, tempDir: Path, partSize: Int) 
    extends GraphStage[FlowShape[ByteString, MultipartUploadChunk]] { 

    assert(maxSize > partSize, "Max size should be larger than part size. ") 

    val in: Inlet[ByteString] = Inlet[ByteString]("PartsMaker.in") 
    val out: Outlet[MultipartUploadChunk] = Outlet[MultipartUploadChunk]("PartsMaker.out") 

    override val shape: FlowShape[ByteString, MultipartUploadChunk] = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) with OutHandler with InHandler { 

     var partNumber: Int = 0 
     var length: Int = 0 
     var currentBuffer: Option[PartBuffer] = None 

     override def onPull(): Unit = 
     if (isClosed(in)) { 
      emitPart(currentBuffer, length) 
     } else { 
      pull(in) 
     } 

     override def onPush(): Unit = { 
     val elem = grab(in) 
     length += elem.size 
     val currentPart: PartBuffer = currentBuffer match { 
      case Some(part) => part 
      case None => 
      val newPart = createPart(partNumber) 
      currentBuffer = Some(newPart) 
      newPart 
     } 
     currentPart.fileChannel.write(elem.asByteBuffer) 
     if (length > partSize) { 
      emitPart(currentBuffer, length) 
      //3. .increment part number, reset length. 
      partNumber += 1 
      length = 0 
     } else { 
      pull(in) 
     } 
     } 

     override def onUpstreamFinish(): Unit = 
     if (length > 0) emitPart(currentBuffer, length) // emit part only if something is still left in current buffer. 

     private def emitPart(maybePart: Option[PartBuffer], size: Int): Unit = maybePart match { 
     case Some(part) => 
      //1. flush the part buffer and truncate the file. 
      part.fileChannel.force(false) 
      //   not sure why we do this truncate.. but was being done in alpakka. also maybe safe to do. 
//     val ch = new FileOutputStream(part.path.toFile).getChannel 
//   try { 
//   println(s"truncating to size $size") 
//   ch.truncate(size) 
//   } finally { 
//   ch.close() 
//   } 
      //2emit the part 
      val chunk = MultipartUploadChunk(path = part.path, size = length, partNumber = partNumber) 
      push(out, chunk) 
      part.fileChannel.close() // TODO: probably close elsewhere. 
      currentBuffer = None 
      //complete stage if in is closed. 
      if (isClosed(in)) completeStage() 
     case None => if (isClosed(in)) completeStage() 
     } 

     private def createPart(partNum: Int): PartBuffer = { 
     val path: Path = partFile(partNum) 
     //currentPart.deleteOnExit() //TODO: Enable in prod. requests that the file be deleted when VM dies. 
     PartBuffer(path, new RandomAccessFile(path.toFile, "rw").getChannel) 
     } 

     /** 
     * Creates a file in the temp directory with name bmcs-buffer-part-$partNumber 
     * @param partNumber the part number in multipart upload. 
     * @return 
     * TODO:add unique id to the file name. for multiple 
     */ 
     private def partFile(partNumber: Int): Path = 
     tempDir.resolve(s"bmcs-buffer-part-$partNumber.bin") 
     setHandlers(in, out, this) 
    } 

    case class PartBuffer(path: Path, fileChannel: FileChannel) //TODO: see if you need mapped byte buffer. might be ok with just output stream/channel. 

}