2014-11-07 32 views
7

Sono un po 'nuovo a Google Cloud Messaging. Ci lavoriamo da un paio di mesi, ma solo di recente abbiamo ricevuto messaggi di "Connection Draining". Quando ciò accade, tutte le comunicazioni si fermano.Apertura della nuova connessione dopo Connection Draining. Google Cloud Messaging

Google dice: https://developer.android.com/google/gcm/ccs.html#response

Quando si riceve un messaggio CONNECTION_DRAINING, si dovrebbe iniziare immediatamente l'invio di messaggi a un altro collegamento CCS, aprendo una nuova connessione, se necessario. Tuttavia, è necessario mantenere la connessione originale aperta e continuare a ricevere i messaggi che potrebbero passare attraverso la connessione (e inserirli). CCS gestirà l'avvio di una connessione chiusa quando è pronta.

La mia domanda è

  1. Se apro una nuova connessione manualmente, come fa a sapere quale connessione da utilizzare se non chiudo la connessione esistente?
  2. Se vengono inviati contemporaneamente 6 messaggi, come posso impedire al metodo di aprire 6 connessioni? O sono confuso su questo?
  3. Perché si verifica il drenaggio della connessione?

Sono sorpreso che questo non sia già stato messo in gioco nel loro codice di esempio. Sembra che sia praticamente tutto ciò di cui hai bisogno. È già fatto per me nel codice e mi manca?

Non ho un metodo principale nel mio codice, ho invece i servlet utente come trigger. La mia connessione è inizializzata in questo modo

@PostConstruct 
    public void init() throws Exception{ 
     try { 
      smackCcsClient.connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key")); 
     }catch (IOException e){ 
      e.printStackTrace(); 
     }catch(SmackException e){ 
      e.printStackTrace(); 
     }catch(XMPPException e){ 
      e.printStackTrace(); 
     } 
    } 

tuttavia dopo questo non tocco mai più la connessione. Sto gestendo questo sbagliato, è la connessione qualcosa che dovrei toccare più frequentemente o qualcosa di cui ho bisogno per tenere traccia di?

_______________________ADDED DOPO LA QUESTION_________________________

ho aggiunto un collegamento all'interno del loro codice di esempio per provare a reinizializzare una connessione. Ecco come si presenta:

