Cloud Streams
=============
https://github.com/spring-boot-tutorials/cloud-streams
Create Initial Code Base
------------------------
- Go to https://start.spring.io/
- Add the following dependencies:
- spring-boot-starter-web
- spring-cloud-starter-stream-rabbit
- lombok
- Click ``Generate``
Dependencies
------------
Dependencies used in ``pom.xml``:
.. code-block:: xml
org.springframework.boot
spring-boot-starter-web
org.springframework.cloud
spring-cloud-starter-stream-rabbit
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.cloud
spring-cloud-stream-test-binder
test
Properties
----------
Add the following properties into ``src/main/resources/application.yaml``:
.. code-block:: yaml
spring:
cloud:
function:
definition: enrichLogMessage;processLogs
stream:
function.routing.enabled: true
bindings:
enrichLogMessage-in-0:
destination: my.input.queue.log.messages
enrichLogMessage-out-0:
destination: my.output.queue.log.messages
Configuration
-------------
Create new file ``src/main/java/com/example/spring_cloud_streams/DefaultConfiguration.java``:
.. code-block:: java
@Configuration
public class DefaultConfiguration {
@Bean
public Function enrichLogMessage() {
return value -> "[%s] - %s".formatted("Marcus", value);
}
@Bean
public Function> processLogs() {
return log -> {
boolean shouldBeEnriched = log.length() > 10;
String destination = shouldBeEnriched ? "enrichLogMessage-in-0" : "my.output.queue.log.messages";
return MessageBuilder.withPayload(log)
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
};
}
}
Create Tests
------------
Create a new file ````:
.. code-block:: java
@EnableTestBinder
@SpringBootTest
class SpringCloudStreamsApplicationTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
@Test
void whenSendingLogMessage_thenMessageIsEnrichedWithPrefix() {
Message messageIn = MessageBuilder.withPayload("hello world").build();
input.send(messageIn, "my.input.queue.log.messages");
Message messageOut = output.receive(1000L, "my.output.queue.log.messages");
assertThat(messageOut.getPayload())
.asString()
.isEqualTo("[Marcus] - hello world");
}
@Test
void whenProcessingLongLogMessage_thenItsEnrichedWithPrefix() {
Message messageIn = MessageBuilder.withPayload("hello world").build();
input.send(messageIn, "processLogs-in-0");
Message messageOut = output.receive(1000L, "my.output.queue.log.messages");
assertThat(messageOut.getPayload())
.asString()
.isEqualTo("[Marcus] - hello world");
}
@Test
void whenProcessingShortLogMessage_thenItsNotEnrichedWithPrefix() {
Message messageIn = MessageBuilder.withPayload("hello").build();
input.send(messageIn, "processLogs-in-0");
Message messageOut = output.receive(1000L, "my.output.queue.log.messages");
assertThat(messageOut .getPayload())
.asString()
.isEqualTo("hello");
}
}
Run Tests
---------
.. code-block:: sh
mvn clean package