2012-02-29 12 views
29

Sto cercando di esplorare il Protocol Buffer (PB) nella piattaforma Linux e il mio linguaggio di codifica è C++. Ho trovato degli esempi nei protocolli online dei protocolli del protocollo ma nulla di specifico per il socket invia e ricevi (o l'ho perso completamente :)). Così ho deciso di aggiungere il messaggio Length prima del messaggio effettivo e inviarlo attraverso il socket. Gradirei se qualcuno potesse suggerire una soluzione migliore di quello che sto pensando di fare e inoltre c'è qualcosa di pronto in PB per la creazione di tali pacchetti.Protocol Buffer su socket in C++

Ma continuo a riscontrare un problema sul lato server in cui devo decodificare il pacchetto. Dì se il client invia un pacchetto di 10 byte in cui il primo 4 byte è la lunghezza del pacchetto; Ma è impossibile conoscere la lunghezza prima di decodificare il pacchetto. Quindi, anche se leggo il primo 4 byte, come deduco il valore con il pacchetto a metà lettura usando il Protocol Buffer.

risposta

25

Finalmente ho potuto farlo funzionare. Sto postando il codice qui in modo che si possa rivedere e commentare su di esso così come se qualcuno vuole implementarlo in C++, questo pezzo di codice può aiutare. È un codice shabby che la mia intenzione era di far funzionare Protobuf in modo prefissato. Ho preso il codice del server client da un sito che non ricordo e l'ho modificato per ospitare protobuf. Qui il server guarda prima nel socket e ottiene la lunghezza del pacchetto totale e quindi la lettura effettiva del socket viene eseguita per leggere l'intero pacchetto. Ci possono essere zillion modi per farlo, ma per una soluzione rapida l'ho fatto in questo modo. Ma ho bisogno di trovare un modo migliore per evitare 2 recv per pacchetti, ma nella mia condizione tutti i messaggi sono di dimensioni diverse, quindi questo è l'unico modo che immagino.

file di Proto

message log_packet { 
    required fixed64 log_time =1; 
    required fixed32 log_micro_sec =2; 
    required fixed32 sequence_no =3; 
    required fixed32 shm_app_id =4; 
    required string packet_id =5; 
    required string log_level=6; 
    required string log_msg=7; 
    } 

protocollo codice di buffer client

#include <unistd.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/message.h> 
#include <google/protobuf/descriptor.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> 


using namespace google::protobuf::io; 

using namespace std; 
int main(int argv, char** argc){ 

/* Coded output stram */ 

log_packet payload ; 

payload.set_log_time(10); 
payload.set_log_micro_sec(10); 
payload.set_sequence_no(1); 
payload.set_shm_app_id(101); 
payload.set_packet_id("TST"); 
payload.set_log_level("DEBUG"); 
payload.set_log_msg("What shall we say then"); 

cout<<"size after serilizing is "<<payload.ByteSize()<<endl; 
int siz = payload.ByteSize()+4; 
char *pkt = new char [siz]; 
google::protobuf::io::ArrayOutputStream aos(pkt,siz); 
CodedOutputStream *coded_output = new CodedOutputStream(&aos); 
coded_output->WriteVarint32(payload.ByteSize()); 
payload.SerializeToCodedStream(coded_output); 

     int host_port= 1101; 
     char* host_name="127.0.0.1"; 

     struct sockaddr_in my_addr; 

     char buffer[1024]; 
     int bytecount; 
     int buffer_len=0; 

     int hsock; 
     int * p_int; 
     int err; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n",errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n",errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = inet_addr(host_name); 
     if(connect(hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       if((err = errno) != EINPROGRESS){ 
         fprintf(stderr, "Error connecting socket %d\n", errno); 
         goto FINISH; 
       } 
     } 




     for (int i =0;i<10000;i++){ 
      for (int j = 0 ;j<10;j++) { 

       if((bytecount=send(hsock, (void *) pkt,siz,0))== -1) { 
         fprintf(stderr, "Error sending data %d\n", errno); 
         goto FINISH; 
       } 
       printf("Sent bytes %d\n", bytecount); 
       usleep(1); 
     } 
     } 
     delete pkt; 

FINISH: 
     close(hsock); 

} 

buffer di protocollo Codice Server

#include <fcntl.h> 
#include <string.h> 
#include <stdlib.h> 
#include <errno.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <resolv.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <unistd.h> 
#include <pthread.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 

using namespace std; 
using namespace google::protobuf::io; 



void* SocketHandler(void*); 

int main(int argv, char** argc){ 

     int host_port= 1101; 

     struct sockaddr_in my_addr; 

     int hsock; 
     int * p_int ; 
     int err; 

     socklen_t addr_size = 0; 
     int* csock; 
     sockaddr_in sadr; 
     pthread_t thread_id=0; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n", errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n", errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = INADDR_ANY ; 

     if(bind(hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); 
       goto FINISH; 
     } 
     if(listen(hsock, 10) == -1){ 
       fprintf(stderr, "Error listening %d\n",errno); 
       goto FINISH; 
     } 

     //Now lets do the server stuff 

     addr_size = sizeof(sockaddr_in); 

     while(true){ 
       printf("waiting for a connection\n"); 
       csock = (int*)malloc(sizeof(int)); 
       if((*csock = accept(hsock, (sockaddr*)&sadr, &addr_size))!= -1){ 
         printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); 
         pthread_create(&thread_id,0,&SocketHandler, (void*)csock); 
         pthread_detach(thread_id); 
       } 
       else{ 
         fprintf(stderr, "Error accepting %d\n", errno); 
       } 
     } 

FINISH: 
;//oops 
} 

google::protobuf::uint32 readHdr(char *buf) 
{ 
    google::protobuf::uint32 size; 
    google::protobuf::io::ArrayInputStream ais(buf,4); 
    CodedInputStream coded_input(&ais); 
    coded_input.ReadVarint32(&size);//Decode the HDR and get the size 
    cout<<"size of payload is "<<size<<endl; 
    return size; 
} 

void readBody(int csock,google::protobuf::uint32 siz) 
{ 
    int bytecount; 
    log_packet payload; 
    char buffer [siz+4];//size of the payload and hdr 
    //Read the entire buffer including the hdr 
    if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     } 
    cout<<"Second read byte count is "<<bytecount<<endl; 
    //Assign ArrayInputStream with enough memory 
    google::protobuf::io::ArrayInputStream ais(buffer,siz+4); 
    CodedInputStream coded_input(&ais); 
    //Read an unsigned integer with Varint encoding, truncating to 32 bits. 
    coded_input.ReadVarint32(&siz); 
    //After the message's length is read, PushLimit() is used to prevent the CodedInputStream 
    //from reading beyond that length.Limits are used when parsing length-delimited 
    //embedded messages 
    google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz); 
    //De-Serialize 
    payload.ParseFromCodedStream(&coded_input); 
    //Once the embedded message has been parsed, PopLimit() is called to undo the limit 
    coded_input.PopLimit(msgLimit); 
    //Print the message 
    cout<<"Message is "<<payload.DebugString(); 

} 

