2014-11-20 24 views
31

Come può documenti essere spostati da una raccolta a un'altra raccolta in MongoDB ?? Ad esempio: ho molti documenti nella raccolta A e voglio spostare tutti i documenti più vecchi di 1 mese alla raccolta B (questi documenti più vecchi di 1 mese non dovrebbero essere nella raccolta A).MongoDB spostare i documenti da una raccolta a un'altra raccolta

Utilizzo dell'aggregazione copia. Ma quello che sto cercando di fare è spostando di documenti. Quale metodo può essere utilizzato per spostare i documenti?

risposta

50

Aggiornamento

This answer by @jasongarber è un approccio più sicuro e dovrebbe essere usato al posto mio.


Purché ti ho preso a destra e si desidera spostare tutti i documenti più vecchi di 1 mese, e si utilizza MongoDB 2.6, non v'è alcuna ragione per non utilizzare le operazioni di massa, che sono il modo più efficiente di fare più operazioni sono a conoscenza di:

> var bulkInsert = db.target.initializeUnorderedBulkOp() 
> var bulkRemove = db.source.initializeUnorderedBulkOp() 
> var date = new Date() 
> date.setMonth(date.getMonth() -1) 
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){ 
     bulkInsert.insert(doc); 
     bulkRemove.find({_id:doc._id}).removeOne(); 
    } 
) 
> bulkInsert.execute() 
> bulkRemove.execute() 

Questo dovrebbe essere piuttosto veloce e ha il vantaggio che nel caso in cui qualcosa va storto durante l'inserimento di massa, i dati originali esiste ancora.


Modifica

Al fine di evitare troppa memoria per essere utilizzato, è possibile eseguire l'operazione di massa su ogni x documenti elaborati:

> var bulkInsert = db.target.initializeUnorderedBulkOp() 
> var bulkRemove = db.source.initializeUnorderedBulkOp() 
> var x = 10000 
> var counter = 0 
> var date = new Date() 
> date.setMonth(date.getMonth() -1) 
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){ 
     bulkInsert.insert(doc); 
     bulkRemove.find({_id:doc._id}).removeOne(); 
     counter ++ 
     if(counter % x == 0){ 
     bulkInsert.execute() 
     bulkRemove.execute() 
     bulkInsert = db.target.initializeUnorderedBulkOp() 
     bulkRemove = db.source.initializeUnorderedBulkOp() 
     } 
    } 
) 
> bulkInsert.execute() 
> bulkRemove.execute() 
+0

O in strumenti dell'interfaccia utente come Robomongo db.getCollection ('source'). Find ({}). ForEach (function (doc) {db.getCollection ('target'). Insert (doc); db.getCollection (' fonte').remove (doc);}) – Arthur

+2

@Arthur: il tuo approccio presenta due principali svantaggi. È * molto * più lento * e * è possibile che le raccolte incomplete siano difficili da sincronizzare di nuovo nel peggiore dei casi. –

+0

Questo non ha funzionato per me. Ho provato questo su una collezione con record 50M, e ho cercato di spostare circa 25 milioni. Non è riuscito nella query di ricerca con l'errore 'Errore irreversibile in CALL_AND_RETRY_2 # Allocazione non riuscita - elaborazione esaurita della memoria'. Questo è su un server con 32 GB di memoria e i record avevano solo 5 campi. La dimensione totale dei dati della raccolta è solo di circa 5 GB. – UpTheCreek

1

è possibile utilizzare query di intervallo per ottenere dati da sourceCollection e conservare i dati del cursore nel ciclo variabile e su di esso e inserire a obiettivo di raccolta:

var doc = db.sourceCollection.find({ 
     "Timestamp":{ 
       $gte:ISODate("2014-09-01T00:00:00Z"), 
       $lt:ISODate("2014-10-01T00:00:00Z") 
     } 
}); 

doc.forEach(function(doc){ 
    db.targetCollection.insert(doc); 
}) 

