Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer group specific offset seeking for AbstractConsumerSeekAware #2302

Open
akemalsaglam opened this issue Jun 9, 2022 · 25 comments
Open

Comments

@akemalsaglam
Copy link

akemalsaglam commented Jun 9, 2022

Expected Behavior

We want to be able to seek offset for specific consumer group by using AbstractConsumerSeekAware.

Current Behavior

regarding to below implementation it is clear that we can seek offset for all assigned partitions in a topic regardless of different consumer group ids.

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        super.onPartitionsAssigned(assignments, callback);
    }

    @Override
    public void seekToTimestamp(long time) {
        getSeekCallbacks().forEach((tp, callback) -> {
            callback.seekToTimestamp(tp.topic(), tp.partition(), time);
        });
    }

Context
For our use case there might be more than one group instance which is assigned same partition in a topic. Below example might be useful to describe our case:

  1. we have one topic let's name it: product.feed.fullexport, it has 12 partitions.
  2. 10 different micro-services with different group ids are listening same topic and each has 12 concurrent consumers.
  3. when we want to seek offset by using above ConsumerSeekCallback implementation for one of the micro-service, it affects all assigned partitions and listening consumer instances regardless to expected group id.

Is there any way to seek offset in a partition but only for specific group id?

@artembilan
Copy link
Member

artembilan commented Jun 9, 2022

I don't see a behavior you are describing:

@SpringBootApplication
public class KafkaGh2302Application {

	public static void main(String[] args) {
		SpringApplication.run(KafkaGh2302Application.class, args);
	}


	@Bean
	public NewTopic topic() {
		return new NewTopic("seekExample", 3, (short) 1);
	}

	@Component
	public static class Listener extends AbstractConsumerSeekAware {

		@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
		public void listen(String payload) {
			System.out.println("Listener received: " + payload);
		}

		public void seekToStart() {
			getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
		}

	}

	@Component
	public static class Listener2 extends AbstractConsumerSeekAware {

		@KafkaListener(id = "seekExample2", topics = "seekExample", concurrency = "3")
		public void listen(String payload) {
			System.out.println("Listener2 received: " + payload);
		}

		public void seekToStart() {
			getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
		}

	}

}

As you see I have two @KafkaListener classes with different ids which is, essentially, pointing to different consumer groups.

In the unit test I do:

@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class KafkaGh2302ApplicationTests {

	@Autowired
	KafkaGh2302Application.Listener listener;

	@Autowired
	KafkaTemplate<String, String> template;

	@Test
	void contextLoads() throws InterruptedException {

		for (int i = 0; i < 10; i++) {
			this.template.send("seekExample", i % 3, "some_key", "test#" + i);
		}

		Thread.sleep(1000);

		this.listener.seekToStart();

		Thread.sleep(10000);
	}

}

So, after sending some data, I just call seekToStart() on one of the listener services.
The output is like this:

Listener2 received: test#2
Listener received: test#1
Listener2 received: test#0
Listener2 received: test#5
Listener received: test#2
Listener received: test#4
Listener received: test#5
Listener received: test#7
Listener2 received: test#1
Listener2 received: test#8
Listener received: test#0
Listener2 received: test#3
Listener2 received: test#4
Listener received: test#8
Listener2 received: test#6
Listener received: test#3
Listener2 received: test#9
Listener2 received: test#7
Listener received: test#6
Listener received: test#9
Listener received: test#0
Listener received: test#3
Listener received: test#6
Listener received: test#9
Listener received: test#2
Listener received: test#5
Listener received: test#1
Listener received: test#4
Listener received: test#7
Listener received: test#8

This confirms that seeking really happens only in one consumer group and it does not effect other groups on the same topic.

@garyrussell
Copy link
Contributor

Correct; seeks only affect the current group.

@bky373
Copy link
Contributor

bky373 commented May 6, 2024

@artembilan @garyrussell
Hello, I've found that the current behavior can occur when using multiple listeners with different group IDs in the same class.

Current Behavior - described above

it is clear that we can seek offset for all assigned partitions in a topic regardless of different consumer group ids.

Test

I have modified the code provided above slightly.

  • KafkaGh2302Application
...
  // Existing code
  @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
  public void listen(String payload) {
      ...
  }

  // Added code in the same class Listener
  @KafkaListener(id = "seekExample3", topics = "seekExample", concurrency = "3")
  public void listen3(String payload) {
      System.out.println("Listener3 received: " + payload);
  }
...
  • KafkaGh2302ApplicationTests
...

  // Existing code
  for (int i = 0; i < 30; i++) { // Change: 10 -> 30 for both two listeners(seekExample and seekExample3) to be able to seek offsets
    this.template.send("seekExample", i % 3, "some_key", "test#" + i);
  }

