流处理R中的大csv文件

问题描述:

我需要对非常大的csv文件(c.8.5GB)进行一些相对简单的更改。我最初尝试使用各种阅读器函数:read.csv,readr :: read.csv,data.table :: fread。但是:它们全部用完内存。流处理R中的大csv文件

我想我需要使用流处理方法,而不是;读一个块,更新它,写下来,重复。我发现this answer这是正确的;但我不知道如何终止循环(我对R来说比较新)。

所以我有2个问题:

  1. 什么使while循环工作的正确方法?
  2. 有没有更好的方法(对'更好'的一些定义)?例如有没有办法做到这一点使用dplyr &管道?

当前代码如下:

src_fname <- "testdata/model_input.csv" 
tgt_fname <- "testdata/model_output.csv" 

#Changes needed in file: rebase identifiers, set another col to constant value 
rebase_data <- function(data, offset) { 
    data$'Unique Member ID' <- data$'Unique Member ID' - offset 
    data$'Client Name' <- "TestClient2" 
    return(data) 
} 

CHUNK_SIZE <- 1000 
src_conn = file(src_fname, "r") 
data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE) 
cols <- colnames(data) 
offset <- data$'Unique Member ID'[1] - 1 

data <- rebase_data(data, offset) 
#1st time through, write the headers 
tgt_conn = file(tgt_fname, "w") 
write.csv(data,tgt_conn, row.names=FALSE) 

#loop over remaining data 
end = FALSE 
while(end == FALSE) { 
    data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols) 
    data <- rebase_data(data, offset) 
    #write.csv doesn't support col.names=FALSE; so use write.table which does 
    write.table(data, tgt_conn, row.names=FALSE, col.names=FALSE, sep=",") 
    # ??? How to test for EOF and set end = TRUE if so ??? 
    # This doesn't work, presumably because nrow() != CHUNK_SIZE on final loop? 
    if (nrow(data) < CHUNK_SIZE) { 
    end <- TRUE 
    } 

} 
close(src_conn) 
close(tgt_conn) 

感谢任何指针。

+1

检查出CRAN包'chunked'。它允许从文本文件中chunkwise读取,特别有趣的是,使用dplyr进行chunkwise处理。没有小插曲,但介绍https://github.com/edwindj/chunked/使用 我打算自己尝试,但没有找到时间! –

OK我找到了解决方法,具体如下:

# src_fname <- "testdata/model_input.csv" 
# tgt_fname <- "testdata/model_output.csv" 

CHUNK_SIZE <- 20000 

#Changes needed in file: rebase identifiers, set another col to constant value 
rebase_data <- function(data, offset) { 
    data$'Unique Member ID' <- data$'Unique Member ID' - offset 
    data$'Client Name' <- "TestClient2" 
    return(data) 
} 

#-------------------------------------------------------- 
# Get the structure first to speed things up 
#-------------------------------------------------------- 
structure <- read.csv(src_fname, nrows = 2, check.names = FALSE) 
cols <- colnames(structure) 
offset <- structure$'Unique Member ID'[1] - 1 

#Open the input & output files for reading & writing 
src_conn = file(src_fname, "r") 
tgt_conn = file(tgt_fname, "w") 

lines_read <- 0 
end <- FALSE 
read_header <- TRUE 
write_header <- TRUE 
while(end == FALSE) { 
    data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols, header = read_header) 
    if (nrow(data) > 0) { 
    lines_read <- lines_read + nrow(data) 
    print(paste0("lines read this chunk: ", nrow(data), ", lines read so far: ", lines_read)) 
    data <- rebase_data(data, offset) 
    #write.csv doesn't support col.names=FALSE; so use write.table which does 
    write.table(data, tgt_conn, row.names=FALSE, col.names=write_header, sep = ",") 
    } 
    if (nrow(data) < CHUNK_SIZE) { 
    end <- TRUE 
    } 
    read_header <- FALSE 
    write_header <- FALSE 
} 
close(src_conn) 
close(tgt_conn) 

尝试了这一点:

library("chunked") 

read_chunkwise(src_fname, chunk_size=CHUNK_SIZE) %>% 
rebase_data(offset) %>% 
write_chunkwise(tgt_fname) 

您可能需要摆弄了一下与colnames得到你想要的东西。

(免责声明:没有尝试过的代码)

注意,没有暗角与方案,但标准用法是在GitHub上描述:https://github.com/edwindj/chunked/

+0

非常感谢 - 在我的谷歌搜索中没有发现chunked。看起来就是这样。 – sfinnie