Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaJS claims a broker does not host a topic-partition, even though it does #815

Open
mgirard772 opened this issue Jul 15, 2020 · 42 comments · May be fixed by #1558
Open

KafkaJS claims a broker does not host a topic-partition, even though it does #815

mgirard772 opened this issue Jul 15, 2020 · 42 comments · May be fixed by #1558

Comments

@mgirard772
Copy link

mgirard772 commented Jul 15, 2020

Describe the bug
I receive lots of error messages like the following:

{
    "level": "ERROR",
    "timestamp": "2020-07-15T16:48:34.740Z",
    "logger": "kafkajs",
    "message": "[Connection] Response Metadata(key: 3, version: 5)",
    "broker": "aws_msk_host_1:9092",
    "clientId": "ti-qa",
    "error": "This server does not host this topic-partition",
    "correlationId": 16,
    "size": 2120
}

However, a check of the topic metadata (using this topic as an example), speaks to the contrary:

  "learningpaths" with 6 partition(s)
partition 0 leader: 2, replicas: [2, 3, 1], isrs: [3, 1, 2] errstr: 
partition 1 leader: 1, replicas: [1, 2, 3], isrs: [3, 1, 2] errstr: 
partition 2 leader: 3, replicas: [3, 1, 2], isrs: [3, 1, 2] errstr: 
partition 3 leader: 2, replicas: [2, 1, 3], isrs: [3, 1, 2] errstr: 
partition 4 leader: 1, replicas: [1, 3, 2], isrs: [3, 1, 2] errstr: 
partition 5 leader: 3, replicas: [3, 2, 1], isrs: [3, 1, 2] errstr: 

This happens across multiple topics.

Code

const awslog = require('lib/awslog');
const config = require('config');

const { Kafka } = require('kafkajs');

const BROKERS =
  config.kafkaBrokers && config.kafkaBrokers.trim() !== '' ? config.kafkaBrokers.split(',') : null;

const USE_KAFKA = config.env !== 'test' && BROKERS !== null;

const kafka = USE_KAFKA
  ? new Kafka({
      clientId: `ti-${config.env}`,
      brokers: BROKERS,
      retry: {
        initialRetryTime: 1000,
        retries: 9
      }
    })
  : null;

const producer = USE_KAFKA
  ? kafka.producer({
      metadataMaxAge: 60000
    })
  : null;

function push(name, records) {
  if (USE_KAFKA && records && records.length) {
    Promise.all(
      records.map(record =>
        // `Promise.resolve` here prevents invalid messages from throwing,
        // just in case others in the same batch are valid.
        Promise.resolve(keysToLowerCase(record)).then(
          value => ({ value, key: record.id || record.requestId }),
          err => {
            config.bugsnag.notify(new Error('Failed to prepare record for Kafka'), {
              message: err.message,
              paths: err.paths,
              record,
              topic: name.toLowerCase(),
              brokers: BROKERS
            });

            return null;
          }
        )
      )
    )
      .then(encodedMessages => {
        const validMessages = encodedMessages.filter(message => message);
        if (validMessages.length) {
          return producer.send({
            topic: name.toLowerCase(),
            messages: validMessages,
            acks: 1
          });
        }
      })
      .catch(e => {
        awslog.error(null, new Error('Failed to send record to Kafka'), {
          message: e.message,
          topic: name.toLowerCase(),
          messages: records,
          brokers: BROKERS,
          paths: e.paths
        });
      });
  }
}

function flush() {
  if (USE_KAFKA) {
    return producer.disconnect();
  } else {
    return Promise.resolve(true);
  }
}

module.exports = {
  push,
  flush
};

function keysToLowerCase(obj) {
  const newObj = {};
  const keys = Object.keys(obj);
  for (const key of keys) {
    newObj[key.toLowerCase()] = obj[key];
  }

  return JSON.stringify(newObj);
}

Expected behavior
Messages get sent and acknowledged by at least the topic-partition leader, unless an error occurs.

