2012-08-29 11 views
9

Sto utilizzando un'applicazione Clojure per accedere ai dati da un'API Web. Farò molte richieste e molte richieste porteranno a fare più richieste, quindi voglio mantenere gli URL delle richieste in una coda che lascerà 60 secondi tra i download successivi.Code di lavoro in Clojure

seguito this blog post ho messo questo insieme:

(def queue-delay (* 1000 60)) ; one minute 

(defn offer! 
    [q x] 
    (.offerLast q x) 
    q) 

(defn take! 
    [q] 
    (.takeFirst q)) 

(def my-queue (java.util.concurrent.LinkedBlockingDeque.)) 

(defn- process-queue-item 
    [item] 
    (println ">> " item) ; this would be replaced by downloading `item` 
    (Thread/sleep queue-delay)) 

Se includo un (future (process-queue-item (take! my-queue))) nel mio codice da qualche parte poi al REPL posso (offer! my-queue "something") e vedo il ">> qualcosa" stampato immediatamente. Fin qui tutto bene! Ma ho bisogno che la coda duri per tutto il tempo in cui il mio programma è attivo. La chiamata (future ...) che ho appena menzionato funziona per estrarre un elemento dalla coda, una volta disponibile, ma voglio qualcosa che guardi continuamente la coda e chiami process-queue-item ogni volta che qualcosa è disponibile.

Inoltre, contrariamente al solito amore Clojure per la concorrenza, voglio assicurarmi che venga fatta una sola richiesta alla volta e che il mio programma attenda 60 secondi per effettuare ogni richiesta successiva.

Penso che this Stack Overflow question sia rilevante, ma non sono sicuro di come adattarlo per fare ciò che voglio. Come si esegue il polling continuo della coda e ci si assicura che venga eseguita una sola richiesta in una sola volta?

+0

Perché si desidera eseguire il polling in modo continuo ma inviare solo ogni 60 secondi? Il polling solo una volta ogni 60 secondi porta a termine la stessa cosa? – mamboking

+0

@mamboking Quasi, si. L'unico svantaggio di questo approccio sarebbe l'aggiunta del primo elemento alla coda: se il programma impiega cinque secondi per capire quale sarà l'URL della prima richiesta, rimarrà seduto per 55 secondi fino a quando la coda non sarà spuntata. Il programma sarà comunque piuttosto lungo, quindi penso che non sia un problema. – bdesham

+0

stai evitando un task scheduler? Per esempio, questo, https://github.com/zcaudate/cronj (c'è anche un elenco di altre librerie nel readme di quel repository) – georgek

risposta

0

Ho finito con il rollare la mia piccola biblioteca, che ho chiamato simple-queue. Puoi leggere la documentazione completa su GitHub, ma qui è la fonte nella sua interezza. Non manterrò questa risposta aggiornata, quindi se vuoi usare questa libreria per favore prendi il codice sorgente da GitHub.

(ns com.github.bdesham.simple-queue) 

(defn new-queue 
    "Creates a new queue. Each trigger from the timer will cause the function f 
    to be invoked with the next item from the queue. The queue begins processing 
    immediately, which in practice means that the first item to be added to the 
    queue is processed immediately." 
    [f & opts] 
    (let [options (into {:delaytime 1} 
         (select-keys (apply hash-map opts) [:delaytime])), 
     delaytime (:delaytime options), 
     queue {:queue (java.util.concurrent.LinkedBlockingDeque.)}, 
     task (proxy [java.util.TimerTask] [] 
       (run [] 
       (let [item (.takeFirst (:queue queue)), 
         value (:value item), 
         prom (:promise item)] 
        (if prom 
        (deliver prom (f value)) 
        (f value))))), 
     timer (java.util.Timer.)] 
    (.schedule timer task 0 (int (* 1000 delaytime))) 
    (assoc queue :timer timer))) 

(defn cancel 
    "Permanently stops execution of the queue. If a task is already executing 
    then it proceeds unharmed." 
    [queue] 
    (.cancel (:timer queue))) 

