2012-12-01 19 views
18

Attualmente sto lavorando a un'applicazione client/server Delphi XE3 per trasferire file (con i componenti FTP di Indy). La parte client controlla una cartella, ottiene un elenco dei file all'interno, li carica sul server ed elimina gli originali. Il caricamento viene effettuato da un thread separato, che elabora i file uno per uno. I file possono variare da 0 a poche migliaia e anche le loro dimensioni variano molto.Sincronizzazione caricamento file multithread

È un'applicazione Firemonkey compilata per OSX e Windows, quindi ho dovuto utilizzare TThread anziché OmniThreadLibrary, che preferivo. Il mio cliente segnala che l'applicazione si blocca casualmente. Non potrei duplicarlo, ma poiché non ho tanta esperienza con TThread, potrei aver messo da qualche parte la condizione di deadlock. Ho letto un sacco di esempi, ma non sono ancora sicuro su alcune specifiche del multithread.

La struttura dell'app è semplice:
Un timer nel thread principale controlla la cartella e ottiene informazioni su ogni file in un record, che entra in un TList generico. Questo elenco mantiene informazioni sui nomi dei file, le dimensioni, i progressi, se il file è stato completamente caricato o deve essere ritentato. Tutto ciò che viene visualizzato in una griglia con barre di avanzamento, ecc. Questo elenco è accessibile solo dal thread principale. Successivamente gli elementi dall'elenco vengono inviati al thread chiamando il metodo AddFile (codice di seguito). Il thread memorizza tutti i file in una coda thread-safe come questa http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
Quando il file viene caricato, il thread del caricatore notifica al thread principale una chiamata a Sincronizza.
Il thread principale chiama periodicamente il metodo Uploader.GetProgress per verificare l'avanzamento del file corrente e visualizzarlo. Questa funzione non è effettivamente thread-safe, ma potrebbe causare un deadlock o solo dati errati restituiti?

Quale sarebbe un modo sicuro ed efficiente per eseguire il controllo di avanzamento?

Quindi questo approccio è OK o mi è sfuggito qualcosa? come lo faresti?
Ad esempio, ho pensato di creare un nuovo thread solo per leggere il contenuto della cartella. Ciò significa che il TList che uso deve essere reso thread-safe, ma deve essere sempre accessibile per aggiornare le informazioni visualizzate nella griglia della GUI. Non tutta la sincronizzazione rallenterebbe la GUI?

Ho inserito il codice semplificato di seguito nel caso qualcuno volesse guardarlo. In caso contrario, sarei felice di sentire alcune opinioni su ciò che dovrei usare in generale. Gli obiettivi principali sono di lavorare sia su OSX che su Windows; essere in grado di visualizzare le informazioni su tutti i file e il progresso di quello corrente; e di essere reattivo indipendentemente dal numero e dalla dimensione dei file.

Questo è il codice del thread di caricamento. Ho rimosso alcuni di essi per facilitarne la lettura:

type 
    TFileStatus = (fsToBeQueued, fsUploaded, fsQueued); 
    TFileInfo = record 
    ID: Integer; 
    Path: String; 
    Size: Int64; 
    UploadedSize: Int64; 
    Status: TFileStatus; 
    end; 

    TUploader = class(TThread) 
    private 
    FTP: TIdFTP; 
    fQueue: TThreadedQueue<TFileInfo>; 
    fCurrentFile: TFileInfo; 
    FUploading: Boolean; 
    procedure ConnectFTP; 
    function UploadFile(aFileInfo: TFileInfo): String; 
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
    procedure SignalComplete; 
    procedure SignalError(aError: String); 
    protected 
    procedure Execute; override; 
    public 
    property Uploading: Boolean read FUploading; 
    constructor Create; 
    destructor Destroy; override; 
    procedure Terminate; 
    procedure AddFile(const aFileInfo: TFileInfo); 
    function GetProgress: TFileInfo; 
    end; 

procedure TUploader.AddFile(const aFileInfo: TFileInfo); 
begin 
    fQueue.Enqueue(aFileInfo); 
