Non importa se gli articoli sono scritti su TSparseArray<T>
, perché è necessario solo se un thread di lavoro ha terminato tutte le attività a lui delegate e un altro thread di lavoro non è ancora terminato. A questo punto il thread inattivo guarda le code degli altri gradini all'interno della piscina e tenta di rubare un po 'di lavoro.
Se una coda non è stata inserita in questo array, non è visibile ai thread inattivi e pertanto il carico di lavoro non può essere condiviso.
per risolvere questo ho scelto l'opzione 2
function TSparseArray<T>.Add(const Item: T): Integer;
...
SetLength(NewArray, Length(LArray) * 2);
TArray.Copy<T>(LArray, NewArray, I + 1); // <- No Exception here with XE7U1
NewArray[I + 1] := Item;
{$IFDEF USE_BUGFIX}
FArray := NewArray;
{$ENDIF}
Exit(I + 1);
Ma che rubare parte è rischioso attuata senza alcun blocco
procedure TThreadPool.TQueueWorkerThread.Execute;
...
if Signaled then
begin
I := 0;
while I < Length(ThreadPool.FQueues.Current) do
begin
if (ThreadPool.FQueues.Current[I] <> nil)
and (ThreadPool.FQueues.Current[I] <> WorkQueue)
and ThreadPool.FQueues.Current[I].TrySteal(Item)
then
Break;
Inc(I);
end;
if I <> Length(ThreadPool.FQueues.Current) then
Break;
LookedForSteals := True;
end
La lunghezza array è solo crescendo così
while I < Length(ThreadPool.FQueues.Current) do
e
if I <> Length(ThreadPool.FQueues.Current) then
dovrebbe essere abbastanza sicuro.
if Signaled then
begin
I := 0;
while I < Length(ThreadPool.FQueues.Current) do
begin
{$IFDEF USE_BUGFIX}
TMonitor.Enter(ThreadPool.FQueues);
try
{$ENDIF}
if (ThreadPool.FQueues.Current[I] <> nil) and (ThreadPool.FQueues.Current[I] <> WorkQueue) and ThreadPool.FQueues.Current[I].TrySteal(Item) then
Break;
{$IFDEF USE_BUGFIX}
finally
TMonitor.Exit(ThreadPool.FQueues);
end;
{$ENDIF}
Inc(I);
end;
if I <> Length(ThreadPool.FQueues.Current) then
Break;
LookedForSteals := True;
end
Ora abbiamo bisogno di un ambiente di test per vedere il furto:
program WatchStealingTasks;
{$APPTYPE CONSOLE}
{$R *.res}
uses
Winapi.Windows,
System.SysUtils,
System.Threading,
System.Classes,
System.Math;
procedure OutputDebugStr(const AStr: string); overload;
begin
OutputDebugString(PChar(AStr));
end;
procedure OutputDebugStr(const AFormat: string; const AParams: array of const); overload;
begin
OutputDebugStr(Format(AFormat, AParams));
end;
function CreateInnerTask(AThreadId: Cardinal; AValue: Integer; APool: TThreadPool): ITask;
begin
Result := TTask.Run(
procedure
begin
Sleep(AValue);
if AThreadId <> TThread.CurrentThread.ThreadID
then
OutputDebugStr('[%d] executed stolen task from [%d]', [TThread.CurrentThread.ThreadID, AThreadId])
else
OutputDebugStr('[%d] executed task', [TThread.CurrentThread.ThreadID]);
end, APool);
end;
function CreateTask(AValue: Integer; APool: TThreadPool): ITask;
begin
Result := TTask.Run(
procedure
var
LIdx: Integer;
LTasks: TArray<ITask>;
begin
// Create three inner tasks per task
SetLength(LTasks, 3);
for LIdx := Low(LTasks) to High(LTasks) do
begin
LTasks[LIdx] := CreateInnerTask(TThread.CurrentThread.ThreadID, AValue, APool);
end;
OutputDebugStr('[%d] waiting for tasks completion', [TThread.CurrentThread.ThreadID]);
TTask.WaitForAll(LTasks);
OutputDebugStr('[%d] task finished', [TThread.CurrentThread.ThreadID]);
end, APool);
end;
procedure Test;
var
LPool: TThreadPool;
LIdx: Integer;
LTasks: TArray<ITask>;
begin
OutputDebugStr('Test started');
try
LPool := TThreadPool.Create;
try
// Create three tasks
SetLength(LTasks, 3);
for LIdx := Low(LTasks) to High(LTasks) do
begin
// Let's put some heavy work (200ms) on the first tasks shoulder
// and the other tasks just some light work (20ms) to do
LTasks[LIdx] := CreateTask(IfThen(LIdx = 0, 200, 20), LPool);
end;
TTask.WaitForAll(LTasks);
finally
LPool.Free;
end;
finally
OutputDebugStr('Test completed');
end;
end;
begin
try
Test;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
ReadLn;
end.
E il registro di debug è
Debug-Ausgabe: Test started Prozess WatchStealingTasks.exe (4532)
Thread-Start: Thread-ID: 2104. Prozess WatchStealingTasks.exe (4532)
Thread-Start: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532)
Thread-Start: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2104] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)
Thread-Start: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] task finished Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] task finished Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2104] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2104] task finished Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: Thread Exiting: 2188 Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: Thread Exiting: 4948 Prozess WatchStealingTasks.exe (4532)
Thread-Ende: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532)
Thread-Ende: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532)
Thread-Ende: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532)
Ok, rubando dovrebbe essere ora di lavoro con qualsiasi numero di thread di lavoro , quindi tutto va bene?
No
Questa piccola applicazione di prova non sarà giunta al termine, perché ora si blocca all'interno del distruttore del pool di thread. L'ultimo thread di lavoro non terminerà causato da
procedure TThreadPool.TQueueWorkerThread.Execute;
...
if ThreadPool.FWorkerThreadCount = 1 then
begin
// it is the last thread after all tasks executed, but
// FQueuedRequestCount is still on 7 - WTF
if ThreadPool.FQueuedRequestCount = 0 then
begin
Un altro bug da risolvere qui ... perché quando in attesa di attività con Task.WaitForAll
poi tutte le attività che si sta ora aspettando, sono stati eseguiti internamente, ma non diminuirà lo FQueuedRequestCount
.
fissaggio che
function TThreadPool.TryRemoveWorkItem(const WorkerData: IThreadPoolWorkItem): Boolean;
begin
Result := (QueueThread <> nil) and (QueueThread.WorkQueue <> nil);
if Result then
Result := QueueThread.WorkQueue.LocalFindAndRemove(WorkerData);
{$IFDEF USE_BUGFIX}
if Result then
DecWorkRequestCount;
{$ENDIF}
end;
e ora corre come avrebbe dovuto fare in una sola volta.
Aggiornamento
Come un commento di Uwe abbiamo anche bisogno di risolvere il System.Generics.Collections.TArray.Copy<T>
class procedure TArray.Copy<T>(const Source, Destination: array of T; SourceIndex, DestIndex, Count: NativeInt);
{$IFDEF USE_BUGFIX}
begin
CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count);
if IsManagedType(T) then
System.CopyArray(Pointer(@Destination[DestIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count)
else
System.Move(Pointer(@Source[SourceIndex])^,Pointer(@Destination[DestIndex])^, Count * SizeOf(T));
end;
{$ELSE}
begin
CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count);
if IsManagedType(T) then
System.CopyArray(Pointer(@Destination[SourceIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count)
else
System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^, Count * SizeOf(T));
end;
{$ENDIF}
un semplice controllo fissata alla prova:
procedure TestArrayCopy;
var
LArr1, LArr2: TArray<Integer>;
begin
LArr1 := TArray<Integer>.Create(10, 11, 12, 13);
LArr2 := TArray<Integer>.Create(20, 21);
// copy the last 2 elements from LArr1 to LArr2
TArray.Copy<Integer>(LArr1, LArr2, 2, 0, 2);
end;
- con XE7 otterrete un'eccezione
- con XE7 Update1 otterrete
LArr1 = (10, 11, 0, 0)
LArr2 = (20, 21)
- con quella modifica di cui sopra avranno
LArr1 = (10, 11, 12, 13)
LArr2 = (12, 13)
Anche se 'TArray.Copy()' sono stati risolti, penserei l'assegnazione/modifica mancante di 'FArray' sarebbe un bug separato che deve essere corretto. Altrimenti, come è 'Add()' aggiungendo 'Item' alla matrice? Non ho ancora visto il codice sorgente di 'TSparseArray'. –
'TSparseArray' è strano. Perché sente la necessità di creare un oggetto di blocco separato? Cosa c'è di sbagliato nel blocco stesso? –