2015-01-03 8 views
7

A causa di un errore non risolto con System.Generics.Collections.TArray.Copy<T> (a seconda di already reported bug in System.CopyArray) a volte si genera un'eccezione utilizzando la libreria di threading.Come risolvere TSparseArray <T>?

L'eccezione è sollevata in modo System.Threading.TSparseArray<T>.Add:

function TSparseArray<T>.Add(const Item: T): Integer; 
var 
    I: Integer; 
    LArray, NewArray: TArray<T>; 
begin 
    ... 
      TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here 
    ... 
end; 

bene che è previsto con il bug in System.CopyArray. Così, quando si cerca di risolvere questo mio primo pensiero è stato quello di copiare semplicemente l'array con:

// TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here 
for LIdx := Low(LArray) to High(LArray) do 
    NewArray[LIdx] := LArray[LIdx]; 

funziona come un fascino. Ma dopo che mi chiedevo, perché è necessaria la copia matrice:

LArray := FArray; // copy array reference from field 
... 
SetLength(NewArray, Length(LArray) * 2); 
TArray.Copy<T>(LArray, NewArray, I + 1); 
NewArray[I + 1] := Item; 
Exit(I + 1); 

Gli elementi vengono copiati (variabile locale) e questo è tutto. Non è possibile eseguire il trasferimento a FArray, quindi per me lo verrà finalizzato quando non è incluso.

Ora ho tre scelte Bugfix:

  1. basta sostituire TArray.Copy

    SetLength(NewArray, Length(LArray) * 2); 
    // TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here 
    for LIdx := Low(LArray) to High(LArray) do 
        NewArray[LIdx] := LArray[LIdx]; 
    NewArray[I + 1] := Item; 
    Exit(I + 1); 
    
  2. Sostituire TArray.Copy e salvare

    SetLength(NewArray, Length(LArray) * 2); 
    // TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here 
    for LIdx := Low(LArray) to High(LArray) do 
        NewArray[LIdx] := LArray[LIdx]; 
    NewArray[I + 1] := Item; 
    FArray := NewArray; 
    Exit(I + 1); 
    
  3. commento fuori tutte le parti di codice non necessarie (perché t hey sono solo perdendo tempo)

    // SetLength(NewArray, Length(LArray) * 2); 
    // TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here 
    // NewArray[I + 1] := Item; 
    Exit(I + 1); 
    

Ho controllato tutte e tre le correzioni con un mucchio di compiti in cerca di thread di lavoro inutilizzati o compiti non eseguiti. Ma non ho trovato nessuno di loro. La libreria funziona come previsto (e ora senza eccezioni).

Potete per favore indicarmi cosa mi manca qui?


Per arrivare a questa eccezione, è stata eseguita una serie di compiti e lasciare che il TTaskPool creare sempre più TWorkerQueueThreads. Controllare il numero di thread da TaskManager e utilizzare un punto di interruzione sulla riga TArray.Copy nel metodo TSparseArray<T>.Add. Qui ottengo questa eccezione quando il numero di thread dell'applicazione supera 25 thread.

// Hit the button very fast until the debugger stops 
// at TSparseArray<T>.Add method to copy the array 
procedure TForm1.Button1Click(Sender : TObject); 
var 
    LIdx : Integer; 
begin 
    for LIdx := 1 to 20 do 
    TTask.Run( 
     procedure 
     begin 
     Sleep(50); 
     end); 
end; 
+0

Anche se 'TArray.Copy()' sono stati risolti, penserei l'assegnazione/modifica mancante di 'FArray' sarebbe un bug separato che deve essere corretto. Altrimenti, come è 'Add()' aggiungendo 'Item' alla matrice? Non ho ancora visto il codice sorgente di 'TSparseArray'. –

+0

'TSparseArray ' è strano. Perché sente la necessità di creare un oggetto di blocco separato? Cosa c'è di sbagliato nel blocco stesso? –

risposta

1

Non importa se gli articoli sono scritti su TSparseArray<T>, perché è necessario solo se un thread di lavoro ha terminato tutte le attività a lui delegate e un altro thread di lavoro non è ancora terminato. A questo punto il thread inattivo guarda le code degli altri gradini all'interno della piscina e tenta di rubare un po 'di lavoro.

Se una coda non è stata inserita in questo array, non è visibile ai thread inattivi e pertanto il carico di lavoro non può essere condiviso.

per risolvere questo ho scelto l'opzione 2

