2016-06-01 5 views
9

Sto lavorando con Apache NiFi 0.5.1 su uno script Groovy per sostituire i valori Json in arrivo con quelli contenuti in un file di mapping. Il file di mapping simile a questa (si tratta di un semplice txt):Apache NiFi ExecuteScript: script Groovy per sostituire i valori Json tramite un file di mappatura

Header1;Header2;Header3 
A;some text;A2 

ho iniziato con il seguente:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
     { inputStream, outputStream -> 

      def content = """ 
{ 
    "field1": "A" 
    "field2": "A", 
    "field3": "A" 

}""" 

      def slurped = new JsonSlurper().parseText(content) 
      def builder = new JsonBuilder(slurped) 
      builder.content.field1 = "A" 
      builder.content.field2 = "some text" 
      builder.content.field3 = "A2" 
      outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
     } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS) 

Questo primo passo funziona bene, anche se è hardcoded ed è Lungi dall'essere l'ideale. Inizialmente, il mio pensiero era di usare ReplaceTextWithMapping per poter eseguire le sostituzioni, tuttavia non funziona bene con i file di mappatura complessi (ad esempio multi-colonne). Vorrei portarlo avanti, ma non sono sicuro di come farlo. Prima di tutto, invece di passare nell'intero JSON harcoded, mi piacerebbe leggere il file di flusso in arrivo. Com'è possibile in NiFi? Prima di eseguire lo script come parte di ExecuteScript, ho emesso un file .Json con contenuto tramite UpdateAttribute dove nomefile = myResultingJSON.json. Inoltre, io so come caricare un file .txt con Groovy (String mappingContent= new File('/path/to/file').getText('UTF-8'), ma come faccio ad usare il file caricato per eseguire le sostituzioni in modo che il mio risultante JSON sarebbe simile a questa:

{ 
    "field1": "A" 
    "field2": "some text", 
    "field3": "A2" 
} 

Grazie per la vostra aiutare,

I.

EDIT:

prima modifica allo script non mi permette di leggere dal InputStream:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 

import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
     { inputStream, outputStream -> 

      def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8) 

      def slurped = new JsonSlurper().parseText(content) 
      def builder = new JsonBuilder(slurped) 
      builder.content.field1 = "A" 
      builder.content.field2 = "some text" 
      builder.content.field3 = "A2" 
      outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 
     } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS) 

ho poi trasferito a testare l'approccio con il ConfigSlurper e ha scritto una classe generica prima di iniettare la logica in Groovy ExecuteScript:

class TestLoadingMappings { 

    static void main(String[] args) { 

     def content = ''' 
     {"field2":"A", 
     "field3": "A" 
     } 
     ''' 

     println "This is the content of the JSON file" + content 

     def slurped = new JsonSlurper().parseText(content) 
     def builder = new JsonBuilder(slurped) 

     println "This is the content of my builder " + builder 

     def propertiesFile = new File("D:\\myFile.txt") 
     Properties props = new Properties() 
     props.load(new FileInputStream(propertiesFile)) 
     def config = new ConfigSlurper().parse(props).flatten() 

     println "This is the content of my config " + config 

     config.each { k, v -> 
      if (builder[k]) { 
       builder[k] = v 
      } 
     } 
     println(builder.toPrettyString()) 
    } 

} 

Io torno con un groovy.lang.MissinPropertyException e questo perché la mappatura non è così semplice. Tutti i campi/proprietà (da campo1 a campo3) entrano in InpuStream con lo stesso valore (ad es.) E ciò significa che ogni volta che field2, ad esempio, ha quel valore, puoi essere certo che sarà valido per le altre due proprietà. Tuttavia, non posso avere un campo di mappatura che mappa "field2": "someText" perché il mapping effettivo è guidato dal primo valore nel file di mapping. Ecco un esempio:

{ 
     "field1": "A" 
     "field2": "A", 
     "field3": "A" 

} 

Nel mio file di mapping ho:

A;some text;A2 

Tuttavia campo1 ha bisogno di mappatura per un (primo valore nel file) o rimanere lo stesso, se lo si desidera. Field2 richiede la mappatura del valore nell'ultima colonna (A2) e, infine, Field3 deve mappare "alcuni testi" nella colonna centrale.

