Ignite将数据异步放入缓存
问题描述:
我正在处理重载应用程序,并希望能够将数据异步放入缓存。我的意思是像调用缓存中的操作,并收到代表结果的IgniteFuture<V>
。Ignite将数据异步放入缓存
我的意思是我需要能够从持久性存储中平行抽取数据并将它们放入缓存中。如果发生什么事情,我可以尝试再次提取数据(罚款不太多的数据,相当细致)。
答
如果你没有对IgniteFuture
任何硬性要求(我相信不应该是这样)和所有你想要的是某种机制来将数据放入高速缓存,并把它异步处理,然后处理结果通过该操作返回,那么您可以使用Java's Executor Service。
如果您不太了解Java的执行程序服务,那么您可能需要阅读文档或this answer,其中强调了快速点,下面的示例中我还添加了注释。
下面是关于线程的数量很少其他快速点:
- 本例创建一个
ExecutorService
与“ThreadPoolExecutor
”的实施,也有你确定有多少,它可以让喜欢“Executors.newSingleThreadExecutor()
”和“Executors.newFixedThreadPool(10)
”其他选项你想在JVM中使用线程。 - 您也可以选择直接创建
ThreadPoolExecutor
的对象,如return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
,并且您可以更多地控制队列的线程数和类型。您需要了解更多关于这一点,如果你想创建自己ThreadPoolExecutor
对象,而不是通过与您实现ExecutorService
一些其他问题:
- 当你正在处理的
Future
对象则是因为Future.get()
是一个阻塞调用,所以您的主线程将被阻塞,直到所有Future对象被返回并处理。 - 如果你不想阻塞,那么有几个选项,你可以创建一个新的线程来完成所有这些处理,从而释放你的主线程。另一种选择可能是不处理Future对象,所以尽快你做
executorService.submit(callableTask1)
,你的线程将是免费的,然后在你的CallableTask
对象中,你可以将结果推送到队列中(你可以根据需要选择Java's queue implementation),然后从其他线程处理该队列。还有一种选择可能是专用另一个线程来处理你的Future对象。
示例代码:
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceFutureCallableExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Future<String>> futuresList = new ArrayList<>();
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorServiceFutureCallableExample.CallableTask callableTask1 = new ExecutorServiceFutureCallableExample.CallableTask(2000);
ExecutorServiceFutureCallableExample.CallableTask callableTask2 = new ExecutorServiceFutureCallableExample.CallableTask(1000);
ExecutorServiceFutureCallableExample.CallableTask callableTask3 = new ExecutorServiceFutureCallableExample.CallableTask(3000);
System.out.println("### Starting submitting tasks");
// submit the callable and register the returned future object so that it can be processed later.
futuresList.add(executorService.submit(callableTask1));
futuresList.add(executorService.submit(callableTask2));
futuresList.add(executorService.submit(callableTask3));
System.out.println("### Finished submitting tasks");
for (int i = 0; i < futuresList.size(); i++) {
// here "get()" waits for the future tasks to be returned.
System.out.println(futuresList.get(i).get());
}
System.out.println("### Finished.");
}
static class CallableTask implements Callable<String>{
private long timeToSleep;
CallableTask(long _timeToSleep){
this.timeToSleep = _timeToSleep;
}
@Override
public String call() throws Exception {
String str = new Date() + ": Processing - " + this.hashCode() + " | " + Thread.currentThread() + ", slept for seconds - " + timeToSleep;
System.out.println(str);
Thread.sleep(timeToSleep);
return str + " ||||| completed at: " + new Date();
}
public long getTimeToSleep() {
return timeToSleep;
}
public void setTimeToSleep(long timeToSleep) {
this.timeToSleep = timeToSleep;
}
}
}
什么是你的问题?看起来你做了一个介绍,但忘了问题的最后一部分? –