Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sequential flag to mqtrigger #1814

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

daniel-shuy
Copy link

@daniel-shuy daniel-shuy commented Oct 7, 2020

Fixes #1569

Adds a sequential flag to the mqtrigger function. If set to true, consumes messages sequentially instead of concurrently.

I wasn't sure whether to add support for all 3 MQs (Kafka, Azure Queue Storage, NATS), as they currently have different behaviors (Kafka and Azure Queue Storage MQs currently consumes messages concurrently, while NATS MQ consumes messages sequentially). Therefore I separated the implementation for each MQ into its own commit so that they can be dropped if we decide not to support them.

Note that if support for the sequential flag is added to the NATS MQ, it is a breaking change, as it would mean changing the default behavior from sequential to concurrent.


This change is Reviewable

@codecov
Copy link

codecov bot commented Oct 7, 2020

Codecov Report

Merging #1814 (514479c) into master (ec22e22) will decrease coverage by 7.24%.
The diff coverage is 33.33%.

❗ Current head 514479c differs from pull request most recent head 2a3f88c. Consider uploading reports for the commit 2a3f88c to get more accurate results
Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1814      +/-   ##
==========================================
- Coverage   28.41%   21.17%   -7.25%     
==========================================
  Files          69       52      -17     
  Lines        5040     3708    -1332     
==========================================
- Hits         1432      785     -647     
+ Misses       3390     2850     -540     
+ Partials      218       73     -145     
Impacted Files Coverage Δ
pkg/mqtrigger/scalermanager.go 26.66% <0.00%> (-0.24%) ⬇️
...kg/mqtrigger/messageQueue/azurequeuestorage/asq.go 54.59% <50.00%> (-0.70%) ⬇️
pkg/crd/crdvalidations.go 0.00% <0.00%> (-100.00%) ⬇️
pkg/crd/crd.go 0.00% <0.00%> (-93.24%) ⬇️
pkg/crd/client.go 0.00% <0.00%> (-32.00%) ⬇️
pkg/controller/storagesvc.go
pkg/controller/controller.go
pkg/controller/httpTriggerApi.go
pkg/controller/api.go
pkg/controller/packageApi.go
... and 12 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 154fe0d...2a3f88c. Read the comment docs.