Observed behavior
KafkaJS producer throws above error claiming This server does not host this topic-partition, when it obviously does. It's possible there's another issue but the logic throws this error instead.

Environment:

  • OS: Ubuntu 14.04.6 LTS
  • KafkaJS version 1.12.0
  • Kafka version 2.2.1 (Amazon MSK)
  • NodeJS version 10.20.1

Additional context
Any pointers on what might be wrong with my code, or wrong with the library would be helpful.

@ankon
Copy link
Contributor

ankon commented Jul 15, 2020

Kafka version 2.2.1 (Amazon MSK)

vs

"broker": "localhost:9092",

This doesn't add up I think, are you sure you're looking at the right broker?

@mgirard772
Copy link
Author

mgirard772 commented Jul 15, 2020

Kafka version 2.2.1 (Amazon MSK)

vs

"broker": "localhost:9092",

This doesn't add up I think, are you sure you're looking at the right broker?

I omitted the actual host for security

@Nevon
Copy link
Collaborator

Nevon commented Jul 16, 2020

Are you consistently seeing this error or intermittently? If it's constant, that would point to you not having configured ACLs correctly, so that your producer doesn't have the right to access the topic, and as such wouldn't be able to produce to it.

Ubuntu 14.04.6 LTS

As an aside, you should really upgrade that. 14.04 has been EOL for over a year now. 😅

@mgirard772
Copy link
Author

mgirard772 commented Jul 16, 2020

