2012-06-10 11 views
15

Hanno un tipo di archivio dati GAE con diversi 100'000 di oggetti al loro interno. Vuoi fare diverse query coinvolte (coinvolgendo query di conteggio). Big Query sembra un dio adatto per questo.Google App Engine: utilizzo di Big Query su datastore?

C'è attualmente un modo semplice per interrogare un Datastore di AppEngine in tempo reale usando Big Query?

risposta

17

Non è possibile eseguire un BigQuery direttamente su entità DataStore, ma è possibile scrivere una pipeline di Mapper che legge le entità da DataStore, le scrive in CSV in Google Cloud Storage e quindi le ingloba in BigQuery - è anche possibile automatizzarle il processo. Ecco un esempio di utilizzo delle Mapper API classi solo per il DataStore in formato CSV passo:

import re 
import time 
from datetime import datetime 
import urllib 
import httplib2 
import pickle 

from google.appengine.ext import blobstore 
from google.appengine.ext import db 
from google.appengine.ext import webapp 

from google.appengine.ext.webapp.util import run_wsgi_app 
from google.appengine.ext.webapp import blobstore_handlers 
from google.appengine.ext.webapp import util 
from google.appengine.ext.webapp import template 

from mapreduce.lib import files 
from google.appengine.api import taskqueue 
from google.appengine.api import users 

from mapreduce import base_handler 
from mapreduce import mapreduce_pipeline 
from mapreduce import operation as op 

from apiclient.discovery import build 
from google.appengine.api import memcache 
from oauth2client.appengine import AppAssertionCredentials 


#Number of shards to use in the Mapper pipeline 
SHARDS = 20 

# Name of the project's Google Cloud Storage Bucket 
GS_BUCKET = 'your bucket' 

# DataStore Model 
class YourEntity(db.Expando): 
    field1 = db.StringProperty() # etc, etc 

ENTITY_KIND = 'main.YourEntity' 


class MapReduceStart(webapp.RequestHandler): 
    """Handler that provides link for user to start MapReduce pipeline. 
    """ 
    def get(self): 
    pipeline = IteratorPipeline(ENTITY_KIND) 
    pipeline.start() 
    path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id 
    logging.info('Redirecting to: %s' % path) 
    self.redirect(path) 


class IteratorPipeline(base_handler.PipelineBase): 
    """ A pipeline that iterates through datastore 
    """ 
    def run(self, entity_type): 
    output = yield mapreduce_pipeline.MapperPipeline(
     "DataStore_to_Google_Storage_Pipeline", 
     "main.datastore_map", 
     "mapreduce.input_readers.DatastoreInputReader", 
     output_writer_spec="mapreduce.output_writers.FileOutputWriter", 
     params={ 
      "input_reader":{ 
       "entity_kind": entity_type, 
       }, 
      "output_writer":{ 
       "filesystem": "gs", 
       "gs_bucket_name": GS_BUCKET, 
       "output_sharding":"none", 
       } 
      }, 
      shards=SHARDS) 


def datastore_map(entity_type): 
    props = GetPropsFor(entity_type) 
    data = db.to_dict(entity_type) 
    result = ','.join(['"%s"' % str(data.get(k)) for k in props]) 
    yield('%s\n' % result) 


def GetPropsFor(entity_or_kind): 
    if (isinstance(entity_or_kind, basestring)): 
    kind = entity_or_kind 
    else: 
    kind = entity_or_kind.kind() 
    cls = globals().get(kind) 
    return cls.properties() 


application = webapp.WSGIApplication(
            [('/start', MapReduceStart)], 
            debug=True) 

def main(): 
    run_wsgi_app(application) 

if __name__ == "__main__": 
    main() 

Se si aggiunge questo alla fine della tua classe IteratorPipeline: yield CloudStorageToBigQuery(output), è possibile reindirizzare la risultante csv filehandle in un tubo di ingestione BigQuery. .. come questo:

class CloudStorageToBigQuery(base_handler.PipelineBase): 
    """A Pipeline that kicks off a BigQuery ingestion job. 
    """ 
    def run(self, output): 

# BigQuery API Settings 
SCOPE = 'https://www.googleapis.com/auth/bigquery' 
PROJECT_ID = 'Some_ProjectXXXX' 
DATASET_ID = 'Some_DATASET' 

# Create a new API service for interacting with BigQuery 
credentials = AppAssertionCredentials(scope=SCOPE) 
http = credentials.authorize(httplib2.Http()) 
bigquery_service = build("bigquery", "v2", http=http) 

jobs = bigquery_service.jobs() 
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
    '%m%d%Y_%H%M%S') 
files = [str(f.replace('/gs/', 'gs://')) for f in output] 
result = jobs.insert(projectId=PROJECT_ID, 
        body=build_job_data(table_name,files)).execute() 
logging.info(result) 

def build_job_data(table_name, files): 
    return {"projectId": PROJECT_ID, 
      "configuration":{ 
       "load": { 
        "sourceUris": files, 
        "schema":{ 
         # put your schema here 
         "fields": fields 
         }, 
        "destinationTable":{ 
         "projectId": PROJECT_ID, 
         "datasetId": DATASET_ID, 
         "tableId": table_name, 
         }, 
        } 
       } 
      } 
2

No, BigQuery è un prodotto diverso che richiede il caricamento dei dati. Non può funzionare sul datastore. È possibile utilizzare GQL per interrogare il datastore.

3

Per BigQuery devi esportare questi tipi in una struttura record CSV o delimitata, caricarli in BigQuery e puoi eseguire query. Non c'è alcuna struttura che io sappia di che permetta di interrogare il live GAE Datastore.

Biquery è un motore di query analitico che significa che non è possibile modificare il record. Nessun aggiornamento o eliminazione consentiti, è possibile solo aggiungere.

5

stiamo facendo un programma di Trusted Tester per passare da archivio dati di BigQuery in due semplici operazioni:

  1. Backup t egli datastore utilizzando la funzionalità di backup di archivio dati Admin
  2. di backup importare direttamente in BigQuery

Si prende automaticamente la cura dello schema per voi.

saperne di più (a pagamento): https://docs.google.com/a/google.com/spreadsheet/viewform?formkey=dHdpeXlmRlZCNWlYSE9BcE5jc2NYOUE6MQ

+0

Quindi cosa è successo a questo? Qualche aggiornamento sul destino del TTP? – gae123

+0

sì, è stato un po 'di tempo – ZiglioUK

+0

Interessato anche a – Omri

6

Con la nuova (da settembre 2013) streaming inserts api è possibile importare i record dalla tua app in BigQuery.

I dati sono disponibili immediatamente in BigQuery, quindi questo dovrebbe soddisfare i requisiti del live.

Mentre questo problema ora è un po 'vecchio, questo può essere una soluzione più facile per chiunque imbattersi in questa domanda

Al momento però sempre questo lavoro da un server locale dev è irregolare al meglio.

1

A partire dal 2016, questo è molto possibile ora! È necessario effettuare le seguenti operazioni:

  1. fare un nuovo secchio in deposito google
  2. entità di backup utilizzando utilizzando l'amministratore del database a console.developers.google.com Ho un tutorial completo
  3. Testa a BigQuery Web UI e importa i file generati nel passaggio 1.

Vedere this post per un esempio completo di questo flusso di lavoro!

Problemi correlati