2012-08-26 17 views
5

Sto scrivendo una libreria Mercurial completamente gestita (da utilizzare in un Mercurial Server for Windows completamente gestito, disponibile a breve) e uno dei più severi problemi di prestazioni che sto incontrando è, abbastanza stranamente, dividere un array in parti.Il modo più veloce (portatile) per dividere un array in C#

L'idea è la seguente: c'è un array di byte con dimensioni che vanno da diverse centinaia di byte a un massimo di un megabyte e tutto ciò che devo fare è dividerlo in parti delimitate da, nel mio caso specifico, \n caratteri .

Ora quello che dotTrace mi mostra è che il mio "optimized" version della Split (il codice è corretta, here's the naive versione ho cominciato con) occupa 11 secondi per 2.300 chiamate (c'è un successo evidente le prestazioni introdotto dal dotTrace in sé, ma tutto di fino alla scala).

Ecco i numeri:

  • unsafe versione: 11 297 ms per 2 312 chiamate
  • gestito ("naive") Versione: 20 001 ms per 2 312 chiamate

quindi ecco qui: che cosa sarà il modo più veloce (preferibilmente portatile, che significa supportare sia x86 che x64) per dividere un array in C#.

+3

Possiamo vedere il tuo * versione "ottimizzata" di Spalato * in modo che possiamo pensare a come migliorarlo? –

+0

Portatile nel senso di una libreria di classi portatile come descritto qui: http://msdn.microsoft.com/en-us/library/gg597391.aspx? –

+2

Puoi trovare il suo codice nel link "versione ottimizzata". –

risposta

3

Per Split, la gestione ulong sulla macchina a 32 bit è molto lenta, quindi riduci decisamente a uint. Se si desidera veramente ulong, implementare due versioni, una per 32 bit, una per 64 bit.

È inoltre necessario misurare se il byte di gestione alla volta è più veloce.

È necessario definire il costo dell'allocazione di memoria. Se è abbastanza grande, prova a riutilizzare la memoria attraverso più chiamate.

Altro:

ToString: è più veloce da usare "(" + Offset.ToString() + "" + Length.ToString() + ")";

GetHashCode: provare fisso (byte * b = & tampone [offset])


Questa versione dovrebbe essere molto veloce, se utilizzato più volte. Punto chiave: nessuna nuova allocazione di memoria dopo che l'array interno si è espanso alle dimensioni corrette, copia minima dei dati.

class ArraySplitter 
{ 
    private byte[] m_data; 
    private int m_count; 
    private int[] m_stops; 

    private void AddRange(int start, int stop) 
    { 
     // Skip empty range 
     if (start > stop) 
     { 
      return; 
     } 

     // Grow array if needed 
     if ((m_stops == null) || (m_stops.Length < (m_count + 2))) 
     { 
      int[] old = m_stops; 

      m_stops = new int[m_count * 3/2 + 4]; 

      if (old != null) 
      { 
       old.CopyTo(m_stops, 0); 
      } 
     } 

     m_stops[m_count++] = start; 
     m_stops[m_count++] = stop; 
    } 

    public int Split(byte[] data, byte sep) 
    { 
     m_data = data; 
     m_count = 0;  // reuse m_stops 

     int last = 0; 

     for (int i = 0; i < data.Length; i ++) 
     { 
      if (data[i] == sep) 
      { 
       AddRange(last, i - 1); 
       last = i + 1; 
      } 
     } 

     AddRange(last, data.Length - 1); 

     return m_count/2; 
    } 

    public ArraySegment<byte> this[int index] 
    { 
     get 
     { 
      index *= 2; 
      int start = m_stops[index]; 

      return new ArraySegment<byte>(m_data, start, m_stops[index + 1] - start + 1); 
     } 
    } 
} 

programma di test:

