2011-01-12 13 views
25

Ho un ascoltatore:Multi-threading con .Net HttpListener

listener = new HttpListener(); 
listener.Prefixes.Add(@"http://+:8077/"); 
listener.Start(); 
listenerThread = new Thread(HandleRequests); 
listenerThread.Start(); 

E sono le richieste di movimentazione:

private void HandleRequests() 
{ 
    while (listener.IsListening) 
    { 
     var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener); 
     context.AsyncWaitHandle.WaitOne(); 
    } 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 
} 

vorrei scrivere void Stop() in modo tale, che:

  1. Bloccherà fino a quando tutte le richieste attualmente gestite termineranno (cioè attenderà che tutti i thread "facciano un po 'di cose").
  2. Mentre attende le richieste già avviate, non consente più altre richieste (ad esempio, ritorno all'inizio di ListenerCallback).
  3. Dopodiché chiamerà listener.Stop() (listener.IsListening diventato falso).

Come potrebbe essere scrivere?

EDIT: Che cosa ne pensi di questa soluzione? È sicuro?

public void Stop() 
{ 
    lock (this) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (this) 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 

    lock (this) 
    { 
     if (--numberOfRequests == 0) 
      resetEvent.Set(); 
    } 
} 

risposta

2

Ho consultato il mio codice in EDIT parte della mia domanda e ho deciso di accettarlo con alcune modifiche:

public void Stop() 
{ 
    lock (locker) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (locker) //locking on this is a bad idea, but I forget about it before 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    try 
    { 
     var listener = ar.AsyncState as HttpListener; 

     var context = listener.EndGetContext(ar); 

     //do some stuff 
    } 
    finally //to make sure that bellow code will be executed 
    { 
     lock (locker) 
     { 
      if (--numberOfRequests == 0) 
       resetEvent.Set(); 
     } 
    } 
} 
0

Basta chiamare listener.Stop() dovrebbe fare il trucco. Ciò non risolverà alcuna connessione che è già stata stabilita ma impedirà qualsiasi nuova connessione.

+1

Questo non è vero. Se chiami 'listener.Stop()' durante l'esecuzione di 'ListenerCallback' otterrai un'eccezione es. quando si chiama 'EndGetContext' o anche più tardi, quando si utilizza il flusso di output. Naturalmente posso cogliere le eccezioni, ma preferirei non farlo. – prostynick

+0

Nel mio codice uso un flag e non mi riferisco più al listener più dopo aver chiamato stop su di esso, ma chiudendo il listener non si chiudono le connessioni già accettate, solo il listener. –

+0

Non so cosa intendi dicendo "Uso una bandiera". Il problema è che in 'ListenerCallback' sto usando il listener e se un altro thread lo chiude, mentre lo sto usando, finirò con delle eccezioni, che ho menzionato. – prostynick

4

Bene ci sono diversi modi per risolvere questo ... Questo è un semplice esempio che utilizza un semaforo per tenere traccia del lavoro in corso e un segnale che viene generato quando tutti i lavoratori sono finiti. Questo dovrebbe darti un'idea di base su cui lavorare.

La soluzione seguente non è l'ideale, idealmente dovremmo acquisire il semaforo prima di chiamare BeginGetContext. Ciò rende più difficile l'arresto, quindi ho scelto di utilizzare questo approccio più semplificato. Se lo stessi facendo per "reale" probabilmente scriverei la mia gestione dei thread piuttosto che affidarmi al ThreadPool. Ciò consentirebbe un arresto più affidabile.

Comunque qui è l'esempio completo:

class TestHttp 
{ 
    static void Main() 
    { 
     using (HttpServer srvr = new HttpServer(5)) 
     { 
      srvr.Start(8085); 
      Console.WriteLine("Press [Enter] to quit."); 
      Console.ReadLine(); 
     } 
    } 
} 


class HttpServer : IDisposable 
{ 
    private readonly int _maxThreads; 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly ManualResetEvent _stop, _idle; 
    private readonly Semaphore _busy; 

    public HttpServer(int maxThreads) 
    { 
     _maxThreads = maxThreads; 
     _stop = new ManualResetEvent(false); 
     _idle = new ManualResetEvent(false); 
     _busy = new Semaphore(maxThreads, maxThreads); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     _idle.Reset(); 

     //aquire and release the semaphore to see if anyone is running, wait for idle if they are. 
     _busy.WaitOne(); 
     if(_maxThreads != 1 + _busy.Release()) 
      _idle.WaitOne(); 

     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ListenerCallback, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ListenerCallback(IAsyncResult ar) 
    { 
     _busy.WaitOne(); 
     try 
     { 
      HttpListenerContext context; 
      try 
      { context = _listener.EndGetContext(ar); } 
      catch (HttpListenerException) 
      { return; } 

      if (_stop.WaitOne(0, false)) 
       return; 

      Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl); 
      context.Response.SendChunked = true; 
      using (TextWriter tw = new StreamWriter(context.Response.OutputStream)) 
      { 
       tw.WriteLine("<html><body><h1>Hello World</h1>"); 
       for (int i = 0; i < 5; i++) 
       { 
        tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now); 
        tw.Flush(); 
        Thread.Sleep(1000); 
       } 
       tw.WriteLine("</body></html>"); 
      } 
     } 
     finally 
     { 
      if (_maxThreads == 1 + _busy.Release()) 
       _idle.Set(); 
     } 
    } 
} 
56

Per completezza, ecco quello che potrebbe apparire come se a gestire i propri thread di lavoro:

class HttpServer : IDisposable 
{ 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly Thread[] _workers; 
    private readonly ManualResetEvent _stop, _ready; 
    private Queue<HttpListenerContext> _queue; 

