2016-01-08 26 views
5

Sto cercando di inserire un paio di milioni di record (con circa 6 campi/colonne) ricevendo richieste dai client 10.000 record per tentativo di inserimento di massa (utilizzando sequelize .js e bulkCreate())Come eseguire lo streaming di un array JSON da NodeJS a postgres

Questo ovviamente era una cattiva idea, così ho provato a guardare in node-pg-copy-streams

Tuttavia, io non voglio avviare un cambiamento sul lato client, in cui una matrice jSON viene inviato come tale

# python 
data = [ 
    { 
    "column a":"a values", 
    "column b":"b values", 
    }, 
    ... 
    # 10,000 items 
    ... 
] 
request.post(data=json.dumps(data), url=url) 

Sul lato server in nodejs, come faccio a trasmettere lo request.body ricevuto nello scheletro seguente?

.post(function(req, res){ 

    // old sequelize code 
    /* table5.bulkCreate(
     req.body, {raw:true} 
    ).then(function(){ 
     return table5.findAll(); 
    }).then(function(result){ 
     res.json(result.count); 
    });*/ 

    // new pg-copy-streams code 
    pg.connect(function(err, client, done) { 
    var stream = client.query(copyFrom('COPY my_table FROM STDIN')); 
    // My question is here, how would I stream or pipe the request body ? 
    // ?.on('error', done); 
    // ?.pipe(stream).on('finish', done).on('error', done); 
    }); 
}); 
+0

prega fatemi sapere come posso migliorare questa domanda, questo probabilmente è la mia prima domanda o prima domanda sempre – Sri

+0

Attualmente sto cercando in come trasmettere una stringa o un array. – Sri

+1

postgres copy definisce tre formati ammessi sull'input: text, csv e binary. Non sono un esperto ma i formati sono descritti qui: http://www.postgresql.org/docs/9.3/static/sql-copy.html (Ricerca per "Formati di file" titolo) – thst

risposta

2

Ecco come ho risolto il mio problema,

Prima una funzione per convertire il mio req.body dict in un TSV (non una parte del problema iniziale)

/** 
* Converts a dictionary and set of keys to a Tab Separated Value blob of text 
* @param {Dictionary object} dict 
* @param {Array of Keys} keys 
* @return {Concatenated Tab Separated Values} String 
*/ 
function convertDictsToTSV(dicts, keys){ 
    // ... 
} 

In secondo luogo il resto della mia funzione .post originale

.post(function(req, res){ 
    // ... 
    /* requires 'stream' as 
    * var stream = require('stream'); 
    * var copyFrom = require('pg-copy-streams').from; 
    */ 
    var read_stream_string = new stream.Readable(); 
    read_stream_string.read = function noop() {}; 
    var keys = [...]; // set of dictionary keys to extract from req.body 
    read_stream_string.push(convertDictsToTSV(req.body, keys)); 
    read_stream_string.push(null); 
    pg.connect(connectionString, function(err, client, done) { 
     // ... 
     // error handling 
     // ... 
     var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN' 
     var pg_copy_stream = client.query(copyFrom(copy_string)); 
     read_stream_string.pipe(pg_copy_stream).on('finish', function(finished){ 
      // handle finished and done appropriately 
     }).on('error', function(errored){ 
      // handle errored and done appropriately 
     }); 
    }); 
    pg.end(); 
}); 
0

Tecnicamente, non c'è streaming qui, non in termini di funzionamento dello streaming NodeJS.

Si sta inviando una porzione di 10.000 record ogni volta e si aspetta che il proprio lato server li inserisca e restituisca un OK al client per inviare altri 10.000 record. Si tratta di dati di limitazione/paging, non di streaming.

Una volta che il server ha ricevuto i successivi 10.000 record, inserirli (solitamente come una transazione), quindi rispondere di nuovo con OK al client in modo che possa inviare i successivi 10.000 record.

Scrivere transazioni con il nodo Postgres non è un compito facile, poiché è troppo basso per quello.

Di seguito è un esempio di come farlo con l'aiuto di pg-promise:

function insertRecords(records) { 
    return db.tx(t=> { 
     var inserts = []; 
     records.forEach(r=> { 
      var query = t.none("INSERT INTO table(fieldA, ...) VALUES(${propA}, ...)", r); 
      inserts.push(query); 
     }); 
     return t.batch(inserts); 
    }); 
} 

Poi all'interno del vostro gestore HTTP, si può scrivere:

function myPostHandler(req, res) {   
    // var records = get records from the request;  
    insertRecords(records) 
     .then(data=> { 
      // set response as success; 
     }) 
     .catch(error=> { 
      // set response as error; 
     });  
} 
+1

Grazie, questo aiuta ma è ancora un po 'inefficiente sul quadro generale. Ho confrontato questo con la velocità di COPY FROM, ed è più lento, quindi vado con quello invece di INSERT – Sri

+1

Se vuoi farlo in modo efficiente, dovresti eseguire lo streaming dei dati direttamente dal client nel database, tramite TCP-IP, usando web IO. –

+0

Sì, è decisamente più efficiente! Tuttavia, la modifica del codice del client non è fattibile e anche l'introduzione dell'accesso al database non è pratica nella nostra attuale configurazione in cui client e database si trovano su reti diverse.Cambiare il codice del server era il nostro minimo percorso di resistenza. – Sri

Problemi correlati