2016-04-15 11 views
5

Ho un processo di elaborazione file che attualmente utilizza gli attori di akka con la contropressione gestita manualmente per gestire la pipeline di elaborazione, ma non sono mai stato in grado di gestire correttamente la contropressione all'ingresso fase di lettura dei file.Lavello per file linea per riga IO con contropressione

Questo lavoro prende un file di input e raggruppa le righe in base a un numero ID presente all'inizio di ogni riga, e quindi quando raggiunge una riga con un nuovo numero ID, invia le linee raggruppate a un attore di elaborazione tramite messaggio, e poi continua con il nuovo numero ID, fino a raggiungere la fine del file.

Questo sembra che sarebbe un caso d'uso buono per Akka Streams, utilizzando il file come un lavandino, ma non sono ancora sicuro di tre cose:

1) Come posso leggere il file riga per linea?

2) Come posso raggruppare per ID presente su ogni linea? Attualmente sto utilizzando un'elaborazione molto imperativa per questo, e non penso che avrò la stessa capacità in una pipeline di flusso.

3) Come posso applicare la contropressione, in modo tale da non continuare a leggere le righe nella memoria più rapidamente di quanto sia possibile elaborare i dati a valle?

+0

domande: Come si fa a gestire contropressione in questo momento? Stai leggendo il file dal singolo nodo? Stai usando il cluster akka per l'elaborazione? – Aivean

+0

Non gestisco la contropressione. Ho provato un po 'di cose ma erano tutte hacky (come il 'long' poll' ask'ing per i messaggi 'Continue' dall'attore di elaborazione, e manualmente iterando la lettura, che era incredibilmente fragile). Quindi ho scelto di modificarlo, dando alla mia app spazio sufficiente per consumare l'intero file di input in memoria. Non posso più farlo, perché devo distribuirlo su un server condiviso e non posso più divorare la memoria di tutti. – dannytoone

risposta

7

Akka streaming 'groupBy è un approccio. Ma groupBy ha un parametro maxSubstreams che richiede che tu sappia che il numero massimo di intervalli di ID in anticipo. Quindi: la soluzione qui di seguito utilizza scan per identificare i blocchi dello stesso ID, e splitWhen a dividersi in substreams:

object Main extends App { 
    implicit val system = ActorSystem("system") 
    implicit val materializer = ActorMaterializer() 

    def extractId(s: String) = { 
    val a = s.split(",") 
    a(0) -> a(1) 
    } 

    val file = new File("/tmp/example.csv") 

    private val lineByLineSource = FileIO.fromFile(file) 
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) 
    .map(_.utf8String) 

    val future: Future[Done] = lineByLineSource 
    .map(extractId) 
    .scan((false,"",""))((l,r) => (l._2 != r._1, r._1, r._2)) 
    .drop(1) 
    .splitWhen(_._1) 
    .fold(("",Seq[String]()))((l,r) => (r._2, l._2 ++ Seq(r._3))) 
    .concatSubstreams 
    .runForeach(println) 

    private val reply = Await.result(future, 10 seconds) 
    println(s"Received $reply") 
    Await.ready(system.terminate(), 10 seconds) 
} 

extractId linee divide in id -> tuple di dati. scan id preimpending -> tuple di dati con un flag di inizio area ID. drop rilascia l'elemento di adescamento su scan. splitwhen avvia un nuovo substream per ciascun avvio di gamma. fold concatena i substream agli elenchi e rimuove il valore booleano dell'intervallo iniziale, in modo che ogni substream produca un singolo elemento.Al posto della piega probabilmente vuoi un custom SubFlow che elabora uno stream di righe per un singolo ID ed emette qualche risultato per l'intervallo di ID. concatSubstreams unisce i sottosistemi della gamma per-ID prodotti da split, tornando in un singolo flusso stampato da runForEach.

Run con:

$ cat /tmp/example.csv 
ID1,some input 
ID1,some more input 
ID1,last of ID1 
ID2,one line of ID2 
ID3,2nd before eof 
ID3,eof 

uscita è:

(ID1,List(some input, some more input, last of ID1)) 
(ID2,List(one line of ID2)) 
(ID3,List(2nd before eof, eof)) 
+0

Penso di seguire la logica qui, ma puoi spiegare cosa fa 'mergeSubstreams'? Non riesco a trovare la sua definizione nei documenti API. – dannytoone

+0

Non ho trovato documenti oltre agli scaladoc, ma ho aggiunto commenti per il palco. Inoltre, ho sostituito 'mergeSubstreams' con' concatSubstreams' dato che in questo caso i substream vengono avviati e terminati in modo sequenziale (in base agli intervalli di ID) e concat genererà l'output nello stesso ordine degli intervalli ID indipendentemente da qualsiasi elaborazione asincrona nei flussi secondari. – tariksbl

0

Sembra che il modo più semplice per aggiungere "contropressione" al sistema senza introdurre enormi modifiche sia semplicemente cambiare il tipo di cassetta postale dei gruppi di input che consumano l'attore su BoundedMailbox.

  1. Cambiare il tipo di attore che consuma le linee a BoundedMailbox con elevata mailbox-push-timeout-time:

    bounded-mailbox { 
        mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox" 
        mailbox-capacity = 1 
        mailbox-push-timeout-time = 1h 
    } 
    
    val actor = system.actorOf(Props(classOf[InputGroupsConsumingActor]).withMailbox("bounded-mailbox")) 
    
  2. Crea iteratore dal file, creare raggruppati (da id) iteratore da quella iteratore. Quindi basta scorrere i dati, inviando gruppi a consumare attore. Nota che l'invio si bloccherà in questo caso, quando la casella vocale di Actor si riempie.

    def iterGroupBy[A, K](iter: Iterator[A])(keyFun: A => K): Iterator[Seq[A]] = { 
        def rec(s: Stream[A]): Stream[Seq[A]] = 
        if (s.isEmpty) Stream.empty else { 
         s.span(keyFun(s.head) == keyFun(_)) match { 
         case (prefix, suffix) => prefix.toList #:: rec(suffix) 
        } 
        } 
        rec(iter.toStream).toIterator 
    } 
    
    val lines = Source.fromFile("input.file").getLines() 
    
    iterGroupBy(lines){l => l.headOption}.foreach { 
        lines:Seq[String] => 
         actor.tell(lines, ActorRef.noSender) 
    } 
    

Questo è tutto! Probabilmente vorrai spostare il materiale di lettura del file in un thread separato, poiché bloccherà. Inoltre, regolando mailbox-capacity è possibile regolare la quantità di memoria consumata. Ma se la lettura batch dal file è sempre più veloce di elaborazione, sembra ragionevole mantenere la capacità piccola, come 1 o 2.

upditerGroupBy realizzato con Stream, non testato per produrre StackOverflow.

Problemi correlati