end; 

procedure TUploader.ConnectFTP; 
begin 
    ... 
    FTP.Connect; 
end; 

constructor TUploader.Create; 
begin 
    inherited Create(false); 
    FreeOnTerminate := false; 
    fQueue := TThreadedQueue<TFileInfo>.Create; 
    // Create the TIdFTP and set ports and other params 
    ... 
end; 

destructor TUploader.Destroy; 
begin 
    fQueue.Close; 
    fQueue.Free; 
    FTP.Free; 
    inherited; 
end; 

// Process the whole queue and inform the main thread of the progress 
procedure TUploader.Execute; 
var 
    Temp: TFileInfo; 
begin 
    try 
    ConnectFTP; 
    except 
    on E: Exception do 
     SignalError(E.Message); 
    end; 

    // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails 
    while fQueue.Peek(fCurrentFile) = wrSignaled do 
    try 
     if UploadFile(fCurrentFile) = '' then 
     begin 
     fQueue.Dequeue(Temp); // Delete the item from the queue if succesful 
     SignalComplete; 
     end; 
    except 
     on E: Exception do 
     SignalError(E.Message); 
    end; 
end; 

// Return the current file's info to the main thread. Used to update the progress indicators 
function TUploader.GetProgress: TFileInfo; 
begin 
    Result := fCurrentFile; 
end; 

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar 
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
begin 
    fCurrentFile.UploadedSize := AWorkCount; 
end; 

procedure TUploader.SignalComplete; 
begin 
    Synchronize(
    procedure 
    begin 
     frmClientMain.OnCompleteFile(fCurrentFile); 
    end); 
end; 

procedure TUploader.SignalError(aError: String); 
begin 
    try 
    FTP.Disconnect; 
    except 
    end; 
    if fQueue.Closed then 
    Exit; 

    Synchronize(
    procedure 
    begin 
     frmClientMain.OnUploadError(aError); 
    end); 
end; 

// Clear the queue and terminate the thread 
procedure TUploader.Terminate; 
begin 
    fQueue.Close; 
    inherited; 
end; 

function TUploader.UploadFile(aFileInfo: TFileInfo): String; 
begin 
    Result := 'Error'; 
    try 
    if not FTP.Connected then 
     ConnectFTP; 
    FUploading := true; 
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));  
    Result := ''; 
    finally 
    FUploading := false; 
    end; 
end; 

e parti del thread principale che interagiscono con l'uploader:

...... 
// Main form 
    fUniqueID: Integer; // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted) 
    fUploader: TUploader;   // The uploader thread 
    fFiles: TList<TFileInfo>; 
    fCurrentFileName: String;  // Used to display the progress 
    function IndexOfFile(aID: Integer): Integer; //Return the index of the record inside the fFiles given the file ID 
    public 
    procedure OnCompleteFile(aFileInfo: TFileInfo); 
    procedure OnUploadError(aError: String); 
    end; 

// This is called by the uploader with Synchronize 
procedure TfrmClientMain.OnUploadError(aError: String); 
begin 
    // show and log the error 
end; 

// This is called by the uploader with Synchronize 
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo); 
var 
    I: Integer; 
begin 
    I := IndexOfFile(aFileInfo.ID); 
    if (I >= 0) and (I < fFiles.Count) then 
    begin 
    aFileInfo.Status := fsUploaded; 
    aFileInfo.UploadedSize := aFileInfo.Size; 
    FFiles.Items[I] := aFileInfo; 
    Inc(FFilesUploaded); 
    TFile.Delete(aFileInfo.Path); 
    colProgressImg.UpdateCell(I); 
    end; 
end; 

procedure TfrmClientMain.ProcessFolder; 
var 
    NewFiles: TStringDynArray; 
    I, J: Integer; 
    FileInfo: TFileInfo; 
