Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't re-join when consumption has stopped #1571

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ascandella
Copy link

@ascandella ascandella commented May 8, 2023

We're seeing an issue where consumers are erroneously rebalancing even after we try to shut them down during deploys. Below are some debug logs. I've tested this branch with my consumers and verified that they no longer try to participate in the re-balance (instead correctly leaving the group) if they detect a rebalance while shutting down so that the deploy can continue.

2023/05/08 09:22:24 [consumer-2] info: stop consumer group {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:24.532Z","logger":"kafkajs","message":"stop consumer group","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d"},"timestamp":"2023-05-08T16:22:24.532Z"}

# some time passes, they finish processing their batch

2023/05/08 09:22:41 [consumer-2] info: Request Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.628Z","logger":"kafkajs","message":"Request Heartbeat(key: 12, version: 3)","broker":"localhost:9092","correlationId":12,"expectResponse":true,"size":157},"timestamp":"2023-05-08T16:22:41.628Z"}
2023/05/08 09:22:41 [consumer-2] warn: Response Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"ERROR","timestamp":"2023-05-08T16:22:41.631Z","logger":"kafkajs","message":"Response Heartbeat(key: 12, version: 3)","broker":"localhost:9092","error":"The group is rebalancing, so a rejoin is needed","correlationId":12,"size":10},"timestamp":"2023-05-08T16:22:41.631Z"}
2023/05/08 09:22:41 [consumer-2] info: Response Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.631Z","logger":"kafkajs","message":"Response Heartbeat(key: 12, version: 3)","broker":"localhost:9092","error":"The group is rebalancing, so a rejoin is needed","correlationId":12,"payload":{"type":"Buffer","data":"[filtered]"}},"timestamp":"2023-05-08T16:22:41.631Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopping fetchers... {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopping fetchers..."},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopped fetchers {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopped fetchers"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: waiting for consumer to finish... {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"waiting for consumer to finish...","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopped fetchers {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopped fetchers"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] warn: The group is rebalancing, re-joining {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"WARN","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"The group is rebalancing, re-joining","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d","error":"The group is rebalancing, so a rejoin is needed"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: consumer.rebalancing {"name":"analytics-kafka-consumer","className":"Kafka","methodName":"logEvent","data":{"groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d","eventId":10},"timestamp":"2023-05-08T16:22:41.632Z"}

I'm not sure if this should be a flag that consumers can specify, but I don't know why you'd want your consumers to re-join a group if they're in the process of exiting.

@lkakol
Copy link

lkakol commented Aug 14, 2023

would love to see this functionality in mainline branch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants