Akka流 - 将一串ByteString分割为多个文件
我试图将一个传入的Akka字节流(来自http请求的正文,但也可能来自一个文件)分成多个定义的文件尺寸。Akka流 - 将一串ByteString分割为多个文件
例如,如果我正在上传10Gb文件,它会创建类似10G的1个文件。这些文件会有随机生成的名称。我的问题是,我不知道从哪里开始,因为我读过的所有响应和示例都将整个块存储到内存中,或者使用基于字符串的分隔符。除非我真的不能拥有1Gb的“块”,然后就把它们写到磁盘上。
有没有容易方法来执行那种操作?我唯一的想法是使用类似http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings的东西,但转换为FlowShape[ByteString, File]
之类的东西,将自己写入文件大块直到达到最大文件大小,然后创建一个新文件等,然后流回创建的文件。它看起来像没有使用正确阿卡一个穷凶极恶的想法..提前
感谢
我经常回到纯粹的功能,非阿卡,对于像这样的问题的技术,然后“升降机”这些功能集成到阿卡结构。我的意思是我尽量只使用Scala的“东西”,然后尝试换阿卡以后里面的东西......
文件创建
基于“随机生成FileOutputStream
创建启动名称“:
def randomFileNameGenerator : String = ??? //not specified in question
import java.io.FileOutputStream
val randomFileOutGenerator :() => FileOutputStream =
() => new FileOutputStream(randomFileNameGenerator)
国家
需要有存储的某种方式‘当前文件的状态’,例如,字节数已经写好:
case class FileState(byteCount : Int = 0,
fileOut : FileOutputStream = randomFileOutGenerator())
文件写入
首先我们确定我们是否会违反与指定ByteString
的最大文件大小阈值:
import akka.util.ByteString
val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
(state, byteString, maxBytes) =>
state.byteCount + byteString.length > maxBytes
那么我们有编写新的函数,如果我们已经清除了当前函数的容量,或者返回当前状态(如果它仍然低于容量),则创建新的FileState
:
val closeFileInState : FileState => Unit =
(_ : FileState).fileOut.close()
val getCurrentFileState(FileState, ByteString, Int) => FileState =
(state, byteString, maxBytes) =>
if(isEndOfChunk(maxBytes, state, byteString)) {
closeFileInState(state)
FileState()
}
else
state
剩下的唯一一件事就是写FileOutputStream
:
val writeToFileAndReturn(FileState, ByteString) => FileState =
(fileState, byteString) => {
fileState.fileOut write byteString.toArray
fileState copy (byteCount = fileState.byteCount + byteString.size)
}
//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =
writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)
折任何GenTraversableOnce
Scala中的一个GenTraversableOnce
的任何集合,平行与否,具有折叠运算符。这些包括迭代器,矢量,数组,Seq,Scala流......个最终writeToChunkedFile
功能完全匹配的GenTraversableOnce#fold签名:
val anyIterable : Iterable = ???
val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))
最后一个松散端;最后的FileOutputStream
也需要关闭。由于折只会发出,去年FileState
我们可以关闭一个:
closeFileInState(finalFileState)
阿卡流
阿卡流量获取其fold
从FlowOps#fold这恰好符合GenTraversableOnce
签名。因此,我们可以“提升”我们的常规功能为类似的方式流值,我们使用Iterable
倍:
import akka.stream.scaladsl.Flow
def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] =
Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))
约定期功能处理问题的好处是,他们可以超越流以外的异步框架内使用,例如期货或演员。你也不需要在单元测试中使用akka ActorSystem
,只需要定期的语言数据结构。
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
def byteStringSink(maxBytes : Int) : Sink[ByteString, _] =
chunkerFlow(maxBytes) to (Sink foreach closeFileInState)
然后,您可以使用此Sink
排HttpEntity
从HttpRequest
到来。
您可以编写自定义图形阶段。 您的问题类似于在上传到亚马逊S3期间在alpakka中遇到的问题。 (谷歌alpakka s3连接器..他们不会让我发布超过2个链接)
但由于某些原因,s3连接器DiskBuffer写入整个传入来源的字节串到文件,然后发出该块进行进一步的流处理..
我们想要的东西类似于limit a source of byte strings to specific length。在这个例子中,他们通过维护一个内存缓冲区,将传入的Source [ByteString,_]限制为一个固定大小的byteStrings的源。我采用它来处理文件。 这样做的好处是您可以使用专用线程池来执行阻塞IO。要获得良好的反应流,您希望阻止IO在actor系统中的单独线程池中。 PS:这不会尝试生成确切大小的文件..所以如果我们在100MB文件中读取2KB的额外文件,我们会将这些额外的字节写入当前文件,而不是尝试实现确切的大小。
import java.io.{FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel
import java.nio.file.Path
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream._
import akka.util.ByteString
case class MultipartUploadChunk(path: Path, size: Int, partNumber: Int)
//Starts writing the byteStrings received from upstream to a file. Emits a path after writing a partSize number of bytes. Does not attemtp to write exact number of bytes.
class FileChunker(maxSize: Int, tempDir: Path, partSize: Int)
extends GraphStage[FlowShape[ByteString, MultipartUploadChunk]] {
assert(maxSize > partSize, "Max size should be larger than part size. ")
val in: Inlet[ByteString] = Inlet[ByteString]("PartsMaker.in")
val out: Outlet[MultipartUploadChunk] = Outlet[MultipartUploadChunk]("PartsMaker.out")
override val shape: FlowShape[ByteString, MultipartUploadChunk] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
var partNumber: Int = 0
var length: Int = 0
var currentBuffer: Option[PartBuffer] = None
override def onPull(): Unit =
if (isClosed(in)) {
emitPart(currentBuffer, length)
} else {
pull(in)
}
override def onPush(): Unit = {
val elem = grab(in)
length += elem.size
val currentPart: PartBuffer = currentBuffer match {
case Some(part) => part
case None =>
val newPart = createPart(partNumber)
currentBuffer = Some(newPart)
newPart
}
currentPart.fileChannel.write(elem.asByteBuffer)
if (length > partSize) {
emitPart(currentBuffer, length)
//3. .increment part number, reset length.
partNumber += 1
length = 0
} else {
pull(in)
}
}
override def onUpstreamFinish(): Unit =
if (length > 0) emitPart(currentBuffer, length) // emit part only if something is still left in current buffer.
private def emitPart(maybePart: Option[PartBuffer], size: Int): Unit = maybePart match {
case Some(part) =>
//1. flush the part buffer and truncate the file.
part.fileChannel.force(false)
// not sure why we do this truncate.. but was being done in alpakka. also maybe safe to do.
// val ch = new FileOutputStream(part.path.toFile).getChannel
// try {
// println(s"truncating to size $size")
// ch.truncate(size)
// } finally {
// ch.close()
// }
//2emit the part
val chunk = MultipartUploadChunk(path = part.path, size = length, partNumber = partNumber)
push(out, chunk)
part.fileChannel.close() // TODO: probably close elsewhere.
currentBuffer = None
//complete stage if in is closed.
if (isClosed(in)) completeStage()
case None => if (isClosed(in)) completeStage()
}
private def createPart(partNum: Int): PartBuffer = {
val path: Path = partFile(partNum)
//currentPart.deleteOnExit() //TODO: Enable in prod. requests that the file be deleted when VM dies.
PartBuffer(path, new RandomAccessFile(path.toFile, "rw").getChannel)
}
/**
* Creates a file in the temp directory with name bmcs-buffer-part-$partNumber
* @param partNumber the part number in multipart upload.
* @return
* TODO:add unique id to the file name. for multiple
*/
private def partFile(partNumber: Int): Path =
tempDir.resolve(s"bmcs-buffer-part-$partNumber.bin")
setHandlers(in, out, this)
}
case class PartBuffer(path: Path, fileChannel: FileChannel) //TODO: see if you need mapped byte buffer. might be ok with just output stream/channel.
}
哇谢谢,这是一个非常详细的回应!没有那样做。在'GraphStageLogic'中,这种方法与'fold'之间的任何区别,并且具有相同的逻辑(保持状态并手动创建输出流)?例如(根据您的评论+我以前的链接)http://pastebin.com/tzLFAmzk?最小的优点是,它允许创建文件时尽快创建文件(但代码更长,更容易出错)。 – Vuzi
@Vuzi欢迎您。我在“真实世界”中看到的方法和“GraphStageLogic”之间的主要区别在于测试。通过我的函数,所有的测试都可以在没有'ActorSystem','ActorMaterializer'和'ExecutionContext'的情况下完成。如果你把你的逻辑放在akka流构造中,那么你需要所有的akka框架来测试这个逻辑。快乐的黑客攻击。 –