Everything seems to work fine up to a point, then an onslaught of these errors start coming (and don't stop) even though the metadata says everything is fine. The kicker is that the errors are for topics we've confirmed exist and are writable by the producer. Something changes somewhere down the line.

Things I've tried so far, but to no avail:

  1. Ensure no whitespace makes it into the topic name
  2. Upgrade to the latest Kafka version available on MSK (2.4.1)
  3. Reducing metadata age
  4. Only requiring acks from the leader

Things I'm still looking into:

  1. Potential issues with the provided key somehow resulting in an incorrect partition number. The key is a UUID, so it shouldn't be an issue, but who knows.
  2. Using the JavaCompatiblePartitioner
  3. Ditching MSK and using another Kafka implementation

Thanks for bringing up the Ubuntu version, I'll have to mention it to our SRE team.

Any thoughts on what I might be missing? I'd really prefer to stick with our current implementation, but I've been trying to debug this for 2 weeks now and starting to get pretty frustrated.

@Nevon
Copy link
Collaborator

Nevon commented Jul 16, 2020

The key and the partitioner should be innocent in this. Regardless of which partition a message is assigned to, the mismatch is between the broker address and the resolved IP, or the metadata response and reality.

Under the hood, KafkaJS uses net/tls.connect in order to connect to the brokers. DNS resolution is done via dns.lookup by default (this is all happening in Node, it's not something that we explicitly configure), which in turn just delegates to the OS. If your OS was caching DNS entries, that could mean that we connect to the wrong broker if the IPs change. This would explain why this suddenly happens at some point and doesn't stop. What's unlikely about it though is that clearly a broker is responding, just not the right one. You could verify this theory by implementing a custom socket factory (search in the docs for this. There's an example in there) which uses dns.resolve instead of dns.lookup - this is an option you can pass into net/tls.connect. dns.resolve does not use the same mechanism as lookup, and should bypass whatever caching the OS may have in place. At the very least you could log the hostname and the resolved IP to see if this is changing over time.

If that's not the issue, then it would have to be something with the metadata response being incorrect. We have seen some odd stuff from MSK in the past, but I can't remember if it was related.

Another thing you could do, just to rule it out, is to try using the beta version of KafkaJS instead of 1.12.0. It's possible there was some related bug fixed in there, but I would have to go through the commits to be sure.

@mgirard772
Copy link
Author

mgirard772 commented Jul 16, 2020

Does the broker list have to be in a particular order? For example, does broker 1 have the be the first in the list, or are host names/IP's used to match appropriately based on what's in the metadata? The metadata responses I've seen simply give numbers.

I've noticed that the AWS Boto3 API in Python gives the brokers hostnames out of order, 3,2,1 or 3,1,2, while the AWS console gives them as 1,2,3.

@Nevon
Copy link
Collaborator

Nevon commented Jul 16, 2020

No, order is irrelevant. It's just a mapping between node ids and hostnames.

@ankon
Copy link
Contributor

ankon commented Jul 16, 2020

Ditching MSK and using another Kafka implementation

We have seen some odd stuff from MSK in the past, but I can't remember if it was related.

Mostly as a data point: We're running on MSK (right now Kafka 2.4.1), and have been since the preview. Sometimes there are glitches/oddnesses, but in general MSK does work and we haven't seen this particular problem.

@oemergenc
Copy link

Maybe this is also related to this issue: #803

I experienced the same problems and errors as described by the author here.

@mgirard772
Copy link
Author

Interesting.

We ended up switching over to node-rdkafka and stopped experiencing the issue. There must be some issue in the way kafkajs communicates with the brokers that's causing it to throw these errors unnecessarily.

@titobundy
Copy link

titobundy commented Jun 18, 2021

I've a similar issue using NestJS with Kafka Confluent Cloud, when the topic doesn't exists it fails even though I have allowAutoTopicCreation set to true in Consumer and Producer:

[NestWinston] Error 2021-6-18 16:06:17 [ClientKafka] ERROR [Connection] Response Metadata(key: 3, version: 6) {"timestamp":"2021-06-18T20:06:17.495Z","logger":"kafkajs","broker":"pkc-4nym6.us-east-1.aws.confluent.cloud:9092","clientId":"nestjs-consumer-client","error":"This server does not host this topic-partition","correlationId":24,"size":2569} - {"stack":[""]}

Client Config

{
	transport: Transport.KAFKA,
	options: {
	  client: {
	    brokers,
	    sasl,
	    ssl,
	  },
	  consumer: {
	    groupId: 'consumer',
	    allowAutoTopicCreation: true,
	  }
	},
 }

@momirov
Copy link

momirov commented Jun 24, 2021

@titobundy check if auto.create.topics.enable config is set to true, I've got that issue on AWS MSK

@titobundy
Copy link

I've use Kafka Confluent Cloud free version, it seems that this version Confluent Control Center doesn't allow access to all options to configure the broker, but in the docs it indicates that by default is set to true.

image

@vquanglqd
Copy link

vquanglqd commented Jul 24, 2021

When you use createTopics method , you should try to add { topic: 'topic_name', numPartitions : 1, replicationFactor : 3 }

@joerg-walter-de
Copy link

I get the error message when I try to run

    const [createError, createResult] = await to( admin.createTopics({
        validateOnly: true,
        waitForLeaders: true,
        timeout: 5000,
        topics: [
            {
                topic: '123',
            }
        ],
    }) );

@helio-frota
Copy link
Contributor

@joerg-walter-de thanks the waitForLeaders: true, solved the error type: 'NOT_CONTROLLER' in some cases for me 👍

@helio-frota
Copy link
Contributor

helio-frota commented Nov 8, 2021

That is so strange, I'm trying to run the kafkaJS integration tests against a different kafka (not the kafka configured by the kafkaJS integration tests), and when I use waitForLeaders: true I can get the src/admin/__tests__/createTopics.spec.js tests working but get NOT_CONTROLLER error on src/admin/__tests__/deleteTopics.spec.js tests. When I change waitForLeaders: false I got the same error on the createTopics.spec.js tests.

@alexmreis
Copy link

Running 2.0.1 here and still seeing this bug running against Confluent Cloud. It doesn't matter how many partitions the topic has, tried with 1, 3 and 6. Is there a workaround for this yet?

With the following code:

const consumeMessage = async ({ topic, partition, message, heartbeat }) => {
  console.log('Received message', topic, partition, message, heartbeat)
  return processEntry(message.key.toString(), JSON.parse(message.value.toString()))
}

const startConsumer = async () => {
  const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID })
  await consumer.connect()
  await consumer.subscribe({ topics: [process.env.KAFKA_TOPIC] })
  return await consumer.run({ eachMessage: consumeMessage })
}