Speranza in modo che aiuta !!

+0

Yeahhhh !! Insert() e remove() la soluzione ottimizzata da fare ?? – manojpt

+0

questo potrebbe aiutarti http://stackoverflow.com/questions/5942575/is-moving-documents-between-collections-a-good-way-to-represent-state-changes-in – Ninad

8

Inserire e rimuovere:

var documentsToMove = db.collectionA.find({}); 
documentsToMove.forEach(function(doc) { 
    db.collectionB.insert(doc); 
    db.collectionA.remove(doc); 
} 
+0

Is insert() e remove () la soluzione ottimizzata per fare ?? – manojpt

+0

Nessuna idea :) È anche possibile eseguire il dump e il ripristino utilizzando gli strumenti di amministrazione. –

+2

Questo non è atomico, c'è la possibilità che qualcosa venga inserito nella Raccolta B e non venga rimosso da A. – user1965449

3

Può essere dal punto di vista delle prestazioni è meglio rimuovere un sacco di documenti utilizzando un comando (soprattutto se si hanno gli indici per parte query) anziché eliminarne one by-one.

Ad esempio:

db.source.find({$gte: start, $lt: end}).forEach(function(doc){ 
    db.target.insert(doc); 
}); 
db.source.remove({$gte: start, $lt: end}); 
4

Questa è una riaffermazione della @ Markus W Mahlberg

Restituzione del favore - come funzione

function moveDocuments(sourceCollection,targetCollection,filter) { 
    var bulkInsert = targetCollection.initializeUnorderedBulkOp(); 
    var bulkRemove = sourceCollection.initializeUnorderedBulkOp(); 
    sourceCollection.find(filter) 
     .forEach(function(doc) { 
     bulkInsert.insert(doc); 
     bulkRemove.find({_id:doc._id}).removeOne(); 
     } 
) 
    bulkInsert.execute(); 
    bulkRemove.execute(); 
} 

Un esempio utilizzare

var x = {dsid:{$exists: true}}; 
moveDocuments(db.pictures,db.artifacts,x) 

per spostare tutti i documenti che hanno alto elemento di livello DSID dalle immagini alla raccolta manufatti

2

Da MongoDB 3.0, è possibile utilizzare il comando copyTo con la seguente sintassi:

db.source_collection.copyTo("target_collection") 

Quindi è possibile utilizzare il comando drop per rem Ove la vecchia collezione:

db.source_collection.drop() 
0

mi piace la risposta da @ markus-w-Mahlberg, però, a volte, ho visto la necessità di mantenere un po 'più semplice per le persone. Come tale ho un paio di funzioni che sono sotto. Si potrebbe naturalmente avvolgere la cosa qui con operatori di massa come ha fatto lui, ma questo codice funziona allo stesso modo con i vecchi e nuovi sistemi Mongo.

function parseNS(ns){ 
    //Expects we are forcing people to not violate the rules and not doing "foodb.foocollection.month.day.year" if they do they need to use an array. 
    if (ns instanceof Array){ 
     database = ns[0]; 
     collection = ns[1]; 
    } 
    else{ 
     tNS = ns.split("."); 
     if (tNS.length > 2){ 
      print('ERROR: NS had more than 1 period in it, please pass as an [ "dbname","coll.name.with.dots"] !'); 
      return false; 
     } 
     database = tNS[0]; 
     collection = tNS[1]; 
    } 
    return {database: database,collection: collection}; 
} 

function insertFromCollection(sourceNS, destNS, query, batchSize, pauseMS){ 
    //Parse and check namespaces 
    srcNS = parseNS(sourceNS); 
    destNS = parseNS(destNS); 
    if (srcNS == false || destNS == false){return false;} 

    batchBucket = new Array(); 
    totalToProcess = db.getDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count(); 
    currentCount = 0; 
    print("Processed "+currentCount+"/"+totalToProcess+"..."); 
    db.getDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){ 
     batchBucket.push(doc); 
     if (batchBucket.length > batchSize){ 
      db.getDB(destNS.database).getCollection(destNS.collection)insert(batchBucket); 
      currentCount += batchBucket.length; 
      batchBucket = []; 
      sleep (pauseMS); 
      print("Processed "+currentCount+"/"+totalToProcess+"...");  
     } 
    } 
    print("Completed"); 
} 

