6

Vorrei inserire i dati dei messaggi PubSub provenienti da un argomento in una tabella BigQuery utilizzando Google Cloud Dataflow. Tutto funziona alla grande ma nella tabella di BigQuery posso vedere stringhe illeggibili come "߈ ". Questa è la mia condotta:Inserire messaggi PubSub in BigQuery tramite Google Cloud Dataflow

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name")) 
.apply(ParDo.named("Transformation").of(new StringToRowConverter())) 
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table") 
    .withSchema(schema) 
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)) 

e la mia semplice funzione StringToRowConverter è:

class StringToRowConverter extends DoFn<String, TableRow> { 
private static final long serialVersionUID = 0; 

@Override 
public void processElement(ProcessContext c) { 
    for (String word : c.element().split(",")) { 
     if (!word.isEmpty()) { 
      System.out.println(word); 
     c.output(new TableRow().set("data", word)); 
     } 
    } 
} 
} 

E questo è il messaggio che ho inviato tramite una richiesta POST:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish 
{ 
"messages": [ 
    { 
    "attributes":{ 
"key": "tablet, smartphone, desktop", 
"value": "eng" 
    }, 
    "data": "34gf5ert" 
    } 
] 
} 

Che cosa mi manca ? Grazie!

+0

[Questo] (https://github.com/bomboradata/pubsub-to-bigquery) è un open source che è possibile utilizzare per indirizzare pub/sub a BQ – PUG

risposta

6

In base a https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage, il payload JSON del messaggio pubsub è codificato in base64. PubsubIO in Dataflow, per impostazione predefinita, utilizza il codificatore String UTF8. La stringa di esempio fornita "34gf5ert", quando decodificata in base64 e interpretata come una stringa UTF-8, fornisce esattamente "߈ ".

2

Questo è come mi sto disimballaggio miei messaggi PubSub:

@Override 
public void processElement(ProcessContext c) { 

    String json = c.element(); 

    HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 
    String unpacked = items.get("JsonKey"); 

Spero proprio utile a voi.

Problemi correlati