如何使用Scala Stream类读取大型CSV文件?
如何用Scala Stream读取大型CSV文件(> 1 Gb)?你有代码示例吗?或者你会用不同的方式来读取一个大的CSV文件,而不是先把它加载到内存中?如何使用Scala Stream类读取大型CSV文件?
只要使用Source.fromFile(...).getLines
就像你已经说过的那样。
返回一个Iterator,这已经是懒惰(你会使用流作为在那里你就想以前提取的值进行memoized一个懒惰的集合,这样你就可以再次阅读)
如果您收到内存问题,那么问题将出现在你正在做的之后 getLines。像toList
这样的任何操作都会导致严重的收集问题。
我希望你不是指斯卡拉的collection.immutable.Stream
与流。这是而不是你想要什么。流是懒惰的,但会记忆。
我不知道你打算做什么,但只是逐行阅读文件应该工作得很好,而不使用大量的内存。
getLines
应评估懒惰,不应该崩溃(只要你的文件没有超过2³²线,afaik)。如果是这样,请询问#scala或提交错误消息(或两者兼而有之)。
如果您希望一行一行地处理大文件,同时避免要求将整个文件的内容一次加载到内存中,则可以使用由scala.io.Source
返回的Iterator
。
我有一个小功能,tryProcessSource
,(包含两个子功能),我正是用这些类型的用例。该功能最多需要四个参数,其中只有第一个参数是必需的。其他参数提供了理智的默认值。
这里的功能模式(全功能的实现是在底部):
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
第一个参数,file: File
,是必需的。它只是java.io.File
的任何有效实例,它指向一个面向行的文本文件,如CSV。
第二个参数parseLine: (Int, String) => Option[List[String]]
是可选的。如果提供,它必须是一个函数,期望接收两个输入参数; index: Int
,unparsedLine: String
。然后返回Option[List[String]]
。该函数可能会返回由有效列值组成的包装List[String]
Some
。或者它可能会返回一个None
,表示整个流式处理过程正在中止。如果未提供此参数,则提供缺省值(index, line) => Some(List(line))
。该缺省结果将整个行作为单个String
值返回。
第三个参数filterLine: (Int, List[String]) => Option[Boolean]
是可选的。如果提供,它必须是一个函数,期望接收两个输入参数; index: Int
,parsedValues: List[String]
。然后返回Option[Boolean]
。该函数可能会返回一个Some
包装Boolean
,指示是否应将此特定行包含在输出中。或者它可能会返回一个None
,表示整个流式处理过程正在中止。如果未提供此参数,则提供缺省值(index, values) => Some(true)
。此默认结果包含所有行。
第四个也是最后一个参数retainValues: (Int, List[String]) => Option[List[String]]
是可选的。如果提供,它必须是一个函数,期望接收两个输入参数; index: Int
,parsedValues: List[String]
。然后返回Option[List[String]]
。该函数可能会返回一个包含List[String]
的包装List[String]
,其中包含一些子集和/或现有列值的更改。或者它可能会返回一个None
,表示整个流式处理过程正在中止。如果未提供此参数,则提供缺省值(index, values) => Some(values)
。此默认值导致由第二个参数parseLine
解析的值。
考虑采用以下内容的文件(4号线):
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
下调用资料...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
......结果这个输出tryLinesDefaults
(文件内容不变):
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
以下主叫轮廓...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
...结果在此输出tryLinesParseOnly
(解析成各个列值的每一行):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
的以下呼叫简介...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
......结果这个输出tryLinesIrvingTxNoHeader
(每行解析成单独的列值,没有头,只有在欧文的两排,TX):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
这里就是整个tryProcessSource
功能的实现:
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
虽然这种解决方案是比较简洁的,我花了大量的时间和许多重构经过我终于能去这里之前。请让我知道,如果你看到任何可能改进的方式。
更新:我刚刚问过下面的问题为it's own StackOverflow question。现在它在has an answer fixing the error下面提到。
我有这样的想法,试图使这个更具通用性,将retainValues
参数更改为transformLine
,下面是新的泛型函数定义。但是,我不断得到IntelliJ中的突出显示错误“表达式类型Some [List [String]]不符合预期类型Option [A]”并且无法弄清楚如何更改默认值,因此错误消失了。
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
任何有关如何使这项工作的援助将不胜感激。
你的意思是流如在懒惰评估功能?这大概可能,但不是必需的? - 逐行读取文件实质上已经是了。我对Scala io的速度还不是很快,但getLines(从源代码快速浏览)也是以懒惰的方式实现的 - 是否将所有文件读入内存? – 2010-11-23 10:46:36
我相信它会读入内存,因为当使用scala.Source.fromFile(),然后getLines()时会出现OutOfMemoryException。所以使用Stream类听起来像是一个有效的选择,对吧? – 2010-11-23 10:49:17