2016-03-10 8 views
7

Ho uno stream di input e voglio fare 2 richieste di rete per HTTPS prima di passare il risultato ad un'altra parte del programma. Il throughput tipico è 50 al secondo.Come faccio a fare un gran numero di richieste HTTPS simultanee in Clojure (/ Java)

for each input: 
    HTTP request A 
    HTTP request B 
    pass event on with (A.body and B.body) 

Sto usando il cliente http-kit, che è asincrono per impostazione predefinita. Restituisce una promessa e può anche richiedere una richiamata. Http-kit utilizza Java NIO (vedi here e here)

La velocità delle richieste in arrivo, combinata con il tempo necessario per effettuare una richiesta, è sufficientemente elevata da dover essere eseguita in modo asincrono.

Ho provato 3 approcci:

  1. Quando un evento entra, metterlo su un canale. Un numero di routine go che escono dal canale. Ogni richiesta di esecuzione che "blocca" il goblock da parte di deref promesse da richieste HTTP. Questo non funziona perché non penso che la promessa funzioni bene con i thread.
  2. Quando arriva un evento, avvia immediatamente un future, che "blocca" sulle promesse asincrone. Ciò si traduce in molto alto utilizzo della CPU. In più, la fame di risorse di rete in qualche modo.
  3. Quando arriva un evento, attivare immediatamente la richiesta http-kit per la richiesta A, passando una richiamata che effettua la richiesta B, passando una richiamata che passa l'evento. Ciò ha causato un errore di memoria insufficiente dopo alcune ore.

Questi funzionano e gestiscono la capacità per un po '. Alla fine tutti si schiantano. L'incidente più recente, dopo circa 12 ore:

Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run 
WARNING: com[email protected]1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending 
tasks! 
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run 
WARNING: com[email protected]1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status: 
     Managed Threads: 3 
     Active Threads: 1 
     Active Tasks: 
       com[email protected]65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0) 
     Pending Tasks: 
       [email protected]0d 
Pool thread stack traces: 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main] 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560) 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main] 
       java.lang.Object.wait(Native Method) 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534) 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main] 
       java.lang.Object.wait(Native Method) 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534) 


Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen 
java.lang.OutOfMemoryError: Java heap space 
     at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77) 
     at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76) 
     at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65) 
     at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63) 
     at sun.security.ssl.Handshaker.activate(Handshaker.java:514) 
     at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717) 
     at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743) 
     at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310) 
     at org.httpkit.client.HttpClient.run(HttpClient.java:375) 
     at java.lang.Thread.run(Thread.java:745) 
Mar 10, 2016 4:56:34 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 
Mar 10, 2016 5:00:43 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 
Mar 10, 2016 4:58:25 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 

Non so quale sia la causa del fallimento. Potrebbe darsi che ci siano troppe chiusure a cui si è aggrappata, o una graduale perdita di risorse, o la fame di thread.

Domande

  1. Does rendendo 50 richieste HTTP al secondo, ognuno dei quali potrebbe prendere 200ms, il che significa che ci potrebbero essere 100 richieste in volo in un dato momento, suona come un onere eccessivo?

  2. Come faccio a fare questo in modo che gestisca il throughput ed è robusto?

EDIT

YourKit profiler mi dice che ho circa 2 GB di char[] s tramite org.httpkit.client.Handler s tramite java.util.concurrent.FutureTask s che suggerisce che i riferimenti a vecchi gestori (cioè richieste) vengono mantenuti in qualche modo. L'intera ragione per cercare di usare le callback era evitare (anche se in qualche modo potrebbero essere bloccati nelle chiusure)

+1

OutOfMemoryError indica che c'è un problema nella memorizzazione della memoria ... ma non possiamo fare a meno di vedere il codice o scrivere una soluzione completa da zero. Cercherei di aggrapparmi alla testa di una sequenza infinita, o non pulire risorse come le connessioni. –

+0

Mi chiedevo se potesse essere conservato un buffer, ma per quanto posso dire la garbage collection dovrebbe gestire la liberazione di memoria/buffer esterni che, ad es. NIO aveva assegnato. Quello che succede a valle è praticamente solo un inserimento nel database e un inserimento su un canale. – Joe

+0

Ho pensato di postare il codice ma è abbastanza complicato e ci vorrebbe circa un giorno per sapere se avevo replicato il problema in una versione semplificata. – Joe

risposta

0

Un'alternativa al metodo A (deref il kit HTTP restituito futures all'interno di un go-block) potrebbe essere una possibilità, solo facendo così in un modo che non causa il blocco del nucleo.le discussioni gestore asincrono sul futuro, che si può fare unendo callback e core.async di httpkit:

(defn handle-event 
"Return a core.async channel that will contain the result of making both HTTP call A and B." 
    [event-data] 
    (let [event-a-chan (clojure.core.async/chan) 
     event-b-chan (clojure.core.async/chan) 
     return-chan (clojure.core.async/chan)] 
    (org.httpkit.client/request "https://event-a-call" 
           {:method :get :params {"param1-k" "param1-v"}} 
           (fn [resp] 
            (clojure.core.async/put! event-a-chan resp))) 
    (org.httpkit.client/request "https://event-b-call" 
           {:method :get :params {"param1-k" "param1-v"}} 
           (fn [resp] 
            (clojure.core.async/put! event-b-chan resp))) 
    (clojure.core.async/go 
     (clojure.core.async/>! return-chan {:event-a-response (clojure.core.async/<! event-a-chan) 
              :event-b-response (clojure.core.async/<! event-b-chan)})) 
    return-chan)) 
