Eclipse Kapua经纪人:未授权订阅主题

问题描述:

我试图订阅来自Eclipse Paho MQTT客户端的简单主题“foo”。Eclipse Kapua经纪人:未授权订阅主题

代理由Eclipse Kapua管理,可通过tcp:// localhost:1883使用凭证“kapua-broker”和“kapua-password”进行访问。

我发布的值是这样的:

send(new Payload.Builder().put("testKey","testVal"),"foo");

这基本上发送一个地图( “密押”, “名为testVal的”)的主题为 “foo”。要订阅这个话题,我有以下代码(主机= “localhost”,则端口= 1883):

String topic = "foo"; 
    String broker = "tcp://"+host+":"+Integer.toString(port); 
    String clientId = "supply-chain-control-simulation-listener"; 
    String username = "kapua-broker"; 
    String password = "kapua-password"; 

    try { 
     MqttClient client = new MqttClient(broker, clientId); 
     MqttConnectOptions connOpts = new MqttConnectOptions(); 
     connOpts.setCleanSession(true); 
     connOpts.setUserName(username); 
     connOpts.setPassword(password.toCharArray()); 
     connOpts.setCleanSession(true); 
     logger.info("Connecting to broker: "+broker); 
     client.setCallback(new MqttCallback() { 
      @Override 
      public void connectionLost(Throwable throwable) { 
       logger.info("Subscriptions stopped"); 
      } 

      @Override 
      public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 
       logger.info(s); 
       logger.info(mqttMessage.getPayload().toString()); 
      } 

      @Override 
      public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 

      } 
     }); 
     client.connect(connOpts); 
     if (client.isConnected()) 
      logger.info("Connected"); 
     else 
      logger.error(client.getDebug().toString()); 
     client.subscribe(topic,2); 
    } catch(MqttException me) { 
     logger.error("reason "+me.getReasonCode()); 
     logger.error("msg "+me.getMessage()); 
     logger.error("loc "+me.getLocalizedMessage()); 
     logger.error("cause "+me.getCause()); 
     logger.error("excep "+me); 
     me.printStackTrace(); 
    } 

的连接工作,但认购输出这个错误:

15:40 :03.240 [ActiveMQ NIO Worker 0]警告oekbcpKapuaSecurityBrokerFilter - 用户1:kapua-broker(供应链控制模拟侦听器 - tcp://172.17.0.1:40888 - conn id 1734706196170193882)未被授权读取:话题://VirtualTopic.foo

在卡普阿,您可以根据您的用户许可进行发布/订阅。

如果您的用户只能发布broker:connect许可/订阅只在题目:

{account-name}/{connectionClientId}/{semanticTopic} 

在特定情况下,你应该发布/订阅的话题:

kapus-sys/supply-chain-control-simulation-listener/foo 

kapua-sys是账户用户kapua-broker所属的名称, ,而supply-chain-control-simulation-listener是用于创建连接的clientId。

请注意,用于连接和帐户名称的用户名是卡普阿两种不同的东西。一个帐户有多个用户。

不要调用connect后马上,而是移到该呼叫到connectComplete回调:

IMqttAsyncClient client = new MqttAsyncClient(broker, clientId); 
MqttConnectOptions connOpts = new MqttConnectOptions(); 
connOpts.setCleanSession(true); 
connOpts.setUserName(username); 
connOpts.setPassword(password.toCharArray()); 
connOpts.setCleanSession(true); 
logger.info("Connecting to broker: "+broker); 
client.setCallback(new MqttCallbackExtended() { 
    @Override 
    public void connectComplete(boolean reconnect, String brokerAddress) { 
     logger.info("Connected"); 
     client.subscribe(topic,2); 
    } 
    @Override 
    public void connectionLost(Throwable throwable) { 
     logger.info("Subscriptions stopped"); 
    } 

    @Override 
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 
     logger.info(s); 
     logger.info(mqttMessage.getPayload().toString()); 
    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 

    } 
}); 
client.connect(connOpts); 

这就是说,你的错误可能是从您使用MQTT经纪人来了,你需要配置它以允许访问该主题。

+1

感谢您的回答。你说得对,这种方式更清洁,但不幸的是我仍然得到相同的错误。问题是我不控制经纪人,它是Kapua项目的一部分,并使用它,我只是启动一个码头集装箱。 –