Highland.js CSV解析
问题描述:
我试图写一个非常实用的方式。我们使用Highland.js来管理流处理,但是因为我太新了,我想我对如何处理这种独特的情况感到困惑。Highland.js CSV解析
这里的问题是文件流中的所有数据都不一致。文件中的第一行通常是头,我们希望将其存储到内存中,然后压缩流中的所有行。
这是我第一次去吧:
var _ = require('highland');
var fs = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var headers = [];
var through = _.pipeline(
_.split(),
_.head(),
_.doto(function(col) {
headers = col.split(',');
return headers;
}),
......
_.splitBy(','),
_.zip(headers),
_.wrapCallback(process)
);
_(stream)
.pipe(through)
.pipe(output);
在管道中的第一个命令是由行文件分割。下一个抓取标题,doto声明它是一个全局变量。问题是流中接下来的几行不存在,所以进程被阻塞......可能是因为它上面的head()命令。
我试过其他一些变化,但我觉得这个例子给你一个我需要去的地方的感觉。
有关这方面的任何指导都会有所帮助 - 它也提出了我的每行中是否有不同值的问题,我如何在多种不同长度/复杂度的流操作之间分割流程流。
谢谢。
编辑:我产生了一个更好的结果,但我质疑它的效率 - 有没有一种方法我可以优化这个,所以在每次运行我不检查头是否被记录?这仍然感觉不稳定。
var through = _.pipeline(
_.split(),
_.filter(function(row) {
// Filter out bogus values
if (! row || headers) {
return true;
}
headers = row.split(',');
return false;
}),
_.map(function(row) {
return row.split(',')
}),
_.batch(500),
_.compact(),
_.map(function(row) {
return JSON.stringify(row) + "\n";
})
);
_(stream)
.pipe(through)
答
您可以使用Stream.observe()
或Stream.fork()
分裂流。
var _ = require('highland');
var fs = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var through = highland.pipeline(function(s) {
var headerStream, headers;
// setup a shared variable to store the headers
headers = [];
// setup the csv processing
s = s
// split input into lines
.split()
// remove empty lines
.compact()
// split lines into arrays
.map(function(row) {
return row.split(',');
});
// create a new stream to grab the header
headerStream = s.observe();
// pause the original stream
s.pause();
// setup processing of the non-header rows
s = s
// drop the header row
.drop(1)
// convert the rest of the rows to objects
.map(function(row) {
var obj = headers.reduce(function(obj, key, i) {
obj[key] = row[i];
return obj;
}, {});
return JSON.stringify(obj) + "\n";
});
// grab the first row from the header stream
// save the headers and then resume the normal stream
headerStream.head().toArray(function(rows) {
headers = rows[0];
s.resume();
});
return s;
});
_(stream)
.pipe(through)
.pipe(output);
也就是说,您的csv解析不会在您的值中转义换行符和逗号。通常情况下,这通过在双引号中包装值来在csv文件中完成。然后双引号通过将两个相邻放在一起来逃脱。这样做有点棘手,所以我建议使用一个处理它的包,比如fast-csv。
那么你的代码看起来是这样的:
var _ = require('highland');
var fs = require('fs');
var csv = require('fast-csv');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
_(stream.pipe(csv({headers: true, ignoreEmpty: true})))
.map(function(row) {
return JSON.stringify(row) + "\n";
})
.pipe(output);