In this article, we will learn about
the messaging patterns of message brokers. RabbitMQ supports a wide range of
messaging patterns one of which is the
Publisher-Subscriber (Pub-Sub) pattern. In the
previous article, we learned what a message broker is.
Pub/Sub Pattern
Assume I've published an article on Medium. One Medium user liked my article
and gave me a clap👏. Now Medium would do the following: increase the clap
count for that article by one, notify me via email that one user had clapped
my article, and pay me 10 cents(!).
So, based on a single event, different services are triggered. In the same
way, in the Pub-Sub pattern, a producer sends a message to an
exchange, which then sends the same message to multiple consumers. The
reason for doing so is that different services in the
microservice architecture may be interested in processing the same
message. Similar to the preceding example, the various services process the clapping
information differently.
Consider another example. When a user creates a new account in an application,
the new user information is stored in one microservice. This data may be of
interest to the logging or auditing microservice. And the promotional service
would use these user details to send promotional emails.
Exchange is extremely important in the Pub-Sub pattern.
In the
previous article, the producer is connected to a specific queue via the default
exchange. However, in the Pub-Sub pattern, the producer can only
send messages to the exchange. The exchange must
know what to do with the message that arrives. RabbitMQ provides various exchange types to help you decide what to do with the message. The available exchange types are listed below.
- Direct
- Topic
- Header
- Fanout
The Fanout exchange publishes the same message to multiple
queues. We will use a Fanout exchange to implement the Pub-Sub pattern.
Random Queue Allocation
In our previous article, we described how the producer communicates with the
consumer via a specified queue. This means whenever the producer wants to exchange messages it must use
that particular queue to send messages to consumers.
However, in this pattern, we are only interested in current messages, not old
ones. To accomplish this,
we could create a queue with a random name or allow the channel to select a
random queue name. The queue is automatically deleted once the consumer disconnects from the
exchange.
Implementation using Java
Shared Component Module
The Shared Component Module contains the MessageBody class. It consists
of two records: UserDetails and ArticleDetails.
package com.raven.components.model;
public record UserDetails(
String name,
String email
) {
}
package com.raven.components.model;
public record ArticleDetails(
String length,
String publicationDate,
Boolean isClapped,
String clappedBy,
Integer readTime,
String timeUnit
) {
}
package com.raven.components.model;
public class MessageBody {
private UserDetails userDetails;
private ArticleDetails articleDetails;
public UserDetails getUserDetails() {
return userDetails;
}
public MessageBody setUserDetails(UserDetails userDetails) {
this.userDetails = userDetails;
return this;
}
public ArticleDetails getArticleDetails() {
return articleDetails;
}
public MessageBody setArticleDetails(ArticleDetails articleDetails) {
this.articleDetails = articleDetails;
return this;
}
}
We will use MessageBody to transport data between producers and
consumers.
This module also includes the UtilityService class, which contains a
variety of conversion methods.
package com.raven.components.utility;
public class UtilityService {
public static byte[] convertObjectToByte() {
var messageBody = getMessage();
var mapper = new ObjectMapper();
byte[] _byte = new byte[0];
try {
String message = mapper.writeValueAsString(messageBody);
_byte = message.getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
return _byte;
}
public static byte[] convertObjectToByte(MessageBody messageBody) {
var mapper = new ObjectMapper();
byte[] _byte = new byte[0];
try {
String message = mapper.writeValueAsString(messageBody);
_byte = message.getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
return _byte;
}
public static String convertObjectToString() {
var messageBody = getMessage();
var mapper = new ObjectMapper();
String message = "";
try {
message = mapper.writeValueAsString(messageBody);
} catch (JsonProcessingException e) {
System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
return message;
}
public static String convertObjectToString(MessageBody messageBody) {
var mapper = new ObjectMapper();
String message = "";
try {
message = mapper.writeValueAsString(messageBody);
} catch (JsonProcessingException e) {
System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
return message;
}
public static MessageBody convertStringToObject(String message) {
var mapper = new ObjectMapper();
MessageBody messageBody = new MessageBody();
try {
messageBody = mapper.readValue(message, new TypeReference<MessageBody>() {
});
} catch (JsonProcessingException e) {
System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
return messageBody;
}
public static MessageBody convertStringToObject(byte[] bytes) {
var mapper = new ObjectMapper();
MessageBody messageBody = new MessageBody();
try {
String message = new String(bytes, StandardCharsets.UTF_8);
messageBody = mapper.readValue(message, new TypeReference<MessageBody>() {
});
} catch (JsonProcessingException e) {
System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
return messageBody;
}
private static MessageBody getMessage() {
return new MessageBody()
.setUserDetails(new UserDetails(
"Paula Small",
"paula.small@bilearner.com")
)
.setArticleDetails(new ArticleDetails(
"1289 words",
"03/12/2024",
true,
"john.dow@yahoomail.com",
80,
"second")
);
}
}
We will use these conversion methods to send and receive messages between
producers and consumers.
Producer
To implement the Pub-Sub pattern, we defined an exchange of type
FANOUT. We are publishing the MessageBody object to the
exchange. As you can see, we are not attaching a queue to
the channel. The channel is directly connected to the
exchange.
package com.raven.producer;
public class MyProducer {
private static final String HOST = "localhost";
private static final String EXCHANGE = "pub_sub";
public static void main(String[] args) {
// CREATE A CONNECTION FACTORY
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
// CREATE A CONNECTION FROM FACTORY
try (Connection connection = connectionFactory.newConnection()) {
// GET CHANNEL FROM CONNECTION
Channel channel = connection.createChannel();
// DECLARE A EXCHANGE
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
// MESSAGE DETAILS
var message = UtilityService.convertObjectToString(
new MessageBody()
.setUserDetails(new UserDetails(
"Paula Small",
"paula.small@bilearner.com")
)
.setArticleDetails(new ArticleDetails(
"1289 words",
"03/12/2024",
true,
"john.dow@yahoomail.com",
80,
"second")
)
);
// PUBLISH A MESSAGE TO CHANNEL
channel.basicPublish(EXCHANGE, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" Message sent : '" + message + "'");
System.out.println();
} catch (Exception e) {
System.out.println("Error in sending message: " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
}
}
Consumer #1
Here is our first consumer: Article service. Here, we are processing the
article's clap information. The queueDeclare() method is used to
declare an auto-delete, non-durable queue. It returns the queue
name once the queue has been successfully created. The queue is then bound to
the exchange with the queueBind() method.
package com.raven.article_cosumer;
public class ArticleConsumer {
private static final String HOST = "localhost";
private static final String EXCHANGE = "pub_sub";
public static void main(String[] args) {
// CREATE A CONNECTION FACTORY
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
try {
// CREATE A CONNECTION
Connection connection = connectionFactory.newConnection();
// CREATE A CHANNEL FROM CONNECTION
Channel channel = connection.createChannel();
// DECLARE A EXCHANGE
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
// DECLARE A QUEUE IN THE CHANNEL AND GET ITS NAME
String queueName = channel.queueDeclare().getQueue();
System.out.println("Consumer: Article Service: Queue name : " + queueName);
// BIND THE QUEUE WITH THE EXCHANGE
channel.queueBind(queueName, EXCHANGE, "");
System.out.println("Waiting for message...");
DeliverCallback deliverCallback = (consumeMsg, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer: Article Service :: Message received : '" + message + "'");
System.out.println();
var messageBody = UtilityService.convertStringToObject(message);
ArticleDetails articleDetails = messageBody.getArticleDetails();
System.out.println("-> Article details: " + articleDetails);
if (articleDetails.isClapped()) {
System.out.println("-> Article is clapped by " + articleDetails.clappedBy());
System.out.println("-> And article has 67 claps now!");
}
};
// GET MESSAGE FROM EXCHANGE
channel.basicConsume(queueName, true, deliverCallback, consumeMsg -> {
});
} catch (Exception e) {
System.out.println("Article Consumer: Error in consuming message: " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
}
}
Consumer #2
Payment service is our second consumer. This consumer handles the payment
information. Here, we create the queue at runtime and associate it with
the exchange.
package com.raven.payment_cosumer;
public class PaymentConsumer {
private static final String HOST = "localhost";
private static final String EXCHANGE = "pub_sub";
public static void main(String[] args) {
// CREATE A CONNECTION FACTORY
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
try {
// CREATE A CONNECTION
Connection connection = connectionFactory.newConnection();
// CREATE A CHANNEL FROM CONNECTION
Channel channel = connection.createChannel();
// DECLARE A EXCHANGE
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
// DECLARE A QUEUE IN THE CHANNEL AND GET ITS NAME
String queueName = channel.queueDeclare().getQueue();
System.out.println("Consumer: Payment Service: Queue name : " + queueName);
// BIND THE QUEUE WITH THE EXCHANGE
channel.queueBind(queueName, EXCHANGE, "");
System.out.println("Waiting for message...");
DeliverCallback deliverCallback = (consumeMsg, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer: Article Service :: Message received : '" + message + "'");
System.out.println();
var messageBody = UtilityService.convertStringToObject(message);
ArticleDetails articleDetails = messageBody.getArticleDetails();
System.out.println("-> Article details: " + articleDetails);
if (articleDetails.isClapped() && articleDetails.readTime() > 50) {
System.out.println("-> Article is read time is " + articleDetails.readTime() + articleDetails.timeUnit());
System.out.println("-> You have received 30 cents.");
System.out.println("-> Your total earning is $10.45.");
}
};
// GET MESSAGE FROM EXCHANGE
channel.basicConsume(queueName, true, deliverCallback, consumeMsg -> {
});
} catch (Exception e) {
System.out.println("Payment Consumer: Error in consuming message: " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
}
}
}
Testing
We need to install RabbitMQ on our local machine to communicate with the Java
application. You can find an installation guide
here. So, we'll start our two consumer applications one at a time, then the
producer program.
The producer sends the message.
The article service consumer receives and processes the message.
The payment service consumer also receives and processes the message.
The RabbitMQ management console is accessible via
http://localhost:15672/#/. To log in to the console, use guest as the username and password.
Click the 'Exchanges' tab to view the exchanges that are linked to the
message broker. Find our exchange, 'pub_sub', and click on it for more information.
To view the queues, select the 'Queues and Streams' tab.
Source code:
PubSubDemo.
Happy coding!!! 😊