2016-01-19 14 views
6

Supponiamo che l'Oracle schema ha seguito le tabelle e colonne:Aggiornamento complesso documento nidificato elasticsearch utilizzando logstash e JDBC

 

    Country 
     country_id; (Primary Key) 
     country_name; 

    Department 
     department_id; (Primary Key) 
     department_name; 
     country_id; (Foreign key to Country:country_id) 

    Employee 
     employee_id; (Primary Key) 
     employee_name; 
     department_id; (Foreign key to Department:department_id) 

E io ho il mio documento elasticsearch in cui l'elemento principale è un Paese e contiene Tutti i reparti in quel Paese che a sua volta contiene tutti i Dipendenti nei rispettivi Dipartimenti.

Quindi la struttura del documento si presenta così:

 

    { 
     "mappings": { 
     "country": { 
      "properties": { 
      "country_id": { "type": "string"}, 
      "country_name": { "type": "string"},   
      "department": { 
       "type": "nested", 
       "properties": { 
       "department_id": { "type": "string"}, 
       "department_name": { "type": "string"}, 
       "employee": { 
        "type": "nested", 
        "properties": { 
        "employee_id": { "type": "string"}, 
        "employee_name": { "type": "string"} 
        } 
       } 
       } 
      } 
      } 
     } 
     } 
    }   

Voglio essere in grado di avere le query JDBC ingresso separato in esecuzione su ogni tavolo e dovrebbero creare/aggiornare/cancellare i dati nel documento elasticsearch ogni volta che il i dati nella tabella di base sono aggiunti/aggiornati/cancellati.

Questo è un problema di esempio e le tabelle e la struttura dati effettive sono più complesse. Quindi non sto cercando la soluzione limitata a questo.

C'è un modo per raggiungere questo obiettivo?

Grazie.

+0

Sto indovinando potrebbe essere già risolto questo, tuttavia, non si poteva utilizzare una Vista Oracle per combinare i dati richiesti nel formato della struttura del documento (Paese, Dipartimento, Dipendente) e avere come un'unica query JDBC, in questo modo si sarebbe in grado di creare l'ID del documento elasticsearch come il livello univoco più basso (id_dipendente in questo caso) e gestire le modifiche lì? –

risposta

0

Per il livello uno, è diretto in avanti utilizzando aggregate filter. È necessario avere un ID comune tra di loro per fare riferimento.

filter {  

    aggregate { 
    task_id => "%{id}" 

    code => "  
     map['id'] = event.get('id') 
     map['department'] ||= [] 
     map['department'] << event.to_hash.each do |key,value| { key => value } end  
    " 
    push_previous_map_as_event => true 
    timeout => 150000 
    timeout_tags => ['aggregated']  
    } 

    if "aggregated" not in [tags] { 
    drop {} 
    } 
} 

Importante: L'azione di uscita dovrebbe essere modificare

output { 
     elasticsearch { 
      action => "update" 
      ... 
      } 
     } 

Un modo per risolvere livello 2 è interrogare il documento già indicizzato e aggiornarlo con la scheda nidificata . Ancora usando aggregate filter; ci dovrebbe essere un id comune per il documento in modo da poter cercare e inserire nel documento corretto.

filter {  
    #get the document from elastic based on id and store it in 'emp' 
    elasticsearch { 
      hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"] 
      query => "id:%{id}" 
      fields => { "employee" => "emp" } 
     } 



    aggregate { 
    task_id => "%{id}" 
    code => "  
       map['id'] = event.get('id') 
       map['employee'] = [] 
       employeeArr = [] 
       temp_emp = {} 

       event.to_hash.each do |key,value|      
        temp_emp[key] = value 
       end  

       #push the objects into an array 
       employeeArr.push(temp_emp) 

       empArr = event.get('emp')     

       for emp in empArr 
        emp['employee'] = employeeArr      
        map['employee'].push(emp) 
       end 
    " 
    push_previous_map_as_event => true 
    timeout => 150000 
    timeout_tags => ['aggregated'] 

    } 

    if "aggregated" not in [tags] { 
    drop {} 
    } 

} 

output { 

elasticsearch { 
     action => "update" #important 
     ... 
     } 
} 

Inoltre, al fine di eseguire il debug del codice Ruby, utilizzare il sotto nell'output

output{ 
    stdout { codec => dots } 
} 
Problemi correlati