Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On kotlin application, Spring kafka 3.2.0 doesn't take the message conversion logic, because of the wrong coroutine detection on MessagingMessageListenerAdapter #3277

Closed
huisam opened this issue May 24, 2024 · 2 comments

Comments

@huisam
Copy link
Contributor

huisam commented May 24, 2024

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

3.2.0

Between 3.1.4 and 3.2.0

Describe the bug

On the MessagingMessageListenerAdapter class there is a bug on the determineInferredType method.

boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType());
isNotConvertible |= isCoroutines;

if you determine the coroutine function, you should have to use

KotlinDetector.isSuspendingFunction(Method method)

not isKotlinType method

because on kotlin, @KafkaListener method parameter type is always kotlin type. So the conversionNeeded flag changes to false.
This occurs messageConverters not working normally.

To Reproduce

@KafkaListener(topics = [TEST_KAFKA_TOPIC], groupId = TEST_KAFKA_TOPIC_GROUP)
fun onMessage(message: TestMessage) {
    execute()
}

data class TestMessage(val key: String)

Expected behavior

consumes normally and execute the execute() method but it occurs MethodArgumentNotValidException exception

org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.company.test.spring.kafka.TestKafkaService.onMessage(com.company.test.spring.kafka.TestMessage): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] 

Sample

sample is on the issue

@huisam huisam changed the title Spring kafka 3.2.0 doesn't take the message conversion logic, because of the wrong coroutine detection on MessagingMessageListenerAdapter On kotlin application, Spring kafka 3.2.0 doesn't take the message conversion logic, because of the wrong coroutine detection on MessagingMessageListenerAdapter May 24, 2024
@artembilan
Copy link
Member

Looks like we have overseen this scenario when we tested Kotlin support for @KafkaListener:

		@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic1"], containerFactory = "kafkaListenerContainerFactory")
		fun listen(value: String) {
			this.received = value
			this.latch1.countDown()
		}

That String is indeed not a Kotlin type 😄 .

Feel free to contribute the fix as you have just explained.
And I guess with respective fix for that EnableKafkaKotlinTests.test listener() test and its configuration with the mentioned listen() method.

Thanks

@huisam
Copy link
Contributor Author

huisam commented May 25, 2024

@artembilan Thank you for your feedback on my opinion.

Okay, then I will submit a pull request to resolve this issue.
Also, I will check the test code that you mentioned to me, and improved it too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants