2013-01-01 16 views
5

Sto scrivendo un'applicazione di controllo per Logitech Media Server (precedentemente noto come Squeezebox Server).Rimozione del codice imperativo dai flussi di lavoro asincroni F #

Una piccola parte di ciò sta rilevando quali server sono in esecuzione sulla rete locale. Questo viene fatto trasmettendo uno speciale pacchetto UDP alla porta 3483 e aspettando le risposte. Se nessun server risponde dopo un determinato periodo di tempo, (o un server preferito risponde) l'applicazione dovrebbe smettere di ascoltare.

Ce l'ho in C#, usando le funzioni asincrone/attese di C# 5, ma ero curioso di vedere come sarebbe stato in F #. Ho la seguente funzione (più o meno direttamente tradotto da C#):

let broadCast (timeout:TimeSpan) onServerDiscovered = async { 
    use udp = new UdpClient (EnableBroadcast = true) 
    let endPoint = new IPEndPoint(IPAddress.Broadcast, 3483) 
    let! _ = udp.SendAsync(discoveryPacket, discoveryPacket.Length, endPoint) 
      |> Async.AwaitTask 

    let timeoutTask = Task.Delay(timeout) 
    let finished = ref false 
    while not !finished do 
    let recvTask = udp.ReceiveAsync() 
    let! _ = Task.WhenAny(timeoutTask, recvTask) |> Async.AwaitTask 
    finished := if not recvTask.IsCompleted then true 
       else let udpResult = recvTask.Result 
        let hostName = udpResult.RemoteEndPoint.Address.ToString() 
        let serverName = udpResult.Buffer |> getServerName 
        onServerDiscovered serverName hostName 9090 
    } 

Il discoveryPacket è una matrice di byte contenente i dati trasmessi. getServerName è una funzione definita altrove, che estrae il nome del server leggibile dall'uomo dai dati di risposta del server.

Quindi l'applicazione chiama broadCast con due argomenti, un timeout e una funzione di richiamata che verrà chiamata quando un server risponde. Questa funzione di callback può quindi decidere di terminare l'ascolto o meno, restituendo true o false. Se nessun server risponde, o nessun callback restituisce true, la funzione ritorna al termine del timeout.

Questo codice funziona bene, ma sono vagamente disturbato dall'uso della cella di riferimento imperativa finished.

Quindi, ecco la domanda: c'è un modo idiomatico di F # per fare questo genere di cose senza passare al lato oscuro imperativo?

Aggiornamento

Sulla base della risposta accettato di sotto (che era quasi a destra), questo è il programma di test completo Ho finito con:

open System 
open System.Linq 
open System.Text 
open System.Net 
open System.Net.Sockets 
open System.Threading.Tasks 

let discoveryPacket = 
    [| byte 'd'; 0uy; 2uy; 23uy; 0uy; 0uy; 0uy; 0uy; 
     0uy; 0uy; 0uy; 0uy; 0uy; 1uy; 2uy; 3uy; 4uy; 5uy |] 

let getUTF8String data start length = 
    Encoding.UTF8.GetString(data, start, length) 

let getServerName data = 
    data |> Seq.skip 1 
     |> Seq.takeWhile ((<) 0uy) 
     |> Seq.length 
     |> getUTF8String data 1 


let broadCast (timeout : TimeSpan) onServerDiscovered = async { 
    use udp = new UdpClient (EnableBroadcast = true) 
    let endPoint = IPEndPoint (IPAddress.Broadcast, 3483) 
    do! udp.SendAsync (discoveryPacket, Array.length discoveryPacket, endPoint) 
     |> Async.AwaitTask 
     |> Async.Ignore 

    let timeoutTask = Task.Delay timeout 

    let rec loop() = async { 
     let recvTask = udp.ReceiveAsync() 

     do! Task.WhenAny(timeoutTask, recvTask) 
      |> Async.AwaitTask 
      |> Async.Ignore 

     if recvTask.IsCompleted then 
      let udpResult = recvTask.Result 
      let hostName = udpResult.RemoteEndPoint.Address.ToString() 
      let serverName = getServerName udpResult.Buffer 
      if onServerDiscovered serverName hostName 9090 then 
       return()  // bailout signalled from callback 
      else 
       return! loop() // we should keep listening 
    } 

    return! loop() 
    } 

[<EntryPoint>] 
let main argv = 
    let serverDiscovered serverName hostName hostPort = 
     printfn "%s @ %s : %d" serverName hostName hostPort 
     false 

    let timeout = TimeSpan.FromSeconds(5.0) 
    broadCast timeout serverDiscovered |> Async.RunSynchronously 
    printfn "Done listening" 
    0 // return an integer exit code 
+0

Perché non si scrive un ciclo infinito ma lo si esegue utilizzando 'Async.RunSynchronously' specificando un timeout di 5 secondi? –

+0

@Jon Harrop - perché nel programma reale (questo è solo un esempio semplice) non voglio eseguire la scoperta in modo sincrono. – corvuscorax

risposta

4

è possibile implementare questa "funzionalmente" con una funzione ricorsiva che produce anche un valore 'T> Async < (in questo caso, Async). Questo codice dovrebbe funzionare - è basato sul codice che hai fornito - anche se non ho potuto testarlo in quanto dipende da altre parti del tuo codice.

open System 
open System.Net 
open System.Net.Sockets 
open System.Threading.Tasks 
open Microsoft.FSharp.Control 

let broadCast (timeout : TimeSpan) onServerDiscovered = async { 
    use udp = new UdpClient (EnableBroadcast = true) 
    let endPoint = IPEndPoint (IPAddress.Broadcast, 3483) 
    do! udp.SendAsync (discoveryPacket, Array.length discoveryPacket, endPoint) 
     |> Async.AwaitTask 
     |> Async.Ignore 

    let rec loop() = 
     async { 
     let timeoutTask = Task.Delay timeout 
     let recvTask = udp.ReceiveAsync() 

     do! Task.WhenAny (timeoutTask, recvTask) 
      |> Async.AwaitTask 
      |> Async.Ignore 

     if recvTask.IsCompleted then 
      let udpResult = recvTask.Result 
      let hostName = udpResult.RemoteEndPoint.Address.ToString() 
      let serverName = getServerName udpResult.Buffer 
      onServerDiscovered serverName hostName 9090 
      return! loop() 
     } 

    return! loop() 
    } 
+1

Poiché i task vengono avviati immediatamente al momento della creazione, a differenza dei flussi di lavoro asincroni, sospetto che dovresti spostare la dichiarazione di 'timeoutTask' all'interno del blocco asincrono' loop'. In questo modo si ottiene un nuovo 'timeoutTask' per andare con ogni nuovo' recvTask'. –

+0

Grazie Joel, è riparato ora. –

+0

In realtà, avere il 'timeoutTask' fuori dal ciclo è ciò che cerco - in questo modo c'è solo un periodo di 5 secondi (ad esempio) dove ascolto i server, indipendentemente da quando arrivano le risposte. Ma Il codice Jacks era quasi corretto, pubblicherò la soluzione completa in un minuto, grazie. – corvuscorax

2

avrei disinfettare le chiamate asincrone e utilizzare un timeout asincrono, più simile a questo:

open System.Net 

let discoveryPacket = 
    [|'d'B; 0uy; 2uy; 23uy; 0uy; 0uy; 0uy; 0uy; 
    0uy; 0uy; 0uy; 0uy; 0uy; 1uy; 2uy; 3uy; 4uy; 5uy|] 

let getUTF8String data start length = 
    System.Text.Encoding.UTF8.GetString(data, start, length) 

let getServerName data = 
    data 
    |> Seq.skip 1 
    |> Seq.takeWhile ((<) 0uy) 
    |> Seq.length 
    |> getUTF8String data 1 

type Sockets.UdpClient with 
    member client.AsyncSend(bytes, length, ep) = 
    let beginSend(f, o) = client.BeginSend(bytes, length, ep, f, o) 
    Async.FromBeginEnd(beginSend, client.EndSend) 

    member client.AsyncReceive() = 
    async { let ep = ref null 
      let endRecv res = 
       client.EndReceive(res, ep) 
      let! bytes = Async.FromBeginEnd(client.BeginReceive, endRecv) 
      return bytes, !ep } 

let broadCast onServerDiscovered = 
    async { use udp = new Sockets.UdpClient (EnableBroadcast = true) 
      let endPoint = IPEndPoint (IPAddress.Broadcast, 3483) 
      let! _ = udp.AsyncSend(discoveryPacket, discoveryPacket.Length, endPoint) 
      while true do 
      let! bytes, ep = udp.AsyncReceive() 
      let hostName = ep.Address.ToString() 
      let serverName = getServerName bytes 
      onServerDiscovered serverName hostName 9090 } 

do 
    let serverDiscovered serverName hostName hostPort = 
    printfn "%s @ %s : %d" serverName hostName hostPort 

    let timeout = 5000 
    try 
    Async.RunSynchronously(broadCast serverDiscovered, timeout) 
    with _ ->() 
    printfn "Done listening" 

Mi piacerebbe anche Sostituire la funzione effetto collaterale a base di serverDiscovered con un'architettura diversa, come un agente asincrono che raccoglie le risposte per 5 secondi.

Problemi correlati