Cloud Streams ============= https://github.com/spring-boot-tutorials/cloud-streams Create Initial Code Base ------------------------ - Go to https://start.spring.io/ - Add the following dependencies: - spring-boot-starter-web - spring-cloud-starter-stream-rabbit - lombok - Click ``Generate`` Dependencies ------------ Dependencies used in ``pom.xml``: .. code-block:: xml org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-stream-rabbit org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-stream-test-binder test Properties ---------- Add the following properties into ``src/main/resources/application.yaml``: .. code-block:: yaml spring: cloud: function: definition: enrichLogMessage;processLogs stream: function.routing.enabled: true bindings: enrichLogMessage-in-0: destination: my.input.queue.log.messages enrichLogMessage-out-0: destination: my.output.queue.log.messages Configuration ------------- Create new file ``src/main/java/com/example/spring_cloud_streams/DefaultConfiguration.java``: .. code-block:: java @Configuration public class DefaultConfiguration { @Bean public Function enrichLogMessage() { return value -> "[%s] - %s".formatted("Marcus", value); } @Bean public Function> processLogs() { return log -> { boolean shouldBeEnriched = log.length() > 10; String destination = shouldBeEnriched ? "enrichLogMessage-in-0" : "my.output.queue.log.messages"; return MessageBuilder.withPayload(log) .setHeader("spring.cloud.stream.sendto.destination", destination) .build(); }; } } Create Tests ------------ Create a new file ````: .. code-block:: java @EnableTestBinder @SpringBootTest class SpringCloudStreamsApplicationTests { @Autowired private InputDestination input; @Autowired private OutputDestination output; @Test void whenSendingLogMessage_thenMessageIsEnrichedWithPrefix() { Message messageIn = MessageBuilder.withPayload("hello world").build(); input.send(messageIn, "my.input.queue.log.messages"); Message messageOut = output.receive(1000L, "my.output.queue.log.messages"); assertThat(messageOut.getPayload()) .asString() .isEqualTo("[Marcus] - hello world"); } @Test void whenProcessingLongLogMessage_thenItsEnrichedWithPrefix() { Message messageIn = MessageBuilder.withPayload("hello world").build(); input.send(messageIn, "processLogs-in-0"); Message messageOut = output.receive(1000L, "my.output.queue.log.messages"); assertThat(messageOut.getPayload()) .asString() .isEqualTo("[Marcus] - hello world"); } @Test void whenProcessingShortLogMessage_thenItsNotEnrichedWithPrefix() { Message messageIn = MessageBuilder.withPayload("hello").build(); input.send(messageIn, "processLogs-in-0"); Message messageOut = output.receive(1000L, "my.output.queue.log.messages"); assertThat(messageOut .getPayload()) .asString() .isEqualTo("hello"); } } Run Tests --------- .. code-block:: sh mvn clean package