Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limiting concurrency to some number of concurrent processes #125

Open
lvh opened this issue Feb 14, 2017 · 4 comments
Open

Limiting concurrency to some number of concurrent processes #125

lvh opened this issue Feb 14, 2017 · 4 comments

Comments

@lvh
Copy link
Contributor

lvh commented Feb 14, 2017

throttle does this elegantly with a stream where there should be a limited number of events per time, but it doesn't help you when there should be at most n requests at all. It could be that there's an idiom for this, but I haven't quite figured it out :)

Twisted does this with a thing called a deferred semaphore, that works just like you'd expect a mutex-like semaphore to work: it takes a fn that may or may not return a deferred, and returns a deferred that fires when that function fires; the function is called only after the semaphore allows it to do so.

http://twistedmatrix.com/documents/current/api/twisted.internet.defer.DeferredSemaphore.html

@dm3
Copy link
Contributor

dm3 commented Feb 15, 2017

@lvh, do you mean something like

(require '[manifold.deferred :as m.d])

(defprotocol DeferredSemaphore
  (acquire [_])
  (release [_]))

(defn deferred-semaphore [tokens]
  (let [^java.util.LinkedList waiting (java.util.LinkedList.)
        tokens-remaining (volatile! tokens)
        acquired! #(do (vswap! tokens-remaining dec)
                       (m.d/success! % ::acquired))]
    (reify DeferredSemaphore
      (acquire [_]
        (locking waiting
          (let [d (m.d/deferred)]
            (if (zero? @tokens-remaining)
              (.add waiting d)
              (acquired! d))
            d)))
      (release [_]
        (locking waiting
          (assert (< @tokens-remaining tokens))
          (vswap! tokens-remaining inc)
          (when-let [d (.poll waiting)]
            (acquired! d)))))))

that's pretty much a translation of what the Python class that you linked to is doing. Not sure if this can be done relying purely on primitives provided by Manifold or whether this makes sense in the domain of Streams at all (there's no release).

@lvh
Copy link
Contributor Author

lvh commented Feb 16, 2017

@dm3 yup; almost. The important method you usually use is run, so basically you get something like (semaphore 3) which I guess acts like a stream that will produce at most 3 success-deferreds when first read from, and then won't make future deferreds get a value until one of those 3 finishes.

@dm3
Copy link
Contributor

dm3 commented Feb 16, 2017

@lvh so a (semaphore-stream n-tokens) would return a Source stream which could produce at most n-tokens deferreds at any given time. Once a consumer realizes a token (deferred), the stream can produce another one, right?

If my interpretation is correct - it seems like Streams is an unnecessary abstraction for this task, what do you think?

@lvh
Copy link
Contributor Author

lvh commented Feb 22, 2017

I have no particular opinion if streams are useful or not. I think they're convenient if you're building a stream that ends up being sent to consume-async or something. Your interpretation is what I'd expect, though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants