Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spring Pulsar container property customizers #37559

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,12 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
ConcurrentPulsarListenerContainerFactory<Object> pulsarListenerContainerFactory(
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver) {
TopicResolver topicResolver, ObjectProvider<PulsarContainerPropertiesCustomizer> customizersProvider) {
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
this.propertiesMapper.customizeContainerProperties(containerProperties);
customizersProvider.orderedStream().forEach((customizer) -> customizer.customize(containerProperties));
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
}

Expand All @@ -178,10 +179,12 @@ private void applyReaderBuilderCustomizers(List<ReaderBuilderCustomizer<?>> cust
@Bean
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
SchemaResolver schemaResolver) {
SchemaResolver schemaResolver,
ObjectProvider<PulsarReaderContainerPropertiesCustomizer> customizersProvider) {
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver);
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
customizersProvider.orderedStream().forEach((customizer) -> customizer.customize(readerContainerProperties));
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.pulsar;

import org.springframework.pulsar.listener.PulsarContainerProperties;

/**
* The interface to customize a {@link PulsarContainerProperties}.
*
* @author Chris Bono
* @since 3.2.0
*/
@FunctionalInterface
public interface PulsarContainerPropertiesCustomizer {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally going to put these container props customizers in the framework. However, the container factories do not need to accept a customizer as they already accept the container props. Non-boot users will already have to create the container props in order to create the container factory. They will therefore just adjust the props accordingly - no need to also have a customizer.

IOW - the framework does not need the concept of container props customizers.


/**
* Customizes a {@link PulsarContainerProperties}.
* @param containerProperties the container properties to customize
*/
void customize(PulsarContainerProperties containerProperties);

}
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,25 @@ private void applyMessageConsumerBuilderCustomizers(List<ReactiveMessageConsumer
@ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory")
DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainerFactory(
ReactivePulsarConsumerFactory<Object> reactivePulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver) {
TopicResolver topicResolver,
ObjectProvider<ReactivePulsarContainerPropertiesCustomizer<?>> customizersProvider) {
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
this.propertiesMapper.customizeContainerProperties(containerProperties);
List<ReactivePulsarContainerPropertiesCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeContainerProperties);
customizers.addAll(customizersProvider.orderedStream().toList());
applyContainerPropertiesCustomizers(customizers, containerProperties);
return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties);
}

@SuppressWarnings("unchecked")
private void applyContainerPropertiesCustomizers(List<ReactivePulsarContainerPropertiesCustomizer<?>> customizers,
ReactivePulsarContainerProperties<?> containerProperties) {
LambdaSafe.callbacks(ReactivePulsarContainerPropertiesCustomizer.class, customizers, containerProperties)
.invoke((customizer) -> customizer.customize(containerProperties));
}

@Bean
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.pulsar;

import org.springframework.pulsar.reader.PulsarReaderContainerProperties;

/**
* The interface to customize a {@link PulsarReaderContainerProperties}.
*
* @author Chris Bono
* @since 3.2.0
*/
@FunctionalInterface
public interface PulsarReaderContainerPropertiesCustomizer {

/**
* Customizes a {@link PulsarReaderContainerProperties}.
* @param containerProperties the container properties to customize
*/
void customize(PulsarReaderContainerProperties containerProperties);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.pulsar;

import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;

/**
* The interface to customize a {@link ReactivePulsarContainerProperties}.
*
* @param <T> the message payload type
* @author Chris Bono
* @since 3.2.0
*/
@FunctionalInterface
public interface ReactivePulsarContainerPropertiesCustomizer<T> {

/**
* Customizes a {@link ReactivePulsarContainerProperties}.
* @param containerProperties the container properties to customize
*/
void customize(ReactivePulsarContainerProperties<T> containerProperties);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.common.schema.SchemaType;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -47,6 +49,7 @@
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
import org.springframework.pulsar.config.PulsarReaderContainerFactory;
import org.springframework.pulsar.config.PulsarReaderEndpointRegistry;
import org.springframework.pulsar.core.CachingPulsarProducerFactory;
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
Expand All @@ -65,6 +68,7 @@
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -390,7 +394,7 @@ ConsumerBuilderCustomizer<?> customizerBar() {
}

@Nested
class ListenerTests {
class ListenerContainerTests {

private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;

Expand Down Expand Up @@ -464,6 +468,53 @@ void whenObservationsDisabledDoesNotEnableObservation() {
.hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false));
}

@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
this.contextRunner.withPropertyValues("spring.pulsar.consumer.subscription.type=Shared")
.withUserConfiguration(ContainerPropertiesCustomizerConfig.class)
.run((context) -> {
ConcurrentPulsarListenerContainerFactory<?> containerFactory = context
.getBean(ConcurrentPulsarListenerContainerFactory.class);
// We use subscriptionType to prove user customizers come after base
// props customizer.
// We use subscriptionName to prove user customizers are applied in
// their order.
assertThat(containerFactory)
.extracting(ConcurrentPulsarListenerContainerFactory::getContainerProperties)
.satisfies((containerProps) -> {
assertThat(containerProps.getSubscriptionType()).isEqualTo(SubscriptionType.Failover);
assertThat(containerProps.getSubscriptionName()).isEqualTo("/customizer1/customizer2");
});
});
}

@TestConfiguration(proxyBeanMethods = false)
static class ContainerPropertiesCustomizerConfig {

@Bean
@Order(200)
PulsarContainerPropertiesCustomizer customizerFoo() {
return (props) -> {
props.setSubscriptionType(SubscriptionType.Failover);
String name = "%s/customizer2"
.formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : "");
props.setSubscriptionName(name);
};
}

@Bean
@Order(100)
PulsarContainerPropertiesCustomizer customizerBar() {
return (props) -> {
props.setSubscriptionType(SubscriptionType.Failover);
String name = "%s/customizer1"
.formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : "");
props.setSubscriptionName(name);
};
}

}

}

