2012-08-29 11 views
9

Używam aplikacji Clojure do uzyskiwania dostępu do danych z internetowego interfejsu API. Będę wysyłać wiele żądań, a wiele żądań będzie prowadzić do kolejnych żądań, więc chcę zachować adresy URL żądań w kolejce, która pozostawi 60 sekund między kolejnymi pobraniami.Kolejki pracy w Clojure

Po this blog post Ja to razem:

(def queue-delay (* 1000 60)) ; one minute 

(defn offer! 
    [q x] 
    (.offerLast q x) 
    q) 

(defn take! 
    [q] 
    (.takeFirst q)) 

(def my-queue (java.util.concurrent.LinkedBlockingDeque.)) 

(defn- process-queue-item 
    [item] 
    (println ">> " item) ; this would be replaced by downloading `item` 
    (Thread/sleep queue-delay)) 

Gdybym obejmują (future (process-queue-item (take! my-queue))) w moim kodu gdzieś potem na REPL mogę (offer! my-queue "something") i widzę coś ">>" drukowane natychmiast. Jak na razie dobrze! Ale potrzebuję, aby kolejka trwała przez cały czas, gdy mój program jest aktywny. Wywołane wcześniej wywołanie (future ...) działa w celu wyciągnięcia jednego elementu z kolejki, gdy tylko będzie dostępny, ale chcę czegoś, co będzie stale oglądać kolejkę i zadzwonić pod numer process-queue-item, gdy tylko coś będzie dostępne.

Ponadto, w przeciwieństwie do zwykłego zamiłowania Clojure do współbieżności, chcę się upewnić, że tylko jedna prośba jest wykonywana w tym samym czasie i że mój program czeka 60 sekund na wykonanie każdego kolejnego żądania.

Myślę, że jest to ważne, ale nie wiem, jak to dostosować, aby robić to, co chcę. Jak bez przerwy odpytywać kolejkę i upewnić się, że tylko jedno żądanie jest uruchamiane jednocześnie?

+0

Dlaczego chcesz sondować ciągle, ale wysyłać co 60 sekund? Czy odpytywanie raz na 60 sekund dokona tego samego? – mamboking

+0

@mamboking Prawie, tak. Jedynym minusem tego podejścia byłoby dodanie pierwszego elementu do kolejki: jeśli program zajmie pięć sekund, aby dowiedzieć się, jaki będzie pierwszy adres URL żądania, po prostu będzie siedział tam przez 55 sekund, dopóki kolejka nie zostanie sprawdzona. Program i tak będzie dość długi, więc myślę, że to nie jest problem. – bdesham

+0

Czy unikasz planowania zadań? Na przykład ten, https://github.com/zcaudate/cronj (jest tam również lista innych bibliotek w readme tego repo) – georgek

Odpowiedz

0

Skończyłem przetaczać moją małą bibliotekę, którą nazwałem simple-queue. Możesz przeczytać pełną dokumentację na GitHub, ale tutaj jest źródło w całości. Nie zamierzam aktualizować tej odpowiedzi, więc jeśli chcesz korzystać z tej biblioteki, pobierz źródło z GitHub.

(ns com.github.bdesham.simple-queue) 

(defn new-queue 
    "Creates a new queue. Each trigger from the timer will cause the function f 
    to be invoked with the next item from the queue. The queue begins processing 
    immediately, which in practice means that the first item to be added to the 
    queue is processed immediately." 
    [f & opts] 
    (let [options (into {:delaytime 1} 
         (select-keys (apply hash-map opts) [:delaytime])), 
     delaytime (:delaytime options), 
     queue {:queue (java.util.concurrent.LinkedBlockingDeque.)}, 
     task (proxy [java.util.TimerTask] [] 
       (run [] 
       (let [item (.takeFirst (:queue queue)), 
         value (:value item), 
         prom (:promise item)] 
        (if prom 
        (deliver prom (f value)) 
        (f value))))), 
     timer (java.util.Timer.)] 
    (.schedule timer task 0 (int (* 1000 delaytime))) 
    (assoc queue :timer timer))) 