...
  • Result
    • Since callbacks are registered per thread in a single listener class that implements AbstractConsumerSeekAware, seeking offsets is performed regardless of the consumer group ID. (In this case, Listener(id: seekExample) and Listener 3(id: seekExample3))
...
Listener received: test#0
Listener3 received: test#3
Listener2 received: test#16
Listener2 received: test#19
Listener2 received: test#22
Listener received: test#29
Listener2 received: test#25
Listener2 received: test#28
Listener received: test#28
Listener received: test#21
Listener received: test#24
Listener received: test#27
Listener2 received: test#23
Listener2 received: test#26
Listener3 received: test#16
========
Listener received: test#0
Listener received: test#3
Listener received: test#24
Listener received: test#27
Listener received: test#2
Listener3 received: test#1
Listener received: test#5
Listener3 received: test#4
Listener received: test#8
Listener3 received: test#7
Listener received: test#11
Listener3 received: test#10
...

IMHO, If we want to seek offset for a specific consumer group only, we can use the following methods:

  • Use separate classes for consumers with different group IDs.
  • Add filtering conditions based on the consumer group ID included in the thread name to ensure callbacks are registered or executed selectively (in registerSeekCallback() or onPartitionsAssigned()).
    • However, I don't want to recommend this as thread names can be changed arbitrarily and may not always include the consumer group ID.

If you have any other solutions for seeking offsets based on a specific consumer group ID, please let me know. I would appreciate hearing them. Thank you!

@garyrussell
Copy link
Contributor

I am no longer involved with the project, but what you are suggesting is incorrect.

Each listener method is invoked by a different listener container and, therefore, on different threads.

So, if there is a problem, it is not related to any thread-based state.

@sobychacko
Copy link
Contributor

sobychacko commented May 6, 2024

@bky373 Thanks for reporting. As @garyrussell pointed out, this looks like it is non-thread-state related, but it looks like some bug (Thanks, Gary, for chiming in!! :) ). We will look at this today. Do you have any sample application for us to reproduce? (that would be easier). Otherwise, we can look into creating one since you provided some snippets.

@bky373
Copy link
Contributor

bky373 commented May 6, 2024

@garyrussell

I am no longer involved with the project

Oh I didn't know that! Thanks for your comment!! 🙇

Each listener method is invoked by a different listener container and, therefore, on different threads.

You are totally right. Threads are different. I didn't mean to say that it's an issue with thread state.
If my suggestion seemed like it was due to a thread state issue, I'm afraid I wrote it wrong.

I just wanted to report that in a class that have listeners with different consumer group IDs and implements AbstractConsumerSeekAware, it's difficult to find the offset by specifying the consumer group ID. (This might not be the problem).

@bky373
Copy link
Contributor

bky373 commented May 6, 2024

@bky373 Thanks for reporting. As @garyrussell pointed out, this looks like it is non-thread-state related, but it looks like some bug (Thanks, Gary, for chiming in!! :) ). We will look at this today. Do you have any sample application for us to reproduce? (that would be easier). Otherwise, we can look into creating one since you provided some snippets.

@sobychacko

Sure! the code is so simple so I'll leave it in the comments here.
Thank you for your time and reply!!

@SpringBootApplication
public class KafkaGh2302Application {