@Nested
Expand Down Expand Up @@ -517,4 +568,88 @@ ReaderBuilderCustomizer<?> customizerBar() {

}

@Nested
class ReaderContainerTests {

private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;

@Test
void whenHasUserDefinedReaderContainerFactoryBeanDoesNotAutoConfigureBean() {
PulsarReaderContainerFactory readerContainerFactory = mock(PulsarReaderContainerFactory.class);
this.contextRunner
.withBean("pulsarReaderContainerFactory", PulsarReaderContainerFactory.class,
() -> readerContainerFactory)
.run((context) -> assertThat(context).getBean(PulsarReaderContainerFactory.class)
.isSameAs(readerContainerFactory));
}

@Test
@SuppressWarnings("rawtypes")
void injectsExpectedBeans() {
PulsarReaderFactory<?> readerFactory = mock(PulsarReaderFactory.class);
SchemaResolver schemaResolver = mock(SchemaResolver.class);
this.contextRunner.withBean("pulsarReaderFactory", PulsarReaderFactory.class, () -> readerFactory)
.withBean("schemaResolver", SchemaResolver.class, () -> schemaResolver)
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class)
.hasFieldOrPropertyWithValue("readerFactory", readerFactory)
.extracting(DefaultPulsarReaderContainerFactory::getContainerProperties)
.hasFieldOrPropertyWithValue("schemaResolver", schemaResolver));
}

@Test
@SuppressWarnings("unchecked")
void whenHasUserDefinedReaderAnnotationBeanPostProcessorBeanDoesNotAutoConfigureBean() {
PulsarReaderAnnotationBeanPostProcessor<String> readerAnnotationBeanPostProcessor = mock(
PulsarReaderAnnotationBeanPostProcessor.class);
this.contextRunner
.withBean("org.springframework.pulsar.config.internalPulsarReaderAnnotationProcessor",
PulsarReaderAnnotationBeanPostProcessor.class, () -> readerAnnotationBeanPostProcessor)
.run((context) -> assertThat(context).getBean(PulsarReaderAnnotationBeanPostProcessor.class)
.isSameAs(readerAnnotationBeanPostProcessor));
}

@Test
void whenHasCustomProperties() {
List<String> properties = new ArrayList<>();
properties.add("spring.pulsar.reader.topics=fromPropsCustomizer");
this.contextRunner.withPropertyValues(properties.toArray(String[]::new)).run((context) -> {
DefaultPulsarReaderContainerFactory<?> factory = context
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(factory.getContainerProperties().getTopics()).containsExactly("fromPropsCustomizer");
});
}

@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
this.contextRunner.withPropertyValues("spring.pulsar.reader.topics=fromPropsCustomizer")
.withUserConfiguration(ReaderContainerPropertiesCustomizerConfig.class)
.run((context) -> {
DefaultPulsarReaderContainerFactory<?> containerFactory = context
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(containerFactory).extracting(DefaultPulsarReaderContainerFactory::getContainerProperties)
.extracting(PulsarReaderContainerProperties::getTopics,
InstanceOfAssertFactories.list(String.class))
.containsExactly("fromPropsCustomizer", "customizer1", "customizer2");
});
}

@TestConfiguration(proxyBeanMethods = false)
static class ReaderContainerPropertiesCustomizerConfig {

@Bean
@Order(200)
PulsarReaderContainerPropertiesCustomizer customizerFoo() {
return (props) -> props.getTopics().add("customizer2");
}

@Bean
@Order(100)
PulsarReaderContainerPropertiesCustomizer customizerBar() {
return (props) -> props.getTopics().add("customizer1");
}

}

}

}