Java EE 7批处理和魔兽世界–第1部分
这是我在上一个JavaOne上的会议之一。 这篇文章将扩展主题并使用Batch JSR-352 API进入一个实际的应用程序。 此应用程序与MMORPG 魔兽世界集成。
由于JSR-352是Java EE世界中的新规范,所以我认为许多人不知道如何正确使用它。 确定本规范适用的用例也可能是一个挑战。 希望该示例可以帮助您更好地理解用例。
抽象
《魔兽世界》是一款全球超过800万玩家玩的游戏。 该服务按地区提供:美国(US) ,欧洲(EU) ,中国和韩国。 每个区域都有一组称为Realm的服务器,您可以使用这些服务器进行连接以玩游戏。 对于此示例,我们仅研究美国和欧盟地区。
该游戏最有趣的功能之一是允许您使用拍卖行买卖称为“ 物品”的游戏内商品。 每个领域都有两个拍卖行 。 平均每个领域交易约70.000 项 。 让我们计算一些数字:
- 512 境界 ( 美国和欧盟 )
- 每个领域 7万个 物品
- 整个商品超过3500万
数据
《魔兽世界》的另一个有趣之处在于,开发人员提供了REST API来访问大多数游戏内信息,包括拍卖行的数据。 在此处检查完整的API。
拍卖行的数据分两步获得。 首先,我们需要查询对应的Auction House Realm REST端点,以获取对JSON文件的引用。 接下来,我们需要访问该URL并下载包含所有拍卖行 物品信息的文件。 这是一个例子:
http://eu.battle.net/api/wow/auction/data/aggra-portugues
应用程序
我们的目标是建立一个下载拍卖行的应用程序,对其进行处理并提取指标。 这些指标将建立商品价格随时间变化的历史记录。 谁知道? 也许借助这些信息,我们可以预测价格波动并在最佳时间购买或出售商品 。
设置
对于设置,我们将在Java EE 7中使用一些其他功能:
职位
批处理JSR-352作业将执行主要工作。 作业是封装整个批处理过程的实体。 作业将通过作业规范语言连接在一起。 使用JSR-352 ,作业只是这些步骤的容器。 它组合了逻辑上属于流程的多个步骤。
我们将把业务登录分为三个工作:
- 准备 –创建所需的所有支持数据。 列出领域 ,创建文件夹以复制文件。
- 文件 –查询领域以检查是否有新文件要处理。
- 处理 –下载文件,处理数据,提取指标。
编码
后端–具有Java 8的Java EE 7
大多数代码将在后端。 我们需要Batch JSR-352 ,但我们还将使用Java EE的许多其他技术:例如JPA , JAX-RS , CDI和JSON-P 。
由于“ 准备工作”仅用于初始化应用程序资源以进行处理,因此我将跳过它,而深入到最有趣的部分。
文件作业
文件作业是AbstractBatchlet
的实现。 批处理是批处理规范中可用的最简单的处理样式。 这是一个面向任务的步骤,其中任务被调用一次,执行并返回退出状态。 对于执行各种非面向项目的任务,例如执行命令或执行文件传输,此类型最有用。 在这种情况下,我们的Batchlet
将在每个Realm上对每个域发出REST请求,以进行迭代,并使用包含要处理的数据的文件检索URL。 这是代码:
LoadAuctionFilesBatchlet
@Named public class LoadAuctionFilesBatchlet extends AbstractBatchlet { @Inject private WoWBusiness woWBusiness; @Inject @BatchProperty(name = "region") private String region; @Inject @BatchProperty(name = "target") private String target; @Override public String process() throws Exception { List<Realm> realmsByRegion = woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region)); realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation); return "COMPLETED"; } void getRealmAuctionFileInformation(Realm realm) { try { Client client = ClientBuilder.newClient(); Files files = client.target(target + realm.getSlug()) .request(MediaType.TEXT_PLAIN).async() .get(Files.class) .get(2, TimeUnit.SECONDS); files.getFiles().forEach(auctionFile -> createAuctionFile(realm, auctionFile)); } catch (Exception e) { getLogger(this.getClass().getName()).log(Level.INFO, "Could not get files for " + realm.getRealmDetail()); } } void createAuctionFile(Realm realm, AuctionFile auctionFile) { auctionFile.setRealm(realm); auctionFile.setFileName("auctions." + auctionFile.getLastModified() + ".json"); auctionFile.setFileStatus(FileStatus.LOADED); if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) { woWBusiness.createAuctionFile(auctionFile); } } }
关于此的一个很酷的事情是Java 8的使用parallelStream()
一次调用多个REST请求很容易! 您真的可以注意到其中的区别。 如果您想尝试一下,只需运行示例,然后用stream()
替换parallelStream()
stream()
并检出即可。 在我的机器上,使用parallelStream()
可使任务执行速度提高约5或6倍。
更新资料
通常,我不会使用这种方法。 我这样做了,因为部分逻辑涉及调用慢速的REST请求,而parallelStreams确实在这里闪耀。 可以使用批处理分区执行此操作,但是很难实现。 我们还需要每次都在服务器池中收集新数据,因此,如果跳过一个或两个文件,这并不可怕。 请记住,如果您不想错过任何一条记录,块处理样式将更适合。 感谢Simon Simonelli引起我的注意。
由于美国和欧盟的领域要求调用不同的REST端点,因此它们非常适合分区。 分区意味着该任务将运行到多个线程中。 每个分区一个线程。 在这种情况下,我们有两个分区。
要完成作业定义,我们需要提供一个JoB XML文件。 这需要放置在META-INF/batch-jobs
目录中。 这是此作业的files-job.xml
:
files-job.xml
<job id="loadRealmAuctionFileJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="loadRealmAuctionFileStep"> <batchlet ref="loadAuctionFilesBatchlet"> <properties> <property name="region" value="#{partitionPlan['region']}"/> <property name="target" value="#{partitionPlan['target']}"/> </properties> </batchlet> <partition> <plan partitions="2"> <properties partition="0"> <property name="region" value="US"/> <property name="target" value="http://us.battle.net/api/wow/auction/data/"/> </properties> <properties partition="1"> <property name="region" value="EU"/> <property name="target" value="http://eu.battle.net/api/wow/auction/data/"/> </properties> </plan> </partition> </step> </job>
在files-job.xml
我们需要定义我们Batchlet
在batchlet
元素。 对于分区,只需定义partition
元素并为每个plan
分配不同的properties
。 然后,可以使用这些properties
使用表达式#{partitionPlan['region']}
和#{partitionPlan['target']}
将值后期绑定到LoadAuctionFilesBatchlet
。 这是一种非常简单的表达式绑定机制,仅适用于简单的属性和字符串。
处理作业
现在,我们要处理领域拍卖数据文件。 使用上一份工作中的信息,我们现在可以下载文件并对数据进行某些处理。 JSON文件具有以下结构:
item-auctions-sample.json
{ "realm": { "name": "Grim Batol", "slug": "grim-batol" }, "alliance": { "auctions": [ { "auc": 279573567, // Auction Id "item": 22792, // Item for sale Id "owner": "Miljanko", // Seller Name "ownerRealm": "GrimBatol", // Realm "bid": 3800000, // Bid Value "buyout": 4000000, // Buyout Value "quantity": 20, // Numbers of items in the Auction "timeLeft": "LONG", // Time left for the Auction "rand": 0, "seed": 1069994368 }, { "auc": 278907544, "item": 40195, "owner": "Mongobank", "ownerRealm": "GrimBatol", "bid": 38000, "buyout": 40000, "quantity": 1, "timeLeft": "VERY_LONG", "rand": 0, "seed": 1978036736 } ] }, "horde": { "auctions": [ { "auc": 278268046, "item": 4306, "owner": "Thuglifer", "ownerRealm": "GrimBatol", "bid": 570000, "buyout": 600000, "quantity": 20, "timeLeft": "VERY_LONG", "rand": 0, "seed": 1757531904 }, { "auc": 278698948, "item": 4340, "owner": "Celticpala", "ownerRealm": "Aggra(Português)", "bid": 1000000, "buyout": 1000000, "quantity": 10, "timeLeft": "LONG", "rand": 0, "seed": 0 } ] } }
该文件包含从其下载的领域的拍卖列表。 在每个记录中,我们可以检查待售物品,价格,卖方和拍卖结束前的剩余时间。 拍卖的算法按拍卖行类型进行汇总: Alliance和Horde 。
对于process-job
我们要读取JSON文件,转换数据并将其保存到数据库。 这可以通过块处理来实现。 块是一种ETL(提取–转换–加载)样式的处理,适合处理大量数据。 块一次读取一个数据,并在事务内创建要写出的块。 从ItemReader
读入一项,交给ItemProcessor
并进行聚合。 一旦读取的项目数等于提交间隔,就通过ItemWriter
写入整个块,然后提交事务。
ItemReader
实际文件太大,以致无法将它们完全加载到内存中,否则可能会耗尽它。 相反,我们使用JSON-P API以流方式解析数据。
AuctionDataItemReader
@Named public class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader { private JsonParser parser; private AuctionHouse auctionHouse; @Inject private JobContext jobContext; @Inject private WoWBusiness woWBusiness; @Override public void open(Serializable checkpoint) throws Exception { setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP)))); AuctionFile fileToProcess = getContext().getFileToProcess(); fileToProcess.setFileStatus(FileStatus.PROCESSING); woWBusiness.updateAuctionFile(fileToProcess); } @Override public void close() throws Exception { AuctionFile fileToProcess = getContext().getFileToProcess(); fileToProcess.setFileStatus(FileStatus.PROCESSED); woWBusiness.updateAuctionFile(fileToProcess); } @Override public Object readItem() throws Exception { while (parser.hasNext()) { JsonParser.Event event = parser.next(); Auction auction = new Auction(); switch (event) { case KEY_NAME: updateAuctionHouseIfNeeded(auction); if (readAuctionItem(auction)) { return auction; } break; } } return null; } @Override public Serializable checkpointInfo() throws Exception { return null; } protected void updateAuctionHouseIfNeeded(Auction auction) { if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) { auctionHouse = AuctionHouse.ALLIANCE; } else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) { auctionHouse = AuctionHouse.HORDE; } else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) { auctionHouse = AuctionHouse.NEUTRAL; } auction.setAuctionHouse(auctionHouse); } protected boolean readAuctionItem(Auction auction) { if (parser.getString().equalsIgnoreCase("auc")) { parser.next(); auction.setAuctionId(parser.getLong()); parser.next(); parser.next(); auction.setItemId(parser.getInt()); parser.next(); parser.next(); parser.next(); parser.next(); auction.setOwnerRealm(parser.getString()); parser.next(); parser.next(); auction.setBid(parser.getInt()); parser.next(); parser.next(); auction.setBuyout(parser.getInt()); parser.next(); parser.next(); auction.setQuantity(parser.getInt()); return true; } return false; } public void setParser(JsonParser parser) { this.parser = parser; } }
要打开JSON Parse流,我们需要Json.createParser
并传递输入流的引用。 要读取元素,我们只需要调用hasNext()
和next()
方法。 这将返回一个JsonParser.Event
,它使我们能够检查解析器在流中的位置。 从Batch API ItemReader
的readItem()
方法中读取并返回元素。 当没有更多元素可读取时,返回null
以完成处理。 注意,我们还实现了从ItemReader
open
和close
的方法。 这些用于初始化和清理资源。 它们只执行一次。
ItemProcessor
ItemProcessor
是可选的。 它用于转换读取的数据。 在这种情况下,我们需要向竞价添加其他信息。
AuctionDataItemProcessor
@Named public class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor { @Override public Object processItem(Object item) throws Exception { Auction auction = (Auction) item; auction.setRealm(getContext().getRealm()); auction.setAuctionFile(getContext().getFileToProcess()); return auction; } }
ItemWriter
最后,我们只需要将数据写到数据库中即可:
AuctionDataItemWriter
@Named public class AuctionDataItemWriter extends AbstractItemWriter { @PersistenceContext protected EntityManager em; @Override public void writeItems(List<Object> items) throws Exception { items.forEach(em::persist); } }
在我的计算机上,具有70 k记录文件的整个过程大约需要20秒。 我确实注意到了一些非常有趣的事情。 在编写此代码之前,我使用的是注入的EJB,它通过persist操作来调用方法。 这总共花费了30秒,因此注入EntityManager并执行持久操作可以直接为我节省三分之一的处理时间。 我只能推测该延迟是由于堆栈调用的增加而造成的,其中EJB拦截器位于中间。 这是在Wildfly中发生的。 我将对此进行进一步调查。
要定义块,我们需要将其添加到process-job.xml文件中:
process-job.xml
<step id="processFile" next="moveFileToProcessed"> <chunk item-count="100"> <reader ref="auctionDataItemReader"/> <processor ref="auctionDataItemProcessor"/> <writer ref="auctionDataItemWriter"/> </chunk> </step>
在item-count
属性中,我们定义每个处理块中可以容纳多少个元素。 这意味着每100个事务就会提交一次。 这对于保持较小的事务大小和检查数据很有用。 如果我们需要停止然后重新开始操作,我们可以这样做而不必再次处理每个项目。 我们必须自己编写逻辑代码。 该示例中不包括此功能,但以后会做。
跑步
要运行作业,我们需要获得JobOperator
的引用。 JobOperator
提供了一个界面来管理作业处理的所有方面,包括操作命令,例如开始,重新启动和停止,以及与作业存储库相关的命令,例如检索作业和步骤执行。
要运行先前的files-job.xml
Job,我们执行:
执行工作
JobOperator jobOperator = BatchRuntime.getJobOperator(); jobOperator.start("files-job", new Properties());
请注意,我们使用Job xml文件的名称,而没有扩展名到JobOperator
。
下一步
我们仍然需要汇总数据以提取指标并将其显示在网页中。 这篇文章已经很长了,因此我将在以后的文章中介绍以下步骤。 无论如何,该部分的代码已经在Github存储库中。 检查资源部分。
资源资源
您可以从我的github存储库中克隆完整的工作副本,然后将其部署到Wildfly。 您可以在此处找到说明进行部署。
翻译自: https://www.javacodegeeks.com/2014/10/java-ee-7-batch-processing-and-world-of-warcraft-part-1.html