I never see my console.log. I only see the following logged, every time a message is published on that topic:

Got request 5e39bdb0-c9ec-11ec-8a73-cd9f4c2cfe13 { ts: 1653574734217 }
{"level":"ERROR","timestamp":"2022-06-01T07:54:50.826Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":3,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:51.109Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":4,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:51.582Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":5,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:52.364Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":6,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:54.193Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":7,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:54:57.138Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"pkc-4r297.europe-west1.gcp.confluent.cloud:9092","clientId":"gtm-adapter","error":"This server does not host this topic-partition","correlationId":8,"size":2345}
{"level":"ERROR","timestamp":"2022-06-01T07:55:00.023Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNonRetriableError: This server does not host this topic-partition","groupId":"alex-consumer","stack":"KafkaJSNonRetriableError: This server does not host this topic-partition\n    at /Users/alex/Projects/revend/gtmadapter-node/node_modules/kafkajs/src/retry/index.js:55:18\n    at runMicrotasks (<anonymous>)\n    at processTicksAndRejections (node:internal/process/task_queues:96:5)"}
{"level":"INFO","timestamp":"2022-06-01T07:55:00.091Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"alex-consumer"}

Is there any settings on the topic or broker that I need to change in conflunt cloud in order to make it compatible with this library?

Thanks in advance!

@adamatti
Copy link

+1 facing the same issue

@vadiml
Copy link

vadiml commented Jul 16, 2022

I had the same issue.
There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.

Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)

Easy way to reproduce this bug:

  1. create topic1 and topic2
  2. get offsets for topic1 and topic2 with kafkajs
  3. manually delete topic1 without use of kafkajs
  4. try to get offset for topic2 with kafkajs. It will fail with "This server does not host this topic-partition" exception even if topic2 exists and we only deleted topic1.

So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.

@natenrb9
Copy link

"error":"This server does not host this topic-partition"

I am experiencing this issue with NestJS with the subscribeToResponseOf() function even though I know the topic exists... I have search everywhere for a resolution, and tried many code changes.

Any guidance from anyone on how to fix this issue? Is it simply a Kafka BUG??

@adamatti
Copy link

Update from my side: my issue was that we had the same broker for prod and dev, and the app instances were using the same group id (e.g. app dev instance connecting to dev_ topics, app prod instance connecting to prod_ topics).
It all started to working fine when we added the env to group id.
Hope it helps someone.

@shivakumara-rapido
Copy link

We have also faced similar issue: Steps to reproduce:

  1. Try to publish message to a topic that is NOT EXISTING
  2. Try to publish message to a topic that is EXISTING
    Both the publishes will fail below error:
    "This server does not host this topic-partition"" "originalError\":{\"name\":\"KafkaJSProtocolError\",\"retriable\":true,\"type\":\"UNKNOWN_TOPIC_OR_PARTITION\",\"code\":3}

Library should not fail to publish data to an EXISTING topic even if there is an attempt to publish data to a NON EXISTING topic in some other part of the code.

@trevorr
Copy link

trevorr commented Aug 11, 2022

I'm seeing the same issue as @vadiml. We use Kafka topics as webhook queues, along with a permanent topic for create/update/delete of the webhooks. When a webhook is deleted, if we delete the webhook topic, there's a race between the queue readers unsubscribing due to the webhook delete message and hitting this issue. To be clear, we do use kafkajs to delete the topic, but it's happening in another process. It would also be a big help if the error had more context, like which topic-partition is not found.

@prescindivel
Copy link

