Java EE 7批处理和魔兽世界–第2部分

今天,我将把第二部分带到我以前关于Java EE 7批处理和《魔兽世界–第1部分》的帖子中。 在本文中,我们将了解如何从第1部分中获得的数据中汇总和提取指标。

Java EE 7批处理和魔兽世界–第2部分

概括

批处理目的是下载魔兽世界拍卖行的数据,处理拍卖并提取指标。 这些指标将建立拍卖项目价格随时间变化的历史记录。 第1部分中 ,我们已经下载了数据并将其插入数据库。

应用程序

处理作业

在将原始数据添加到数据库之后,我们将添加一个带有Chunk样式处理的步骤。 在块中,我们将读取聚合的数据,然后将其插入数据库中的另一个表中以便于访问。 这是在process-job.xml

process-job.xml

<step id="importStatistics">
    <chunk item-count="100">
        <reader ref="processedAuctionsReader"/>
        <processor ref="processedAuctionsProcessor"/>
        <writer ref="processedAuctionsWriter"/>
    </chunk>
</step>

块一次读取一个数据,并在事务内创建要写出的块。 ItemReader读入一项,交给ItemProcessor并进行聚合。 一旦读取的项目数等于提交间隔,就通过ItemWriter写入整个块,然后提交事务。

ProcessedAuctionsReader

在读者中,我们将使用数据库功能选择和汇总指标。

ProcessedAuctionsReader.java

@Named
public class ProcessedAuctionsReader extends AbstractAuctionFileProcess implements ItemReader {
    @Resource(name = "java:comp/DefaultDataSource")
    protected DataSource dataSource;
 
    private PreparedStatement preparedStatement;
    private ResultSet resultSet;
 
    @Override
    public void open(Serializable checkpoint) throws Exception {
        Connection connection = dataSource.getConnection();
 
        preparedStatement = connection.prepareStatement(
                        "SELECT" +
                        "   itemid as itemId," +
                        "   sum(quantity)," +
                        "   sum(bid)," +
                        "   sum(buyout)," +
                        "   min(bid / quantity)," +
                        "   min(buyout / quantity)," +
                        "   max(bid / quantity)," +
                        "   max(buyout / quantity)" +
                        " FROM auction" +
                        " WHERE auctionfile_id = " +
                        getContext().getFileToProcess().getId() +
                        " GROUP BY itemid" +
                        " ORDER BY 1",
                ResultSet.TYPE_FORWARD_ONLY,
                ResultSet.CONCUR_READ_ONLY,
                ResultSet.HOLD_CURSORS_OVER_COMMIT
        );
 
        // Weird bug here. Check https://java.net/bugzilla/show_bug.cgi?id=5315
        //preparedStatement.setLong(1, getContext().getFileToProcess().getId());
 
        resultSet = preparedStatement.executeQuery();
    }
 
    @Override
    public void close() throws Exception {
        DbUtils.closeQuietly(resultSet);
        DbUtils.closeQuietly(preparedStatement);
    }
 
    @Override
    public Object readItem() throws Exception {
        return resultSet.next() ? resultSet : null;
    }
 
    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }

在此示例中,我们通过使用具有简单可滚动结果集的纯JDBC获得最佳性能结果。 这样,仅执行一个查询,并根据需要在readItem中提取结果。 您可能想探索其他替代方法。

Plain JPA在标准中没有可滚动的结果集,因此您需要对结果进行分页。 这将导致多个查询,这将减慢阅读速度。 另一个选择是使用新的Java 8 Streams API来执行聚合操作。 这些操作很快,但是您需要从数据库中选择整个数据集到流中。 最终,这会削弱您的性能。

我确实尝试了这两种方法,并通过使用数据库聚合功能获得了最佳结果。 我并不是说这始终是最好的选择,但是在这种情况下,这是最好的选择。

在实施过程中,我还发现了Batch中的错误。 您可以在这里检查。 在PreparedStatement中设置参数时会引发异常。 解决方法是将参数直接注入查询SQL中。 丑陋,我知道...

ProcessedAuctionsProcessor

在处理器中,让我们将所有聚合值存储在一个holder对象中,以存储在数据库中。

ProcessedAuctionsProcessor.java

