Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does this support KRaft mode? #1563

Closed
leppaott opened this issue Apr 24, 2023 · 5 comments
Closed

Does this support KRaft mode? #1563

leppaott opened this issue Apr 24, 2023 · 5 comments

Comments

@leppaott
Copy link

Does this support KRaft mode without zookeeper? Just configured kafka to use controller.quorum etc and kafkajs seems to crash on after awhile:

       "type": "KafkaJSProtocolError",
       "message": "This server does not host this topic-partition",
       "stack":
           KafkaJSProtocolError: This server does not host this topic-partition
               at createErrorFromCode (/s/node_modules/kafkajs/src/protocol/error.js:581:10)
               at Object.parse (/s/node_modules/kafkajs/src/protocol/requests/metadata/v0/response.js:55:11)
               at Connection.send (/s/node_modules/kafkajs/src/network/connection.js:433:35)
               at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
               at async [private:Broker:sendRequest] (/s/node_modules/kafkajs/src/broker/index.js:904:14)
               at async Broker.metadata (/s/node_modules/kafkajs/src/broker/index.js:177:12)
               at async /s/node_modules/kafkajs/src/cluster/brokerPool.js:158:25
               at async /s/node_modules/kafkajs/src/cluster/index.js:111:14
               at async Cluster.refreshMetadata (/s/node_modules/kafkajs/src/cluster/index.js:172:5)
               at async Cluster.addMultipleTargetTopics (/s/node_modules/kafkajs/src/cluster/index.js:230:11)
       "name": "KafkaJSProtocolError"

However connecting + creating topics on startup seemed to work fine.

@unatarajan
Copy link

+1 to the question of whether KafkaJS supports KRaft mode without zookeeper.

@florian-besser
Copy link

florian-besser commented Jul 19, 2023

We've just encountered the same issue; KafkaJS refuses to connect.

Looking at the Kafka logs I can see:

2023-07-19 16:28:39 [2023-07-19 08:28:39,156] INFO Sent auto-creation request for Set(int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,157] INFO [Controller 1] CreateTopics result(s): CreatableTopic(name='int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,157] INFO [Controller 1] Created topic int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order with topic ID JoScpE7wShWlxP6HrEBZXA. (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,157] INFO [Controller 1] Created partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 with topic ID JoScpE7wShWlxP6HrEBZXA and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,159] INFO Sent auto-creation request for Set(int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,164] INFO Sent auto-creation request for Set(int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,183] INFO [Broker id=1] Transitioning 1 partition(s) to local leaders. (state.change.logger)
2023-07-19 16:28:39 [2023-07-19 08:28:39,183] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0) (kafka.server.ReplicaFetcherManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,183] INFO [Broker id=1] Creating new partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 with topic id JoScpE7wShWlxP6HrEBZXA. (state.change.logger)
2023-07-19 16:28:39 [2023-07-19 08:28:39,185] INFO [Controller 1] CreateTopics result(s): CreatableTopic(name='int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,185] INFO [Controller 1] Created topic int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer with topic ID 7pMdq3wARYmXVMlb6i-iwA. (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,185] INFO [Controller 1] Created partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 with topic ID 7pMdq3wARYmXVMlb6i-iwA and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,185] INFO [LogLoader partition=int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
2023-07-19 16:28:39 [2023-07-19 08:28:39,186] INFO Created log for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 in /var/lib/kafka/data/int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 with properties {} (kafka.log.LogManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,186] INFO [Partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 broker=1] No checkpointed highwatermark is found for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 (kafka.cluster.Partition)
2023-07-19 16:28:39 [2023-07-19 08:28:39,186] INFO [Partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 broker=1] Log loaded for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 with initial high watermark 0 (kafka.cluster.Partition)
2023-07-19 16:28:39 [2023-07-19 08:28:39,186] INFO [Broker id=1] Leader int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Order-0 with topic id Some(JoScpE7wShWlxP6HrEBZXA) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [1], adding replicas [] and removing replicas []. Previous leader epoch was -1. (state.change.logger)
2023-07-19 16:28:39 [2023-07-19 08:28:39,211] INFO [Broker id=1] Transitioning 1 partition(s) to local leaders. (state.change.logger)
2023-07-19 16:28:39 [2023-07-19 08:28:39,211] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0) (kafka.server.ReplicaFetcherManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,211] INFO [Broker id=1] Creating new partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 with topic id 7pMdq3wARYmXVMlb6i-iwA. (state.change.logger)
2023-07-19 16:28:39 [2023-07-19 08:28:39,212] INFO [Controller 1] CreateTopics result(s): CreatableTopic(name='int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,212] INFO [Controller 1] Created topic int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement with topic ID 6g_i8LVdQbazeoHFdC58kw. (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,212] INFO [Controller 1] Created partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 with topic ID 6g_i8LVdQbazeoHFdC58kw and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,212] INFO [LogLoader partition=int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
2023-07-19 16:28:39 [2023-07-19 08:28:39,213] INFO Created log for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 in /var/lib/kafka/data/int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 with properties {} (kafka.log.LogManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,213] INFO [Partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 broker=1] No checkpointed highwatermark is found for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 (kafka.cluster.Partition)
2023-07-19 16:28:39 [2023-07-19 08:28:39,213] INFO [Partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 broker=1] Log loaded for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 with initial high watermark 0 (kafka.cluster.Partition)
2023-07-19 16:28:39 [2023-07-19 08:28:39,213] INFO [Broker id=1] Leader int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Transfer-0 with topic id Some(7pMdq3wARYmXVMlb6i-iwA) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [1], adding replicas [] and removing replicas []. Previous leader epoch was -1. (state.change.logger)
2023-07-19 16:28:39 [2023-07-19 08:28:39,238] INFO [Broker id=1] Transitioning 1 partition(s) to local leaders. (state.change.logger)
2023-07-19 16:28:39 [2023-07-19 08:28:39,238] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0) (kafka.server.ReplicaFetcherManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,238] INFO [Broker id=1] Creating new partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 with topic id 6g_i8LVdQbazeoHFdC58kw. (state.change.logger)
2023-07-19 16:28:39 [2023-07-19 08:28:39,240] INFO [LogLoader partition=int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
2023-07-19 16:28:39 [2023-07-19 08:28:39,240] INFO Created log for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 in /var/lib/kafka/data/int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 with properties {} (kafka.log.LogManager)
2023-07-19 16:28:39 [2023-07-19 08:28:39,240] INFO [Partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 broker=1] No checkpointed highwatermark is found for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 (kafka.cluster.Partition)
2023-07-19 16:28:39 [2023-07-19 08:28:39,240] INFO [Partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 broker=1] Log loaded for partition int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 with initial high watermark 0 (kafka.cluster.Partition)
2023-07-19 16:28:39 [2023-07-19 08:28:39,240] INFO [Broker id=1] Leader int-test-3edd36c6-b991-4d6c-8822-62ca04bcf47c.backend.Settlement-0 with topic id Some(6g_i8LVdQbazeoHFdC58kw) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [1], adding replicas [] and removing replicas []. Previous leader epoch was -1. (state.change.logger)

Which looks like the topics are created nicely and without issue

@florian-besser
Copy link

florian-besser commented Jul 19, 2023

I have a way of reproducing the error in more detail.

I followed https://www.npmjs.com/package/kafkajs for setup, and the proposed code there works as intended
In short:

await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

Note that the code in question produces first, and then consumes the produced message. Producing a message to a topic by default auto-creates said topic, which is why this works.

To get the problem as noted in this issue the code is tweaked a bit:

const producer = kafka.producer()
    const consumer = kafka.consumer({ groupId: 'test-group' })

    // Consuming
    await consumer.connect()
    await consumer.subscribe({ topic: 'test-topic' })

  await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log({
          partition,
          offset: message.offset,
          value: message.value.toString(),
        })
      },
    })

    await producer.connect()
    await producer.send({
      topic: 'test-topic',
      messages: [
        { value: 'Hello KafkaJS user!' },
      ],
    })

