From d6e84268ccfba90d8a8e0885b4afd09a34934f89 Mon Sep 17 00:00:00 2001 From: mmercedes Date: Wed, 21 Nov 2018 22:18:04 +0000 Subject: [PATCH 1/3] Set consumer setting max.poll.interval.ms to rest proxy setting of consumer.timeout.ms If `consumer.timeout.ms` has been set to a value greater than the default value of `max.poll.interval.ms` and a consumer has set `auto.commit.enable=false` then it is possible the kafka brokers will consider a consumer as failed and release its partition assignments, while the rest proxy maintains a consumer instance handle. This then leads to an exception on the next call to poll, commitSync, or similar. This commit sets max.poll.interval.ms equal to consumer.timeout.ms to ensure that kafka will not consider the consumer failed until the rest proxy does as well. --- .../main/java/io/confluent/kafkarest/ConsumerManager.java | 6 ++++++ .../io/confluent/kafkarest/v2/KafkaConsumerManager.java | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java b/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java index 167c8d5dbc..7cbafe51fa 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java @@ -185,6 +185,12 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig // functionality, we always use a timeout. This can't perfectly guarantee a total request // timeout, but can get as close as this timeout's value props.setProperty("consumer.timeout.ms", Integer.toString(iteratorTimeoutMs)); + // If the consumer.timeout.ms value is set higher than the default value + // for max.poll.interval.ms then it is possible the consumer will be + // considered failed by the brokers while it has yet to hit the timeout + // for the rest proxy. + props.setProperty("max.poll.interval.ms", ((Integer) iteratorTimeoutMs).toString()); + if (instanceConfig.getAutoCommitEnable() != null) { props.setProperty("auto.commit.enable", instanceConfig.getAutoCommitEnable()); } else { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java index f78997440b..12d4122cfc 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java @@ -216,6 +216,12 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig // and should not be propagated to the consumer props.setProperty("request.timeout.ms", "30000"); + // If the consumer.timeout.ms value is set higher than the default value + // for max.poll.interval.ms then it is possible the consumer will be + // considered failed by the brokers while it has yet to hit the timeout + // for the rest proxy. + props.setProperty("max.poll.interval.ms", props.getProperty("consumer.timeout.ms", "")); + props.setProperty( "schema.registry.url", config.getString(KafkaRestConfig.SCHEMA_REGISTRY_URL_CONFIG) From 81f30ce7e1335d3f5d992af8b25c057f7c33ce35 Mon Sep 17 00:00:00 2001 From: Matt Mercedes Date: Tue, 4 Dec 2018 11:19:38 -0500 Subject: [PATCH 2/3] Update ConsumerManager.java dont allocate just to call toString() --- .../src/main/java/io/confluent/kafkarest/ConsumerManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java b/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java index 7cbafe51fa..0f47fe50a4 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/ConsumerManager.java @@ -189,7 +189,7 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig // for max.poll.interval.ms then it is possible the consumer will be // considered failed by the brokers while it has yet to hit the timeout // for the rest proxy. - props.setProperty("max.poll.interval.ms", ((Integer) iteratorTimeoutMs).toString()); + props.setProperty("max.poll.interval.ms", Integer.toString(iteratorTimeoutMs)); if (instanceConfig.getAutoCommitEnable() != null) { props.setProperty("auto.commit.enable", instanceConfig.getAutoCommitEnable()); From 8c3f7a332ba864ef021718b4215e27d161e818b3 Mon Sep 17 00:00:00 2001 From: Matt Mercedes Date: Sat, 2 Feb 2019 20:46:48 +0000 Subject: [PATCH 3/3] retrigger ci