@Named
public class ProcessedAuctionsProcessor extends AbstractAuctionFileProcess implements ItemProcessor {
    @Override
    @SuppressWarnings("unchecked")
    public Object processItem(Object item) throws Exception {
        ResultSet resultSet = (ResultSet) item;
 
        AuctionItemStatistics auctionItemStatistics = new AuctionItemStatistics();
        auctionItemStatistics.setItemId(resultSet.getInt(1));
        auctionItemStatistics.setQuantity(resultSet.getLong(2));
        auctionItemStatistics.setBid(resultSet.getLong(3));
        auctionItemStatistics.setBuyout(resultSet.getLong(4));
        auctionItemStatistics.setMinBid(resultSet.getLong(5));
        auctionItemStatistics.setMinBuyout(resultSet.getLong(6));
        auctionItemStatistics.setMaxBid(resultSet.getLong(7));
        auctionItemStatistics.setMaxBuyout(resultSet.getLong(8));
 
        auctionItemStatistics.setTimestamp(getContext().getFileToProcess().getLastModified());
 
        auctionItemStatistics.setAvgBid(
                (double) (auctionItemStatistics.getBid() / auctionItemStatistics.getQuantity()));
        auctionItemStatistics.setAvgBuyout(
                (double) (auctionItemStatistics.getBuyout() / auctionItemStatistics.getQuantity()));
 
        auctionItemStatistics.setRealm(getContext().getRealm());
 
        return auctionItemStatistics;
    }
}

由于指标会及时记录数据的准确快照,因此计算仅需执行一次。 这就是为什么我们要保存汇总指标。 它们永远不会改变,我们可以轻松地检查历史。

如果您知道源数据是不可变的,并且需要对其进行操作,那么建议您将结果保留在某处。 这样可以节省您的时间。 当然,如果将来要多次访问此数据,则需要平衡。 如果不是这样,也许您就不需要经历持久化数据的麻烦了。

ProcessedAuctionsWriter

最后,我们只需要将数据写到数据库中即可:

ProcessedAuctionsWriter.java

@Named
public class ProcessedAuctionsWriter extends AbstractItemWriter {
    @PersistenceContext
    protected EntityManager em;
 
    @Override
    @SuppressWarnings("unchecked")
    public void writeItems(List items) throws Exception {
        List<AuctionItemStatistics> statistis = (List<AuctionItemStatistics>) items;
        statistis.forEach(em::persist);
    }
}

指标

现在,为了对数据做一些有用的事情,我们将公开一个REST端点,以对所计算的指标执行查询。 方法如下:

WowBusinessBean.java

@Override @GET
    @Path("items")
    public List<AuctionItemStatistics> findAuctionItemStatisticsByRealmAndItem(@QueryParam("realmId") Long realmId,
                                                                               @QueryParam("itemId") Integer itemId) {
 
        Realm realm = (Realm) em.createNamedQuery("Realm.findRealmsWithConnectionsById")
                                .setParameter("id", realmId)
                                .getSingleResult();
 
        // Workaround for https://bugs.eclipse.org/bugs/show_bug.cgi?id=433075 if using EclipseLink
        List<Realm> connectedRealms = new ArrayList<>();
        connectedRealms.addAll(realm.getConnectedRealms());
        List<Long> ids = connectedRealms.stream().map(Realm::getId).collect(Collectors.toList());
        ids.add(realmId);
 
        return em.createNamedQuery("AuctionItemStatistics.findByRealmsAndItem")
                 .setParameter("realmIds", ids)
                 .setParameter("itemId", itemId)
                 .getResultList();
    }

如果您还记得第1部分中的一些细节,那么魔兽世界服务器称为Realms 这些领域可以相互链接并共享同一拍卖行 为此,我们还拥有有关领域之间如何相互联系的信息。 这很重要,因为我们可以在所有连接的领域中搜索拍卖品 其余的逻辑只是简单的查询以获取数据。

在开发过程中,我还发现了Eclipse Link (如果在Glassfish中运行)和Java 8的错误。显然, Eclipse Link返回的基础Collection的元素计数设置为0。如果您使用Streams,则此方法效果不佳尝试内联查询调用以及Stream操作。 流将认为它为空,并且不会返回任何结果。 您可以在这里阅读更多有关此的内容。

接口

我还使用AngularGoogle Charts开发了一个小界面来显示指标。 看一看:

Java EE 7批处理和魔兽世界–第2部分

在这里,我在寻找一个名为“Aggra(葡萄牙语)”的境界拍卖项目编号72092对应于鬼铁矿石 如您所见,我们可以检查待售数量,出价和买断值以及价格随时间的波动。 整齐? 我可能会写另一篇关于将来构建Web Interface的文章。

资源资源

您可以从我的github存储库中克隆完整的工作副本,然后将其部署到WildflyGlassfish中 您可以在那里找到部署说明: 魔兽世界拍卖

也请检查Java EE示例项目,其中包含大量完整的批处理示例。

翻译自: https://www.javacodegeeks.com/2015/01/java-ee-7-batch-processing-and-world-of-warcraft-part-2.html