android - Paho MQTT服务发布

问题描述:

我是Android和服务的新手。我的目标是能够设置订阅并制作关于主题字符串的出版物。解析文本字段的输入后,主题字符串和客户端ID被设置。我正在使用Paho MQTT service(下载源代码并构建了JAR)。android - Paho MQTT服务发布

以下原因会在c.publish()处产生空指针异常。 logcat显示在方法MqttAndroidClient中的例外,其中正在执行递送令牌。

public class MainActivity extends Activity { 
    @Override 
    protected void onCreate(Bundle savedInstanceState) { 
     super.onCreate(savedInstanceState); 
     setContentView(R.layout.activity_main); 

     // Set locale; 
     l = getResources().getConfiguration().locale; 
    } 

    @Override 
    protected void onResume() { 
     super.onResume(); 
     addButtonListener();   
    } 

    private void addButtonListener() { 
     Button submitButton = (Button) findViewById(R.id.buttonSubmit); 

     submitButton.setOnClickListener(new OnClickListener() { 
// ... 
// validation code for fields in layout 
// ... 
// Finally, this. 

        MemoryPersistence mPer = new MemoryPersistence(); 
        String clientId = UUID.randomUUID().toString(); 
        String brokerUrl = "tcp://m2m.eclipse.org:1883"; 
        MqttAndroidClient c = new MqttAndroidClient(getApplicationContext(), brokerUrl, clientId, mPer); 
        try { 
         c.connect(); 
         String topic = "transfers/topic"; 
         String msg = "topic payload" 
         MqttMessage m = new MqttMessage(); 
         m.setPayload(msg.getBytes()); 
         m.setQos(2); 
         m.setRetained(false); 
         c.publish(topic, m); 
        } catch (MqttException e) { 
         Toast.makeText(getApplicationContext(), e.getMessage(), Toast.LENGTH_SHORT).show(); 
        } 

您能否告诉我如何使用该服务发布和订阅?我确实浏览了示例项目(来自Paho Android)。随着LWT的布局(activity_publish.xml)似乎也用于发布,LWT和发布似乎被合并。

NullPointerException是因为connect()调用异步方法,您需要实现ActionListener。 如果成功,您可以发送消息。

Log.i(LOGTAG, "MQTT Start"); 

MemoryPersistence memPer = new MemoryPersistence(); 

final MqttAndroidClient client = new MqttAndroidClient(
    context, "tcp://192.168.0.13:1883", username, memPer); 

try { 
    client.connect(null, new IMqttActionListener() { 

     @Override 
     public void onSuccess(IMqttToken mqttToken) { 
      Log.i(LOGTAG, "Client connected"); 
      Log.i(LOGTAG, "Topics="+mqttToken.getTopics()); 

      MqttMessage message = new MqttMessage("Hello, I am Android Mqtt Client.".getBytes()); 
      message.setQos(2); 
      message.setRetained(false); 

      try { 
       client.publish("messages", message); 
       Log.i(LOGTAG, "Message published"); 

       client.disconnect(); 
       Log.i(LOGTAG, "client disconnected"); 

      } catch (MqttPersistenceException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 

      } catch (MqttException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     @Override 
     public void onFailure(IMqttToken arg0, Throwable arg1) { 
      // TODO Auto-generated method stub 
      Log.i(LOGTAG, "Client connection failed: "+arg1.getMessage()); 

     } 
    }); 
} 
+0

Th at真的很有趣,但是怎么能说这个方法会做异步工作呢? 在C#中不可能有这样的实现,因为connect()方法需要被标记为异步以避免这种错误。 –

明白,你需要调用client.setCallback,为了收到关于主题的消息落实MqttCallbackHandler到您订阅这是非常重要的。

下面是代码的例子,可以发布和订阅等

下面的代码最初出版MQTT主题和有效载荷:

  • 主题: AndroidPhone
  • 有效载荷:你好,我是一个Android Mqtt客户端。

该代码订阅主题“测试仪”。如果它接收与主题“测试器”和“激活的报警”,那么的有效载荷的消息时,它公布了以下主题和有效载荷(通过上面提到的回调):

  • 主题: Fitlet
  • 有效载荷:你好,Mosquitto经纪人收到你的消息说 报警被激活。

如果您正在使用Mosquitto然后在终端中输入以下命令将导致该消息被发射出去:

mosquitto_pub -h 192.168.9.100 -t tester -m "Alarm Activated" -u fred -P 1234 

凡我Mosquitto用户名是弗雷德和我的密码是1234

代码:

package colin.android.mqtt;  
import android.support.v7.app.AppCompatActivity; 
import android.os.Bundle; 
import android.util.Log; 
import android.widget.Toast; 
import org.eclipse.paho.android.service.MqttAndroidClient; 
import org.eclipse.paho.client.mqttv3.IMqttActionListener; 
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.IMqttToken; 
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttPersistenceException; 
import java.io.UnsupportedEncodingException; 

public class MainActivity extends AppCompatActivity { 
    @Override 
    protected void onCreate(Bundle savedInstanceState) { 
     super.onCreate(savedInstanceState); 
     setContentView(R.layout.activity_main); 

     String clientId = MqttClient.generateClientId(); 

     //The URL of the Mosquitto Broker is 192.168.9.100:1883 
     final MqttAndroidClient client = new MqttAndroidClient(this.getApplicationContext(), "tcp://192.168.9.100:1883", clientId); 

     client.setCallback(new MqttCallbackHandler(client));//This is here for when a message is received 

     MqttConnectOptions options = new MqttConnectOptions(); 

     try { 
      options.setUserName("fred"); 
      options.setPassword("1234".toCharArray()); 
      IMqttToken token = client.connect(options); 

      token.setActionCallback(new IMqttActionListener() { 
       @Override 
       public void onSuccess(IMqttToken asyncActionToken) { 
        // We are connected 
        Log.d("mqtt", "onSuccess"); 
//----------------------------------------------------------------------------------------------- 
        //PUBLISH THE MESSAGE      
        MqttMessage message = new MqttMessage("Hello, I am an Android Mqtt Client.".getBytes()); 
        message.setQos(2); 
        message.setRetained(false); 

        String topic = "AndroidPhone"; 

        try { 
         client.publish(topic, message); 
         Log.i("mqtt", "Message published"); 

         // client.disconnect(); 
         //Log.i("mqtt", "client disconnected"); 

        } catch (MqttPersistenceException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 

        } catch (MqttException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 
        } 
//----------------------------------------------------------------------------------------------- 

        String subtopic = "tester"; 
        int qos = 1; 
        try { 
         IMqttToken subToken = client.subscribe(subtopic, qos); 
         subToken.setActionCallback(new IMqttActionListener() { 
          @Override 
          public void onSuccess(IMqttToken asyncActionToken) { 
           // The message was published 
           Log.i("mqtt", "subscription success"); 
          } 

          @Override 
          public void onFailure(IMqttToken asyncActionToken, 
                Throwable exception) { 
           // The subscription could not be performed, maybe the user was not 
           // authorized to subscribe on the specified topic e.g. using wildcards 
           Log.i("mqtt", "subscription failed"); 

          } 
         }); 



        } catch (MqttException e) { 
         e.printStackTrace(); 
        } 

//--------------------------------------------------------------------------- 

       } 

       @Override 
       public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
        // Something went wrong e.g. connection timeout or firewall problems 
        Log.d("mqtt", "onFailure"); 

       } 

      }); 


     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 

    } 


}//End of Activity class 

//----------------------------------------------------------------------------- 

class MqttCallbackHandler implements MqttCallbackExtended { 

    private final MqttAndroidClient client; 

    public MqttCallbackHandler (MqttAndroidClient client) 
    { 
     this.client=client; 
    } 

    @Override 
    public void connectComplete(boolean b, String s) { 
     Log.w("mqtt", s); 
    } 

    @Override 
    public void connectionLost(Throwable throwable) { 

    } 

    public void AlarmActivatedMessageReceived() 
    { 
     MqttMessage msg= new MqttMessage("Hello, the Mosquitto Broker got your message saying that the Alarm is Activated.".getBytes()); 
     try { 
      this.client.publish("Fitlet", msg); 
      Log.i("mqtt", "Message published"); 

     } catch (MqttPersistenceException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 

     } catch (MqttException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { 
     Log.w("mqtt", mqttMessage.toString()); 

     if (mqttMessage.toString().contains("Alarm Activated")) 
     { 
      AlarmActivatedMessageReceived(); 
     } 

    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 

    } 
} 
+0

仅供参考如果您有“正在处理”,那么MQTT更容易实现:http://processing-mqtt.blogspot.com/。处理代码在Android手机上运行... – CMP