/** Example Usage: 
     insertFromCollection("foo.bar","foo2.bar",{"type":"archive"},1000,20);  

Si potrebbe, ovviamente, aggiungere un db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).remove(query,true) Se si voleva rimuovere anche i record dopo che sono stati copiati nella nuova posizione. Il codice può essere facilmente costruito in questo modo per renderlo riavviabile.

2

$ out è utilizzare per creare la nuova collezione con i dati, in modo da utilizzare $ fuori

db.oldCollection.aggregate([{$out : "newCollection"}]) 

quindi utilizzare cadere

db.oldCollection.drop() 
0

ho pensato di arhieve 1000 record alla volta utilizzando bulkinsert e bulkdelete metodi di pymongo.

Per entrambi fonte e la destinazione

  1. creare oggetti mongodb per la connessione al database.

  2. istanziare gli oggetti di grandi dimensioni. Nota: ho creato anche un backup di oggetti bulk. Questo mi aiuterà a ripristinare l'inserimento o la rimozione quando si verifica un errore. esempio:

    Per la sorgente // replace this with mongodb object creation logic source_db_obj = db_help.create_db_obj(source_db, source_col) source_bulk = source_db_obj.initialize_ordered_bulk_op() source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
    Per obiettivo // replace this with mogodb object creation logic target_db_obj = db_help.create_db_obj(target_db, target_col) target_bulk = target_db_obj.initialize_ordered_bulk_op() target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()

  3. Ottenere i record di origine che corrisponde ai criteri di filtro

    source_find_results = source_db_obj.find (filtro)

  4. Loop attraverso la fonte registra

    Crea target e sfusi fonte operazioni

    accodamento archived_at campo con il datetime corrente alla raccolta di destinazione

    //replace this with the logic to obtain the UTCtime. doc['archived_at'] = db_help.getUTCTime() target_bulk.insert(document) source_bulk.remove(document)

    per ripristino in caso di errori o eccezioni, creare target_bulk_bak e operazioni source_bulk_bak.

    target_bulk_bak.find({'_id':doc['_id']}).remove_one() source_bulk_bak.insert(doc) //remove the extra column doc.pop('archieved_at', None)

  5. Quando il conteggio dei record a 1000, eseguire il bersaglio - inserimento e fonte bulk - rimozione bulk. Nota: questo metodo richiede oggetti target_bulk e source_bulk per l'esecuzione.

    execute_bulk_insert_remove (source_bulk, target_bulk)

  6. Quando si verifica un'eccezione, eseguire la rimozione target_bulk_bak e inesertions source_bulk_bak. Ciò ripristinerebbe le modifiche. Dal momento che MongoDB non ha rollback, sono arrivato fino a questo hack

    execute_bulk_insert_remove (source_bulk_bak, target_bulk_bak)

  7. Infine reinizializzare gli oggetti di origine e di destinazione e alla rinfusa bulk_bak. Questo è necessario perché puoi usarli solo una volta.

  8. completo del codice

    def execute_bulk_insert_remove(source_bulk, target_bulk): 
         try: 
          target_bulk.execute() 
          source_bulk.execute() 
         except BulkWriteError as bwe: 
          raise Exception(
           "could not archive document, reason: {}".format(bwe.details)) 
    
        def archive_bulk_immediate(filter, source_db, source_col, target_db, target_col): 
         """ 
         filter: filter criteria for backup 
         source_db: source database name 
         source_col: source collection name 
         target_db: target database name 
         target_col: target collection name 
         """ 
         count = 0 
         bulk_count = 1000 
    
         source_db_obj = db_help.create_db_obj(source_db, source_col) 
         source_bulk = source_db_obj.initialize_ordered_bulk_op() 
         source_bulk_bak = source_db_obj.initialize_ordered_bulk_op() 
    
         target_db_obj = db_help.create_db_obj(target_db, target_col) 
         target_bulk = target_db_obj.initialize_ordered_bulk_op() 
         target_bulk_bak = target_db_obj.initialize_ordered_bulk_op() 
    
         source_find_results = source_db_obj.find(filter) 
    
         start = datetime.now() 
    
         for doc in source_find_results: 
          doc['archived_at'] = db_help.getUTCTime() 
    
          target_bulk.insert(doc) 
          source_bulk.find({'_id': doc['_id']}).remove_one() 
          target_bulk_bak.find({'_id': doc['_id']}).remove_one() 
          doc.pop('archieved_at', None) 
          source_bulk_bak.insert(doc) 
    
          count += 1 
    
          if count % 1000 == 0: 
           logger.info("count: {}".format(count)) 
           try: 
            execute_bulk_insert_remove(source_bulk, target_bulk) 
           except BulkWriteError as bwe: 
            execute_bulk_insert_remove(source_bulk_bak, target_bulk_bak) 
            logger.info("Bulk Write Error: {}".format(bwe.details)) 
            raise 
    
           source_bulk = source_db_obj.initialize_ordered_bulk_op() 
           source_bulk_bak = source_db_obj.initialize_ordered_bulk_op() 
    
           target_bulk = target_db_obj.initialize_ordered_bulk_op() 
           target_bulk_bak = target_db_obj.initialize_ordered_bulk_op() 
    
         end = datetime.now() 
    
         logger.info("archived {} documents to {} in ms.".format(
          count, target_col, (end - start))) 
    
+1

Ciao e benvenuto su Stack Overflow! Prenditi un minuto per leggere [risposta] - questo sembra utile ma trarrebbe beneficio da alcune spiegazioni su cosa fa il codice, considera [modifica] -ing che? – whrrgarbl

7

Le operazioni di massa @ Markus-w-Mahlberg ha mostrato (e @ mark-Mullin raffinato) sono efficienti, ma non sicuro come scritto. Se bulkInsert ha esito negativo, bulkRemove continuerà comunque. Per essere sicuri di non perdere tutti i record quando in movimento, utilizzare questo invece:

function insertBatch(collection, documents) { 
 
    var bulkInsert = collection.initializeUnorderedBulkOp(); 
 
    var insertedIds = []; 
 
    var id; 
 
    documents.forEach(function(doc) { 
 
    id = doc._id; 
 
    // Insert without raising an error for duplicates 
 
    bulkInsert.find({_id: id}).upsert().replaceOne(doc); 
 
    insertedIds.push(id); 
 
    }); 
 
    bulkInsert.execute(); 
 
    return insertedIds; 
 
} 
 

 
function deleteBatch(collection, documents) { 
 
    var bulkRemove = collection.initializeUnorderedBulkOp(); 
 
    documents.forEach(function(doc) { 
 
    bulkRemove.find({_id: doc._id}).removeOne(); 
 
    }); 
 
    bulkRemove.execute(); 
 
} 
 

 
function moveDocuments(sourceCollection, targetCollection, filter, batchSize) { 
 
    print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection); 
 
    var count; 
 
    while ((count = sourceCollection.find(filter).count()) > 0) { 
 
    print(count + " documents remaining"); 
 
    sourceDocs = sourceCollection.find(filter).limit(batchSize); 
 
    idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs); 
 

 
    targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}}); 
 
    deleteBatch(sourceCollection, targetDocs); 
 
    } 
 
    print("Done!") 
 
}

+1

Hai assolutamente ragione! * facepalm * –

+0

Questa procedura mantiene l'ID nella destinazione e nella fonte. (il mio caso è un po 'diverso dal post e sono incappato in questa funzione). –

Problemi correlati