function TSparseArray<T>.Add(const Item: T): Integer; 
... 
SetLength(NewArray, Length(LArray) * 2); 
TArray.Copy<T>(LArray, NewArray, I + 1); // <- No Exception here with XE7U1 
NewArray[I + 1] := Item; 
{$IFDEF USE_BUGFIX} 
FArray := NewArray; 
{$ENDIF} 
Exit(I + 1); 

Ma che rubare parte è rischioso attuata senza alcun blocco

procedure TThreadPool.TQueueWorkerThread.Execute; 

... 

if Signaled then 
begin 
    I := 0; 
    while I < Length(ThreadPool.FQueues.Current) do 
    begin 
    if (ThreadPool.FQueues.Current[I] <> nil) 
     and (ThreadPool.FQueues.Current[I] <> WorkQueue) 
     and ThreadPool.FQueues.Current[I].TrySteal(Item) 
    then 
     Break; 
    Inc(I); 
    end; 
    if I <> Length(ThreadPool.FQueues.Current) then 
    Break; 
    LookedForSteals := True; 
end 

La lunghezza array è solo crescendo così

while I < Length(ThreadPool.FQueues.Current) do 

e

if I <> Length(ThreadPool.FQueues.Current) then 

dovrebbe essere abbastanza sicuro.

if Signaled then 
begin 
    I := 0; 
    while I < Length(ThreadPool.FQueues.Current) do 
    begin 
    {$IFDEF USE_BUGFIX} 
    TMonitor.Enter(ThreadPool.FQueues); 
    try 
    {$ENDIF} 
     if (ThreadPool.FQueues.Current[I] <> nil) and (ThreadPool.FQueues.Current[I] <> WorkQueue) and ThreadPool.FQueues.Current[I].TrySteal(Item) then 
     Break; 
    {$IFDEF USE_BUGFIX} 
    finally 
     TMonitor.Exit(ThreadPool.FQueues); 
    end; 
    {$ENDIF} 
    Inc(I); 
    end; 
    if I <> Length(ThreadPool.FQueues.Current) then 
    Break; 
    LookedForSteals := True; 
end 

Ora abbiamo bisogno di un ambiente di test per vedere il furto:

program WatchStealingTasks; 

{$APPTYPE CONSOLE} 
{$R *.res} 

uses 
    Winapi.Windows, 
    System.SysUtils, 
    System.Threading, 
    System.Classes, 
    System.Math; 

procedure OutputDebugStr(const AStr: string); overload; 
begin 
    OutputDebugString(PChar(AStr)); 
end; 

procedure OutputDebugStr(const AFormat: string; const AParams: array of const); overload; 
begin 
    OutputDebugStr(Format(AFormat, AParams)); 
end; 

function CreateInnerTask(AThreadId: Cardinal; AValue: Integer; APool: TThreadPool): ITask; 
begin 
    Result := TTask.Run(
     procedure 
    begin 
     Sleep(AValue); 
     if AThreadId <> TThread.CurrentThread.ThreadID 
     then 
     OutputDebugStr('[%d] executed stolen task from [%d]', [TThread.CurrentThread.ThreadID, AThreadId]) 
     else 
     OutputDebugStr('[%d] executed task', [TThread.CurrentThread.ThreadID]); 
    end, APool); 
end; 

function CreateTask(AValue: Integer; APool: TThreadPool): ITask; 
begin 
    Result := TTask.Run(
    procedure 
    var 
     LIdx: Integer; 
     LTasks: TArray<ITask>; 
    begin 
     // Create three inner tasks per task 
     SetLength(LTasks, 3); 
     for LIdx := Low(LTasks) to High(LTasks) do 
     begin 
      LTasks[LIdx] := CreateInnerTask(TThread.CurrentThread.ThreadID, AValue, APool); 
     end; 
     OutputDebugStr('[%d] waiting for tasks completion', [TThread.CurrentThread.ThreadID]); 
     TTask.WaitForAll(LTasks); 
     OutputDebugStr('[%d] task finished', [TThread.CurrentThread.ThreadID]); 
    end, APool); 
end; 

procedure Test; 
var 
    LPool: TThreadPool; 
    LIdx: Integer; 
    LTasks: TArray<ITask>; 
begin 
    OutputDebugStr('Test started'); 
    try 
    LPool := TThreadPool.Create; 
    try 
     // Create three tasks 
     SetLength(LTasks, 3); 
     for LIdx := Low(LTasks) to High(LTasks) do 
     begin 
      // Let's put some heavy work (200ms) on the first tasks shoulder 
      // and the other tasks just some light work (20ms) to do 
      LTasks[LIdx] := CreateTask(IfThen(LIdx = 0, 200, 20), LPool); 
     end; 
     TTask.WaitForAll(LTasks); 
    finally 
     LPool.Free; 
    end; 
    finally 
    OutputDebugStr('Test completed'); 
    end; 