(defn cancel 
    "Permanently stops execution of the queue. If a task is already executing 
    then it proceeds unharmed." 
    [queue] 
    (.cancel (:timer queue))) 

(defn process 
    "Adds an item to the queue, blocking until it has been processed. Returns 
    (f item)." 
    [queue item] 
    (let [prom (promise)] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise prom}) 
    @prom)) 

(defn add 
    "Adds an item to the queue and returns immediately. The value of (f item) is 
    discarded, so presumably f has side effects if you're using this." 
    [queue item] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise nil})) 

Przykładem zastosowania tej kolejki do zwracania wartości:

(def url-queue (q/new-queue slurp :delaytime 30)) 
(def github (q/process url-queue "https://github.com")) 
(def google (q/process url-queue "http://www.google.com")) 

połączenia do q/process blokuje tak, że nie będzie 30 sekund opóźnienia pomiędzy dwoma def instrukcji.

Przykładem wykorzystania tej kolejki wyłącznie do skutków ubocznych:

(defn cache-url 
    [{url :url, filename :filename}] 
    (spit (java.io.File. filename) 
     (slurp url))) 

(def url-queue (q/new-queue cache-url :delaytime 30)) 
(q/add url-queue {:url "https://github.com", 
        :filename "github.html"}) ; returns immediately 
(q/add url-queue {:url "https://google.com", 
        :filename "google.html"}) ; returns immediately 

Teraz wywołania q/add powrocie natychmiast.

2

Oto fragment kodu z a project I did for fun. Nie jest doskonały, ale może dać ci wyobrażenie, jak obejść problem "poczekaj 55 sekund na pierwszy przedmiot". W gruncie rzeczy obieguje obietnice, wykorzystując futures do natychmiastowego przetwarzania rzeczy lub do momentu, gdy obietnica "stanie się" dostępna.

(defn ^:private process 
    [queues] 
    (loop [[q & qs :as q+qs] queues p (atom true)] 
    (when-not (Thread/interrupted) 
     (if (or 
      (< (count (:promises @work-manager)) (:max-workers @work-manager)) 
      @p) ; blocks until a worker is available 
     (if-let [job (dequeue q)] 
      (let [f (future-call #(process-job job))] 
      (recur queues (request-promise-from-work-manager))) 
      (do 
      (Thread/sleep 5000) 
      (recur (if (nil? qs) queues qs) p))) 
     (recur q+qs (request-promise-from-work-manager)))))) 

Może mógłbyś zrobić coś podobnego? Kod nie jest świetny i prawdopodobnie zostanie przepisany ponownie, aby użyć lazy-seq, ale to tylko ćwiczenie, do którego jeszcze nie dotarłem!

0

Jest to całkiem możliwe, szalony ale zawsze można użyć funkcji tak, aby utworzyć spowolnione-down leniwe sekwencję:

(defn slow-seq [delay-ms coll] 
    "Creates a lazy sequence with delays between each element" 
    (lazy-seq 
    (if-let [s (seq coll)] 
     (do 
      (Thread/sleep delay-ms) 
      (cons (first s) 
       (slow-seq delay-ms (rest s))))))) 

To będzie w zasadzie zapewnić opóźnienie pomiędzy każdą z wywołań funkcji.

Można go używać z coś jak następuje, zapewniając opóźnienia w milisekundach:

(doseq [i (slow-seq 500 (range 10))] 
    (println (rand-int 10)) 

Lub alternatywnie można umieścić wywołanie funkcji wewnątrz sekwencji z czymś takim:

(take 10 (slow-seq 500 (repeatedly #(rand-int 10)))) 

oczywiście , w obu powyższych przypadkach można zastąpić (rand-int 10) dowolnym kodem, który jest używany do wykonania/wyzwalania pobierania.

+0

Jeśli czytam to prawo, wszystkie elementy 'coll' muszą być znane zanim uruchomisz' slow-seq', prawda? Chciałbym czegoś, co pozwoliłoby ci dynamicznie dodawać przedmioty bez problemu. W szczególności, jeśli wynik jednego wywołania API jest taki, że muszę wykonać inne wywołanie API, czy ta funkcja zezwoli na umieszczenie drugiego połączenia w kolejce? – bdesham