begin 
    // Remove completed files from the list if it contains more than XX files 
    while FFiles.Count > 1000 do 
     if FFiles[0].Status = fsUploaded then 
     begin 
     Dec(FFilesUploaded); 
     FFiles.Delete(0); 
     end else 
     Break; 

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories); 
    for I := 0 to Length(NewFiles) - 1 do 
    begin 
      FileInfo.ID := FUniqueID; 
      Inc(FUniqueID); 
      FileInfo.Path := NewFiles[I]; 
      FileInfo.Size := GetFileSizeByName(NewFiles[I]); 
      FileInfo.UploadedSize := 0; 
      FileInfo.Status := fsToBeQueued; 
      FFiles.Add(FileInfo); 

     if (I mod 100) = 0 then 
     begin 
     UpdateStatusLabel; 
     grFiles.RowCount := FFiles.Count; 
     Application.ProcessMessages; 
     if fUploader = nil then 
      break; 
     end; 
    end; 

    // Send the new files and resend failed to the uploader thread 
    for I := 0 to FFiles.Count - 1 do 
     if (FFiles[I].Status = fsToBeQueued) then 
     begin 
     if fUploader = nil then 
      Break; 
     FileInfo := FFiles[I]; 
     FileInfo.Status := fsQueued; 
     FFiles[I] := FileInfo; 
     SaveDebug(1, 'Add: ' + ExtractFileName(FFiles[I].Path)); 
     FUploader.AddFile(FFiles[I]); 
     end; 
end; 

procedure TfrmClientMain.tmrGUITimer(Sender: TObject); 
var 
    FileInfo: TFileInfo; 
    I: Integer; 
begin 
    if (fUploader = nil) or not fUploader.Uploading then 
    Exit; 
    FileInfo := fUploader.GetProgress; 
    I := IndexOfFile(FileInfo.ID); 
    if (I >= 0) and (I < fFiles.Count) then 
    begin 
    fFiles.Items[I] := FileInfo; 
    fCurrentFileName := ExtractFileName(FileInfo.Path); 
    colProgressImg.UpdateCell(I); 
    end; 
end; 

function TfrmClientMain.IndexOfFile(aID: Integer): Integer; 
var 
    I: Integer; 
begin 
    Result := -1; 
    for I := 0 to FFiles.Count - 1 do 
    if FFiles[I].ID = aID then 
     Exit(I); 
end; 
+0

Non sono sicuro e non ho testato .. ma hai provato ad aggiungere un TIdAntiFreeze e controllato se il comportamento è lo stesso? (FMX.IdAntiFreeze) – Whiler

+2

TIdAntiFreeze è progettato per impedire il congelamento della GUI quando si utilizza un componente di Indy dal thread principale (ad esempio rilasciato nel modulo). Lo uso in un thread separato, quindi non vedo come sarebbe utile. Almeno per quanto ne so ... – VGeorgiev

+0

A prima vista, la gestione degli errori mi sembra sbagliata. Ad esempio, nel metodo Execute, se la chiamata ConnectFTP fallisce, si _eat_ l'eccezione (dopo aver informato l'errore) e si continuano a inviare chiamate a UploadFile. IMHO devi _clean_ that, e lascia che il thread muoia con FatalException o gestisca correttamente l'eccezione all'interno del metodo Execute, ad esempio riprovando la connessione un numero di volte, forse a seconda del tipo di errore. D'altra parte, se si dispone di un elenco nel thread principale, non riesco a capire perché è necessaria una coda nei singoli thread. – jachguate

risposta

0

questo potrebbe non essere il problema, ma TFileInfo è un record.

Ciò significa che quando viene passato come parametro (non const/var), viene copiato. Ciò può causare problemi con elementi come le stringhe nel record che non ottengono il conteggio dei riferimenti aggiornato quando il record viene copiato.

Una cosa da provare sarebbe quella di renderlo una classe e passare un'istanza come parametro (cioè un puntatore ai dati sull'heap).

Qualcos'altro da tenere d'occhio è l'Int64 condiviso (ad esempio i valori delle dimensioni) sui sistemi a 32 bit con thread.

