4.4 Event Driven with Kafka
In this stage we will have a look at the setup of a microservice and we will implement the order microservice.
Stock Microservice
Theorder and stock microservice have a similar setup.
Therefore, we will only work on the order microservice. If you have to make changes to the stock microservice
it is clearly stated.The tasks we need to complete are the following:
- Update dependencies
- Create and consume
shop-order-requestevents - Create and consume
shop-order-confirmationevents - Create and consume
shop-order-compensationevents
Task 4.4.1 - Update dependencies
First we are going to declare the dependencies we are going to use in this chapter. Add the following dependencies to your pom.xml:
| GroupId | ArtifactId | Description | Detailed information |
|---|---|---|---|
io.quarkus | quarkus-smallrye-reactive-messaging-kafka | Reactive Messaging with Apache Kafka | Using Apache Kafka with Reactive Messaging |
io.quarkus | quarkus-smallrye-context-propagation | Context Propagation in Quarkus | Context Propagation in Quarkus |
io.opentracing.contrib | opentracing-jdbc | OpenTracing JDBC Instrumentation | OpenTracing JDBC Instrumentation |
io.opentracing.contrib | opentracing-kafka-client | OpenTracing Kafka Client | OpenTracing Kafka Client |
Dependencies Task Hint
The following dependencies have to be added.
| |
Task 4.4.2 - Emit the first event
We are now going event driven. Let’s create our first message and emit an event whenever a new order comes in. We create a new @ApplicationScoped class ch.puzzle.mm.kafka.order.order.boundary.ShopOrderRequestProducer in the order microservice for emitting events to our shop-order-request topic. In order to to emit messages to a topic manually we have to define a channel. To send things (payload or Message) from imperative code to a specific channel you need to use: @Channel annotation on an Emitter type field. The @Channel lets you indicate to which channel you are going to send your payloads or messages. The Emitter is the object to use to send these payloads or messages.
In the ShopOrderRequestProducer class we define a @Channel("shop-order-request") on an Emitter<ShopOrderDTO> emitter. We then create a method which creates the requests and takes a ShopOrderDTO as a parameter. The method simply emits an event to the shop-order-request channel with emitter.send(Message.of(shopOrderDTO)).
Emit Event Task Hint
The class should look like this:
| |
We need to update configurations to establish the communication to our message broker and define the connectors from the channels to the topics in our message broker:
Add the following properties to the application.properties file:
| |
In order to have our OpenTracing traces injected into the message headers we will need to add some code around the message emitter to make it work properly. Update your class to the following:
| |
Now we can simply replace the REST call we made with help of the rest-client in the previous chapter with the emitting of the event by injecting the ShopOrderRequestProducer into our ShopOrderService and using its createRequest method to emit an event to the kafka topic. Please remove the update of the status flag as well from the ShopOrderService::createShopOrder. The updating of the status flag in the order should be done when we received the confirmation event from the stock microservice.
Emit event
The createOrder function could look like this:
| |
Task 4.4.3 - Consume the first event
When we create a valid shop-order-request to the topic, the stock microservice will handle the event and confirm the order in case of succession or compensate the order in case of failure. It then emits events to either the shop-order-confirmation topic or the shop-order-compensation topic. Let’s create our consumer for these two cases in the order microservice.
Create a class ch.puzzle.mm.kafka.order.order.boundary.ShopOrderConfirmationConsumer which will handle all incoming events in the shop-order-confirmation topic. Define a function CompletionStage<Void> consumeOrders(Message<ShopOrderDTO> message). We annotate the consumeOrders function with @Incoming("shop-order-confirmation") to indicate that incoming events in the shop-order-confirmation channel will be handled by this function. Inside the function we want our ShopOrderService to confirm the order, which updates the status to COMPLETED. Be careful though, the changing of the status will follow in a database commit which is a blocking operation. With quarkus reactive messaging blocking operations are only allowed in reactive context when executed from a worker thread. Spawning worker threads can be achieved by using the quarkus-smallrye-context-propagation class SmallRyeManagedExecutor which propagates the context properly into the new threads.
The consumer could look something like this:
| |
To connect the defined channel in the @Inbound annotation to the kafka broker we need to add a few lines to the application.properties:
| |
We add some boilerplate code around the consumer to make the propagation of the tracing headers work:
| |
Extend the ShopOrderService with the confirmOrder method:
| |
Task 4.4.4 - Compensation
Now it is up to you to do the same for the shop-order-compensation events. Create a consumer listening on the “shop-order-compensation” topic to compensate your failed orders. Create a class ShopOrderCompensationConsumer in the same package as the ShopOrderConfirmationConsumer and update your application.properties to connect your defined connectors to the kafka broker.
Create a class ch.puzzle.mm.kafka.order.order.boundary.ShopOrderCompensationConsumer which will confirm and compensate orders.
Compensation consumer
The class should look like this:
| |
Do not forget to update your application.properties as well to connector your defined connector @Incoming("shop-order-compensation") to your kafka broker and topic.
| |
Task 4.4.5 - Add business to your service
In the business parts of the ShopOrderService you will add logic to confirm and compensate your orders! Add the two following functions to your service:
ShopOrderService::compensateOrder: Find order with idid, set status toShopOrderStatus.INCOMPLETEand save and flush.ShopOrderService::confirmOrder: Find order with idid, set status toShopOrderStatus.COMPLETEDand save and flush.
Business hint
These two functions could look like this:
| |