void* SocketHandler(void* lp){ 
    int *csock = (int*)lp; 

     char buffer[4]; 
     int bytecount=0; 
     string output,pl; 
     log_packet logp; 

     memset(buffer, '\0', 4); 

     while (1) { 
     //Peek into the socket and get the packet size 
     if((bytecount = recv(*csock, 
         buffer, 
           4, MSG_PEEK))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     }else if (bytecount == 0) 
       break; 
     cout<<"First read byte count is "<<bytecount<<endl; 
     readBody(*csock,readHdr(buffer)); 
     } 

FINISH: 
     free(csock); 
    return 0; 
} 
+0

Grazie un compagno molto. –

+0

+1 per la domanda e +1 per la risposta, questo è stato utile. –

+1

Grazie! Questo è stato molto utile con un problema che stavo avendo –

10

Purtroppo, protobuf non prevede un modo per "imballaggio" (di delimitazione) Messaggi:

Se si desidera scrivere più messaggi in un singolo file o un flusso, si è a voi per mantenere traccia di dove finisce un messaggio e inizia il prossimo . Il formato del protocollo del protocollo buffer non è auto-delimitando, pertanto i parser del buffer del protocollo non possono determinare dove un messaggio termina con il proprio . Il modo più semplice per risolvere questo problema è scrivere la dimensione di ogni messaggio prima di scrivere il messaggio stesso.

(dal loro documentation)

Quindi, si consiglia fondamentalmente la stessa soluzione si è arrivati ​​al.

+0

Grazie Tamas così ho ll andare con il mio approccio – punith