L'aggiornamento/lettura non viene eseguito in modo atomico & non si dispone di alcuna protezione specifica, quindi è possibile che una lettura del valore ottenga corrispondenze tra i 32 bit superiori e inferiori dovuti alla filettatura. (Ad esempio, Leggi 32 bit superiori, Scrivi 32 bit superiori, Scrivi 32 bit inferiori, Leggi 32 bit inferiori, con letture & scrivi in ​​thread diversi). Probabilmente questo non causa i problemi che stai vedendo e a meno che tu non stia lavorando con trasferimenti di file> 4GB, è improbabile che ti causino problemi.

0

I deadlock sono decisamente difficili da individuare, ma questo potrebbe essere il problema. Nel tuo codice, non ho visto che hai aggiunto alcun timeout al file di accodamento, sbirciatina o deseleziona, il che significa che prenderà il valore predefinito di Infinite.

L'enqueue ha questa linea in esso - significato, come ogni oggetto di sincronizzazione, bloccherà fino a quando il Invio Completa (si blocca il monitor) o si verifica il timeout (in quanto non si dispone di un timeout, si attenderà per sempre)

TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult; 
...  
if not TMonitor.Enter(FQueue, Timeout) 

che sto anche andando a fare l'ipotesi che si implementato PEEK con l'aiuto delle Dequeue - solo che in realtà non rimuove l'elemento.

che appare di eseguire il proprio timeout - tuttavia, è ancora il seguente:

function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult; 
... 
if not TMonitor.Enter(FQueue, Timeout) 

Dove timeout è Infinito - quindi, se siete nel metodo peek in attesa che venga segnalato con un infinito timeout, quindi non è possibile accodare qualcosa da un secondo thread senza bloccare quel thread in attesa che il metodo peek si completi in un timeout infinito.

Ecco un frammento del commento da TMonitor

Enter locks the monitor object with an optional timeout (in ms) value. 
Enter without a timeout will wait until the lock is obtained. 
If the procedure returns it can be assumed that the lock was acquired. 
Enter with a timeout will return a boolean status indicating whether or 
not the lock was obtained (True) or the attempt timed out prior to 
acquire the lock (False). Calling Enter with an INFINITE timeout 
is the same as calling Enter without a timeout. 

Poiché l'implementazione utilizza infinito di default, e un valore TMonitor.Spinlock non è previsto, che possa bloccare il filo finché può acquisire l'oggetto FQueue .

Il mio suggerimento sarebbe quello di modificare il codice come segue:

// Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails 
    while true do 
    case fQueue.Peek(fCurrentFile,10) 
     wrSignaled: 
     try 
      if UploadFile(fCurrentFile) = '' then 
      begin 
      fQueue.Dequeue(Temp); // Delete the item from the queue if succesful 
      SignalComplete; 
      end; 
     except 
      on E: Exception do 
      SignalError(E.Message); 
     end; 
     wrTimeout: sleep(10); 
     wrIOCompletion, 
     wrAbandoned, 
     wrError: break; 
    end; //case 

In questo modo, sbirciatina non terrà la serratura FQueue a tempo indeterminato, lasciando una finestra per l'Enqueue di acquisire e aggiungere il file da il thread principale (UI).

+0

Grazie per la risposta dettagliata. Sono d'accordo che le due linee TMonitor.Enter() potrebbero causare un deadlock. TMonitor.Enter() in TSimpleThreadedQueue.Peek/Dequeue è seguito da un TMonitor.Wait(). Se ho capito bene, l'attesa rilascia temporaneamente il blocco e consente all'altro thread di inserire un blocco nel metodo Enqueue, pertanto non dovrebbe causare un deadlock. L'attesa quindi tenta di posizionare nuovamente un blocco. La situazione di stallo che mi era capitata molto raramente, mentre se fosse il caso che descrivessi accadrebbe quasi ogni volta perché il thread inizia prima che ci siano dei dati in coda. – VGeorgiev

+0