+1 I'm having the same experience when trying to delete topics

@olivierlacan
Copy link

Oddly I'm having the same issue with confluentinc/cp-kafka:7.0.4 but if I try (and fail) to delete the topic before disconnecting, when I reconnect and list topics the topic... was somehow created (and accepts messages).

There seems to be some sort of latency issue or the promise returns before the actual creation process is complete because any subsequent logic that expects the topics to be there fails, even with long (multi-second) sleeps beforehand.

If I let the process end the connection and restart however, the topic is there successfully created the next time I run admin.listTopics().

@mtkopone
Copy link

mtkopone commented Oct 31, 2022

I'm having the same issue. I'm running MSK Serverless kafka in AWS. When destroying our AWS Cloudformation stack, we have a lambda that get called to delete all known topics. These deletion requests are run in parallel. Very usually one out of 5 will fail with this exception. Rerunning the deletion makes the topic get deleted properly. I suspect for at least my case, the problem is related to the parallel deletion.

@tekpriest
Copy link

having the same issue, running NestJS with confluent Kafka, it works if I run it in a sample file but has issues when running on NestJS.

@Jurajzovinec
Copy link

@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.

@tekpriest
Copy link

tekpriest commented Nov 22, 2022

@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.

Yeah running locally doesn't cause errors, but I have been having issues running kafka on docker, anyone got a reference?

@kfirzi
Copy link

kfirzi commented Dec 7, 2022

I had the same issue and I hope this will help to fix it or maybe get more clear with the error message.
When working on a cluster environment with a proxy gate: when try to subscribe topics that are not created in the cluster this error is occurred (This server does not host this topic-partition).
*Side Note -> [auto.create.topics.enable is set to true still not solving this]

2 options solved this error:

  1. Define the brokers property when creating new Kafka instance directly with one of the brokers IP (one only is enough)
    And than he has no problem with subscribing none existing topics.

  2. Creating Kafka instance with proxy gate and not subscribe to none existing topics stop this error.

Hope this will help someone.

@yunnysunny
Copy link

have the same issue:

    KafkaJSNonRetriableError
      Caused by: KafkaJSProtocolError: This server does not host this topic-partition

      at createErrorFromCode (node_modules/kafkajs/src/protocol/error.js:581:10)
      at Object.parse (node_modules/kafkajs/src/protocol/requests/metadata/v0/response.js:56:11)
      at Connection.send (node_modules/kafkajs/src/network/connection.js:333:35)
      at Broker.[private:Broker:sendRequest] (node_modules/kafkajs/src/broker/index.js:947:14)
      at Broker.metadata (node_modules/kafkajs/src/broker/index.js:225:12)  
      at node_modules/kafkajs/src/cluster/brokerPool.js:161:25
      at Cluster.refreshMetadata (node_modules/kafkajs/src/cluster/index.js:135:5)
      at Cluster.addMultipleTargetTopics (node_modules/kafkajs/src/cluster/index.js:193:11)
      at node_modules/kafkajs/src/admin/index.js:277:9

kafkajs : 1.16.0
kafka: 3.3.1

@eoc-ross
Copy link

I had the same issue. There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.

Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)

Easy way to reproduce this bug:

1. create topic1 and topic2

2. get offsets for topic1 and topic2 with kafkajs

3. manually delete topic1 without use of kafkajs

4. try to get offset for topic2 with kafkajs. It will fail with "This server does not host this topic-partition" exception even if topic2 exists and we only deleted topic1.

So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.

Same issue here. Topics created are removed by a different kafkajs instance. After that, the only workaround I came up with is, to restart the creator service.

Installation:

  • kafkajs 2.2.2
  • kafka 3.3.1
  • zookeeper 3.6.3

@piotrgregor
Copy link

If you create a new topic with and start a consumer without waiting for producer to finish then

'KafkaJSProtocolError: This server does not host this topic-partition'

