Skip to content

Spring Integration - Apache Kafka - MongoDB sample application

Notifications You must be signed in to change notification settings

labcabrera/sample-spring-integration-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

68 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Spring Integration - Apache Kafka - MongoDB sample application

La aplicación contiene dos módulos que se comunican a través de Kafka. El primer módulo será un gateway que expondrá una API REST a través de la cual recibirá las peticiones. El segundo módulo contendrá la lógica de negocio escuchando los mensajes de una cola de Kafka utilizando para ello Spring Integration DSL.

[http]       [app-gateway]    [kafka]    [app-core]      [mongo]
  |-------------->|              |           |              |
  |               |------------->| [1]       |              |
  |               |              |           |              |
  |               |              |---------->|              |
  |               |              |           |<------------>|
  |               |              |<----------|              |
  |               |<-------------| [2]       |              |
  |<--------------|              |           |              |

[1]: tf-calculator-in [2]: tf-calculator-out

La información sobre cada cálculo se persistirá en mongodb, que también será utilizado para recuperar aquella información de negocio necesaria para realizar los cálculos.

¿Por qué utilizar Spring Integration?

La principal ventaja es que abstrae a nuestra aplicación de la infraestructura que vamos a tener. Por ejemplo la aplicación se podría cambiar de forma muy sencilla reemplazando Kafka por AMQP (RabbitMQ por ejemplo). De este modo simplemente tendríamos que tocar la configuración de integración cambiando por ejemplo:

IntegrationFlows
  .from(
    Kafka
      .messageDrivenChannelAdapter(consumerFactory, topidName))

por:

IntegrationFlows
  .from(
    Amqp
      .inboundGateway(connectionFactory, amqpTemplate, someQueue)

Manteniendo el resto de la aplicación funcionando del mismo modo.

Gateway app

Este proyecto está construido con maven. Básicamente consta de un @MessagingGateway a partir del cual realizaremos las llamadas a partir del controlador CalculatorController:

@RestController
@RequestMapping("/calculator")
@Slf4j
public class CalculatorController {

	@Autowired
	private CalculatorGateway gateway;

	@PostMapping
	public CalculationResponse calculate(@RequestBody CalculationRequest request) {
		log.debug("Processing calculation request: {}", request);
		CalculationResponse response = gateway.sendMessage(request);
		log.debug("Received response: {}", response);
		return response;
	}

}

Nuestro gateway se integra con los dos canales de entrada y salida que hemos definido en la clase IntegrationConfiguration.

En primer lugar nuestro canal de entrada enviará los mensajes que recibe a Kafka:

@Bean
IntegrationFlow outboundGateFlow(ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
  return IntegrationFlows.from(CHANNEL_NAME_IN)
    .log(Level.DEBUG, getClass().getName(), m -> String.format("Sending calculation request: %s", m))
    .transform(Transformers.toJson(mapper))
    .handle(Kafka.outboundGateway(kafkaTemplate).topic(TOPIC_NAME_IN).messageKey("calculateMessageKey"))
    .log(Level.DEBUG, getClass().getName(), m -> String.format("Received calculation response: %s", m))
    .transform(Transformers.fromJson(CalculationResponse.class, mapper))
    .channel(CHANNEL_NAME_OUT)
    .get();
}

Core app

Este proyecto está construído con grade en lugar de maven. Básicamente escucha un topic de Kafka y cuando recibe los mensajes invoca la lógica de negocio y escribe los resultados en otro topic de salida.

Esto lo hacemos a través de la siguiente configuración:

@Bean
IntegrationFlow flowFromKafkaDummy(
  KafkaTemplate<String, String> kafkaTemplate,
  ConsumerFactory<String, String> consumerFactory,
  JsonObjectMapper<?, ?> mapper) {

  return IntegrationFlows
    .from(Kafka.messageDrivenChannelAdapter(consumerFactory, TOPIC_NAME_IN))
    .log(Level.DEBUG, getClass().getName(), m -> String.format("Received calculation request: %s", m))
    .transform(Transformers.fromJson(mapper))
    .handle(CalculationRequest.class, (request, headers) -> coreCalculator.calculate(request))
    .transform(Transformers.toJson(mapper))
    .log(Level.DEBUG, getClass().getName(), m -> String.format("Returning calculation response: %s", m))
    .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
      .messageKey(MESSAGE_KEY)
      .topic(TOPIC_NAME_OUT))
    .get();
}
}

El proceso de cálculo es muy sencillo. Leerá de mongo una serie de valores preestablecidos en función del valor del campo source recibido, generará un importe al azar combinándolo con el valor base y devolverá el resultado.

Adicionalmente almacenará en mongo una referencia al cálculo solicitado:

> db.calculationHistory.find()
{ "_id" : ObjectId("5b056eb7457e766f43b70d4d"), "request" : { "source" : "test" }, "response" : { "amount" : "72.11", "calculated" : ISODate("2018-05-23T13:37:59.948Z") }, "_class" : "org.lab.tariff.calculator.core.domain.CalculationHistory" }

La comunicación con MongoDB la realizaremos a través del proyecto spring-boot-starter-data-mongodb. Simplemente tendremos que añadir la anotación @EnableMongoRepositories y definir aquellas interfaces que utilizarán nuestras entidades:

public interface CalculationSourceDataRepository extends MongoRepository<CalculationSourceData, String> {

	CalculationSourceData findBySourceName(String name);

}

Montando el proyecto en local

En primer lugar levantaremos la imagen de Kafka a través del docker-compose situado en la carpeta env:

cd env
export DOCKER_HOST_IP=127.0.0.1
docker-compose up -d

Probando el sistema en local

Una vez hemos arrancado los contenedores de Kafka y MongoDB, simplemente tendremos que arrancar tanto el core como el gateway y podemos comprobar el funcionamiento de la aplicación a través de nuestra API REST:

$ curl -d '{"source":"test"}' -H "Content-Type: application/json" http://localhost:8080/calculator
{"reference":"5b056eb7457e766f43b70d4d","amount":72.11,"calculated":"2018-05-23T13:37:59.948+0000"}

También podremos hacer la petición utilizando la integración de Swagger proporcionada por SpringFox: http://localhost:8080/swagger-ui.html

Generación de las imágenes de docker

Dockerfile

La generación de las imágenes de Docker va a ser bastante sencilla dado que utilizando una imagen basada en openjdk no deberemos realizar ninguna configuración adicional:

FROM openjdk:8-jdk-alpine
VOLUME /tmp
ARG JAR_FILE
COPY ${JAR_FILE} /opt/cnpparners/app.jar
ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /opt/cnpparners/app.jar

De modo que simplemente copiamos el jar ejecutable generado y lo lanzamos con la opción java -jar como cualquier aplicación Spring Boot.

Plugin usando gradle

Para crear la imagen docker se utiliza el plugin palantir de gradle.

La configuración del plugin es realmente sencilla:

docker {
  name "labcabrera/${jar.baseName}"
  dockerfile file('Docker/Dockerfile')
  tags 'latest'
  files jar.archivePath
  buildArgs(['JAR_FILE': "${jar.archiveName}"])
}

Para crear la imagen simplemente ejecutaremos:

$ gradle build docker

Aparte de la generación local de la imagen el plugin también nos ofrece otras funcionalidades como la de realizar el push o realizar tags.

About

Spring Integration - Apache Kafka - MongoDB sample application

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published