var resp *http.Response
for attempt := 0; attempt <= trigger.Spec.MaxRetries; attempt++ {
// Make the request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @daniel-shuy , can you please set the headers inside the for loop? This is to avoid an error I encountered at other places: https://stackoverflow.com/questions/31337891/net-http-http-contentlength-222-with-body-length-0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshthakur9030 should this be in a separate PR? I'm actually just moving the entire code block into an anonymous function so that I can conditionally call it as a goroutine, I didn't change any of the existing logic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that bug existed before you touched the code base. Since you have already touched this part of the code base, I was hoping you could do it. Everything remains the same, just the for loop which sets the headers needs to go into the loop which makes the requests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sure, I can do it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks !!

}
if err == nil && resp.StatusCode == http.StatusOK {
// Success, quit retrying
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please change this to an appropriate return statement?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? Why? If we return here the rest of the function (closing the response body, checking if the response returned an error, trigger ack message, publishing to response topic) won't execute

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes! My bad. One quick question though: Shouldn't we consider for status codes between 200 and 300 to be successful rather than just 200?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the router can return anything other than HTTP 200 for a successful request? Maybe @vishal-biyani can confirm

@vishal-biyani
Copy link
Member

Hey @therahulbhati This will also impact all Keda connectors I believe?

@daniel-shuy
Copy link
Author

@vishal-biyani I don't think so, don't Keda connectors have their own implementation (https://github.com/fission/keda-connectors)?

Looking at the implementation of the Keda Kafka HTTP Connector (https://github.com/fission/keda-connectors/tree/master/kafka-http-connector), I don't think it will have this issue as it is consuming messages sequentially, unlike the mqtrigger Kafka connector which is consuming messages concurrently.

Copy link
Member

@vishal-biyani vishal-biyani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @daniel-shuy Thanks for this PR and looks great except for 2 really minor comments. Please let me know your thoughts and I will merge this one.

Comment on lines +188 to +193
msgHandler := func() { kafkaMsgHandler(&kafka, producer, trigger, msg, consumer) }
if trigger.Spec.Sequential {
msgHandler()
} else {
go msgHandler()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't we need a wg.Add & wg.Done similar to AzureQueue here?

Comment on lines +242 to +245
if trigger.Spec.Sequential {
cb()
} else {
go cb()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking if wg.Add and wg.Done similar to AzureQueue would be needed here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at the AzureQueue mqtrigger implementation, and seems like it is using sync.WaitGroup to notify AzureQueueSubscription.done (a chan bool) (see asq.go#L275).

The other mqtrigger implementations do not have anything equivalent, so it shouldn't be needed.

@vishal-biyani
Copy link
Member

Thanks @daniel-shuy this looks good, I will merge in a day or so, again thanks for your effort on this one 🎉

@chetanpdeshmukh
Copy link

Hello @daniel-shuy ,
Thanks again for the PR!

Thank you for contributing to Fission project. Could you please fill out this form, so we can send you the well deserved awesome swags :)

Team Fission

@daniel-shuy
Copy link
Author

@chetanpdeshmukh yay, thanks! The form is restricted though

@chetanpdeshmukh
Copy link

@chetanpdeshmukh yay, thanks! The form is restricted though

Hey @daniel-shuy can you give it a try now? Should work smoothly

@daniel-shuy
Copy link
Author

@chetanpdeshmukh It works now, I've filled it up, thanks!

@daniel-shuy
Copy link
Author

@vishal-biyani just noticed that this PR hasn't been merged!

@AnatoliyYakimov
Copy link

Hello, @daniel-shuy! Nice work you did! Any updates on this pull request?
We would like to use fission in our project, and this feature would help us a lot to keep consistency of our system.

@daniel-shuy
Copy link
Author

@AnatoliyYakimov Unfortunately I don't have permission to merge, I'm also wondering why the PR hasn't been merged after so long 😅

@AnatoliyYakimov
Copy link

Maybe they just forgot)
@vishal-biyani please merge this feature if it's ready. We would be happy to this it in next release of Fission!)

@daniel-shuy
Copy link
Author

I've resolved the conflicts with the latest master branch

@AnatoliyYakimov
Copy link

Also looking at the code, i saw that we continue to consume messages from topic if lambda returned an error. So if lambda returns an error, we still lose order, if'd i get it right.
For cases where ordering strictly required it would be better to have an option that allows you to stop processing entierly when facing an error.
What do you think about it?

And some personal question: im not very strong in go, and don't know Fission code that deep, so i don't understend what happens if after polling some messages weren't commited. Does sacrama (go kafka lib, if i spelled it right) return this messages from cache, commit all polled messages anyway or it will try to poll again from previous commit?

@daniel-shuy
Copy link
Author

daniel-shuy commented Jun 30, 2021

@AnatoliyYakimov Oh wow, you're right, didn't think of that. We should break out of the loop on error and repoll Kafka again (since the failed message is not committed, it will be polled again), good catch!

@AnatoliyYakimov
Copy link

If error is not transient and we will poll again and again, we will DDOS our lambdas. So maybe we need some kind of error retry count and then stop trigger. I think in Kubeless you can set retry count, before sending to DLQ. Maybe spec could look like this:
onError: SendToErrorTopic | StopProcessing
retryCount: 5
SendToErrorTopic and StopProcessing are error handling strategies. They can implement abstract interface, so in this case we could easily create new strategies for our mqttriggers. Also, if we abstract away from MQ implementation, we can reuse these strategies for any availible MQ.

@daniel-shuy
Copy link
Author

@AnatoliyYakimov actually, let me get back to you on this. sarama is returning a channel, which is looped over sequentially. I'll need to look into sarama's code to see if its caching messages

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sequential function invokations when using Kafka triggers
5 participants