Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A way to signal errors on a stream #95

Open
dm3 opened this issue Aug 28, 2016 · 6 comments
Open

A way to signal errors on a stream #95

dm3 opened this issue Aug 28, 2016 · 6 comments

Comments

@dm3
Copy link
Contributor

dm3 commented Aug 28, 2016

Currently there's no mechanism to propagate errors which happen in the stream transforming functions. The only option currently is to log the error and close the output stream.

I often find myself attaching an error-carrying deferred at each stream transformation point, deferred/connected to error-carrying deferreds downstream. This way I have a way to react to the abnormal stream termination.

What do you think about adding a stream/catch or stream/on-error or possibly stream/error-deferred which would give access to the error?

@dm3
Copy link
Contributor Author

dm3 commented Aug 29, 2016

I took a stab at implementing error propagation, you can take a look in the commit here. The meat of the thing is the propagate-error function.
I'm not sure about the performance implications and the introduction of memory leaks when the error chaining is being done this way... Needs to be reasoned through more carefully.

Would appreciate the comments/advice you have regarding this approach!

@ztellman
Copy link
Collaborator

ztellman commented Sep 6, 2016

Since put! and take! can yield errors, I'd rather have the error state surfaced there, rather than as a separate side-channel which people may or may not pay attention to. More importantly, though, why can't we just put catch clauses inside the functions themselves? I had error states associated with channels in Lamina, and ended up concluding that it wasn't worth the trouble.

@dm3
Copy link
Contributor Author

dm3 commented Sep 8, 2016

Since put! and take! can yield errors, I'd rather have the error state surfaced there, rather than as a separate side-channel which people may or may not pay attention to.

Completely agree with that. However, I couldn't find a way to solve the propagation of errors from manifold.deferred/loop based stream combinators, like manifold.stream/zip. I have a few of those which take N streams as input and produce a stream as output. If something fails inside the combinator, due to a bug or an invalid input - in the best case the output stream gets closed and the error is logged. The put! part succeeds and the take! part never has a chance to fail.
I can't find a way to signal the error downstream except by chaining a separate error-deferred-like thing through the combinator as a separate parameter. Am I missing anything here?

More importantly, though, why can't we just put catch clauses inside the functions themselves?

This seems very clunky in practice, consider:

(let [a (s/consume fn-a s-a), b (s/consume fn-b s-b)]
  (-> (d/zip a b)
      (d/chain (fn [_] (println "Done")))
      (d/catch (fn [e] (println e "Couldn't consume, restarting ...") ...))))

vs

(let [wrap (fn [f] #(try (f %) 
                         (catch Throwable t (println t "Couldn't consume, restarting ...") ...)))
      a (s/consume (wrap fn-a) s-a), b (s/consume (wrap fn-b s-b) s-b)]
  (-> (d/zip a b)
      (c/chain (fn [_] (println "Done")))))

It's not too bad, but there's some inconsistency in losing the ability to d/catch on a deferred even though an obviously related error happens. What do you think?

@jeroenvandijk
Copy link

I missed the possibility to easily wrap my stream with a catch while I was developing an SSE endpoint. I didn't know exactly where I had to look for the exception. In the end I figured it out with the logging functionality. However I can see how this would be problematic in production where you often want errors to end up elsewhere (e.g. an exception notifier) so one can react on it quicker and monitor how often that specific error happens.

Maybe this can also be done via clojure.tools.logging or timbre however that breaks other architecture approaches e.g. when working with the component library (if your exception notifier is wrapped as a component).

@dm3
Copy link
Contributor Author

dm3 commented Oct 1, 2017

Some graphics I made on #aleph slack in relation to a discussion about error handling - didn't get any response from @ztellman at a time, so attaching here.

@mccraigmccraig
Copy link

i'm currently having some issues with error propagation - in particular i'd like to be able to have errors which happen during a map thrown when a reduce finally happens. this is straightforward to implement with a catch and signal values, but quite easy to forget to do

i'm currently looking at creating some chunked versions of stream/map, stream/reduce and friends to improve throughput on some large stream ops, so maybe i'll build in error recognition and propagation too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants