2012-12-03 16 views
6

Sto calcolando un futuro per avere un timeout in attesa di un evento di serie per accadere:Come fermare timeout di un futuro

Future<Response> future = executor.submit(new CommunicationTask(this, request)); 
response = new Response("timeout"); 
try { 
    response = future.get(timeoutMilliseconds, TimeUnit.MILLISECONDS); 
} catch (InterruptedException | TimeoutException e) { 
    future.cancel(true); 
    log.info("Execution time out." + e); 
} catch (ExecutionException e) { 
    future.cancel(true); 
    log.error("Encountered problem communicating with device: " + e); 
} 

La classe CommunicationTask ha implementato l'interfaccia Observer per ascoltare un cambiamento da la porta seriale

Il problema è che la lettura dalla porta seriale è relativamente lenta e anche quando si verifica un evento seriale il tempo scade e viene lanciato un TimeoutException. Cosa posso fare per fermare il timeout del mio futuro quando si verifica un evento seriale?

ho provato con un AtomicReference ma questo non cambia nulla:

public class CommunicationTask implements Callable<Response>, Observer { 
    private AtomicReference atomicResponse = new AtomicReference(new Response("timeout")); 
    private CountDownLatch latch = new CountDownLatch(1); 
    private SerialPort port; 

    CommunicationTask(SerialCommunicator communicator, Request request) { 
    this.communicator = communicator; 
    this.message = request.serialize(); 
    this.port = communicator.getPort(); 
    } 

    @Override 
    public Response call() throws Exception { 
    return query(message); 
    } 

    public Response query(String message) { 
    communicator.getListener().addObserver(this); 
    message = message + "\r\n"; 
    try { 
     port.writeString(message); 
    } catch (Exception e) { 
     log.warn("Could not write to port: " + e); 
     communicator.disconnect(); 
    } 
    try { 
     latch.await(); 
    } catch (InterruptedException e) { 
     log.info("Execution time out."); 
    } 
    communicator.getListener().deleteObserver(this); 
    return (Response)atomicResponse.get(); 
    } 

    @Override 
    public void update(Observable o, Object arg) { 
    atomicResponse.set((Response)arg); 
    latch.countDown(); 
    } 
} 

Cosa posso fare per risolvere questo problema?

MODIFICA:

Ok, ho avuto un errore. Stavo eseguendo il conto alla rovescia prima di impostare atomicResponse nella mia funzione update. Ora sembra funzionare, ma c'è ancora la domanda se questo approccio è il modo giusto per farlo?

+0

Si potrebbe aggiungere un metodo 'isCommunicationStarted' al tuo CommunicationTask - quando rilevi 'TimeoutException', controlla' isCommunicationStarted' - se restituisce false => cancel, altrimenti prova 'future.get()' di nuovo (magari con un nuovo timeout). – assylias

+0

Puoi dividere la tua classe di attività in due: in primo luogo attenderò fino all'avvio dell'evento con timeout e invierà l'attività di 2 ° tipo all'avvio dell'evento. La seconda attività elaborerà l'evento di conseguenza. –

+0

Non è abbastanza chiaro cosa si vuole raggiungere. L'impostazione di un timeout implica che gli eventi arrivati ​​dopo il timeout possano essere ignorati. In tal caso, perché è importante il caso in questione? – axtavt

risposta

0

Spero che questo possa essere d'aiuto. Non lo commenterò nella speranza che tutto sia chiaro dal codice.

class CommunicationTask implements Callable<String>, Observer { 
    volatile boolean ignoreTimeoutException; 

    public CommunicationTask(SerialCommunicator communicator, Request request) { 
    } 

    public String call() throws Exception { 
     Thread.sleep(1000); 
     return "done"; 
    } 

    public void update(Observable o, Object arg) { 
     ignoreTimeoutException = true; 
    } 
} 

class FutureCommunicationTask extends FutureTask<String> { 
    private CommunicationTask ct; 

    public FutureCommunicationTask(CommunicationTask ct) { 
     super(ct); 
     this.ct = ct; 
    } 

    public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
     try { 
      return super.get(timeout, unit); 
     } catch (TimeoutException e) { 
      if (ct.ignoreTimeoutException) { 
       return get(); // no timeout wait 
      } 
      throw e; 
     } 
    } 
} 

public class Test { 

    public static void main(String[] args) throws Exception { 
     CommunicationTask ct = new CommunicationTask(null, null); 
     FutureTask<String> fct = new FutureCommunicationTask(ct); 
     ExecutorService ex = Executors.newSingleThreadExecutor(); 
     ex.execute(fct); 
//  uncomment this line and timeout will be cancelled 
     ct.update(null, null); 
     String res = fct.get(1, TimeUnit.MILLISECONDS); 
     System.out.println(res); 
    } 
} 
1

hanno esplorato voi di Google Guava 'futuro ascoltatore', si basa su futuro Async, si spera seguente frammento di codice aiuta ....

import java.util.concurrent.Callable; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

import com.google.common.util.concurrent.FutureCallback; 
import com.google.common.util.concurrent.Futures; 
import com.google.common.util.concurrent.ListenableFuture; 
import com.google.common.util.concurrent.ListeningExecutorService; 
import com.google.common.util.concurrent.MoreExecutors; 

public class SyncFutureExample { 
    public static void main(String[] args) { 
     ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); 
     ListenableFuture<String> lf = service.submit(new CommuncationTask()); 

     //no need for future.get() or future.get(10,time minutes) 


     //add callbacks(= async future listeners) .... 
     Futures.addCallback(lf, new FutureCallback<String>() { 
       public void onSuccess(String input) { 
       System.out.println(input + " >>> success");//gets a callback once task is success 
       } 
       public void onFailure(Throwable thrown) { 
        System.out.println(thrown + " >>> failure");//gets a callback if task is failed 
       } 
      }); 
     service.shutdown(); 
    } 

} 

class CommuncationTask implements Callable<String>{ 

    public String call() throws Exception { 
     TimeUnit.SECONDS.sleep(15);// some dummy serious task ............. 
     return "TaskDone"; 
    } 


}