    public static void main(String[] args) {
        SpringApplication.run(KafkaGh2302Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

    @Component
    public static class Listener extends AbstractConsumerSeekAware {

        @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
        public void listen(String payload) {
            System.out.println("Listener received: " + payload);
        }

        @KafkaListener(id = "seekExample3", topics = "seekExample", concurrency = "3")
        public void listen3(String payload) {
            System.out.println("Listener3 received: " + payload);
        }

        public void seekToStart() {
            getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }
    }

    @Component
    public static class Listener2 extends AbstractConsumerSeekAware {

        @KafkaListener(id = "seekExample2", topics = "seekExample", concurrency = "3")
        public void listen(String payload) {
            System.out.println("Listener2 received: " + payload);
        }

        public void seekToStart() {
            getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }
    }
}
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class KafkaGh2302ApplicationTest {

    @Autowired
    KafkaGh2302Application.Listener listener;

    @Autowired
    KafkaTemplate<String, String> template;

    @Test
    void contextLoads() throws InterruptedException {

        for (int i = 0; i < 50; i++) {
            this.template.send("seekExample", i % 3, "some_key", "test#" + i);
        }

        Thread.sleep(1000);
        System.out.println("====================================");
        this.listener.seekToStart();
        Thread.sleep(10000);
    }
}

@sobychacko
Copy link
Contributor

I think the best course of action is to have a single consumer (KafkaListener) per class that extends AbstractConsumerSeekAware. If there are multiple listeners, the callbacks are applied against all the listeners in that particular class. We will see if we can come up with a solution, such as making an API level change to better accommodate group id's.

@garyrussell
Copy link
Contributor

@sobychacko FYI, the thread's associated group is available in KafkaUtils, if that helps.

/**
* Get the group id for the consumer bound to this thread.
* @return the group id.
* @since 2.3
*/
public static String getConsumerGroupId() {
return KafkaUtils.GROUP_IDS.get(Thread.currentThread());
}

@bky373
Copy link
Contributor

bky373 commented May 7, 2024

@sobychacko @garyrussell
Thanks for your comments!

As you mentioned, the method listeners within the class will apply the callback identically regardless of consumer group ID. So it seems necessary to execute callbacks differently for each consumer group since the intended behavior may vary between consumer groups. (Of course, we can work around this for now by keeping our classes separate.)

I'll also keep looking for ways to do it.

I'm so grateful for your help!

@sobychacko
Copy link
Contributor

We will try to make some changes to accommodate this before the GA.

@sobychacko
Copy link
Contributor

@bky373 After looking at this further, we realized this is a bit more involved from the framework perspective since we need to introduce some breaking changes at the API level. Therefore, we recommend your workaround in this and prior versions of Spring Kafka (since we are so close to the 3.2.0 GA release), i.e., stick with a single class / per listener for this use case. We will table the proper fixes for this issue for now and consider this for the next version of the framework, 3.3.0.

@bky373
Copy link
Contributor

bky373 commented May 9, 2024

@sobychacko

Thank you for taking the time to research and respond!

I'm curious to know what you think of the solution.

  • Are you considering adding a new parameter (e.g., consumerGroupId) to the existing methods of ConsumerSeekAware?
  • Or are you planning to keep ConsumerSeekAware and have another interface that extends it (such as ConsumerGroupSeekAware)?
  • Or is there another way you have in mind?

In either case, I'm hesitant to say, as it would be a big change,
I'd like to hear your thoughts and see if there's anything I can contribute.

@sobychacko
Copy link
Contributor

@bky373 We had an internal discussion on this with @artembilan yesterday. We need to make some changes similar to your line of thinking. Some API methods in ConsumerSeekAware need to be modified to take some new information about the consumer group. We believe that relying on the ConsumerSeekCallback can get the group ID information; we need to look further. We can make these changes when we switch the main branch to 3.3.0-SNAPSHOT after the GA release. I have marked this issue for 3.3.0-M1 milestone. If you want to work on a PR for this, you are certainly welcome to do so.

@bky373
Copy link
Contributor

bky373 commented May 10, 2024

@sobychacko

Yes, thank you!

Personally, I'd like to take this on and work on it a bit more.
However, before I start working on the code, it would be great to have a discussion and get some feedback on the direction of the work.
If that's okay with you, I'll do a little more research and get back to you after I've organized things!

Off the top of my head, as you said, if we can get the consumerGroupId from the ConsumerSeekCallback properly using KafkaUtils.getConsumerGroupId(), we can define the behavior per groupId when creating the callback. But that needs to be tested.

@sobychacko
Copy link
Contributor

@bky373 Feel free to work on it. Before you start coding, if you want us to confirm the design, please continue discussing it here, and we can review it. Thanks!

bky373 added a commit to bky373/spring-kafka that referenced this issue Jun 1, 2024
@bky373
Copy link
Contributor

bky373 commented Jun 1, 2024

@sobychacko

Hi, I apologize for reaching out after such a long time.
I've revisited the issue and thought about potential solutions.

Before diving into the details, let me briefly summarize the problem since it has been a while.

  • Problem: When using AbstractConsumerSeekAware, different listeners within the same class use the same callback. And when seek is performed in one listener, it is executed in all listeners, even if it is not desired.

Here are the approaches I've considered:

1. Passing consumerGroupId as a parameter from the outside.

  • 1-1. Adding groupId as a parameter
    • Current: ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback)
    • Proposed: ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback, String groupId)
    • Method description
      • Register the callback only if the passed groupId matches the consumer groupId of the current thread.
    • Issues
      • The process of passing the groupId externally is complex and would require changes in many places that use the registerSeekCallback() method.
      • Currently, it's just groupId, but there might be future requests to register callbacks based on other types of values.
      • The current ConsumerSeekAware API is structurally sound and already provides sufficient functionality. Modifying the API might be unnecessary.
  • 1-2. Changing the parameter
    • Current: ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback)
    • Proposed: ConsumerSeekAware.registerSeekCallback(Map<String, ConsumerSeekCallback> callbackForGroupId)
    • Method description: Allow different callbacks to be registered for different groupIds.
    • Issues: This has the same problems as 1-1.

