2015-07-01 12 views
6

Sto cercando di implementare alcune funzionalità su un client MQTT in Java con Eclipse Paho. L'obiettivo è iscriversi a un argomento e quando viene ricevuto un messaggio, il client invia un altro messaggio su un altro argomento.Come pubblicare un messaggio mentre si riceve su un client Java MQTT utilizzando Eclipse Paho

Sembra molto facile, ma ho uno strano problema che non riesco a risolvere. Ecco il mio codice:

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.IMqttToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 

public class MqttOperations implements MqttCallback { 

    MqttClient sampleClient; 
    MqttConnectOptions connOpts; 

    public MqttOperations() { 
    } 

    public static void main(String[] args) throws InterruptedException { 
     new MqttOperations().launchMqttClient(); 
    } 


    public void launchMqttClient() throws InterruptedException { 
     try { 
       MemoryPersistence persistence = new MemoryPersistence(); 
       sampleClient = new MqttClient("tcp://broker.mqttdashboard.com:1883", "iamaclient", persistence); 
       connOpts = new MqttConnectOptions(); 
       connOpts.setCleanSession(true); 
       sampleClient.connect(connOpts); 
       sampleClient.subscribe("topic/example/ofmessage"); 
       sampleClient.setCallback(this); 

      } catch(MqttException me) { 
       System.out.println("reason "+me.getReasonCode()); 
       System.out.println("msg "+me.getMessage()); 
       System.out.println("loc "+me.getLocalizedMessage()); 
       System.out.println("cause "+me.getCause()); 
       System.out.println("excep "+me); 
       me.printStackTrace(); 
      } 
    } 


    @Override 
    public void connectionLost(Throwable cause) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws MqttException 
    { 
     System.out.println("Received: " + message.toString()); 
     try{ 
      System.out.println("Publishing message: i am the answer"); 
      MqttMessage ans = new MqttMessage("i am the answer".getBytes()); 
      ans.setQos(2); 
      sampleClient.publish("topic/example/ofanswer", ans); 
      System.out.println("Message published"); 

     }catch(MqttException me){ 
       System.out.println("reason "+me.getReasonCode()); 
       System.out.println("msg "+me.getMessage()); 
       System.out.println("loc "+me.getLocalizedMessage()); 
       System.out.println("cause "+me.getCause()); 
       System.out.println("excep "+me); 
       me.printStackTrace(); 
     } 

    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 

    } 

} 

Il fatto è che questo programma funziona solo una volta. Quando il messaggio viene ricevuto, la risposta a questo messaggio viene inviata, ma sembra che il messaggio "messaggio pubblicato" non venga mai visualizzato sullo schermo e il client non riceve altri messaggi. Ho l'impressione che la linea sampleClient.publish("topic/example/ofanswer", ans); non finisca mai la sua esecuzione. Qualcuno sa come viene e come risolvere il mio problema, per favore?

+0

Un'altra precisione: ho trovato alcune fonti in cui è spiegato che dovrei fare attenzione a non rispondere alla mia risposta altrimenti non può funzionare ovviamente. Ma penso di non essere preoccupato da questo problema poiché gli argomenti che uso per iscriversi e pubblicare la risposta sono diversi – tben

+0

Penso che sia un problema che si blocca nel messageArrived callback. Puoi provare a pubblicare in un altro thread (ad esempio, utilizzare un Executor e inviare solo un comando di pubblicazione nella funzione messageArrived callback)? –

risposta

2

Ho avuto un problema simile oggi. Quando ho letto an other question with two connections ho capito: sono necessarie due istanze MqttClient. Uno per la pubblicazione e uno per la sottoscrizione. Purtroppo non ho trovato documentazione per questo fatto.

A proposito. Nella mia prima implementazione con due client, ho dato loro gli stessi ID (logicamente dovrebbe essere la stessa connessione). Ma la seconda connessione disconnette il primo. Quando ho iniziato a utilizzare due ID diversi, inizia a funzionare.

+0

Mi ha davvero aiutato. :) –

+0

Questo mi infastidisce, se leggi il doc http://www.eclipse.org/paho/files/javadoc/index.html per messageArrived(), perché non dovresti avere due connessioni, tuttavia, funziona così applaudito! – Clocker

0

Dominik Obermaier ha ragione: il problema è che si blocca in messageArrived. In particolare, MqttClient.publish attende fino a quando non è stato ricevuto un avviso di consegna per il messaggio, ma il thread di lavoro MqttClient non ottiene mai il recupero, poiché è seduto in attesa dell'avviso stesso in messageArrived!

I due clienti soluzione funziona perché discussione il lavoro di un altro client è libero di recuperare l'avviso dalla presa, ma la soluzione corretta è quella di uno pubblicare con QoS 0 dall'interno messageArrived (come QoS 0 messaggi non hanno bisogno di conferma della consegna) o utilizzare un'API che non attende il recapito del messaggio, ad esempio MqttTopic.publish.

Problemi correlati