if ("CONNECTION_DRAINING".equals(controlType)) { 
      connectionDraining = true; 
      //Open new connection because old connection will be closing or is already closed. 
      try { 
       connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key")); 
      } catch (XMPPException e) { 
       e.printStackTrace(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } catch (SmackException e) { 
       e.printStackTrace(); 
      } 

     } else { 
      logger.log(Level.INFO, "Unrecognized control type: %s. This could happen if new features are " + "added to the CCS protocol.", 
        controlType); 
     } 
+0

Qualche idea? Per favore? –

risposta

1

Sono anche di nuovo in GCM e di fronte lo stesso problema ... ho risolto con la creazione di nuovi SmackCcsClient() sul messaggio CONNECTION_DRAINING. La connessione meno recente dovrebbe ancora esistere e ricevere messaggi, ma non inviare perché:

protetto collegamento booleano volatileDraining = true;

Google dice che il collegamento sarà chiuso da CCS:

CCS gestiranno l'avvio di una stretta connessione quando è pronto.

Fino a quando la connessione non viene chiusa da CCS, sarà possibile ricevere messaggi da entrambe le connessioni, ma in grado di inviare messaggi solo con una nuova.Quando la vecchia connessione è chiusa dovrebbe essere distrutta, non sono sicuro se il garbage collector viene chiamato o meno ... cercando di risolvere questo problema

PS: Non sono sicuro al 100% con questa risposta, ma forse è aprirà più spazio per la discussione.

+0

Grazie per il punto di partenza. Aggiornerò la parte inferiore della mia domanda con ciò che ho implementato alcuni giorni fa come tentativo di capirlo. La mia paura è che con più connessioni aperte ha la capacità di causare messaggi di tag mancanti e così via. –

+0

Sì, sono arrivato con la stessa soluzione e sto ancora testando il comportamento, ho aggiunto anche heartbeat dal mio server a CCS, perché la connessione è stata chiusa dopo qualche tempo in idle e ora non ricevo alcun messaggio CONNECTION_DRAINING (comunque probabilmente non ha nulla da fare con la tua domanda) ... quando ti riconnetti immediatamente, la cosa peggiore che potrebbe accadere è che il server non riceverà i messaggi ACK in arrivo e il server invierà di nuovo i messaggi, quindi potrebbero apparire alcuni duplicati ... Proverò qualcos'altro e ti faccia sapere se c'è una soluzione migliore – cheou

2

Ho scritto un codice per la gestione di questi casi (in pratica deviando i nuovi messaggi a valle di una nuova connessione) ... non accuratamente testato ...

import java.util.Deque; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.ConcurrentLinkedDeque; 

import javax.net.ssl.SSLSocketFactory; 

import org.jivesoftware.smack.ConnectionConfiguration; 
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 
import org.jivesoftware.smack.ConnectionListener; 
import org.jivesoftware.smack.PacketInterceptor; 
import org.jivesoftware.smack.PacketListener; 
import org.jivesoftware.smack.SmackException.NotConnectedException; 
import org.jivesoftware.smack.XMPPConnection; 
import org.jivesoftware.smack.filter.PacketTypeFilter; 
import org.jivesoftware.smack.packet.DefaultPacketExtension; 
import org.jivesoftware.smack.packet.Message; 
import org.jivesoftware.smack.packet.Packet; 
import org.jivesoftware.smack.packet.PacketExtension; 
import org.jivesoftware.smack.provider.PacketExtensionProvider; 
import org.jivesoftware.smack.provider.ProviderManager; 
import org.jivesoftware.smack.tcp.XMPPTCPConnection; 
import org.jivesoftware.smack.util.StringUtils; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.xmlpull.v1.XmlPullParser; 

import com.fasterxml.jackson.core.type.TypeReference; 


/** 
* Based on https://developer.android.com/google/gcm/ccs.html#smack 
* 
* @author Abhinav.Dwivedi 
* 
*/ 
public class SmackCcsClient implements CcsClient { 
    private static final Logger logger = LoggerFactory.getLogger(SmackCcsClient.class); 
    private static final String GCM_SERVER = "gcm.googleapis.com"; 
    private static final int GCM_PORT = 5235; 
    private static final String GCM_ELEMENT_NAME = "gcm"; 
    private static final String GCM_NAMESPACE = "google:mobile:data"; 
    private static volatile SmackCcsClient instance; 
    static { 
     ProviderManager.addExtensionProvider(GCM_ELEMENT_NAME, GCM_NAMESPACE, new PacketExtensionProvider() { 
      @Override 
      public PacketExtension parseExtension(XmlPullParser parser) throws Exception { 
       String json = parser.nextText(); 
       return new GcmPacketExtension(json); 
      } 
     }); 
    } 
    private final Deque<Channel> channels; 

    public static SmackCcsClient instance() { 
     if (instance == null) { 
      synchronized (SmackCcsClient.class) { 
       if (instance == null) { 
        instance = new SmackCcsClient(); 
       } 
      } 
     } 
     return instance; 
    } 

    private SmackCcsClient() { 
     channels = new ConcurrentLinkedDeque<Channel>(); 
     channels.addFirst(connect()); 
    } 

    private class Channel { 
     private XMPPConnection connection; 
     /** 
     * Indicates whether the connection is in draining state, which means that it will not accept any new downstream 
     * messages. 
     */ 
     private volatile boolean connectionDraining = false; 

     /** 
     * Sends a packet with contents provided. 
     */ 
     private void send(String jsonRequest) throws NotConnectedException { 
      Packet request = new GcmPacketExtension(jsonRequest).toPacket(); 
      connection.sendPacket(request); 
     } 

     private void handleControlMessage(Map<String, Object> jsonObject) { 
      logger.debug("handleControlMessage(): {}", jsonObject); 
      String controlType = (String) jsonObject.get("control_type"); 
      if ("CONNECTION_DRAINING".equals(controlType)) { 
       connectionDraining = true; 
      } else { 
       logger.info("Unrecognized control type: {}. This could happen if new features are " 
         + "added to the CCS protocol.", controlType); 
      } 
     } 
    } 

    /** 
    * Sends a downstream message to GCM. 
    * 
    */ 
    @Override 
    public void sendDownstreamMessage(String message) throws Exception { 
     Channel channel = channels.peekFirst(); 
     if (channel.connectionDraining) { 
      synchronized (channels) { 
       channel = channels.peekFirst(); 
       if (channel.connectionDraining) { 
        channels.addFirst(connect()); 
        channel = channels.peekFirst(); 
       } 
      } 
     } 
     channel.send(message); 
     logger.debug("Message Sent via CSS: ({})", message); 
    } 

    /** 
    * Handles an upstream data message from a device application. 
    * 
    */ 
    protected void handleUpstreamMessage(Map<String, Object> jsonObject) { 
     // PackageName of the application that sent this message. 
     String category = (String) jsonObject.get("category"); 
     String from = (String) jsonObject.get("from"); 
     @SuppressWarnings("unchecked") 
     Map<String, String> payload = (Map<String, String>) jsonObject.get("data"); 
     logger.info("Message received from device: category ({}), from ({}), payload: ({})", category, from, 
       JsonUtil.toJson(payload)); 
    } 

    /** 
    * Handles an ACK. 
    * 
    * <p> 
    * Logs a INFO message, but subclasses could override it to properly handle ACKs. 
    */ 
    public void handleAckReceipt(Map<String, Object> jsonObject) { 
     String messageId = (String) jsonObject.get("message_id"); 
     String from = (String) jsonObject.get("from"); 
     logger.debug("handleAckReceipt() from: {}, messageId: {}", from, messageId); 
    } 

    /** 
    * Handles a NACK. 
    * 
    * <p> 
    * Logs a INFO message, but subclasses could override it to properly handle NACKs. 
    */ 
    protected void handleNackReceipt(Map<String, Object> jsonObject) { 
     String messageId = (String) jsonObject.get("message_id"); 
     String from = (String) jsonObject.get("from"); 
     logger.debug("handleNackReceipt() from: {}, messageId: ", from, messageId); 
    } 

    /** 
    * Creates a JSON encoded ACK message for an upstream message received from an application. 
    * 
    * @param to 
    *   RegistrationId of the device who sent the upstream message. 
    * @param messageId 
    *   messageId of the upstream message to be acknowledged to CCS. 
    * @return JSON encoded ack. 
    */ 
    protected static String createJsonAck(String to, String messageId) { 
     Map<String, Object> message = new HashMap<String, Object>(); 
     message.put("message_type", "ack"); 
     message.put("to", to); 
     message.put("message_id", messageId); 
     return JsonUtil.toJson(message); 
    } 

    /** 
    * Connects to GCM Cloud Connection Server using the supplied credentials. 
    * 
    * @return 
    */ 
    @Override 
    public Channel connect() { 
     try { 
      Channel channel = new Channel(); 
      ConnectionConfiguration config = new ConnectionConfiguration(GCM_SERVER, GCM_PORT); 
      config.setSecurityMode(SecurityMode.enabled); 
      config.setReconnectionAllowed(true); 
      config.setRosterLoadedAtLogin(false); 
      config.setSendPresence(false); 
      config.setSocketFactory(SSLSocketFactory.getDefault()); 

      channel.connection = new XMPPTCPConnection(config); 
      channel.connection.connect(); 

      channel.connection.addConnectionListener(new LoggingConnectionListener()); 

      // Handle incoming packets 
      channel.connection.addPacketListener(new PacketListener() { 
       @Override 
       public void processPacket(Packet packet) { 
        logger.debug("Received: ({})", packet.toXML()); 
        Message incomingMessage = (Message) packet; 
        GcmPacketExtension gcmPacket = (GcmPacketExtension) incomingMessage.getExtension(GCM_NAMESPACE); 
        String json = gcmPacket.getJson(); 
        try { 
         Map<String, Object> jsonObject = JacksonUtil.DEFAULT.mapper().readValue(json, 
           new TypeReference<Map<String, Object>>() {}); 
         // present for ack, nack and control, null otherwise 
         Object messageType = jsonObject.get("message_type"); 
         if (messageType == null) { 
          // Normal upstream data message 
          handleUpstreamMessage(jsonObject); 
          // Send ACK to CCS 
          String messageId = (String) jsonObject.get("message_id"); 
          String from = (String) jsonObject.get("from"); 
          String ack = createJsonAck(from, messageId); 
          channel.send(ack); 
         } else if ("ack".equals(messageType.toString())) { 
          // Process Ack 
          handleAckReceipt(jsonObject); 
         } else if ("nack".equals(messageType.toString())) { 
          // Process Nack 
          handleNackReceipt(jsonObject); 
         } else if ("control".equals(messageType.toString())) { 
          // Process control message 
          channel.handleControlMessage(jsonObject); 
         } else { 
          logger.error("Unrecognized message type ({})", messageType.toString()); 
         } 
        } catch (Exception e) { 
         logger.error("Failed to process packet ({})", packet.toXML(), e); 
        } 
       } 
      }, new PacketTypeFilter(Message.class)); 

      // Log all outgoing packets 
      channel.connection.addPacketInterceptor(new PacketInterceptor() { 
       @Override 
       public void interceptPacket(Packet packet) { 
        logger.debug("Sent: {}", packet.toXML()); 
       } 
      }, new PacketTypeFilter(Message.class)); 

      channel.connection.login(ExternalConfig.gcmSenderId() + "@gcm.googleapis.com", ExternalConfig.gcmApiKey()); 
      return channel; 
     } catch (Exception e) { 
      logger.error(Logging.FATAL, "Error in creating channel for GCM communication", e); 
      throw new RuntimeException(e); 
     } 
    } 

    /** 
    * XMPP Packet Extension for GCM Cloud Connection Server. 
    */ 
    private static final class GcmPacketExtension extends DefaultPacketExtension { 

     private final String json; 

     public GcmPacketExtension(String json) { 
      super(GCM_ELEMENT_NAME, GCM_NAMESPACE); 
      this.json = json; 
     } 

     public String getJson() { 
      return json; 
     } 

     @Override 
     public String toXML() { 
      return String.format("<%s xmlns=\"%s\">%s</%s>", GCM_ELEMENT_NAME, GCM_NAMESPACE, 
        StringUtils.escapeForXML(json), GCM_ELEMENT_NAME); 
     } 

     public Packet toPacket() { 
      Message message = new Message(); 
      message.addExtension(this); 
      return message; 
     } 
    } 

    private static final class LoggingConnectionListener implements ConnectionListener { 

     @Override 
     public void connected(XMPPConnection xmppConnection) { 
      logger.info("Connected."); 
     } 

     @Override 
     public void authenticated(XMPPConnection xmppConnection) { 
      logger.info("Authenticated."); 
     } 

     @Override 
     public void reconnectionSuccessful() { 
      logger.info("Reconnecting.."); 
     } 

     @Override 
     public void reconnectionFailed(Exception e) { 
      logger.error("Reconnection failed.. ", e); 
     } 

     @Override 
     public void reconnectingIn(int seconds) { 
      logger.info("Reconnecting in {} secs", seconds); 
     } 

     @Override 
     public void connectionClosedOnError(Exception e) { 
      logger.info("Connection closed on error."); 
     } 

     @Override 
     public void connectionClosed() { 
      logger.info("Connection closed."); 
     } 
    } 
} 
0

ho solo spinto il codice per il FCM Connessione Drenante al mio esempio di server XMPP FCM.

Progetto: Server di connessione XMPP per FCM che utilizza l'ultima versione della libreria Smack (4.2.2) + Implementazione di drenaggio della connessione.

GitHub link: https://github.com/carlosCharz/fcmxmppserverv2

Youtube link: https://youtu.be/KVKEj6PeLTc

Se avete avuto qualche problema controllare la mia sezione di risoluzione dei problemi. Spero che tu possa trovare utile. Saluti!

Problemi correlati