end; 

begin 
    try 
    Test; 
    except 
    on E: Exception do 
     Writeln(E.ClassName, ': ', E.Message); 
    end; 
    ReadLn; 

end. 

E il registro di debug è

 
Debug-Ausgabe: Test started Prozess WatchStealingTasks.exe (4532) 
Thread-Start: Thread-ID: 2104. Prozess WatchStealingTasks.exe (4532) 
Thread-Start: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532) 
Thread-Start: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2188] waiting for tasks completion Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2104] waiting for tasks completion Prozess WatchStealingTasks.exe (4532) 
Thread-Start: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [4948] waiting for tasks completion Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2188] task finished Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [4948] task finished Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2104] executed task Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2188] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [4948] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: [2104] task finished Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: Thread Exiting: 2188 Prozess WatchStealingTasks.exe (4532) 
Debug-Ausgabe: Thread Exiting: 4948 Prozess WatchStealingTasks.exe (4532) 
Thread-Ende: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532) 
Thread-Ende: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532) 
Thread-Ende: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532) 

Ok, rubando dovrebbe essere ora di lavoro con qualsiasi numero di thread di lavoro , quindi tutto va bene?

No

Questa piccola applicazione di prova non sarà giunta al termine, perché ora si blocca all'interno del distruttore del pool di thread. L'ultimo thread di lavoro non terminerà causato da

procedure TThreadPool.TQueueWorkerThread.Execute; 

... 

if ThreadPool.FWorkerThreadCount = 1 then 
begin 
    // it is the last thread after all tasks executed, but 
    // FQueuedRequestCount is still on 7 - WTF 
    if ThreadPool.FQueuedRequestCount = 0 then 
    begin 

Un altro bug da risolvere qui ... perché quando in attesa di attività con Task.WaitForAll poi tutte le attività che si sta ora aspettando, sono stati eseguiti internamente, ma non diminuirà lo FQueuedRequestCount.

fissaggio che

function TThreadPool.TryRemoveWorkItem(const WorkerData: IThreadPoolWorkItem): Boolean; 
begin 
    Result := (QueueThread <> nil) and (QueueThread.WorkQueue <> nil); 
    if Result then 
    Result := QueueThread.WorkQueue.LocalFindAndRemove(WorkerData); 
    {$IFDEF USE_BUGFIX} 
    if Result then 
    DecWorkRequestCount; 
    {$ENDIF} 
end; 

e ora corre come avrebbe dovuto fare in una sola volta.


Aggiornamento

Come un commento di Uwe abbiamo anche bisogno di risolvere il System.Generics.Collections.TArray.Copy<T>

class procedure TArray.Copy<T>(const Source, Destination: array of T; SourceIndex, DestIndex, Count: NativeInt); 
{$IFDEF USE_BUGFIX} 
begin 
    CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count); 
    if IsManagedType(T) then 
    System.CopyArray(Pointer(@Destination[DestIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count) 
    else 
    System.Move(Pointer(@Source[SourceIndex])^,Pointer(@Destination[DestIndex])^, Count * SizeOf(T)); 
end; 
{$ELSE} 
begin 
    CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count); 
    if IsManagedType(T) then 
    System.CopyArray(Pointer(@Destination[SourceIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count) 
    else 
    System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^, Count * SizeOf(T)); 
end; 
{$ENDIF} 

un semplice controllo fissata alla prova:

procedure TestArrayCopy; 
var 
    LArr1, LArr2: TArray<Integer>; 
begin 
    LArr1 := TArray<Integer>.Create(10, 11, 12, 13); 
    LArr2 := TArray<Integer>.Create(20, 21); 
    // copy the last 2 elements from LArr1 to LArr2 
    TArray.Copy<Integer>(LArr1, LArr2, 2, 0, 2); 
end; 
  • con XE7 otterrete un'eccezione
  • con XE7 Update1 otterrete
     
    LArr1 = (10, 11, 0, 0) 
    LArr2 = (20, 21) 
    
  • con quella modifica di cui sopra avranno
     
    LArr1 = (10, 11, 12, 13) 
    LArr2 = (12, 13) 
    
+2

Il takeaway di tutto questo è che il codice non è al momento attendibile. Quando Emba assumerà alcuni sviluppatori che hanno la capacità di scrivere il codice multi-thread corretto? Questo è 'TMonitor' tutto da capo. –

+1

Sì, la maggior parte delle volte vedremo le buone idee implementate male e non realmente testate o testate sulla piattaforma sbagliata o semplicemente un test "compila". –

4

Questo non è un bug in System.CopyArray. Di progettazione supporta solo i tipi gestiti. Il bug è infatti in TArray.Copy<T>. Ciò equivale a chiamare System.CopyArray senza discriminare se T è un tipo gestito.

Tuttavia, l'ultima versione di TArray.Copy<T>, dall'aggiornamento XE7 1 non sembra soffrire del problema descritto.Il codice è simile al seguente:

class procedure TArray.Copy<T>(const Source, Destination: array of T; 
    SourceIndex, DestIndex, Count: NativeInt); 
begin 
    CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, 
    Length(Source), DestIndex, Length(Destination), Count); 
    if IsManagedType(T) then 
    System.CopyArray(Pointer(@Destination[SourceIndex]), 
     Pointer(@Source[SourceIndex]), TypeInfo(T), Count) 
    else 
    System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^, 
     Count * SizeOf(T)); 