will be produced and consumer won't subscribe (say, on nodejs instance n1).
Topic is created, so running the same again but from a different nodejs instance (say, n2) will work (producer in n2 will queue a message, consumer in n2 will read it).

To fix it, make sure to await for the producer. Change:

producer.send({
    topic: domainId, messages: [
        {value: JSON.stringify(msgBody)},
    ],
});

await consumer.connect();
await consumer.subscribe({topic: topic, fromBeginning: true});
await consumer.run({
    eachMessage: consumerCb
});

to:

await producer.send({
    topic: domainId, messages: [
        {value: JSON.stringify(msgBody)},
    ],
});

await consumer.connect();
await consumer.subscribe({topic: topic, fromBeginning: true});
await consumer.run({
    eachMessage: consumerCb
});

@rohitkrishna094
Copy link

Was anyone able to figure out a temporary fix until the above PR is merged?

@vinimoret
Copy link

The PR above didn't fix the issue for me. I added it manually into the lib and still face the issue This server is not the leader for that topic-partition

@noygafni
Copy link

noygafni commented Aug 13, 2023

I sort of solved this problem for my case:
My scenario is this: I use nestjs with kafka to consume messages from topics that does not exists and I see that the service fails to run because of KafkaJSProtocolError: This server does not host this topic-partition but the service is able to create the topics anyway. This happens hen I run kafka using the confluent platform images, using the version with kraft without zookeeper:

  broker1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: broker1
    container_name: broker1
    depends_on:
      - controller
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # KAFKA_METRIC_REPORTERS: 'io.confluent.metrics.reporter.ConfluentMetricsReporter'
      # KAFKA_CONFLUENT_METRIC_REPORTER_BOOTSTRAP_SERVER: 'broker1:9092,controller:9092'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  controller:
    image: confluentinc/cp-kafka:7.4.0
    hostname: controller
    container_name: controller
    ports:
      - "9093:9093"
      - "9102:9102"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9102
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
      # KAFKA_METRIC_REPORTERS: 'io.confluent.metrics.reporter.ConfluentMetricsReporter'
      # KAFKA_CONFLUENT_METRIC_REPORTER_BOOTSTRAP_SERVER: 'broker1:9092,controller:9092'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

My nest config:

{
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'nest-consumer-group',
        },
}

Now the solution was to use kafka version with zookeeper:

 zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

This is not a complete solution, but it is some progress at least

@youth7
Copy link

youth7 commented Sep 14, 2023

For me , set waitForLeaders = false solve the problem;
because the error message is :
key: 3, version: 5
according to Kafka document , key 3 means it is Metadata API (Key: 3). So that is properly Kafkajs send some update operation(such as create topic) to Kafka node A then get the metadata from Kfaka node B. But data was not sync to B at that time ,so error raise.
that maybe someting related to Kafka sync mechanism which can be config. It seems that this just happen in Kafka which use Kraft. I met this problem in Kafka 3.5 with kraft, but it is OK in Kafka 2.8

@mithiridi
Copy link

Any fix for this ? We are facing the same issue. Please let me know if someone fixed this @youth7

@youth7
Copy link

youth7 commented Feb 1, 2024

@mithiridi
No solution but just a ugly workaround. Currently I have to retry in our code while This server does not host this topic-partition error happen. That means I need to add lots try catch code to handle that error. I expect Kafka is Linearizability, but it seems not. It is ridiculous that I use createTopic() to create topic A, then I use listTopic to check whether A is created, event listTopic() said that A is existed , it can still be error when I use subscribe() to get message from A later.

@Pyakz
Copy link

Pyakz commented Mar 21, 2024

Screenshot 2024-03-21 at 10 56 23 PM

any update, i am using kafka upstash and nestjs

@Pyakz
Copy link

Pyakz commented Mar 21, 2024

having the same issue, running NestJS with confluent Kafka, it works if I run it in a sample file but has issues when running on NestJS.

were you able to solve it?
i have the same problem it works on sample but not on nestjs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.