    public HttpServer(int maxThreads) 
    { 
     _workers = new Thread[maxThreads]; 
     _queue = new Queue<HttpListenerContext>(); 
     _stop = new ManualResetEvent(false); 
     _ready = new ManualResetEvent(false); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 

     for (int i = 0; i < _workers.Length; i++) 
     { 
      _workers[i] = new Thread(Worker); 
      _workers[i].Start(); 
     } 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     foreach (Thread worker in _workers) 
      worker.Join(); 
     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ContextReady, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ContextReady(IAsyncResult ar) 
    { 
     try 
     { 
      lock (_queue) 
      { 
       _queue.Enqueue(_listener.EndGetContext(ar)); 
       _ready.Set(); 
      } 
     } 
     catch { return; } 
    } 

    private void Worker() 
    { 
     WaitHandle[] wait = new[] { _ready, _stop }; 
     while (0 == WaitHandle.WaitAny(wait)) 
     { 
      HttpListenerContext context; 
      lock (_queue) 
      { 
       if (_queue.Count > 0) 
        context = _queue.Dequeue(); 
       else 
       { 
        _ready.Reset(); 
        continue; 
       } 
      } 

      try { ProcessRequest(context); } 
      catch (Exception e) { Console.Error.WriteLine(e); } 
     } 
    } 

    public event Action<HttpListenerContext> ProcessRequest; 
} 
+0

Questo è fantastico: è un ottimo candidato per testare il throughput di HttpListener. – Jonno

+0

Grazie mille per quel pezzo di codice! Esistono due piccoli problemi: 1. ProcessRequest potrebbe essere nullo 2. HttpListenerContext non è thread-safe a meno che non sia statico –

+0

@MartinMeeser grazie per il commento. per 1. invece di avvolgerlo in try catch block potremmo usare questo 'ProcessRequest? .Invoke (context);'. Per 2. tuttavia se l'opzione statica non è un'opzione, cosa consiglia? – JohnTube

0

Questo utilizza la coda digitato BlockingCollection per le richieste di assistenza. È utilizzabile così com'è. Dovresti ricavare una classe da questa risposta di override.

using System; 
using System.Collections.Concurrent; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class HttpServer : IDisposable 
    { 
     private HttpListener httpListener; 
     private Thread listenerLoop; 
     private Thread[] requestProcessors; 
     private BlockingCollection<HttpListenerContext> messages; 

     public HttpServer(int threadCount) 
     { 
      requestProcessors = new Thread[threadCount]; 
      messages = new BlockingCollection<HttpListenerContext>(); 
      httpListener = new HttpListener(); 
     } 

     public virtual int Port { get; set; } = 80; 

     public virtual string[] Prefixes 
     { 
      get { return new string[] {string.Format(@"http://+:{0}/", Port)}; } 
     } 

     public void Start(int port) 
     { 
      listenerLoop = new Thread(HandleRequests); 

      foreach(string prefix in Prefixes) httpListener.Prefixes.Add(prefix); 

      listenerLoop.Start(); 

      for (int i = 0; i < requestProcessors.Length; i++) 
      { 
       requestProcessors[i] = StartProcessor(i, messages); 
      } 
     } 

     public void Dispose() { Stop(); } 

     public void Stop() 
     { 
      messages.CompleteAdding(); 

      foreach (Thread worker in requestProcessors) worker.Join(); 

      httpListener.Stop(); 
      listenerLoop.Join(); 
     } 

     private void HandleRequests() 
     { 
      httpListener.Start(); 
      try 
      { 
       while (httpListener.IsListening) 
       { 
        Console.WriteLine("The Linstener Is Listening!"); 
        HttpListenerContext context = httpListener.GetContext(); 

        messages.Add(context); 
        Console.WriteLine("The Linstener has added a message!"); 
       } 
      } 
      catch(Exception e) 
      { 
       Console.WriteLine (e.Message); 
      } 
     } 

     private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Thread thread = new Thread(() => Processor(number, messages)); 
      thread.Start(); 
      return thread; 
     } 

     private void Processor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Console.WriteLine ("Processor {0} started.", number); 
      try 
      { 
       for (;;) 
       { 
        Console.WriteLine ("Processor {0} awoken.", number); 
        HttpListenerContext context = messages.Take(); 
        Console.WriteLine ("Processor {0} dequeued message.", number); 
        Response (context); 
       } 
      } catch { } 

      Console.WriteLine ("Processor {0} terminated.", number); 
     } 

     public virtual void Response(HttpListenerContext context) 
     { 
      SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>")); 
     } 

     public static void SendReply(HttpListenerContext context, StringBuilder responseString) 
     { 
      byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString()); 
      context.Response.ContentLength64 = buffer.Length; 
      System.IO.Stream output = context.Response.OutputStream; 
      output.Write(buffer, 0, buffer.Length); 
      output.Close(); 
     } 
    } 
} 

Questo è un esempio di come utilizzarlo. Non è necessario utilizzare eventi o blocchi di blocco. BlockingCollection risolve tutti questi problemi.

using System; 
using System.Collections.Concurrent; 
using System.IO; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class Server 
    { 
    public static void Main (string[] args) 
    { 
     HttpServer Service = new QuizzServer (8); 
     Service.Start (80); 
     for (bool coninute = true; coninute ;) 
     { 
      string input = Console.ReadLine().ToLower(); 
      switch (input) 
      { 
       case "stop": 
        Console.WriteLine ("Stop command accepted."); 
        Service.Stop(); 
        coninute = false; 
        break; 
       default: 
        Console.WriteLine ("Unknown Command: '{0}'.",input); 
        break; 
      } 
     } 
    } 
    } 
}