2. Setting seek allow flag per @KafkaListener (e.g. Boolean consumerSeekAllowed)

  • Current: With ConsumerSeekAware or AbstractConsumerSeekAware, listeners implement the seek functions and can use it immediately.
  • Proposed: With ConsumerSeekAware or AbstractConsumerSeekAware, listeners implement the seek functions, but its usage is determined by the consumerSeekAllowed value.
    • Explanation: ConsumerSeekAware or AbstractConsumerSeekAware is at the class level, and the listeners belong to that class, so it seems natural for the listeners to use the same callback. (If different callback is needed, the class should be split.) However, it's possible to create different types of listeners within the same class in code anyway, and for various reasons, users might not want to split the class. To address this issue simply, we can add the consumerSeekAllowed attribute to @KafkaListener. The default value is true, but if seek is not desired, it can be set to false. (and null, which does the same thing as true, can be used to to leave a warn log.)
    • Issue:
      • Using different callbacks per listener still requires splitting the class. However, this might be the preferred approach.
      • Users need to be aware of the new consumerSeekAllowed attribute in addition to AbstractConsumerSeekAware.
    • Advantages:
      • You can control seeking while maintaining multiple listeners inside the same class.
      • Minimal code changes. No changes at the ConsumerSeekAware API level.
      • Maintains backward compatibility and allows seek to be disabled based on criteria other than consumer group ID.
  • To demonstrate this approach, I've quickly implemented some code. All existing tests pass, and new tests will need to be created if needed.

Thank you for reading through this long message. Feel free to share any thoughts or feedback.
Thanks!

bky373 added a commit to bky373/spring-kafka that referenced this issue Jun 2, 2024
@bky373
Copy link
Contributor

bky373 commented Jun 12, 2024

@sobychacko
Hi, if you have some time, could I get your feedback on the above?
I would like to know what you have in mind. If a different approach is needed, I would like to consider that as well.
Thanks!

@sobychacko
Copy link
Contributor

sure @bky373. Sorry for the delay. We will get back to you soon on this.

@sobychacko
Copy link
Contributor

@bky373 I like your second approach, as this is a minimally invasive set of changes and doesn't require any API changes in ConsumerSeekAware or related classes. On the other hand, with this approach, users need to be mindful that they need to disable seeks in the listeners in classes that extend from AbstractConsumerSeekAware that contains multiple listeners.

@artembilan do you have any thoughts on changing KafkaListener like this vs making/breaking API level changes in ConsumerSeekAware?

Also, I wonder if there is a valid use case that might benefit others where they need to drive seeking offsets based on the group-id?

@artembilan
Copy link
Member

Well, this new consumerSeekAllowed feels more like a workaround for what we cannot do right now.
And it is a bit awkward: we do ConsumerSeekAware, but then consumerSeekAllowed = false 🤷
But at the same time we already have a more reasonable workaround via splitting.
I'd say this is more robust and logical workaround: in the end we develop micorservices, so as simple logic as possible is the best approach.
Without any paradox of choice.

Currently, it's just groupId, but there might be future requests to register callbacks based on other types of values.

So, just don't mix up many listeners in a single class.
Why make your life so complicated, if we can simply just split and reuse logic via delegation to some other common service (if any)?

Sorry for some rude language, but if we go this way, I'd prefer groupId propagation.
Or we can chose to fail fast, if same ConsumerSeekAware is used for many listeners.

@sobychacko
Copy link
Contributor

Each approaches have pros and cons. While it is an easier solution to add this as a new flag, adding a top-level property like this to KafkaListener might not bode that well, given that there is a workaround. I guess we have 2 options if we exclude the KafkaListener flag.

  1. API changes - Adding a new method that specifically takes a group ID so that only the listener with that ID is involved in the seek operation.
  2. Fail fast when you have multiple listeners in a class that implements ConsumerSeekAware. The exception thrown can suggest the users split the listeners into multiple classes.

@garyrussell
Copy link
Contributor

garyrussell commented Jun 14, 2024

There's another option; add a new (default) method to ConsumerSeekAware

default boolean seekByGroupId() {
    return false;
}

Then, use KafkaUtils.getConsumerGroupId() when seeking, and when building callback tables.

#2302 (comment)

No breaking API changes, no @KafkaListener changes.

But it would only work when seeking on the listener thread.

@sobychacko
Copy link
Contributor

Thanks, @garyrussell, for that great insight.
@ bky373 We should look into the idea Gary suggested for this issue as the solution.

@artembilan
Copy link
Member

Thanks, Gary!

I see the logic in ListenerConsumer.initialize():

 			setupSeeks();
			KafkaUtils.setConsumerGroupId(this.consumerGroupId);

Which probably has to be swapped to make that groupId available for registration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants