Java EE 7批处理和魔兽世界–第2部分
今天,我将把第二部分带到我以前关于Java EE 7批处理和《魔兽世界–第1部分》的帖子中。 在本文中,我们将了解如何从第1部分中获得的数据中汇总和提取指标。
概括
批处理目的是下载魔兽世界拍卖行的数据,处理拍卖并提取指标。 这些指标将建立拍卖项目价格随时间变化的历史记录。 在第1部分中 ,我们已经下载了数据并将其插入数据库。
应用程序
处理作业
在将原始数据添加到数据库之后,我们将添加一个带有Chunk样式处理的步骤。 在块中,我们将读取聚合的数据,然后将其插入数据库中的另一个表中以便于访问。 这是在process-job.xml
:
process-job.xml
<step id="importStatistics"> <chunk item-count="100"> <reader ref="processedAuctionsReader"/> <processor ref="processedAuctionsProcessor"/> <writer ref="processedAuctionsWriter"/> </chunk> </step>
块一次读取一个数据,并在事务内创建要写出的块。 从ItemReader
读入一项,交给ItemProcessor
并进行聚合。 一旦读取的项目数等于提交间隔,就通过ItemWriter
写入整个块,然后提交事务。
ProcessedAuctionsReader
在读者中,我们将使用数据库功能选择和汇总指标。
ProcessedAuctionsReader.java
@Named public class ProcessedAuctionsReader extends AbstractAuctionFileProcess implements ItemReader { @Resource(name = "java:comp/DefaultDataSource") protected DataSource dataSource; private PreparedStatement preparedStatement; private ResultSet resultSet; @Override public void open(Serializable checkpoint) throws Exception { Connection connection = dataSource.getConnection(); preparedStatement = connection.prepareStatement( "SELECT" + " itemid as itemId," + " sum(quantity)," + " sum(bid)," + " sum(buyout)," + " min(bid / quantity)," + " min(buyout / quantity)," + " max(bid / quantity)," + " max(buyout / quantity)" + " FROM auction" + " WHERE auctionfile_id = " + getContext().getFileToProcess().getId() + " GROUP BY itemid" + " ORDER BY 1", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT ); // Weird bug here. Check https://java.net/bugzilla/show_bug.cgi?id=5315 //preparedStatement.setLong(1, getContext().getFileToProcess().getId()); resultSet = preparedStatement.executeQuery(); } @Override public void close() throws Exception { DbUtils.closeQuietly(resultSet); DbUtils.closeQuietly(preparedStatement); } @Override public Object readItem() throws Exception { return resultSet.next() ? resultSet : null; } @Override public Serializable checkpointInfo() throws Exception { return null; }
在此示例中,我们通过使用具有简单可滚动结果集的纯JDBC获得最佳性能结果。 这样,仅执行一个查询,并根据需要在readItem
中提取结果。 您可能想探索其他替代方法。
Plain JPA在标准中没有可滚动的结果集,因此您需要对结果进行分页。 这将导致多个查询,这将减慢阅读速度。 另一个选择是使用新的Java 8 Streams API来执行聚合操作。 这些操作很快,但是您需要从数据库中选择整个数据集到流中。 最终,这会削弱您的性能。
我确实尝试了这两种方法,并通过使用数据库聚合功能获得了最佳结果。 我并不是说这始终是最好的选择,但是在这种情况下,这是最好的选择。
在实施过程中,我还发现了Batch中的错误。 您可以在这里检查。 在PreparedStatement中设置参数时会引发异常。 解决方法是将参数直接注入查询SQL中。 丑陋,我知道...
ProcessedAuctionsProcessor
在处理器中,让我们将所有聚合值存储在一个holder对象中,以存储在数据库中。
ProcessedAuctionsProcessor.java
@Named public class ProcessedAuctionsProcessor extends AbstractAuctionFileProcess implements ItemProcessor { @Override @SuppressWarnings("unchecked") public Object processItem(Object item) throws Exception { ResultSet resultSet = (ResultSet) item; AuctionItemStatistics auctionItemStatistics = new AuctionItemStatistics(); auctionItemStatistics.setItemId(resultSet.getInt(1)); auctionItemStatistics.setQuantity(resultSet.getLong(2)); auctionItemStatistics.setBid(resultSet.getLong(3)); auctionItemStatistics.setBuyout(resultSet.getLong(4)); auctionItemStatistics.setMinBid(resultSet.getLong(5)); auctionItemStatistics.setMinBuyout(resultSet.getLong(6)); auctionItemStatistics.setMaxBid(resultSet.getLong(7)); auctionItemStatistics.setMaxBuyout(resultSet.getLong(8)); auctionItemStatistics.setTimestamp(getContext().getFileToProcess().getLastModified()); auctionItemStatistics.setAvgBid( (double) (auctionItemStatistics.getBid() / auctionItemStatistics.getQuantity())); auctionItemStatistics.setAvgBuyout( (double) (auctionItemStatistics.getBuyout() / auctionItemStatistics.getQuantity())); auctionItemStatistics.setRealm(getContext().getRealm()); return auctionItemStatistics; } }
由于指标会及时记录数据的准确快照,因此计算仅需执行一次。 这就是为什么我们要保存汇总指标。 它们永远不会改变,我们可以轻松地检查历史。
如果您知道源数据是不可变的,并且需要对其进行操作,那么建议您将结果保留在某处。 这样可以节省您的时间。 当然,如果将来要多次访问此数据,则需要平衡。 如果不是这样,也许您就不需要经历持久化数据的麻烦了。
ProcessedAuctionsWriter
最后,我们只需要将数据写到数据库中即可:
ProcessedAuctionsWriter.java
@Named public class ProcessedAuctionsWriter extends AbstractItemWriter { @PersistenceContext protected EntityManager em; @Override @SuppressWarnings("unchecked") public void writeItems(List items) throws Exception { List<AuctionItemStatistics> statistis = (List<AuctionItemStatistics>) items; statistis.forEach(em::persist); } }
指标
现在,为了对数据做一些有用的事情,我们将公开一个REST端点,以对所计算的指标执行查询。 方法如下:
WowBusinessBean.java
@Override @GET @Path("items") public List<AuctionItemStatistics> findAuctionItemStatisticsByRealmAndItem(@QueryParam("realmId") Long realmId, @QueryParam("itemId") Integer itemId) { Realm realm = (Realm) em.createNamedQuery("Realm.findRealmsWithConnectionsById") .setParameter("id", realmId) .getSingleResult(); // Workaround for https://bugs.eclipse.org/bugs/show_bug.cgi?id=433075 if using EclipseLink List<Realm> connectedRealms = new ArrayList<>(); connectedRealms.addAll(realm.getConnectedRealms()); List<Long> ids = connectedRealms.stream().map(Realm::getId).collect(Collectors.toList()); ids.add(realmId); return em.createNamedQuery("AuctionItemStatistics.findByRealmsAndItem") .setParameter("realmIds", ids) .setParameter("itemId", itemId) .getResultList(); }
如果您还记得第1部分中的一些细节,那么魔兽世界服务器称为Realms 。 这些领域可以相互链接并共享同一拍卖行 。 为此,我们还拥有有关领域之间如何相互联系的信息。 这很重要,因为我们可以在所有连接的领域中搜索拍卖品 。 其余的逻辑只是简单的查询以获取数据。
在开发过程中,我还发现了Eclipse Link (如果在Glassfish中运行)和Java 8的错误。显然, Eclipse Link返回的基础Collection的元素计数设置为0。如果您使用Streams,则此方法效果不佳尝试内联查询调用以及Stream操作。 流将认为它为空,并且不会返回任何结果。 您可以在这里阅读更多有关此的内容。
接口
我还使用Angular和Google Charts开发了一个小界面来显示指标。 看一看:
在这里,我在寻找一个名为“Aggra(葡萄牙语)”的境界与拍卖项目编号72092对应于鬼铁矿石 。 如您所见,我们可以检查待售数量,出价和买断值以及价格随时间的波动。 整齐? 我可能会写另一篇关于将来构建Web Interface的文章。
资源资源
您可以从我的github存储库中克隆完整的工作副本,然后将其部署到Wildfly或Glassfish中 。 您可以在那里找到部署说明: 魔兽世界拍卖
也请检查Java EE示例项目,其中包含大量完整的批处理示例。
翻译自: https://www.javacodegeeks.com/2015/01/java-ee-7-batch-processing-and-world-of-warcraft-part-2.html