Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set consumer setting max.poll.interval.ms to rest proxy setting of co… #515

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig
// functionality, we always use a timeout. This can't perfectly guarantee a total request
// timeout, but can get as close as this timeout's value
props.setProperty("consumer.timeout.ms", Integer.toString(iteratorTimeoutMs));
// If the consumer.timeout.ms value is set higher than the default value
// for max.poll.interval.ms then it is possible the consumer will be
// considered failed by the brokers while it has yet to hit the timeout
// for the rest proxy.
props.setProperty("max.poll.interval.ms", Integer.toString(iteratorTimeoutMs));
Copy link
Contributor

@stanislavkozlovski stanislavkozlovski Mar 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old consumer did not have the max.poll.interval.ms config at all - this line shouldn't have any effect


if (instanceConfig.getAutoCommitEnable() != null) {
props.setProperty("auto.commit.enable", instanceConfig.getAutoCommitEnable());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig
// and should not be propagated to the consumer
props.setProperty("request.timeout.ms", "30000");

// If the consumer.timeout.ms value is set higher than the default value
// for max.poll.interval.ms then it is possible the consumer will be
// considered failed by the brokers while it has yet to hit the timeout
// for the rest proxy.
props.setProperty("max.poll.interval.ms", props.getProperty("consumer.timeout.ms", ""));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer should not be considered failed by the brokers if it violates the max.poll.interval.ms. The validation is done in the client side (here: https://github.com/apache/kafka/blob/0d55f0f3ec8f97bc250b325481f6f2fa70f52a5c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1052)

In the case of the v2 consumer, max.poll.interval.ms should be close to request.timeout.ms. For what it's worth, I don't think we should set this property in the code for the v2 consumer at all. Users could configure it in their proxy config via adding the consumer. prefix to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, consumer.timeout.ms is not something we have in any config nor is it documented anywhere in the proxy. It is part of the old consumer configs (https://kafka.apache.org/20/documentation.html#oldconsumerconfigs)


props.setProperty(
"schema.registry.url",
config.getString(KafkaRestConfig.SCHEMA_REGISTRY_URL_CONFIG)
Expand Down