Potete aiutarmi? È qualcosa che posso ottenere con Groovy ed ExecuteScript. Se necessario, posso dividere i file di configurazione in due.

Inoltre, ho dato una rapida occhiata all'altra opzione (PutDistributedMapCache) e non sono sicuro di aver capito come caricare coppie di valori-chiave in una cache di mappe distribuita. Sembra che tu debba avere un DistributedMapCacheClient e non sono sicuro che sia facile da implementare.

Grazie!

EDIT 2:

Alcuni altri progressi, ora ho il lavoro di mappatura, ma non so perché non riesce quando si legge la seconda linea delle proprietà del file:

"A" someText 
"A2" anotherText 

class TestLoadingMappings { 

    static void main(String[] args) { 

     def content = ''' 
     {"field2":"A", 
     "field3":"A" 
     } 
     ''' 

     println "This is the content of the JSON file" + content 

     def slurper = new JsonSlurper().parseText(content) 
     def builder = new JsonBuilder(slurper) 

     println "This is the content of my builder " + builder 

     assert builder.content.field2 == "A" 
     assert builder.content.field3 == "A" 

     def propertiesFile = new File('D:\\myTest.txt') 
     Properties props = new Properties() 
     props.load(new FileInputStream(propertiesFile)) 
     println "This is the content of the properties " + props 
     def config = new ConfigSlurper().parse(props).flatten() 

     config.each { k, v -> 
      if (builder.content.field2) { 

       builder.content.field2 = config[k] 
      } 
      if (builder.content.field3) { 

       builder.content.field3 = config[k] 
      } 

      println(builder.toPrettyString()) 
      println "This is my builder " + builder 
     } 
    } 
} 

Io sono tornato con: This is my builder {"field2":"someText","field3":"someText"}

Qualche idea sul perché?

Grazie mille

EDIT 3 (Spostato dal basso)

ho scritto il seguente:

import groovy.json.JsonBuilder 
    import groovy.json.JsonSlurper 

    class TestLoadingMappings { 

     static void main(String[] args) { 

      def content = 
      ''' 
      {"field2":"A", 
      "field3":"A" 
      } 
      ''' 
      def slurper = new JsonSlurper().parseText(content) 
      def builder = new JsonBuilder(slurper) 

      println "This is the content of my builder " + builder 

      def propertiesFile = new File('D:\\properties.txt') 
      Properties props = new Properties() 
      props.load(new FileInputStream(propertiesFile)) 
      def conf = new ConfigSlurper().parse(props).flatten() 

      conf.each { k, v -> 
      if (builder.content[k]) { 
       builder.content[k] = v 
      } 
      println("This prints the resulting JSON :" + builder.toPrettyString()) 
     } 
    } 
} 

Tuttavia, ho dovuto cambiare la struttura del file di mapping come segue :

"field1"="substitutionText" 
"field2"="substitutionText2" 

Ho quindi "incorporato" ConfigSlurper nell'eseguibile sceneggiatura eScript, come segue:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import org.apache.commons.io.IOUtils 
import org.apache.nifi.processor.io.StreamCallback 

import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

flowFile = session.write(flowFile, 
     { inputStream, outputStream -> 

      def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 

      def slurped = new JsonSlurper().parseText(content) 
      def builder = new JsonBuilder(slurped) 
      outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8)) 

      def propertiesFile = new File(''D:\\properties.txt') 
      Properties props = new Properties() 
      props.load(new FileInputStream(propertiesFile)) 
      def conf = new ConfigSlurper().parse(props).flatten(); 

      conf.each { k, v -> 
       if (builder.content[k]) { 
        builder.content[k] = v 
       } 
      } 
      outputStream.write(content.toString().getBytes(StandardCharsets.UTF_8)) 
     } as StreamCallback) 
session.transfer(flowFile, ExecuteScript.REL_SUCCESS) 

Il problema sembra essere il fatto che non posso davvero replicare la logica nel file di mapping originale utilizzando qualcosa di simile a quello creato per le mie TestLoadingMappings. Come accennato nel mio precedente commenti/modifiche, la mappatura dovrebbe funzionare in questo modo:

field2 = se A allora sostituto "del testo"

field3 = se A allora sostituto A2

.. .

field2 = B allora sostituire a "qualche altro testo"

field3 = B allora sostituire alla B2

e figlio.

