Ho un processo di elaborazione file che attualmente utilizza gli attori di akka con la contropressione gestita manualmente per gestire la pipeline di elaborazione, ma non sono mai stato in grado di gestire correttamente la contropressione all'ingresso fase di lettura dei file.Lavello per file linea per riga IO con contropressione
Questo lavoro prende un file di input e raggruppa le righe in base a un numero ID presente all'inizio di ogni riga, e quindi quando raggiunge una riga con un nuovo numero ID, invia le linee raggruppate a un attore di elaborazione tramite messaggio, e poi continua con il nuovo numero ID, fino a raggiungere la fine del file.
Questo sembra che sarebbe un caso d'uso buono per Akka Streams, utilizzando il file come un lavandino, ma non sono ancora sicuro di tre cose:
1) Come posso leggere il file riga per linea?
2) Come posso raggruppare per ID presente su ogni linea? Attualmente sto utilizzando un'elaborazione molto imperativa per questo, e non penso che avrò la stessa capacità in una pipeline di flusso.
3) Come posso applicare la contropressione, in modo tale da non continuare a leggere le righe nella memoria più rapidamente di quanto sia possibile elaborare i dati a valle?
domande: Come si fa a gestire contropressione in questo momento? Stai leggendo il file dal singolo nodo? Stai usando il cluster akka per l'elaborazione? – Aivean
Non gestisco la contropressione. Ho provato un po 'di cose ma erano tutte hacky (come il 'long' poll' ask'ing per i messaggi 'Continue' dall'attore di elaborazione, e manualmente iterando la lettura, che era incredibilmente fragile). Quindi ho scelto di modificarlo, dando alla mia app spazio sufficiente per consumare l'intero file di input in memoria. Non posso più farlo, perché devo distribuirlo su un server condiviso e non posso più divorare la memoria di tutti. – dannytoone