Hummm .. Guardando all'origine di TMonitor.Enter, non penso che sia il caso se non si imposta uno SpinCount - la maggior parte del codice per lo spin viene saltata dove SpinCount = 0 Dove alla fine si arriva a questo riga: Risultato: = MonitorSupport.WaitOrSignalObject (nil, GetEvent, Timeout) = WAIT_OBJECT_0; – SilverKnight

+0

Credo che sia il caso - tuttavia, dalla mia lettura e cercando di capire cosa fa il monitor, esso gira per un determinato periodo di tempo (che dovrebbe essere molto breve) - quando questo diventa più lungo, allora hai la possibilità di stallo - date un'occhiata a questo articolo Wiki su SpinLock - http://en.wikipedia.org/wiki/Spinlock – SilverKnight

0

Questo potrebbe essere un tentativo lungo, ma ecco un'altra possibilità [la precedente risposta potrebbe essere più probabile] (qualcosa che ho appena trovato, ma che avevo già conosciuto prima): L'uso di Synchronize potrebbe causare lo stallo.Qui è un blog sul perché questo accade: Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx

Il punto pertinente dall'articolo:

Discussione A chiama Sincronizza (MethodA)

Discussione B chiama Sincronizza (MethodB)

Quindi, all'interno del contesto della discussione principale:

Chiamate di thread principale CheckSynchronize() durante l'elaborazione dei messaggi

CheckSynchronize è implementato per l'elaborazione batch di tutte le chiamate in attesa (*). Quindi raccoglie la coda di chiamate in attesa (contenente MethodA e MethodB) e loop attraverso di loro uno per uno.

Il metodoA viene eseguito nel contesto del thread principale. Assumere MethodA chiama ThreadB.WaitFor

WaitFor chiama CheckSynchronize di elaborare qualsiasi chiamate in attesa per la sincronizzazione

In teoria, questo dovrebbe quindi elaborare di ThreadB Sincronizza (MethodB), permettendo Discussione B per completare. Tuttavia, MethodB è già un possesso della prima chiamata CheckSynchronize, quindi non riceve mai il numero chiamato.

DEADLOCK!

Embarcadero QC article che descrive il problema in modo più dettagliato.

Mentre non vedo alcuna chiamata ProcessMessages nel codice precedente o, peraltro, un WaitFor che verrebbe chiamato durante un Synchronize, potrebbe comunque essere un problema che nel punto in cui viene chiamata una sincronizzazione, un'altra discussione chiama anche la sincronizzazione, ma il thread principale è già sincronizzato e sta bloccando.

Questo non ha fatto clic su di me all'inizio, perché tendenzialmente evito Sincronizzare le chiamate come la piaga e di solito progettare gli aggiornamenti dell'interfaccia utente dai thread utilizzando altri metodi come il passaggio dei messaggi e gli elenchi di thread sicuro con la notifica dei messaggi anziché sincronizzare le chiamate.

+0

Grazie ancora per entrare in così tanti dettagli su questo. E mi dispiace per le risposte ritardate, sto viaggiando in questi giorni ... Quello che hai descritto qui mi è venuto in mente e pensavo che il problema fosse Synchronize. L'ho usato perché non c'è SendMessage/PostMessage su OSX o almeno non so se c'è un'alternativa. Quindi Synchronize era una soluzione facile in quel momento. Qualche tempo fa ho riscritto gran parte del codice e non ho più questo blocco, ma non so dove fosse il problema. Potrebbe essere stato correlato ai componenti di Indy TCP che ho usato, perché non erano molto stabili su OSX ... – VGeorgiev

+0

Nessun problema. Stavo cercando qualcos'altro e ho trovato questo post che non aveva una risposta. L'ho usato come esercizio di apprendimento per vedere cosa avrebbe fatto la classe monitor (non l'ho mai usata). Sono sempre interessato a diverse tecniche che potrebbero migliorare il mio codice threaded (principalmente nel ridurre l'utilizzo della CPU, ma anche in diversi metodi di comunicazione). È stato un tuffo interessante nella classe, e spero che anche qualcun altro trarrà beneficio dalla discussione. – SilverKnight