In breve, le mappature sono guidate dal valore in ingresso in InputStream (che varia), che esegue la mappatura condizionale a valori diversi a seconda dell'attributo JSON. Potete raccomandare per favore un modo migliore per ottenere questa mappatura tramite Groovy/ExecuteScript? Ho la flessibilità nel modificare il file di mappatura, puoi vedere un modo in cui posso modificarlo per ottenere i mapping desiderati?

Grazie

risposta

9

Ho alcuni esempi su come leggere un file contenente flusso JSON:

http://funnifi.blogspot.com/2016/02/executescript-explained-split-fields.html http://funnifi.blogspot.com/2016/05/validating-json-in-nifi-with.html http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html

Hai la struttura proprio sopra; in pratica puoi usare quella variabile "inputStream" nella chiusura per leggere il contenuto del file di flusso in entrata. Se vuoi leggerlo tutto in una volta (cosa che probabilmente dovrai fare per JSON), puoi usare IOUtils.toString() seguito da un JsonSlurper, come si fa negli esempi nei link sopra.

Per il vostro file di mapping, soprattutto se il JSON è "flat", si potrebbe avere un file di proprietà Java, la mappatura del nome del campo per il nuovo valore:

field2 = un testo

field3 = A2

Verificare ConfigSlurper per la lettura nei file delle proprietà.

Dopo aver slurpato il file JSON in entrata e letto il file di mapping, è possibile accedere ai singoli campi del JSON utilizzando la notazione di matrice anziché la notazione diretta dei membri. Quindi diciamo che leggo le proprietà in un ConfigSlurper e voglio sovrascrivere qualsiasi proprietà esistente nel mio input JSON (chiamato "json" per l'esempio) con quello del file delle proprietà. Questo potrebbe essere simile al seguente:

config.parse(props).flatten().each { k,v -> 
    if(json[k]) { 
    json[k] = v 
    } 
} 

Si può poi proseguire con il tuo outputStream.write().

Invece di leggere i mapping da un file, è anche possibile caricarlo in una cache distribuita tramite il processore PutDistributedMapCache. È possibile leggere da un DistributedCacheMapServer nel vostro ExecuteScript, ho un esempio qui:

http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html

Se la mappatura è complesso, si consiglia di utilizzare il processore TransformJSON, che sarà disponibile nella prossima release di NiFi (0.7.0). Il caso Jira associato è qui:

https://issues.apache.org/jira/browse/NIFI-361

EDIT:

In risposta alle modifiche, non mi rendevo conto aveste più regole per i diversi valori. In questo caso, un file delle proprietà probabilmente non è il modo migliore per rappresentare i mapping. Invece si potrebbe usare JSON:

{ 
    "field2": { 
     "A": "some text", 
     "B": "some other text" 
     }, 
    "field3": { 
     "A": "A2", 
     "B": "B2" 
     } 
} 

quindi è possibile utilizzare un JSONSlurper per leggere nel file di mapping. Ecco un esempio di utilizzo del file di mapping sopra riportato:

import groovy.json.JsonBuilder 
import groovy.json.JsonSlurper 
import org.apache.commons.io.IOUtils 
import org.apache.nifi.processor.io.StreamCallback 

import java.nio.charset.StandardCharsets 

def flowFile = session.get(); 
if (flowFile == null) { 
    return; 
} 

def mappingJson = new File('/Users/mburgess/mappings.json').text 

flowFile = session.write(flowFile, { inputStream, outputStream -> 

    def content = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 
    def inJson = new JsonSlurper().parseText(content) 
    def mappings = new JsonSlurper().parseText(mappingJson) 

    inJson.each {k,v -> 
     inJson[k] = mappings[k][v] 
    } 
    outputStream.write(inJson.toString().getBytes(StandardCharsets.UTF_8)) 
} as StreamCallback) 

session.transfer(flowFile, REL_SUCCESS) 
+0

Matt, grazie mille per il vostro aiuto. Si prega di dare un'occhiata alla mia risposta/modifica nel post originale. Ho fatto qualche tentativo con lo script di Groovy + ConfigSlurper e sto lottando per avere le mappature del desiderio. Grazie – paranza

+0

Grazie Matt, ho finito per riscrivere le mappature da solo e sì avere un formato JSON aiuta. Grazie ancora. – paranza

Problemi correlati