(defn process 
    "Adds an item to the queue, blocking until it has been processed. Returns 
    (f item)." 
    [queue item] 
    (let [prom (promise)] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise prom}) 
    @prom)) 

(defn add 
    "Adds an item to the queue and returns immediately. The value of (f item) is 
    discarded, so presumably f has side effects if you're using this." 
    [queue item] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise nil})) 

Un esempio di utilizzo questa coda per restituire valori:

(def url-queue (q/new-queue slurp :delaytime 30)) 
(def github (q/process url-queue "https://github.com")) 
(def google (q/process url-queue "http://www.google.com")) 

Le chiamate verso q/process bloccherà in modo che ci sarà un ritardo di 30 secondi fra i due def istruzioni.

Un esempio di utilizzo questa coda puramente per gli effetti collaterali:

(defn cache-url 
    [{url :url, filename :filename}] 
    (spit (java.io.File. filename) 
     (slurp url))) 

(def url-queue (q/new-queue cache-url :delaytime 30)) 
(q/add url-queue {:url "https://github.com", 
        :filename "github.html"}) ; returns immediately 
(q/add url-queue {:url "https://google.com", 
        :filename "google.html"}) ; returns immediately 

Ora le chiamate a q/add ritorno immediatamente.

2

Ecco uno snippet di codice da a project I did for fun. Non è perfetto, ma può darti un'idea di come sono riuscito a risolvere il problema "Aspetta 55 secondi per il primo oggetto". In pratica, scorre le promesse, usando i futures per elaborare le cose immediatamente o finché una promessa "diventa" disponibile.

(defn ^:private process 
    [queues] 
    (loop [[q & qs :as q+qs] queues p (atom true)] 
    (when-not (Thread/interrupted) 
     (if (or 
      (< (count (:promises @work-manager)) (:max-workers @work-manager)) 
      @p) ; blocks until a worker is available 
     (if-let [job (dequeue q)] 
      (let [f (future-call #(process-job job))] 
      (recur queues (request-promise-from-work-manager))) 
      (do 
      (Thread/sleep 5000) 
      (recur (if (nil? qs) queues qs) p))) 
     (recur q+qs (request-promise-from-work-manager)))))) 

Forse potresti fare qualcosa di simile? Il codice non è eccezionale e potrebbe probabilmente essere riscritto per utilizzare lazy-seq, ma questo è solo un esercizio che non ho ancora ottenuto!

0

Questo è probabilmente folle, ma si può sempre utilizzare una funzione come questa per creare una sequenza pigro rallentato:

(defn slow-seq [delay-ms coll] 
    "Creates a lazy sequence with delays between each element" 
    (lazy-seq 
    (if-let [s (seq coll)] 
     (do 
      (Thread/sleep delay-ms) 
      (cons (first s) 
       (slow-seq delay-ms (rest s))))))) 

Questo sarà essenzialmente garantire un ritardo tra ciascuna delle invocazioni di funzione.

Si può usare con qualcosa di simile a quanto segue, fornendo un ritardo in millisecondi:

(doseq [i (slow-seq 500 (range 10))] 
    (println (rand-int 10)) 

Oppure, in alternativa si può mettere il chiamata di funzione all'interno della sequenza con qualcosa di simile:

(take 10 (slow-seq 500 (repeatedly #(rand-int 10)))) 

Ovviamente in entrambi i casi è possibile sostituire (rand-int 10) con qualsiasi codice che si sta utilizzando per eseguire/attivare un download.

+0

Se leggo bene, tutti gli elementi di 'coll' dovrebbero essere conosciuti prima di eseguire' slow-seq', giusto? Mi piacerebbe qualcosa che ti permettesse di aggiungere oggetti in modo dinamico senza problemi. In particolare, se il risultato di una chiamata API è che devo effettuare un'altra chiamata API, questa funzione consentirà di posizionare la seconda chiamata sulla coda? – bdesham