2010-01-21 23 views
15

Sono in esecuzione test ma voglio eseguire 2 funzioni contemporaneamente. Ho una macchina fotografica e sto dicendo che si muoverà via suds, quindi effettuo l'accesso alla videocamera tramite SSH per verificare la velocità impostata dalla fotocamera. Quando controllo la velocità, la fotocamera si è arrestata, quindi non è disponibile alcuna velocità. C'è un modo per far funzionare queste funzioni contemporaneamente per testare la velocità della fotocamera. Il codice di esempio è il seguente:Come eseguire contemporaneamente due funzioni

class VerifyPan(TestAbsoluteMove): 

    def runTest(self): 

     self.dest.PanTilt._x=350 

     # Runs soap move command 
     threading.Thread(target = SudsMove).start() 

     self.command = './ptzpanposition -c 0 -u degx10' 

     # Logs into camera and checks speed 
     TestAbsoluteMove.Ssh(self) 

     # Position of the camera verified through Ssh (No decimal point added to the Ssh value) 
     self.assertEqual(self.Value, '3500') 

Ho provato il modulo di threading come indicato di seguito. Il thread non viene eseguito in sincronia con la funzione TestAbsoluteMove.Ssh(). C'è qualche altro codice che ho bisogno per fare questo lavoro.

Ho cercato di inserire argomenti nell'istruzione thread che indica che il thread viene eseguito quando la funzione Ssh(). Qualcuno sa cosa entrare in questa affermazione?

Scusa se non ho spiegato correttamente. La funzione 'SudsMove' sposta la telecamera e i registri delle funzioni 'Ssh' nella telecamera e controlla la velocità alla quale la telecamera si sta attualmente muovendo. Il problema è che nel momento in cui la funzione "Ssh" accede alla videocamera si è arrestata. Ho bisogno di eseguire entrambe le funzioni in parallelo in modo da poter controllare la velocità della fotocamera mentre è ancora in movimento.

Grazie

+0

È possibile registrare la velocità mentre si sposta la fotocamera? forse in un file di registro. Cosa stai cercando di ottenere? Non penso che il test sopra sia comunque utile. – fabrizioM

+0

Modificato la risposta per rispondere ai problemi di sincronizzazione. – AndiDog

+0

Hai guardato la mia risposta modificata? Ha senso? – AndiDog

risposta

4

Se si desidera utilizzare l'applicazione comune Python (CPython), si può certamente utilizzare il modulo multiprocessing, che does wonders (è possibile passare argomenti non pickleable a sottoprocessi, uccidere i compiti, ...), offre un'interfaccia simile a quella di thread e non soffre del Global Interpreter Lock.

Lo svantaggio è che i sottoprocessi vengono generati, che richiede più tempo rispetto alla creazione di thread; questo dovrebbe essere un problema solo se hai molte, molte brevi attività.In situazioni in cui ogni attività richiede un tempo "lungo", il modulo multiprocessing dovrebbe essere eccezionale.

12

importare il modulo threading ed eseguire SudsMove() in questo modo:

threading.Thread(target = SudsMove).start() 

che creeranno e avviare un thread in background che fa il movimento.

risposta alla domanda Modificato:

Per quanto ho capito questo, TestAbsoluteMove.Ssh(self) sondaggi la velocità di una volta e memorizza il risultato in self.Value?! E stai testando l'inclinazione/rotazione/posizione finale prevista con self.assertEqual(self.Value, '3500') ?!

Se è corretto, è necessario attendere che la fotocamera inizi il suo movimento. Si potrebbe probabilmente polling la velocità in un certo intervallo:

velocità
# Move camera in background thread 
threading.Thread(target = SudsMove).start() 

# What does this do? 
self.command = './ptzpanposition -c 0 -u degx10' 

# Poll the current speed in an interval of 250 ms 
import time 
measuredSpeedsList = [] 

for i in xrange(20): 
    # Assuming that this call will put the result in self.Value 
    TestAbsoluteMove.Ssh(self) 
    measuredSpeedsList.append(self.Value) 
    time.sleep(0.25) 

print "Measured movement speeds: ", measuredSpeedsList 

Il movimento sarà il più grande valore nel measuredSpeedsList (vale a dire max(measuredSpeedsList)). Spero che abbia senso ...

2

Ci può essere solo un thread in esecuzione allo stesso tempo. Questo è stato ampiamente risposto here. Una soluzione sarà quella di utilizzare due processi separati. La risposta sopra fornisce alcuni suggerimenti.

1

Se è possibile eseguire il codice in Jython o IronPython, è possibile eseguire contemporaneamente; non hanno quella stupida cosa "Global Interpreter Lock" di CPython.

Problemi correlati