0
  1. Vuol fare 50 richieste HTTP al secondo, ognuno dei quali potrebbe richiedere 200ms, il che significa che ci potrebbe essere 100 richieste in volo in un dato momento, sembra un carico eccessivo?

Questo è sicuramente non eccessiva su hardware moderno.

  1. Come si fa in modo che gestisca il throughput ed è robusto?

È possibile combinare condutture core.async e callback di http-kit per raggiungere questo obiettivo. Non è necessario creare una routine go per ogni richiesta (anche se ciò non dovrebbe far male), perché è possibile utilizzare async put! dal callback http-kit.

Utilizzare buffer limitati per ciascun passo della pipeline per limitare il numero di connessioni attive, che saranno (almeno) vincolate dal numero di porte TCP effimere disponibili sul sistema.

Ecco un esempio di un piccolo programma che fa qualcosa di simile a quello che hai descritto. Legge "eventi" da un canale — in questo caso, ogni evento è l'ID "1" — e cerca quegli ID su un servizio HTTP. Prende la risposta da quella prima chiamata, cerca il tasto JSON "next" e accoda quello come l'URL per il passaggio 2. Infine, quando questa ricerca è completa, aggiunge un evento al canale out che una routine go monitora per segnalare le statistiche.

(ns concur-req.core 
    (require [clojure.core.async :as async] 
      [cheshire.core :refer [decode]] 
      [org.httpkit.client :as http])) 

(defn url-of 
    [id] 
    ;; this service responds within 100-200ms 
    (str "http://localhost:28080/" id ".json")) 

(defn retrieve-json-async 
    [url c] 
    (http/get url nil 
      (fn [{body :body status :status :as resp}] 
       (if (= 200 status) 
       (async/put! c (decode body true)) 
       (println "ERROR:" resp)) 
       (async/close! c)))) 

(defn run [parallelism stop-chan] 
    (let [;; allocate half of the parallelism to each step 
     step1-n (int (max (/ parallelism 2) 1)) 
     step2-n step1-n 
     ;; buffer to take ids, transform them into urls 
     step1-chan (async/chan step1-n (map url-of)) 
     ;; buffer for result of pulling urls from step1, xform by extracting :next url 
     step2-chan (async/chan step2-n (map :next)) 
     ;; buffer to count completed results 
     out-chan (async/chan 1 (map (constantly 1))) 
     ;; for delivering the final result 
     final-chan (async/chan) 
     start-time (System/currentTimeMillis)] 

    ;; process URLs from step1 and put the result in step2 
    (async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan) 
    ;; process URLs from step2 and put the result in out 
    (async/pipeline-async step2-n out-chan retrieve-json-async step2-chan) 

    ;; keep the input channel full until stop-chan is closed. 
    (async/go-loop [] 
     (let [[v c] (async/alts! [stop-chan [step1-chan "1"]])] 
     (if (= c stop-chan) 
      (async/close! step1-chan) 
      (recur)))) 

    ;; count messages on out-chan until the pipeline is closed, printing 
    ;; status message every second 
    (async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0] 
     (let [[v c] (async/alts! [status-timer out-chan])] 
     (cond (= c status-timer) 
       (do (println subt "records...") 
        (recur (async/timeout 1000) 0 (+ subt accu))) 

       (nil? v) 
       (async/>! final-chan (+ subt accu)) 

       :else 
       (recur status-timer (+ v subt) accu)))) 

    ;; block until done, then emit final report. 
    (let [final-total (async/<!! final-chan) 
      elapsed-ms (- (System/currentTimeMillis) start-time) 
      elapsed-s (/ elapsed-ms 1000.0)] 
     (print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n" 
        final-total parallelism elapsed-s 
        (int (/ final-total elapsed-s))))))) 

(defn run-for 
    [seconds parallelism] 
    (let [stop-chan (async/chan)] 
    (future 
     (Thread/sleep (* seconds 1000)) 
     (async/close! stop-chan)) 
    (run parallelism stop-chan))) 

(do 
    ;; Warm up the connection pool, avoid somaxconn problems... 
    (doseq [p (map #(* 20 (inc %)) (range 25))] 
    (run-for 1 p)) 
    (run-for (* 60 60 6) 500)) 

Per verificare ciò, ho istituito un servizio di HTTP che risponde solo dopo aver dormito un tempo casuale tra 100-200ms. Poi ho eseguito questo programma per 6 ore sul mio Macbook Pro.

Con il parallelismo impostato su 500, ho ottenuto 1155 transazioni completate al secondo in media (2310 richieste HTTP completate al secondo). Sono sicuro che questo potrebbe essere molto più alto con alcune accordature (e soprattutto spostando il servizio HTTP su una macchina diversa). La memoria JVM è aumentata fino a 1,5 GB nei primi 30 minuti e ha mantenuto le dimensioni. Sto usando Oracle JVM a 64 bit a 64 bit.

Problemi correlati