使用Apache Camel插入大型CSV文件时的GC问题
我有一个包含大约5.2M行的大型CSV文件。我想解析这个文件并将数据插入到数据库中。我为此使用apache骆驼。使用Apache Camel插入大型CSV文件时的GC问题
路线是相当容易的(简化本示例)
from("file:work/customer/").id("customerDataRoute")
.split(body().tokenize("\n")).streaming()
.parallelProcessing()
.unmarshal(bindyCustomer)
.split(body())
.process(new CustomerProcessor())
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)");
bindyCustomer是CSV文件和 CustomerProcessor一个BindyCsvDataFormat是返回Bindy客户对象的数据作为对象的阵列的处理器为SQL插入。实际的对象有39个字段(以上简化)。
这对第一个800.000到1.000.000行都可以,但是它会停下来。
我用JVisualVM和Visual GC插件监视了骆驼实例,我可以看到老一代填满了,当它达到最大值时,整个系统停止运行,但不会崩溃。 在这一点上,老一代已经满员了,伊甸园的空间几乎已经满了,两个幸存者空间都是空的(因为它不能将任何东西移动到我猜想的老一代)。
那么这里有什么问题?这看起来像是Camel SQL组件中的内存泄漏。 数据主要存储在ConcurrentHashMap对象中。
当我拿出SQL组件时,老一代几乎没有填充。
我正在使用骆驼2.15.1将尝试使用2.17.1看看是否解决了这个问题。
更新:我试过骆驼2.17.1(同样的问题),我试图用java.sql.Statement.executeUPdate在Java中插入插入。有了这个选项,我设法插入了大约2.6 M行,但随后它也停止了。有趣的是我没有收到内存错误。它只是停下来。
好吧我想通了这里出了什么问题。基本上读取部分与插入部分相比太快。这个例子有点过于简单,因为在阅读和插入之间有一个seda队列(因为我必须对内容做出选择,而这个内容在示例中没有显示)。 但即使没有seda队列,它从来没有完成。我意识到当我杀死骆驼时发生了什么错误,并得到一条消息,说明还有几千条机上消息。
因此,当插入端无法跟上时,并行处理读取没有意义。
from("file:work/customer/").id("customerDataRoute")
.onCompletion().log("Customer data processing finished").end()
.log("Processing customer data ${file:name}")
.split(body().tokenize("\n")).streaming() //no more parallel processing
.choice()
.when(simple("${body} contains 'HEADER TEXT'")) //strip out the header if it exists
.log("Skipping first line")
.endChoice()
.otherwise()
.to("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true")
.endChoice();
from("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true")
.unmarshal(bindyCustomer)
.split(body())
.process(new CustomerProcessor()).id("CustomProcessor") //converts one Notification into an array of values for the SQL insert
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)");
我在SEDA队列上定义了一个大小(默认情况下它不受限制),并在队列满时调用线程块。
seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true
并行处理是通过在SEDA队列上使用20个并发消费者完成的。请注意,出于什么原因,您在调用路由时也必须指定队列大小(不仅在您定义它的位置)。
现在的内存消耗是最小的,它插入500万记录没有问题。
我没有测试你的代码,但是,我确实注意到你的第二个拆分语句不是流式。我建议尝试一下。如果你有太多并行的工作流,GC可能会在你释放资源之前填满。 SQL语句需要的时间可能是什么让GC得到了太多的建立时间,因为你正在对主要处理进行并行化。
from("file:work/customer/").id("customerDataRoute")
.split(body().tokenize("\n")).streaming().parallelProcessing()
.unmarshal(bindyCustomer)
.split(body()).streaming() //Add a streaming call here and see what happens
.process(new CustomerProcessor())
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)");
感谢您的提示。我试过了,但并没有解决问题。 .unmarchal(bindyCustomer)只返回一个元素的数组,所以在这种情况下流应该不会有太大的区别。你能想到其他可能会出错的东西吗?我将尝试在Java中执行插入以查看是否解决了问题。 – Ben
嗯,我有一些粗略的猜测。你能够添加id标签到你的路线,然后打开你的JConsole来确认所有线程是“挂起”的吗? –
这条路线已经作为一个ID(customerDataRoute)或者你指的是别的吗? – Ben