Finalmente ho potuto farlo funzionare. Sto postando il codice qui in modo che si possa rivedere e commentare su di esso così come se qualcuno vuole implementarlo in C++, questo pezzo di codice può aiutare. È un codice shabby che la mia intenzione era di far funzionare Protobuf in modo prefissato. Ho preso il codice del server client da un sito che non ricordo e l'ho modificato per ospitare protobuf. Qui il server guarda prima nel socket e ottiene la lunghezza del pacchetto totale e quindi la lettura effettiva del socket viene eseguita per leggere l'intero pacchetto. Ci possono essere zillion modi per farlo, ma per una soluzione rapida l'ho fatto in questo modo. Ma ho bisogno di trovare un modo migliore per evitare 2 recv per pacchetti, ma nella mia condizione tutti i messaggi sono di dimensioni diverse, quindi questo è l'unico modo che immagino.
file di Proto
message log_packet {
required fixed64 log_time =1;
required fixed32 log_micro_sec =2;
required fixed32 sequence_no =3;
required fixed32 shm_app_id =4;
required string packet_id =5;
required string log_level=6;
required string log_msg=7;
}
protocollo codice di buffer client
#include <unistd.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;
using namespace std;
int main(int argv, char** argc){
/* Coded output stram */
log_packet payload ;
payload.set_log_time(10);
payload.set_log_micro_sec(10);
payload.set_sequence_no(1);
payload.set_shm_app_id(101);
payload.set_packet_id("TST");
payload.set_log_level("DEBUG");
payload.set_log_msg("What shall we say then");
cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
int siz = payload.ByteSize()+4;
char *pkt = new char [siz];
google::protobuf::io::ArrayOutputStream aos(pkt,siz);
CodedOutputStream *coded_output = new CodedOutputStream(&aos);
coded_output->WriteVarint32(payload.ByteSize());
payload.SerializeToCodedStream(coded_output);
int host_port= 1101;
char* host_name="127.0.0.1";
struct sockaddr_in my_addr;
char buffer[1024];
int bytecount;
int buffer_len=0;
int hsock;
int * p_int;
int err;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1){
printf("Error initializing socket %d\n",errno);
goto FINISH;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){
printf("Error setting options %d\n",errno);
free(p_int);
goto FINISH;
}
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = inet_addr(host_name);
if(connect(hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1){
if((err = errno) != EINPROGRESS){
fprintf(stderr, "Error connecting socket %d\n", errno);
goto FINISH;
}
}
for (int i =0;i<10000;i++){
for (int j = 0 ;j<10;j++) {
if((bytecount=send(hsock, (void *) pkt,siz,0))== -1) {
fprintf(stderr, "Error sending data %d\n", errno);
goto FINISH;
}
printf("Sent bytes %d\n", bytecount);
usleep(1);
}
}
delete pkt;
FINISH:
close(hsock);
}
buffer di protocollo Codice Server
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
using namespace std;
using namespace google::protobuf::io;
void* SocketHandler(void*);
int main(int argv, char** argc){
int host_port= 1101;
struct sockaddr_in my_addr;
int hsock;
int * p_int ;
int err;
socklen_t addr_size = 0;
int* csock;
sockaddr_in sadr;
pthread_t thread_id=0;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1){
printf("Error initializing socket %d\n", errno);
goto FINISH;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)||
(setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){
printf("Error setting options %d\n", errno);
free(p_int);
goto FINISH;
}
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = INADDR_ANY ;
if(bind(hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1){
fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
goto FINISH;
}
if(listen(hsock, 10) == -1){
fprintf(stderr, "Error listening %d\n",errno);
goto FINISH;
}
//Now lets do the server stuff
addr_size = sizeof(sockaddr_in);
while(true){
printf("waiting for a connection\n");
csock = (int*)malloc(sizeof(int));
if((*csock = accept(hsock, (sockaddr*)&sadr, &addr_size))!= -1){
printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr));
pthread_create(&thread_id,0,&SocketHandler, (void*)csock);
pthread_detach(thread_id);
}
else{
fprintf(stderr, "Error accepting %d\n", errno);
}
}
FINISH:
;//oops
}
google::protobuf::uint32 readHdr(char *buf)
{
google::protobuf::uint32 size;
google::protobuf::io::ArrayInputStream ais(buf,4);
CodedInputStream coded_input(&ais);
coded_input.ReadVarint32(&size);//Decode the HDR and get the size
cout<<"size of payload is "<<size<<endl;
return size;
}
void readBody(int csock,google::protobuf::uint32 siz)
{
int bytecount;
log_packet payload;
char buffer [siz+4];//size of the payload and hdr
//Read the entire buffer including the hdr
if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){
fprintf(stderr, "Error receiving data %d\n", errno);
}
cout<<"Second read byte count is "<<bytecount<<endl;
//Assign ArrayInputStream with enough memory
google::protobuf::io::ArrayInputStream ais(buffer,siz+4);
CodedInputStream coded_input(&ais);
//Read an unsigned integer with Varint encoding, truncating to 32 bits.
coded_input.ReadVarint32(&siz);
//After the message's length is read, PushLimit() is used to prevent the CodedInputStream
//from reading beyond that length.Limits are used when parsing length-delimited
//embedded messages
google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz);
//De-Serialize
payload.ParseFromCodedStream(&coded_input);
//Once the embedded message has been parsed, PopLimit() is called to undo the limit
coded_input.PopLimit(msgLimit);
//Print the message
cout<<"Message is "<<payload.DebugString();
}
void* SocketHandler(void* lp){
int *csock = (int*)lp;
char buffer[4];
int bytecount=0;
string output,pl;
log_packet logp;
memset(buffer, '\0', 4);
while (1) {
//Peek into the socket and get the packet size
if((bytecount = recv(*csock,
buffer,
4, MSG_PEEK))== -1){
fprintf(stderr, "Error receiving data %d\n", errno);
}else if (bytecount == 0)
break;
cout<<"First read byte count is "<<bytecount<<endl;
readBody(*csock,readHdr(buffer));
}
FINISH:
free(csock);
return 0;
}
Grazie un compagno molto. –
+1 per la domanda e +1 per la risposta, questo è stato utile. –
Grazie! Questo è stato molto utile con un problema che stavo avendo –