Skip to content

Commit

Permalink
fix: consumer.disconnect should stop reconnection attempts
Browse files Browse the repository at this point in the history
updated version of tulios#1148 and tulios#1462
  • Loading branch information
everhardt committed Nov 13, 2023
1 parent 600ec04 commit 31ed6f2
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,18 @@ module.exports = ({
)
}

let allowCrashReconnect = true

/** @type {import("../../types").Consumer["connect"]} */
const connect = async () => {
allowCrashReconnect = true
await cluster.connect()
instrumentationEmitter.emit(CONNECT)
}

/** @type {import("../../types").Consumer["disconnect"]} */
const disconnect = async () => {
const disconnect = async (lAllowCrashReconnect = false) => {
allowCrashReconnect = lAllowCrashReconnect
try {
await stop()
logger.debug('consumer has stopped, disconnecting', { groupId })
Expand Down Expand Up @@ -254,7 +258,7 @@ module.exports = ({
cluster.removeBroker({ host: e.host, port: e.port })
}

await disconnect()
await disconnect(allowCrashReconnect)

const getOriginalCause = error => {
if (error.cause) {
Expand All @@ -267,6 +271,7 @@ module.exports = ({
const isErrorRetriable =
e.name === 'KafkaJSNumberOfRetriesExceeded' || getOriginalCause(e).retriable === true
const shouldRestart =
allowCrashReconnect &&
isErrorRetriable &&
(!retry ||
!retry.restartOnFailure ||
Expand Down

0 comments on commit 31ed6f2

Please sign in to comment.