2016-02-25 25 views
11

Sto cercando di inserire e aggiornare alcuni dati su MySql utilizzando Spark SQL DataFrame e connessione JDBC.SQL SPARK - aggiorna la tabella MySql utilizzando DataFrames e JDBC

Sono riuscito a inserire nuovi dati utilizzando SaveMode.Append. C'è un modo per aggiornare i dati già esistenti in MySql Table da Spark SQL?

mio codice da inserire è:

myDataFrame.write.mode(SaveMode.Append).jdbc(JDBCurl,mySqlTable,connectionProperties)

Se cambio a SaveMode.Overwrite cancella la tabella completa e ne crea uno nuovo, sto cercando qualcosa di simile al "UPDATE KEY ON DUPLICATE "disponibile in MySql

risposta

14

Non è possibile. Come per ora (Spark 1.6.0/2.2.0 SNAPSHOT) Spark DataFrameWriter supporta solo quattro modi di scrittura:

  • SaveMode.Overwrite: sovrascrivere i dati esistenti.
  • SaveMode.Append: aggiungere i dati.
  • SaveMode.Ignore: ignorare l'operazione (ovvero no-op).
  • SaveMode.ErrorIfExists: opzione predefinita, genera un'eccezione in fase di esecuzione.

È possibile inserire manualmente per esempio usando mapPartitions (poiché si desidera un'operazione UPSERT dovrebbe essere idempotente e come tale facile da implementare), scrivere a tabella temporanea ed eseguire upsert manualmente, o utilizzare i trigger.

In generale, ottenere un comportamento aggressivo per le operazioni batch e mantenere prestazioni decenti è tutt'altro che banale. È necessario ricordare che, in generale, ci saranno più transazioni simultanee (una per ogni partizione), quindi è necessario assicurarsi che non ci siano conflitti di scrittura (in genere utilizzando il partizionamento specifico dell'applicazione) o fornire procedure di recupero appropriate. In pratica, potrebbe essere meglio eseguire e scrivere in batch su una tabella temporanea e risolvere la parte di upsert direttamente nel database. risposta

0

zero323 di ragione, volevo solo aggiungere che si potrebbe usare il pacchetto JayDeBeApi per risolvere questo: https://pypi.python.org/pypi/JayDeBeApi/

per aggiornare i dati nella tabella mysql. Potrebbe essere un frutto a basso impatto poiché hai già installato il driver jdbc mysql.

Il modulo JayDeBeApi consente di connettersi dal codice Python ai database utilizzando JDBC Java. Fornisce una Python DB-API v2.0 a quel database .

Utilizziamo la distribuzione Anaconda di Python e il pacchetto python di JayDeBeApi viene fornito di serie.

Vedere gli esempi in questo link sopra.

0

Un peccato che non ci sia la modalità SaveMode.Upsert in Spark per casi abbastanza comuni come l'upserting.

zero322 ha ragione in generale, ma penso che dovrebbe essere possibile (con compromessi in termini di prestazioni) offrire tale funzione di sostituzione.

Volevo anche fornire un codice java per questo caso. Ovviamente non è così performante come quello integrato dalla scintilla, ma dovrebbe essere una buona base per le vostre esigenze. Basta modificarlo in base alle tue esigenze:

myDF.repartition(20); //one connection per partition, see below 

myDF.foreachPartition((Iterator<Row> t) -> { 
      Connection conn = DriverManager.getConnection(
        Constants.DB_JDBC_CONN, 
        Constants.DB_JDBC_USER, 
        Constants.DB_JDBC_PASS); 

      conn.setAutoCommit(true); 
      Statement statement = conn.createStatement(); 

      final int batchSize = 100000; 
      int i = 0; 
      while (t.hasNext()) { 
       Row row = t.next(); 
       try { 
        // better than REPLACE INTO, less cycles 
        statement.addBatch(("INSERT INTO mytable " + "VALUES (" 
          + "'" + row.getAs("_id") + "', 
          + "'" + row.getStruct(1).get(0) + "' 
          + "') ON DUPLICATE KEY UPDATE _id='" + row.getAs("_id") + "';")); 
        //conn.commit(); 

        if (++i % batchSize == 0) { 
         statement.executeBatch(); 
        } 
       } catch (SQLIntegrityConstraintViolationException e) { 
        //should not occur, nevertheless 
        //conn.commit(); 
       } catch (SQLException e) { 
        e.printStackTrace(); 
       } finally { 
        //conn.commit(); 
        statement.executeBatch(); 
       } 
      } 
      int[] ret = statement.executeBatch(); 

      System.out.println("Ret val: " + Arrays.toString(ret)); 
      System.out.println("Update count: " + statement.getUpdateCount()); 
      conn.commit(); 

      statement.close(); 
      conn.close(); 
Problemi correlati