end; 

Se non mi sbaglio nella mia analisi, è sufficiente applicare l'aggiornamento 1 per risolvere i problemi con System.CopyArray.


Ma come sottolinea Uwe nei commenti di seguito, questo codice è ancora falso. Utilizza SourceIndex erroneamente dove deve essere utilizzato DestIndex. E i parametri di origine e destinazione sono passati nell'ordine sbagliato. Ci si chiede anche perché l'autore abbia scritto Pointer(@Destination[SourceIndex])^ anziché Destination[SourceIndex]. Trovo che questa situazione sia terribilmente deprimente. Come può Embarcadero rilasciare il codice di tale qualità spaventosa?


Più di quanto sopra sono i problemi con TSparseArray<T>. Che assomiglia a questo:

function TSparseArray<T>.Add(const Item: T): Integer; 
var 
    I: Integer; 
    LArray, NewArray: TArray<T>; 
begin 
    while True do 
    begin 
    LArray := FArray; 
    TMonitor.Enter(FLock); 
    try 
     for I := 0 to Length(LArray) - 1 do 
     begin 
     if LArray[I] = nil then 
     begin 
      FArray[I] := Item; 
      Exit(I); 
     end else if I = Length(LArray) - 1 then 
     begin 
      if LArray <> FArray then 
      Continue; 
      SetLength(NewArray, Length(LArray) * 2); 
      TArray.Copy<T>(LArray, NewArray, I + 1); 
      NewArray[I + 1] := Item; 
      Exit(I + 1); 
     end; 
     end; 
    finally 
     TMonitor.Exit(FLock); 
    end; 
    end; 
end; 

L'unica volta FArray viene inizializzato è nel costruttore TSparseArray<T>. Ciò significa che se l'array diventa pieno, gli elementi vengono aggiunti e persi. Presumibilmente, lo I = Length(LArray) - 1 ha lo scopo di estendere la lunghezza di FArray e acquisire il nuovo elemento. Tuttavia, si noti anche che espone FArray tramite la proprietà Current. E questa esposizione non è protetta dalla serratura. Quindi, non vedo come questa classe possa comportarsi in modo utile una volta che FArray diventa pieno.

Suggerisco di costruire un esempio in cui FArray diventa pieno e dimostrare che gli elementi aggiunti vengono persi. Invia un bug report dimostrativo e collegando a questa domanda.

+0

Ciò non * risolve veramente * 'TSparseArray '. L'eccezione non aumenterà più e tutto * sembra * va bene, ma la copia strana e inutile è ancora lì e lascia un cattivo odore. Proverò a scavare più nel codice perché funzioni senza tutta la parte di copia. –

+0

BTW Hai ragione, "System.CopyArray" funziona come documentato. La mia colpa è stata la stessa cosa dello scrittore di 'TArray.Copy ' - credere/assumere invece di conoscere/leggere i documenti: o) –

+2

@SirRufo Penso che sia molto più profondo di questo. Non è sufficiente usare l'opzione 2 nella tua domanda perché 'FArray' è esposto pubblicamente senza blocco. Penso che non ci sia una soluzione facile da applicare, anche perché non possiamo conoscere il design di questa classe.Personalmente mi aspetto che Emba impieghi circa 5 anni per sistemare questa nuova libreria di threading, così come ci è voluto tanto tempo per sistemare 'TMonitor'. Non credo che abbiano l'abilità di scrivere una libreria di threading pulita. –

Problemi correlati