static void Main(string[] args) 
    { 
     int count = 1000 * 1000; 

     byte[] data = new byte[count]; 

     for (int i = 0; i < count; i++) 
     { 
      data[i] = (byte) i; 
     } 

     Stopwatch watch = new Stopwatch(); 

     for (int r = 0; r < 10; r++) 
     { 
      watch.Reset(); 
      watch.Start(); 

      int len = 0; 

      foreach (var seg in data.MySplit(13)) 
      { 
       len += seg.Count; 
      } 

      watch.Stop(); 

      Console.WriteLine("MySplit  : {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds); 

      watch.Reset(); 
      watch.Start(); 

      ArraySplitter splitter = new ArraySplitter(); 

      int parts = splitter.Split(data, 13); 

      len = 0; 

      for (int i = 0; i < parts; i++) 
      { 
       len += splitter[i].Count; 
      } 

      watch.Stop(); 
      Console.WriteLine("ArraySplitter: {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds); 
     } 
    } 

Risultato:

MySplit  : 996093 9.514 ms 
ArraySplitter: 996093 4.754 ms 
MySplit  : 996093 7.760 ms 
ArraySplitter: 996093 2.710 ms 
MySplit  : 996093 8.391 ms 
ArraySplitter: 996093 3.510 ms 
MySplit  : 996093 9.677 ms 
ArraySplitter: 996093 3.468 ms 
MySplit  : 996093 9.685 ms 
ArraySplitter: 996093 3.370 ms 
MySplit  : 996093 9.700 ms 
ArraySplitter: 996093 3.425 ms 
MySplit  : 996093 9.669 ms 
ArraySplitter: 996093 3.519 ms 
MySplit  : 996093 9.844 ms 
ArraySplitter: 996093 3.416 ms 
MySplit  : 996093 9.721 ms 
ArraySplitter: 996093 3.685 ms 
MySplit  : 996093 9.703 ms 
ArraySplitter: 996093 3.470 ms 
+0

Un'ultima cosa: il tuo ciclo interno ha troppe variabili. –

+0

Quanti segmenti diversi hanno i dati di test? –

+0

1000 * 1000/256 = 3906 –

4

Credo che il problema è, che si sta facendo un sacco di operazioni complesse in loop. Questo codice rimuove tutte le operazioni tranne l'aggiunta singola e il confronto all'interno di un ciclo. Altre cose complesse si verificano solo quando viene rilevata una divisione o alla fine di una matrice.

Inoltre, è difficile dire con che tipo di dati si eseguono i test, quindi questa è solo una congettura.

public static unsafe Segment[] Split2(byte[] _src, byte value) 
{ 
    var _ln = _src.Length; 

    if (_ln == 0) return new Segment[] { }; 

    fixed (byte* src = _src) 
    { 
     var segments = new LinkedList<Segment>(); // Segment[c]; 

     byte* last = src; 
     byte* end = src + _ln - 1; 
     byte lastValue = *end; 
     *end = value; // value-termination 

     var cur = src; 
     while (true) 
     { 
      if (*cur == value) 
      { 
       int begin = (int) (last - src); 
       int length = (int) (cur - last + 1); 
       segments.AddLast(new Segment(_src, begin, length)); 

       last = cur + 1; 

       if (cur == end) 
       { 
        if (lastValue != value) 
        { 
         *end = lastValue; 
        } 
        break; 
       } 
      } 
      cur++; 
     } 

     return segments.ToArray(); 
    } 
} 

Edit: Codice fisso, quindi restituisce risultati corretti.

+0

Questo è un uso molto astuto/intelligente della sentinella .. tuttavia, solo il ridimensionamento di un condizionale/byte ... sarebbe bello vedere i tempi. –

+0

Bello! '4 696' ms per chiamate' 2 312' sotto dotTrace. –

+0

@pst: è quasi tre volte più veloce della mia versione "non sicura". –

2

Anton,

Non so se siete ancora interessati a ottimizzare questo poiché questa discussione è piuttosto vecchio, ma ho visto che il codice era praticamente lo stesso nel vostro archivio online così ho pensato che sarebbe dagli Un colpo. Ho esaminato il tuo codice HgSharp su bitbucket.org durante la valutazione della tua applicazione HgLab. Ho riscritto la funzione utilizzando i costrutti nativi che lo hanno semplificato notevolmente. I miei test hanno prodotto risultati migliori della metà rispetto alla routine originale. L'ho provato caricando un file sorgente di diversi megabyte e confrontando i tempi con le prestazioni della stessa operazione usando la tua routine originale.

Oltre a riscrivere la logica di base, ho deciso di utilizzare il codice nativo ArraySegment<> integrato nel framework anziché l'implementazione personalizzata. L'unica differenza significativa è che ArraySegment<> espone una proprietà Count anziché una proprietà Length. Il codice seguente non richiede la parola chiave non sicura perché non sto usando i puntatori, tuttavia sembra che ci sia un leggero miglioramento delle prestazioni se viene modificato per farlo.


public static ArraySegment<byte>[] SplitEx(this byte[] source, byte value) { 
     var _previousIndex = -1; 
     var _segments = new List<ArraySegment<byte>>(); 
     var _length = source.Length; 

     if (_length > 0) { 
      int _index; 

      for (_index = 0; _index < _length; _index++) { 
       var _value = source[_index]; 
       if (_value == value) { 
        _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex)); 
        _previousIndex = _index; 
       } 
      } 

      if (--_index != _previousIndex) { 
       _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex)); 
      } 
     } 

     return _segments.ToArray(); 
    } 
Problemi correlati