The tweaked code consumes first, and then later produces. This is the behavior of e.g. NestJS where on application startup it will subscribe to all topics that are annotated and only later will it produce any message for said topics.

Now to the error itself:
KafkaJS tries to subscribe to the topic as instructed, which means calling refreshMetadata for the test-topic topic.

It seems the newer version of Kafka (I'm using 3.4.0, through the Docker image confluentinc/cp-kafka:7.4.1) no longer immediately creates topics on subscription but instead throws an error 3 which according to https://kafka.apache.org/0101/protocol.html translates to UNKNOWN_TOPIC_OR_PARTITION

In short: Kafka seems to reject the subscription request at the time, and expects the client to retry until THE topic in question is created. This may be aligned with https://cwiki.apache.org/confluence/display/KAFKA/KIP-487%3A+Client-side+Automatic+Topic+Creation+on+Producer which seems to argue that the auto-creation of topics when e.g. subscribing should not be used anymore.

Back to KafkaJS: Upon receiving this error, KafkaJs bubbles it up with e.g. stacktrace:

This server does not host this topic-partition
KafkaJSProtocolError: This server does not host this topic-partition
    at createErrorFromCode (/Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/protocol/error.js:581:10)
    at Object.parse (/Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/protocol/requests/metadata/v0/response.js:55:11)
    at Connection.send (/Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/network/connection.js:433:35)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async Broker.[private:Broker:sendRequest] (/Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/broker/index.js:904:14)
    at async Broker.metadata (/Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/broker/index.js:177:12)
    at async /Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/cluster/brokerPool.js:158:25
    at async /Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/cluster/index.js:111:14
    at async Cluster.refreshMetadata (/Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/cluster/index.js:172:5)
    at async Cluster.addMultipleTargetTopics (/Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/cluster/index.js:230:11)
    at async Object.subscribe (/Users/florianbesser/IdeaProjects/gsg-api/node_modules/kafkajs/src/consumer/index.js:185:5)

What especially stings about this is that Kafka actually creates the topic in the background after throwing the error, so if you re-run your app it'll find the topic on the second try.

Basically: KafkaJS should retry on this error, rather than throw it and it'd most likely succeed after a few tries

@florian-besser
Copy link

And I think this would be the fix:
#1558

@asyne
Copy link

asyne commented Jan 6, 2024

I can confirm it works with Kafka 3.4 and KRaft.

If you get This server does not host this topic-partition constantly while running a multi-node Kafka setup, pay attention to the EXTERNAL config inside advertised.listeners / KAFKA_ADVERTISED_LISTENERS as you might have a running node that's misconfigured to advertise other broker's address/port and effectively pointing KafkaJS to the wrong place.

To verify this:

  1. Enable DEBUG level logging on KafkaJS and produce a message.
  2. Check the KafkaJS metadata response in the stdout.
  3. Ensure that data.brokers lists unique and correct host/port pairs for each broker.

@leppaott leppaott closed this as completed Jan 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants