diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java b/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java index 167c8d5dbc..0f47fe50a4 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java @@ -185,6 +185,12 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig // functionality, we always use a timeout. This can't perfectly guarantee a total request // timeout, but can get as close as this timeout's value props.setProperty("consumer.timeout.ms", Integer.toString(iteratorTimeoutMs)); + // If the consumer.timeout.ms value is set higher than the default value + // for max.poll.interval.ms then it is possible the consumer will be + // considered failed by the brokers while it has yet to hit the timeout + // for the rest proxy. + props.setProperty("max.poll.interval.ms", Integer.toString(iteratorTimeoutMs)); + if (instanceConfig.getAutoCommitEnable() != null) { props.setProperty("auto.commit.enable", instanceConfig.getAutoCommitEnable()); } else { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java index f78997440b..12d4122cfc 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java @@ -216,6 +216,12 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig // and should not be propagated to the consumer props.setProperty("request.timeout.ms", "30000"); + // If the consumer.timeout.ms value is set higher than the default value + // for max.poll.interval.ms then it is possible the consumer will be + // considered failed by the brokers while it has yet to hit the timeout + // for the rest proxy. + props.setProperty("max.poll.interval.ms", props.getProperty("consumer.timeout.ms", "")); + props.setProperty( "schema.registry.url", config.getString(KafkaRestConfig.SCHEMA_REGISTRY_URL_CONFIG)