5

È possibile creare una pipeline che legge dati da Pub/Sub e scrive su Datastore? Nel mio codice specificherò PubsubIO come input e applicherò la windowing per ottenere una PColection limitata, ma sembra che non sia possibile usare DatastoreIO.writeTo con options.setStreaming come true, mentre è necessario per usare PubsubIO come input. C'è un modo per aggirare questo? O semplicemente non è possibile leggere da pubsub e scrivere su datastore?Lettura da PubsubIO scrittura su DatastoreIO

Ecco il mio codice:

DataflowPipelineOptions options = PipelineOptionsFactory.create() 
      .as(DataflowPipelineOptions.class); 

    options.setRunner(DataflowPipelineRunner.class); 
    options.setProject(projectName); 
    options.setStagingLocation("gs://my-staging-bucket/staging"); 
    options.setStreaming(true); 

    Pipeline p = Pipeline.create(options); 

    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming")); 
    PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1))); 
    PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() { 
     private static final long serialVersionUID = 1L; 
     public void processElement(ProcessContext c) { 
      String msg = c.element(); 
      byte[] decoded = Base64.decodeBase64(msg.getBytes()); 
      String outmsg = new String(decoded); 
      c.output(outmsg); 
     } 
    })); 
    PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events"))); 

    inputEntity.apply(DatastoreIO.writeTo(datasetid)); 


    p.run(); 

E questa è l'eccezione ottengo:

Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner. 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159) 
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104) 

risposta

5

Il lavello DatastoreIO non è attualmente supportato nel corridore streaming. Per scrivere su Datastore da una pipeline di streaming, è possibile effettuare chiamate dirette all'API Datastore da un DoFn.

+0

Grazie, questo è stato utile. Ma ora sto affrontando problemi con la chiamata dell'API Datastore dall'app Dataflow, che non è un'app AppEngine, e apparentemente l'API del datastore si basa molto sulla funzionalità AppEngine disponibile solo per le app in esecuzione su AppEngine. Poi ho trovato l'API Remote che sembra fornire esattamente ciò di cui ho bisogno, ma ho ancora difficoltà ad usarlo. Devo autenticarmi con un account di servizio? Ho seguito il codice di esempio su questa [pagina] (https://cloud.google.com/appengine/docs/java/tools/remoteapi) ma sto ricevendo una HttpResponseException, 302 – lilline

+0

Stai provando a scrivere su un'istanza di Datastore appartiene a un progetto diverso dalla tua pipeline Dataflow? In tal caso, dai un'occhiata a https://cloud.google.com/dataflow/security-and-permissions#cross-project per come configurarlo – danielm

+0

No, l'istanza del datastore fa parte dello stesso progetto del flusso di dati, Ho superato il problema 302. MA, come è possibile utilizzare l'API remota in un ParDo, quando (sto indovinando qui) ParDo esegue la funzione DoFn in thread o istanze diverse rispetto alla pipeline padre e l'installer dell'API remota non è serializzabile e l'installer è disponibile solo sul thread in cui è stato creato? Non sono sicuro se questo è il problema, ma in ogni caso, sto ricevendo diverse eccezioni a seconda di dove provo a creare e ad accedere all'installer .. – lilline

4

Ok, dopo aver sbattuto la testa contro il muro, finalmente l'ho fatto funzionare. Come suggerito da danielm, sto effettuando chiamate all'API Datastore da un DoDo DoDo. Un problema era che non mi rendevo conto che esiste un'API separata per l'utilizzo di Cloud Datastore all'esterno di AppEngine. (com.google.api.services.datastore ... vs. com.google.appengine.api.datastore ...). Un altro problema è che apparentemente c'è un qualche tipo di bug nell'ultima versione dell'API Cloud Datastore (google-api-services-datastore-protobuf v1beta2-rev1-4.0.0, ho ricevuto un errore IllegalAccess), ho risolto che usando un versione precedente (v1beta2-rev1-2.1.2).

Quindi, ecco il mio codice di lavoro:

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.api.services.datastore.DatastoreV1.*; 
import com.google.api.services.datastore.client.Datastore; 
import com.google.api.services.datastore.client.DatastoreException; 
import com.google.api.services.datastore.client.DatastoreFactory; 
import static com.google.api.services.datastore.client.DatastoreHelper.*; 
import java.security.GeneralSecurityException; 
import java.io.IOException; 
import org.json.simple.JSONObject; 
import org.json.simple.parser.JSONParser; 
import org.json.simple.parser.ParseException; 

//-------------------- 

public static void main(String[] args) { 
    DataflowPipelineOptions options = PipelineOptionsFactory.create() 
      .as(DataflowPipelineOptions.class); 

    options.setRunner(DataflowPipelineRunner.class); 
    options.setProject(projectName); 
    options.setStagingLocation("gs://my-staging-bucket/staging"); 
    options.setStreaming(true); 

    Pipeline p = Pipeline.create(options); 
    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/my-topic-name")); 

    input.apply(ParDo.of(new DoFn<String, String>() { 
     private static final long serialVersionUID = 1L; 
     public void processElement(ProcessContext c) throws ParseException, DatastoreException { 

      JSONObject json = (JSONObject)new JSONParser().parse(c.element()); 

      Datastore datastore = null; 
      try { 
       datastore = DatastoreFactory.get().create(getOptionsFromEnv() 
         .dataset(datasetid).build()); 
      } catch (GeneralSecurityException exception) { 
       System.err.println("Security error connecting to the datastore: " + exception.getMessage()); 
      } catch (IOException exception) { 
       System.err.println("I/O error connecting to the datastore: " + exception.getMessage()); 
      } 

      Key.Builder keyBuilder = makeKey("my-kind"); 
      keyBuilder.getPartitionIdBuilder().setNamespace("my-namespace"); 
      Entity.Builder event = Entity.newBuilder() 
        .setKey(keyBuilder); 

      event.addProperty(makeProperty("my-prop",makeValue((String)json.get("my-prop")))); 

      CommitRequest commitRequest = CommitRequest.newBuilder() 
        .setMode(CommitRequest.Mode.NON_TRANSACTIONAL) 
        .setMutation(Mutation.newBuilder().addInsertAutoId(event)) 
        .build(); 
      if(datastore!=null){ 
       datastore.commit(commitRequest); 
      } 

     } 
    })); 


    p.run(); 
} 

E le dipendenze in pom.xml:

<dependency> 
    <groupId>com.google.cloud.dataflow</groupId> 
    <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> 
    <version>[1.0.0,2.0.0)</version> 
</dependency> 
<dependency> 
    <groupId>com.google.apis</groupId> 
    <artifactId>google-api-services-datastore-protobuf</artifactId> 
    <version>v1beta2-rev1-2.1.2</version> 
</dependency> 
<dependency> 
    <groupId>com.google.http-client</groupId> 
    <artifactId>google-http-client</artifactId> 
    <version>1.17.0-rc</version> 
</dependency> 
<!-- Some more.. like JUnit etc.. -->