SubmissionPublisher on submit on invoking onNext

SubmissionPublisher on submit on invoking onNext

问题描述:

每隔一段时间我都会用某个查询来检索推文。 这些推文必须传递给计算和处理这些推文的服务。 因此,这些服务已订阅到我的发布者。所以publisher.hasSubscribers()返回true。但提交或提供功能不会调用我的订户的onNext。 所以作为一个“修复”,我循环通过我的订阅者并自己调用它。但事实并非如此。SubmissionPublisher on submit on invoking onNext

这是我的出版商的构造函数。

public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){ 
    super(executor, maxBufferCapacity); 
    this.searchQuery = searchQuery; 
    scheduler = new ScheduledThreadPoolExecutor(1); 
    this.tweetGetter = scheduler.scheduleAtFixedRate(
      () -> { 
       List<String> tweets = getTweets(searchQuery); 
       /* this.lastCall = LocalDateTime.now(); 
       for(Flow.Subscriber sub : this.getSubscribers()){ 
        sub.onNext(tweets); 
       }*/ 
       this.submit(tweets); 
       if(tweets.size() >= 20) this.close(); 
      }, 0, period, unit); 
} 

这是我的用户

package myFlowAPI; 

import Interfaces.IProcess; 
import Services.LogToFileService; 

import java.util.List; 
import java.util.concurrent.Flow; 
import java.util.concurrent.atomic.AtomicInteger; 

public class MySubscriber implements Flow.Subscriber<List<String>> { 
private Flow.Subscription subscription; 
private AtomicInteger count; 

private IProcess processor; 

private String name; 
private int DEMAND = 0; 

public MySubscriber(String name, IProcess processor){ 
    this.name = name; 
    this.processor = processor; 
} 

@Override 
public void onSubscribe(Flow.Subscription subscription) { 
    this.subscription = subscription; 
} 


@Override 
public void onNext(List<String> item) { 
    Object result = this.processor.process(item); 
    this.readResult(result); 

    switch (this.processor.getClass().getSimpleName()){ 
     case "CalculateTweetStatsService": 
      if((Integer) result >= 20){ 
       this.subscription.cancel(); 
      } 
      break; 
    } 
} 

@Override 
public void onError(Throwable throwable) { 
    System.out.println("Error is thrown " + throwable.getMessage()); 
} 

@Override 
public void onComplete() { 
    if(this.processor instanceof LogToFileService){ 
     ((LogToFileService) processor).closeResource(); 
    } 
    System.out.println("complete"); 
} 

private void readResult(Object result){ 
    System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString()); 
} 
} 

这是最主要的,我订阅发行

public static void main(String[] args) { 
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); 

    String searchQuery; 
    try{ 
     searchQuery = args[0] != null ? args[0] : "#capgemini50"; 
    }catch (ArrayIndexOutOfBoundsException ex){ 
     searchQuery = "#capgemini50"; 
    } 

    TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery); 

    MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt")); 
    MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService()); 
    streamer.subscribe(subscriber1); 
    streamer.subscribe(subscriber2); 

} 

您需要的用户明确请求数据例如在认购(见http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):

@Override 
public void onSubscribe(Flow.Subscription subscription) { 
    this.subscription = subscription; 
    this.subscription.request(1); 
} 

同在处理onNext()请求下一个项目。

+2

我的英雄。非常感谢!! '@Override public void onNext(List item){ Object result = this.processor.process(item); this.readResult(result);如果((整数)结果> = 20),那么switch(this.processor.getClass()。getSimpleName()){ case“CalculateTweetStatsService”: { this.subscription.cancel(); } break; } this.subscription.request(1); }' – user1008531