2012-06-11 22 views
11

L'idea è di avere un numero variabile di canali in una sezione, di inviare ogni valore ricevuto attraverso di essi in un singolo canale e di chiudere questo canale di uscita quando l'ultimo dei canali di input viene chiuso. Qualcosa di simile, ma per un numero di canali più di due:È possibile multiplexare più canali in uno?

func multiplex(cin1, cin2, cout chan int) { 
    n := 2 
    for { 
     select { 
     case v, ok := <-cin1: 
      if ok { 
       cout <- v 
      } else { 
       n -= 1 
      } 

     case v, ok := <-cin2: 
      if ok { 
       cout <- v 
      } else { 
       n -= 1 
      } 
     } 

     if n == 0 { 
      close(cout) 
      break 
     } 
    } 
} 

Il codice di cui sopra evita loop occupato dal momento che non v'è alcuna default caso, che è buono (EDIT: sembra che la presenza di "ok" rende l'istruzione select non-blocking e il ciclo è occupato dopo tutto, ma per l'esempio, pensate al codice come se fosse bloccato). È possibile ottenere lo stesso tipo di funzionalità anche con un numero arbitrario di canali di input? Ovviamente, ciò potrebbe essere fatto riducendo la slice a coppie su un singolo canale, ma sarei più interessato a una soluzione più semplice, se possibile.

risposta

24

Credo che questo frammento di codice fa quello che stai cercando. Ho cambiato la firma in modo che sia chiaro che gli input e l'output dovrebbero essere usati solo per la comunicazione in una direzione. Notare l'aggiunta di uno sync.WaitGroup, è necessario un modo per tutti gli ingressi per segnalare che hanno completato, e questo è abbastanza facile.

func combine(inputs []<-chan int, output chan<- int) { 
    var group sync.WaitGroup 
    for i := range inputs { 
    group.Add(1) 
    go func(input <-chan int) { 
     for val := range input { 
     output <- val 
     } 
     group.Done() 
    } (inputs[i]) 
    } 
    go func() { 
    group.Wait() 
    close(output) 
    }() 
} 
+1

Ah, soluzione molto bella, chiara e concisa. Grazie! – elpres

+0

V'è ora un pacchetto con una funzione (https://godoc.org/github.com/eapache/channels#Multiplex) che risolve il problema utilizzando la riflessione invece di più goroutines. – Evan

0

Utilizzo di goroutine Ho prodotto questo. È quello che vuoi?

package main 

import (
    "fmt" 
) 

func multiplex(cin []chan int, cout chan int) { 
    n := len(cin) 
    for _, ch := range cin { 
     go func(src chan int) { 
      for { 
       v, ok := <-src 
       if ok { 
        cout <- v 
       } else { 
        n-- // a little dangerous. Maybe use a channel to avoid missed decrements 
        if n == 0 { 
         close(cout) 
        } 
        break 
       } 
      } 
     }(ch) 
    } 
} 

// a main to test the multiplex 
func main() { 
    cin := make([]chan int, 3) 
    cin[0] = make(chan int, 2) 
    cin[1] = make(chan int, 2) 
    cin[2] = make(chan int, 2) 
    cout := make(chan int, 2) 
    multiplex(cin, cout) 
    cin[1] <- 1 
    cin[0] <- 2 
    cin[2] <- 3 
    cin[1] <- 4 
    cin[0] <- 5 
    close(cin[1]) 
    close(cin[0]) 
    close(cin[2]) 
    for { 
     v, ok := <-cout 
     if ok { 
      fmt.Println(v) 
     } else { 
      break 
     } 
    } 
} 

EDIT: Bibliografia:

http://golang.org/ref/spec#Receive_operator

http://golang.org/ref/spec#Close

+0

I documenti dicono che se si legge un valore da un canale con ", ok", l'operazione non viene bloccata. Il valore di 'ok' è quindi semplicemente' false' e ​​l'esecuzione continua.Se questo è corretto (sono nuovo per andare e non posso dirlo), se il canale è vuoto ma non ancora chiuso, la riga 'if ok' valuterà come' false' ed eseguirà il ramo 'else'. Ma se si sostituisce "v, ok: = <- src" e il 'if' con un'istruzione select, allora potrebbe funzionare. Devo testarlo. Grazie per la risposta, btw. – elpres

