如何在Java中对Hive进行异步调用?

问题描述:

我想以异步方式在服务器上执行Hive查询。 Hive查询可能需要很长时间才能完成,因此我不希望阻止该呼叫。我目前使用Thirft进行阻塞调用(在client.execute()上的阻塞),但我还没有看到如何进行非阻塞调用的示例。这里是阻止代码:如何在Java中对Hive进行异步调用?

 TSocket transport = new TSocket("hive.example.com", 10000); 
     transport.setTimeout(999999999); 
     TBinaryProtocol protocol = new TBinaryProtocol(transport); 
     Client client = new ThriftHive.Client(protocol); 
     transport.open(); 
     client.execute(hql); // Omitted HQL 

     List<String> rows; 
     while ((rows = client.fetchN(1000)) != null) { 
      for (String row : rows) { 
       // Do stuff with row 
      } 
     } 

     transport.close(); 

上面的代码缺少try/catch块来保持它简短。

有没有人有任何想法如何做异步调用? Hive/Thrift可以支持它吗?有没有更好的办法?

谢谢!

+0

我现在对Thrift不是很了解,但不能将它包装在可运行的程序中并创建一个新线程? – brindy 2010-02-02 02:10:44

+0

是的,我很清楚自己可以完成这项工作,但是有些事情让我觉得它已经嵌入到Thrift中,比如TNonblockingSocket。我找不到如何使用它的任何示例,或者即使Hive支持它。 – 2010-02-02 17:38:58

说话蜂巢邮件列表后,蜂房不支持使用Thirft异步调用。

我不知道Hive特别是任何阻塞调用可以通过产生一个新线程并使用回调来调用异步调用。你可以看看java.util.concurrent.FutureTask,它被设计为允许轻松处理这种异步操作。

我一无所知蜂巢,但作为最后的手段,你可以使用Java的并发库:

Callable<SomeResult> c = new Callable<SomeResult>(){public SomeResult call(){ 

    // your Hive code here 

}}; 

Future<SomeResult> result = executorService.submit(c); 

// when you need the result, this will block 
result.get(); 

或者,如果您不需要等待结果,使用的Runnable代替可调用

我们启动了对AWS Elastic MapReduce的异步调用。通过调用AWS MapReduce Web服务,AWS MapReduce可以在亚马逊的云上运行hadoop/hive作业。

您也可以监控您的作业的状态,一旦工作完成抓取结果关S3。

由于调用Web服务在本质上是异步的,我们从来没有阻止我们的其他业务。我们继续在一个单独的线程中监控我们的工作状态,并在工作完成时获得结果。

AFAIK,在撰写本文时,Thrift不生成异步客户端。在此链接here(“异步”的搜索文本)中解释的原因是Thrift专为数据中心设计,其中延迟被认为是低的。

不幸的是,你知道有经验的呼叫和结果之间不总是由网络引起的延迟,但通过逻辑执行!我们从Java应用程序服务器调用Cassandra数据库时遇到了这个问题,我们希望限制总线程数。总结:现在你所能做的就是确保你有足够的资源来处理被阻塞的并发线程所需的数量,并等待更高效的实现。

现在有可能使在Java节俭客户端异步调用放入后,该修补程序: https://issues.apache.org/jira/browse/THRIFT-768

使用新节俭生成异步Java客户端和初始化客户端如下:

TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9160); 
TAsyncClientManager clientManager = new TAsyncClientManager(); 
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); 
Hive.AsyncClient client = new Hive.AsyncClient(protocolFactory, clientManager, transport); 

现在您可以像在同步接口上一样在此客户端上执行方法。唯一的变化是所有的方法都需要一个回调的附加参数。