Attualmente sto lavorando a un'applicazione client/server Delphi XE3 per trasferire file (con i componenti FTP di Indy). La parte client controlla una cartella, ottiene un elenco dei file all'interno, li carica sul server ed elimina gli originali. Il caricamento viene effettuato da un thread separato, che elabora i file uno per uno. I file possono variare da 0 a poche migliaia e anche le loro dimensioni variano molto.Sincronizzazione caricamento file multithread
È un'applicazione Firemonkey compilata per OSX e Windows, quindi ho dovuto utilizzare TThread anziché OmniThreadLibrary, che preferivo. Il mio cliente segnala che l'applicazione si blocca casualmente. Non potrei duplicarlo, ma poiché non ho tanta esperienza con TThread, potrei aver messo da qualche parte la condizione di deadlock. Ho letto un sacco di esempi, ma non sono ancora sicuro su alcune specifiche del multithread.
La struttura dell'app è semplice:
Un timer nel thread principale controlla la cartella e ottiene informazioni su ogni file in un record, che entra in un TList generico. Questo elenco mantiene informazioni sui nomi dei file, le dimensioni, i progressi, se il file è stato completamente caricato o deve essere ritentato. Tutto ciò che viene visualizzato in una griglia con barre di avanzamento, ecc. Questo elenco è accessibile solo dal thread principale. Successivamente gli elementi dall'elenco vengono inviati al thread chiamando il metodo AddFile (codice di seguito). Il thread memorizza tutti i file in una coda thread-safe come questa http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
Quando il file viene caricato, il thread del caricatore notifica al thread principale una chiamata a Sincronizza.
Il thread principale chiama periodicamente il metodo Uploader.GetProgress per verificare l'avanzamento del file corrente e visualizzarlo. Questa funzione non è effettivamente thread-safe, ma potrebbe causare un deadlock o solo dati errati restituiti?
Quale sarebbe un modo sicuro ed efficiente per eseguire il controllo di avanzamento?
Quindi questo approccio è OK o mi è sfuggito qualcosa? come lo faresti?
Ad esempio, ho pensato di creare un nuovo thread solo per leggere il contenuto della cartella. Ciò significa che il TList che uso deve essere reso thread-safe, ma deve essere sempre accessibile per aggiornare le informazioni visualizzate nella griglia della GUI. Non tutta la sincronizzazione rallenterebbe la GUI?
Ho inserito il codice semplificato di seguito nel caso qualcuno volesse guardarlo. In caso contrario, sarei felice di sentire alcune opinioni su ciò che dovrei usare in generale. Gli obiettivi principali sono di lavorare sia su OSX che su Windows; essere in grado di visualizzare le informazioni su tutti i file e il progresso di quello corrente; e di essere reattivo indipendentemente dal numero e dalla dimensione dei file.
Questo è il codice del thread di caricamento. Ho rimosso alcuni di essi per facilitarne la lettura:
type
TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
TFileInfo = record
ID: Integer;
Path: String;
Size: Int64;
UploadedSize: Int64;
Status: TFileStatus;
end;
TUploader = class(TThread)
private
FTP: TIdFTP;
fQueue: TThreadedQueue<TFileInfo>;
fCurrentFile: TFileInfo;
FUploading: Boolean;
procedure ConnectFTP;
function UploadFile(aFileInfo: TFileInfo): String;
procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
procedure SignalComplete;
procedure SignalError(aError: String);
protected
procedure Execute; override;
public
property Uploading: Boolean read FUploading;
constructor Create;
destructor Destroy; override;
procedure Terminate;
procedure AddFile(const aFileInfo: TFileInfo);
function GetProgress: TFileInfo;
end;
procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
fQueue.Enqueue(aFileInfo);
end;
procedure TUploader.ConnectFTP;
begin
...
FTP.Connect;
end;
constructor TUploader.Create;
begin
inherited Create(false);
FreeOnTerminate := false;
fQueue := TThreadedQueue<TFileInfo>.Create;
// Create the TIdFTP and set ports and other params
...
end;
destructor TUploader.Destroy;
begin
fQueue.Close;
fQueue.Free;
FTP.Free;
inherited;
end;
// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
Temp: TFileInfo;
begin
try
ConnectFTP;
except
on E: Exception do
SignalError(E.Message);
end;
// Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
while fQueue.Peek(fCurrentFile) = wrSignaled do
try
if UploadFile(fCurrentFile) = '' then
begin
fQueue.Dequeue(Temp); // Delete the item from the queue if succesful
SignalComplete;
end;
except
on E: Exception do
SignalError(E.Message);
end;
end;
// Return the current file's info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
Result := fCurrentFile;
end;
// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
fCurrentFile.UploadedSize := AWorkCount;
end;
procedure TUploader.SignalComplete;
begin
Synchronize(
procedure
begin
frmClientMain.OnCompleteFile(fCurrentFile);
end);
end;
procedure TUploader.SignalError(aError: String);
begin
try
FTP.Disconnect;
except
end;
if fQueue.Closed then
Exit;
Synchronize(
procedure
begin
frmClientMain.OnUploadError(aError);
end);
end;
// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
fQueue.Close;
inherited;
end;
function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
Result := 'Error';
try
if not FTP.Connected then
ConnectFTP;
FUploading := true;
FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));
Result := '';
finally
FUploading := false;
end;
end;
e parti del thread principale che interagiscono con l'uploader:
......
// Main form
fUniqueID: Integer; // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
fUploader: TUploader; // The uploader thread
fFiles: TList<TFileInfo>;
fCurrentFileName: String; // Used to display the progress
function IndexOfFile(aID: Integer): Integer; //Return the index of the record inside the fFiles given the file ID
public
procedure OnCompleteFile(aFileInfo: TFileInfo);
procedure OnUploadError(aError: String);
end;
// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
// show and log the error
end;
// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
I: Integer;
begin
I := IndexOfFile(aFileInfo.ID);
if (I >= 0) and (I < fFiles.Count) then
begin
aFileInfo.Status := fsUploaded;
aFileInfo.UploadedSize := aFileInfo.Size;
FFiles.Items[I] := aFileInfo;
Inc(FFilesUploaded);
TFile.Delete(aFileInfo.Path);
colProgressImg.UpdateCell(I);
end;
end;
procedure TfrmClientMain.ProcessFolder;
var
NewFiles: TStringDynArray;
I, J: Integer;
FileInfo: TFileInfo;
begin
// Remove completed files from the list if it contains more than XX files
while FFiles.Count > 1000 do
if FFiles[0].Status = fsUploaded then
begin
Dec(FFilesUploaded);
FFiles.Delete(0);
end else
Break;
NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
for I := 0 to Length(NewFiles) - 1 do
begin
FileInfo.ID := FUniqueID;
Inc(FUniqueID);
FileInfo.Path := NewFiles[I];
FileInfo.Size := GetFileSizeByName(NewFiles[I]);
FileInfo.UploadedSize := 0;
FileInfo.Status := fsToBeQueued;
FFiles.Add(FileInfo);
if (I mod 100) = 0 then
begin
UpdateStatusLabel;
grFiles.RowCount := FFiles.Count;
Application.ProcessMessages;
if fUploader = nil then
break;
end;
end;
// Send the new files and resend failed to the uploader thread
for I := 0 to FFiles.Count - 1 do
if (FFiles[I].Status = fsToBeQueued) then
begin
if fUploader = nil then
Break;
FileInfo := FFiles[I];
FileInfo.Status := fsQueued;
FFiles[I] := FileInfo;
SaveDebug(1, 'Add: ' + ExtractFileName(FFiles[I].Path));
FUploader.AddFile(FFiles[I]);
end;
end;
procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
FileInfo: TFileInfo;
I: Integer;
begin
if (fUploader = nil) or not fUploader.Uploading then
Exit;
FileInfo := fUploader.GetProgress;
I := IndexOfFile(FileInfo.ID);
if (I >= 0) and (I < fFiles.Count) then
begin
fFiles.Items[I] := FileInfo;
fCurrentFileName := ExtractFileName(FileInfo.Path);
colProgressImg.UpdateCell(I);
end;
end;
function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
I: Integer;
begin
Result := -1;
for I := 0 to FFiles.Count - 1 do
if FFiles[I].ID = aID then
Exit(I);
end;
Non sono sicuro e non ho testato .. ma hai provato ad aggiungere un TIdAntiFreeze e controllato se il comportamento è lo stesso? (FMX.IdAntiFreeze) – Whiler
TIdAntiFreeze è progettato per impedire il congelamento della GUI quando si utilizza un componente di Indy dal thread principale (ad esempio rilasciato nel modulo). Lo uso in un thread separato, quindi non vedo come sarebbe utile. Almeno per quanto ne so ... – VGeorgiev
A prima vista, la gestione degli errori mi sembra sbagliata. Ad esempio, nel metodo Execute, se la chiamata ConnectFTP fallisce, si _eat_ l'eccezione (dopo aver informato l'errore) e si continuano a inviare chiamate a UploadFile. IMHO devi _clean_ that, e lascia che il thread muoia con FatalException o gestisca correttamente l'eccezione all'interno del metodo Execute, ad esempio riprovando la connessione un numero di volte, forse a seconda del tipo di errore. D'altra parte, se si dispone di un elenco nel thread principale, non riesco a capire perché è necessaria una coda nei singoli thread. – jachguate