+1

Dove hai letto che l'operazione non blocca? Non lo trovo e non sembra corrispondere a ciò che osservo. Ho letto dal documento che non blocca * una volta chiuso il canale *. –

+1

Questo sembra provenire da una versione precedente di specifiche, ad es. [qui] (http://go.googlecode.com/hg/doc/go_spec.html?r=c64e293#Communication_operators), dai uno sguardo all'ultimo paragrafo prima di "Espressioni di metodo". Nella versione corrente questo passaggio è cambiato un po 'e dice che "un valore zero è ritornato perché il canale è _chiuso e vuoto_ (falso)". Sembra che 'false' venga restituito solo dopo che i canali sono stati vuotati e chiusi, giusto? Ciò significherebbe che mi sbaglio. – elpres

2

Edit: aggiunto a coppie codice di esempio la riduzione e parti riordinati della risposta.

La soluzione preferita è la non risposta di "ristrutturazione in modo da non avere una porzione di canali". La ristrutturazione può spesso utilizzare la funzione che più goroutine possono inviare a un singolo canale. Quindi, invece di far inviare ciascuna delle tue fonti su canali separati e poi dover gestire la ricezione da una serie di canali, basta creare un canale e lasciare che tutte le fonti inviino su quel canale.

Go non offre una funzionalità per ricevere da una porzione di canali. È una domanda frequente, e mentre la soluzione appena data è preferita, ci sono modi per programmarla. Una soluzione che pensavo avessi suggerito nella tua domanda originale dicendo "ridurre la fetta a coppie" è una soluzione binaria di divisione e conquista. Funziona bene, a patto che tu abbia una soluzione per multiplexare due canali in uno. Il tuo codice di esempio per questo è molto vicino al lavoro.

Ti manca solo un piccolo trucco per far funzionare il tuo codice di esempio. Dove decrementi n, aggiungi una linea per impostare la variabile canale su zero. Ad esempio, ho fatto leggere il codice

case v, ok := <-cin1: 
     if ok { 
      cout <- v 
     } else { 
      n-- 
      cin1 = nil 
     } 
    case v, ok := <-cin2: 
     if ok { 
      cout <- v 
     } else { 
      n-- 
      cin2 = nil 
     } 
    } 

Questa soluzione fa quello che vuoi e non è occupato in attesa.

Allora, un esempio completo che incorpora questa soluzione in una funzione che effettua la multiplazione una fetta:

package main 

import (
    "fmt" 
    "time" 
) 

func multiplex(cin []chan int, cout chan int) { 
    var cin0, cin1 chan int 
    switch len(cin) { 
    case 2: 
     cin1 = cin[1] 
     fallthrough 
    case 1: 
     cin0 = cin[0] 
    case 0: 
    default: 
     cin0 = make(chan int) 
     cin1 = make(chan int) 
     half := len(cin)/2 
     go multiplex(cin[:half], cin0) 
     go multiplex(cin[half:], cin1) 
    } 
    for cin0 != nil || cin1 != nil { 
     select { 
     case v, ok := <-cin0: 
      if ok { 
       cout <- v 
      } else { 
       cin0 = nil 
      } 
     case v, ok := <-cin1: 
      if ok { 
       cout <- v 
      } else { 
       cin1 = nil 
      } 
     } 
    } 
    close(cout) 
} 

func main() { 
    cin := []chan int{ 
     make(chan int), 
     make(chan int), 
     make(chan int), 
    } 
    cout := make(chan int) 
    for i, c := range cin { 
     go func(x int, cx chan int) { 
      for i := 1; i <= 3; i++ { 
       time.Sleep(100 * time.Millisecond) 
       cx <- x*10 + i 
      } 
      close(cx) 
     }(i, c) 
    } 
    go multiplex(cin, cout) 
    for { 
     select { 
     case v, ok := <-cout: 
      if ok { 
       fmt.Println("main gets", v) 
      } else { 
       return 
      } 
     } 
    } 
} 
+1

No, non proprio. Quello che sto cercando funziona con la firma 'func multiplex (cin [] chan int, cout chan int)', cioè una che può operare su un numero arbitrario di canali di input invece di essere codificata a due. – elpres

Problemi correlati