错误:从Actor到Actor的消息未送达。[1]遇到死信。分布式pub-sub跨群集工作不起作用

问题描述:

我试图在不同的群集系统上创建分布式pub-sub,但它无法正常工作。错误:从Actor到Actor的消息未送达。[1]遇到死信。分布式pub-sub跨群集工作不起作用

我想要做的就是创建一个简单的例子。

1)我创建了一个话题,说“内容”。

2)say jvm A中的一个节点创建该主题,订阅该主题以及发布该主题的发布者。

3)在一个不同的节点上,比如jvm B在不同的端口上,我创建一个订户。

4)当我从jvm A向主题发送消息时,我想让jvm B上的订阅者收到它,因为它订阅了同一主题。

任何帮助将不胜感激,或在Java中的不同群集系统中的不同群集系统中的订阅者和发布者的分布式pub pub的简单工作示例。

这里是app1及其配置文件的代码。

public class App1{ 

    public static void main(String[] args) { 

    System.setProperty("akka.remote.netty.tcp.port", "2551"); 
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem"); 
    ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem); 
    ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1"); 
    clusterClientReceptionist1.registerSubscriber("content", subcriber1); 
    ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1"); 
    clusterClientReceptionist1.registerSubscriber("content", publisher1); 
    publisher1.tell("testMessage1", ActorRef.noSender()); 

    } 
} 

app1.confi

akka { 
loggers = ["akka.event.slf4j.Slf4jLogger"] 
loglevel = "DEBUG" 
stdout-loglevel = "DEBUG" 
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 
actor { 
provider = "akka.cluster.ClusterActorRefProvider" 
} 
remote { 
log-remote-lifecycle-events = off 
enabled-transports = ["akka.remote.netty.tcp"] 
netty.tcp { 
    hostname = "127.0.0.1" 
    port = 2551 
    } 
} 
cluster { 
seed-nodes = [ 
    "akka.tcp://[email protected]:2551" 
] 
auto-down-unreachable-after = 10s 
} 
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub", 
"akka.contrib.pattern.ClusterReceptionistExtension"] 
    akka.cluster.pub-sub { 
name = distributedPubSubMediator 
role = "" 
routing-logic = random 
gossip-interval = 1s 
removed-time-to-live = 120s 
max-delta-elements = 3000 
use-dispatcher = "" 
} 

akka.cluster.client.receptionist { 
name = receptionist 
role = "" 
number-of-contacts = 3 
response-tunnel-receive-timeout = 30s 
use-dispatcher = "" 
heartbeat-interval = 2s 
acceptable-heartbeat-pause = 13s 
failure-detection-interval = 2s 
    } 
} 

的APP 2的代码和它的配置文件

public class App 
{ 
    public static Set<ActorPath> initialContacts() { 
    return new HashSet<ActorPath>(Arrays.asList(   
    ActorPaths.fromString("akka.tcp://[email protected]:2551/system/receptionist"))); 
} 

public static void main(String[] args) { 
    System.setProperty("akka.remote.netty.tcp.port", "2553"); 
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2"); 
    ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem); 
    final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
      clusterSystem).withInitialContacts(initialContacts())), "client"); 
    ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2"); 
    clusterClientReceptionist2.registerSubscriber("content", subcriber2); 
    ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2"); 
    publisher2.tell("testMessage2", ActorRef.noSender()); 
    clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null); 

} 
}    

app2.confi

akka { 
loggers = ["akka.event.slf4j.Slf4jLogger"] 
loglevel = "DEBUG" 
stdout-loglevel = "DEBUG" 
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 
actor { 
provider = "akka.cluster.ClusterActorRefProvider" 
} 
remote { 
log-remote-lifecycle-events = off 
enabled-transports = ["akka.remote.netty.tcp"] 
netty.tcp { 
    hostname = "127.0.0.1" 
    port = 2553 
    } 
} 
cluster { 
seed-nodes = [ 
    "akka.tcp://[email protected]:2553" 
] 
auto-down-unreachable-after = 10s 
} 
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub", 
"akka.contrib.pattern.ClusterReceptionistExtension"] 
    akka.cluster.pub-sub { 
name = distributedPubSubMediator 
role = "" 
routing-logic = random 
gossip-interval = 1s 
removed-time-to-live = 120s 
max-delta-elements = 3000 
use-dispatcher = "" 
} 

akka.cluster.client.receptionist { 
name = receptionist 
role = "" 
number-of-contacts = 3 
response-tunnel-receive-timeout = 30s 
use-dispatcher = "" 
heartbeat-interval = 2s 
acceptable-heartbeat-pause = 13s 
failure-detection-interval = 2s 
    } 
} 

发布服务器和订阅类是相同的两个应用这在下面给出。

出版商:

public class Publisher extends UntypedActor { 
private final ActorRef mediator = 
     DistributedPubSub.get(getContext().system()).mediator(); 

@Override 
public void onReceive(Object msg) throws Exception { 
    if (msg instanceof String) { 
     mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf()); 
    } else { 
     unhandled(msg); 
    } 
} 

} 

认购人:

public class Subscriber extends UntypedActor { 
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); 

public Subscriber(){ 

    ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); 
    mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf()); 

} 

public void onReceive(Object msg) throws Throwable { 
    if (msg instanceof String) { 
     log.info("Got: {}", msg); 
    } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { 
     log.info("subscribing"); 
    } else { 
     unhandled(msg); 
    } 
} 
} 

我同时运行两个应用程序得到了在接收器端应用程序此错误。 一纸空文encounterd

[ClusterSystem-akka.actor.default-dispatcher-21] INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

和发送端应用程序的消息发送成功将显示在日志中。

[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist 

使用这种方式,ClusterClient并没有真正意义,并没有任何与使用分布式酒馆子,既是你的节点是集群的一部分,你可以使用分布式酒馆子API直接。

下面是一个简单主要包括配置使用您的确切发布者与订阅者的作品如预期创造一个双节点群集:

public static void main(String[] args) throws Exception { 

    final Config config = ConfigFactory.parseString(
    "akka.actor.provider=cluster\n" + 
    "akka.remote.netty.tcp.port=2551\n" + 
    "akka.cluster.seed-nodes = [ \"akka.tcp://[email protected]:2551\"]\n"); 

    ActorSystem node1 = ActorSystem.create("ClusterSystem", config); 
    ActorSystem node2 = ActorSystem.create("ClusterSystem", 
    ConfigFactory.parseString("akka.remote.netty.tcp.port=2552") 
     .withFallback(config)); 

    // wait a bit for the cluster to form 
    Thread.sleep(3000); 

    ActorRef subscriber = node1.actorOf(
    Props.create(Subscriber.class), 
    "subscriber"); 

    ActorRef publisher = node2.actorOf(
    Props.create(Publisher.class), 
    "publisher"); 

    // wait a bit for the subscription to be gossiped 
    Thread.sleep(3000); 

    publisher.tell("testMessage1", ActorRef.noSender()); 
} 

需要注意的是分布式的酒吧子不给送货的任何担保,因此,如果您在调解员彼此接触之前发送消息,则消息将简单地丢失(因此Thread.sleep声明,这些声明在实际代码中不应该这样做)。