Showing posts with label Software Engineering. Show all posts
Showing posts with label Software Engineering. Show all posts

Monday, December 30, 2024

RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange

java,rabbitmq,message broker,programming,software development,technology,software engineering
In this article, we will learn about the messaging patterns of message brokers. RabbitMQ supports a wide range of messaging patterns one of which is the Publisher-Subscriber (Pub-Sub) pattern. In the previous article, we learned what a message broker is.

Pub/Sub Pattern

Assume I've published an article on Medium. One Medium user liked my article and gave me a clapπŸ‘. Now Medium would do the following: increase the clap count for that article by one, notify me via email that one user had clapped my article, and pay me 10 cents(!).
RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange
So, based on a single event, different services are triggered. In the same way, in the Pub-Sub pattern, a producer sends a message to an exchange, which then sends the same message to multiple consumers. The reason for doing so is that different services in the microservice architecture may be interested in processing the same message. Similar to the preceding example, the various services process the clapping information differently.
RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange
Consider another example. When a user creates a new account in an application, the new user information is stored in one microservice. This data may be of interest to the logging or auditing microservice. And the promotional service would use these user details to send promotional emails.

Exchange is extremely important in the Pub-Sub pattern. In the previous article, the producer is connected to a specific queue via the default exchange. However, in the Pub-Sub pattern, the producer can only send messages to the exchange. The exchange must know what to do with the message that arrives. RabbitMQ provides various exchange types to help you decide what to do with the message. The available exchange types are listed below.
  • Direct
  • Topic
  • Header
  • Fanout
The Fanout exchange publishes the same message to multiple queues. We will use a Fanout exchange to implement the Pub-Sub pattern.

Random Queue Allocation

In our previous article, we described how the producer communicates with the consumer via a specified queueThis means whenever the producer wants to exchange messages it must use that particular queue to send messages to consumers.

However, in this pattern, we are only interested in current messages, not old ones. To accomplish this, we could create a queue with a random name or allow the channel to select a random queue name. The queue is automatically deleted once the consumer disconnects from the exchange.
RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange

Implementation using Java

Shared Component Module

The Shared Component Module contains the MessageBody class. It consists of two records: UserDetails and ArticleDetails.
package com.raven.components.model;

public record UserDetails(
        String name,
        String email
) {
}

package com.raven.components.model;

public record ArticleDetails(
        String length,
        String publicationDate,
        Boolean isClapped,
        String clappedBy,
        Integer readTime,
        String timeUnit
) {
}

package com.raven.components.model;

public class MessageBody {
    private UserDetails userDetails;
    private ArticleDetails articleDetails;

    public UserDetails getUserDetails() {
        return userDetails;
    }

    public MessageBody setUserDetails(UserDetails userDetails) {
        this.userDetails = userDetails;
        return this;
    }

    public ArticleDetails getArticleDetails() {
        return articleDetails;
    }

    public MessageBody setArticleDetails(ArticleDetails articleDetails) {
        this.articleDetails = articleDetails;
        return this;
    }
}
We will use MessageBody to transport data between producers and consumers.

This module also includes the UtilityService class, which contains a variety of conversion methods.
package com.raven.components.utility;

public class UtilityService {

    public static byte[] convertObjectToByte() {
        var messageBody = getMessage();
        var mapper = new ObjectMapper();
        byte[] _byte = new byte[0];

        try {
            String message = mapper.writeValueAsString(messageBody);
            _byte = message.getBytes(StandardCharsets.UTF_8);
        } catch (JsonProcessingException e) {
            System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }

        return _byte;
    }

    public static byte[] convertObjectToByte(MessageBody messageBody) {
        var mapper = new ObjectMapper();
        byte[] _byte = new byte[0];

        try {
            String message = mapper.writeValueAsString(messageBody);
            _byte = message.getBytes(StandardCharsets.UTF_8);
        } catch (JsonProcessingException e) {
            System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }

        return _byte;
    }

    public static String convertObjectToString() {
        var messageBody = getMessage();
        var mapper = new ObjectMapper();
        String message = "";

        try {
            message = mapper.writeValueAsString(messageBody);
        } catch (JsonProcessingException e) {
            System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }

        return message;
    }

    public static String convertObjectToString(MessageBody messageBody) {
        var mapper = new ObjectMapper();
        String message = "";

        try {
            message = mapper.writeValueAsString(messageBody);
        } catch (JsonProcessingException e) {
            System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }

        return message;
    }

    public static MessageBody convertStringToObject(String message) {
        var mapper = new ObjectMapper();
        MessageBody messageBody = new MessageBody();

        try {
            messageBody = mapper.readValue(message, new TypeReference<MessageBody>() {
            });
        } catch (JsonProcessingException e) {
            System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }

        return messageBody;
    }

    public static MessageBody convertStringToObject(byte[] bytes) {
        var mapper = new ObjectMapper();
        MessageBody messageBody = new MessageBody();

        try {
            String message = new String(bytes, StandardCharsets.UTF_8);
            messageBody = mapper.readValue(message, new TypeReference<MessageBody>() {
            });
        } catch (JsonProcessingException e) {
            System.out.println("Error in processing the JSON : " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }

        return messageBody;
    }

    private static MessageBody getMessage() {
        return new MessageBody()
                .setUserDetails(new UserDetails(
                        "Paula Small",
                        "paula.small@bilearner.com")
                )
                .setArticleDetails(new ArticleDetails(
                        "1289 words",
                        "03/12/2024",
                        true,
                        "john.dow@yahoomail.com",
                        80,
                        "second")
                );
    }
}
We will use these conversion methods to send and receive messages between producers and consumers.

Producer

To implement the Pub-Sub pattern, we defined an exchange of type FANOUT. We are publishing the MessageBody object to the exchange. As you can see, we are not attaching a queue to the channel. The channel is directly connected to the exchange.
package com.raven.producer;

public class MyProducer {
    private static final String HOST = "localhost";
    private static final String EXCHANGE = "pub_sub";

    public static void main(String[] args) {
        // CREATE A CONNECTION FACTORY
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);

        // CREATE A CONNECTION FROM FACTORY
        try (Connection connection = connectionFactory.newConnection()) {
            // GET CHANNEL FROM CONNECTION
            Channel channel = connection.createChannel();

            // DECLARE A EXCHANGE
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);

            // MESSAGE DETAILS
            var message = UtilityService.convertObjectToString(
                    new MessageBody()
                            .setUserDetails(new UserDetails(
                                    "Paula Small",
                                    "paula.small@bilearner.com")
                            )
                            .setArticleDetails(new ArticleDetails(
                                    "1289 words",
                                    "03/12/2024",
                                    true,
                                    "john.dow@yahoomail.com",
                                    80,
                                    "second")
                            )
            );

            // PUBLISH A MESSAGE TO CHANNEL
            channel.basicPublish(EXCHANGE, "", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" Message sent : '" + message + "'");
            System.out.println();

        } catch (Exception e) {
            System.out.println("Error in sending message: " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }
    }
}

Consumer #1

Here is our first consumer: Article service. Here, we are processing the article's clap information. The queueDeclare() method is used to declare an auto-delete, non-durable queue. It returns the queue name once the queue has been successfully created. The queue is then bound to the exchange with the queueBind() method.
package com.raven.article_cosumer;

public class ArticleConsumer {
    private static final String HOST = "localhost";
    private static final String EXCHANGE = "pub_sub";

    public static void main(String[] args) {
        // CREATE A CONNECTION FACTORY
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);

        try {
            // CREATE A CONNECTION
            Connection connection = connectionFactory.newConnection();

            // CREATE A CHANNEL FROM CONNECTION
            Channel channel = connection.createChannel();

            // DECLARE A EXCHANGE
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);

            // DECLARE A QUEUE IN THE CHANNEL AND GET ITS NAME
            String queueName = channel.queueDeclare().getQueue();
            System.out.println("Consumer: Article Service: Queue name : " + queueName);

            // BIND THE QUEUE WITH THE EXCHANGE
            channel.queueBind(queueName, EXCHANGE, "");

            System.out.println("Waiting for message...");

            DeliverCallback deliverCallback = (consumeMsg, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("Consumer: Article Service :: Message received : '" + message + "'");

                System.out.println();
                var messageBody = UtilityService.convertStringToObject(message);

                ArticleDetails articleDetails = messageBody.getArticleDetails();
                System.out.println("-> Article details: " + articleDetails);

                if (articleDetails.isClapped()) {
                    System.out.println("-> Article is clapped by " + articleDetails.clappedBy());
                    System.out.println("-> And article has 67 claps now!");
                }
            };

            // GET MESSAGE FROM EXCHANGE
            channel.basicConsume(queueName, true, deliverCallback, consumeMsg -> {
            });
        } catch (Exception e) {
            System.out.println("Article Consumer: Error in consuming message: " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }
    }
}

Consumer #2

Payment service is our second consumer. This consumer handles the payment information. Here, we create the queue at runtime and associate it with the exchange.
package com.raven.payment_cosumer;

public class PaymentConsumer {
    private static final String HOST = "localhost";
    private static final String EXCHANGE = "pub_sub";

    public static void main(String[] args) {
        // CREATE A CONNECTION FACTORY
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);

        try {
            // CREATE A CONNECTION
            Connection connection = connectionFactory.newConnection();

            // CREATE A CHANNEL FROM CONNECTION
            Channel channel = connection.createChannel();

            // DECLARE A EXCHANGE
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);

            // DECLARE A QUEUE IN THE CHANNEL AND GET ITS NAME
            String queueName = channel.queueDeclare().getQueue();
            System.out.println("Consumer: Payment Service: Queue name : " + queueName);

            // BIND THE QUEUE WITH THE EXCHANGE
            channel.queueBind(queueName, EXCHANGE, "");

            System.out.println("Waiting for message...");

            DeliverCallback deliverCallback = (consumeMsg, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("Consumer: Article Service :: Message received : '" + message + "'");

                System.out.println();
                var messageBody = UtilityService.convertStringToObject(message);

                ArticleDetails articleDetails = messageBody.getArticleDetails();
                System.out.println("-> Article details: " + articleDetails);

                if (articleDetails.isClapped() && articleDetails.readTime() > 50) {
                    System.out.println("-> Article is read time is " + articleDetails.readTime() + articleDetails.timeUnit());
                    System.out.println("-> You have received 30 cents.");
                    System.out.println("-> Your total earning is $10.45.");
                }
            };

            // GET MESSAGE FROM EXCHANGE
            channel.basicConsume(queueName, true, deliverCallback, consumeMsg -> {
            });
        } catch (Exception e) {
            System.out.println("Payment Consumer: Error in consuming message: " + e.getMessage() + ", " + Arrays.toString(e.getStackTrace()));
        }
    }
}

Testing

We need to install RabbitMQ on our local machine to communicate with the Java application. You can find an installation guide here. So, we'll start our two consumer applications one at a time, then the producer program.

The producer sends the message.
RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange

The article service consumer receives and processes the message.
RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange

The payment service consumer also receives and processes the message.
RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange

The RabbitMQ management console is accessible via http://localhost:15672/#/. To log in to the console, use guest as the username and password. Click the 'Exchanges' tab to view the exchanges that are linked to the message broker. Find our exchange, 'pub_sub', and click on it for more information.
RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange

To view the queues, select the 'Queues and Streams' tab.
RabbitMQ - Pub-Sub Pattern: Implementation Of Fanout Exchange

Source code: PubSubDemo.
Happy coding!!! 😊
in

Saturday, November 23, 2024

RabbitMQ: Message Broker - A Brief Introduction

We are all familiar with Hogwarts' magical postal network, the Owl Post Office. This magical delivery system relies on owls to deliver packages and letters. They can even pick up an item from any address and give it to another using their magical tracking abilities. Similarly, we can compare RabbitMQ, a message broker to the Owl Post Office. It allows applications to interact with one another and exchange messages.

But why should we use message broker?

An e-commerce application ships thousands of items per day and sends email notifications for each one. This is a synchronous operation. This means that the item's ship status is saved in the database, and an email is sent. Now, on bad days, the email server goes down or crashes due to overload. So none of us receive the email notification.
RabbitMQ -  A Brief Introduction

Assume we add a service layer between the shipment service and e-mail service. So, shipping service sends the shipment notification to the service layer, which will route the message to the email service. If the email service goes down, the service layer will store the messages. When the email service goes live, the service layer will push those messages to the e-mail service.
RabbitMQ -  A Brief Introduction
And this service layer is nothing but a message broker.

RabbitMQ is a distributed message and stream broker. A message broker is software that sits among applications, allowing them to exchange messages. RabbitMQ is useful for decoupling services, remote procedure calls (RPC), streaming services, and IoT.

RabitMQ is most commonly used in microservice-based architectures. It operates asynchronously. This means they do not follow a simple request-response pattern, and we must wait for replies. RabbitMQ, like the postal service, sends messages from producer to consumer.
RabbitMQ -  A Brief Introduction

Producer

A Producer is an action or event that generates messages. Credit card transactions, a drop or rise in stock price, or an order dispatch are all examples of Producers.

Consumer

On the other hand, Consumers are the entities that listen to messages. Humans are the perfect example of a consumer. We consume everything, from news to alcohol. Jokes aside! Because a message broker might be connected to multiple producers and consumers, communication between them is asynchronous.

Exchange

But, wait a second! How does the message broker get a message from the producer to the consumer? The answer is Exchange. It functions as RabbitMQ's brain. Exchange helps message broker to route messages from producer to consumer.
RabbitMQ -  A Brief Introduction

Queue & Binding

A message broker may have multiple exchanges. Exchanges always receive messages from producers. Consumers are not connected directly to exchanges. Queues connect exchanges and consumers. Binding connects queues to exchanges. You can think of queues like our letterbox. Exchange pushes messages into queues, from which interested consumers can consume them.
RabbitMQ -  A Brief Introduction
An exchange can be bound to multiple queues, and a queue can be linked to numerous exchanges. The consumer might also listen to messages from multiple queues.

Connection & Channel

RabbitMQ is designed to implement the AMQP (Advance Message Queuing Protocol). AMQP is an open messaging protocol that defines the rules for message exchange, queueing, and routing in a messaging system.

To communicate with RabbitMQ, a client or application must first establish a connection. A client can be either a producer or a consumer. The connection is established using either TCP or TLS. The main purpose of a connection is to establish a secure path between the client and RabbitMQ.

A connection can have several channels. But, why do we need channels? We can set up multiple connections between the client and the broker to exchange messages. Keeping multiple TCP connections open at the same time is undesirable because it consumes system resources and makes firewall configuration more difficult. So, according to the AMQP protocol, channels are "lightweight connections that share a single TCP connection".
RabbitMQ -  A Brief Introduction
We can use the amqp-client Java library to communicate with RabbitMQ in a Java application. This library is available through the Maven repository. We need to install RabbitMQ on our local machine to communicate with the Java application. You can find an installation guide here.

Producer & Consumer using Java

Now, let's look at how to create a message and how to consume it using RabbitMQ and Java. First, we write a program that will connect and publish a message to RabbitMQ.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Publisher {
    private static final String HOST = "localhost";
    private static final String QUEUE = "OWL-POST";
    private static final String MESSAGE = "Happy birthday Hermione!";

    public static void main(String[] args) {
        // CREATE A CONNECTION FACTORY
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);

        // CREATE A CONNECTION FROM FACTORY
        try (Connection connection = connectionFactory.newConnection()) {
            // GET CHANNEL FROM CONNECTION
            Channel channel = connection.createChannel();

            // ASSIGN A QUEUE TO CHANNEL
            channel.queueDeclare(QUEUE, false, false, false, null);

            // PUBLISH A MESSAGE TO CHANNEL
            channel.basicPublish("", QUEUE, null, MESSAGE.getBytes(StandardCharsets.UTF_8));
            System.out.println(" Message sent : '" + MESSAGE + "'");
        } catch (Exception e) {
            System.out.println(e.getMessage() + ", " + e.getStackTrace());
        }
    }
}
We have used amqp-client Java client library to communicate with RabbitMQ.

RabbitMQ is running on our local machine, so the host is localhost. The queue name is 'OWL-POST'. Consumer should connect to this queue to consume message from producer.

ConnectionFactory is a factory class that allows you to open a connection to RabbitMQ.

First, we get a connection from ConnectionFactory, and then we create a channel. We create a queue using queueDeclare() method by passing the queue name. This queue is idempotent, which means that if it already exists under this name, it will not be created again.

Next, we use the basicPublish() method to send our encoded message to the queue. The first argument of the basicPublish() method is the exchange name. Because we passed a blank string as the first argument, we are connecting to RabbitMQ's default exchange.

Here is our consumer.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Consumer {
    private static final String HOST = "localhost";
    private static final String QUEUE = "OWL-POST";

    public static void main(String[] args) {
        // CREATE A CONNECTION FACTORY
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);

        try {
            // CREATE A CONNECTION
            Connection connection = connectionFactory.newConnection();

            // CREATE A CHANNEL FROM CONNECTION
            Channel channel = connection.createChannel();

            // ASSIGN A QUEUE TO CHANNEL
            channel.queueDeclare(QUEUE, false, false, false, null);
            System.out.println(" Waiting for message...");

            DeliverCallback deliverCallback = (consumeMsg, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println(" Message received : '" + message + "'");
            };

            channel.basicConsume(QUEUE, true, deliverCallback, consumeMsg -> {});
        } catch (Exception e) {
            System.out.println(e.getMessage() + ", " + e.getStackTrace());
        }
    }
}
So the consumer is connected to the 'OWL-POST' queue. And it receives the message via the basicConsume() method.

DeliverCallback is a callback interface. It is notified when the consumer receives the message and is passed as an argument to the basicConsume() method.

Here is the message from Harry -'Happy birthday Hermione!'.

Source code: Publisher and Consumer.
Happy coding!!! 😊

Saturday, November 16, 2024

Spring Boot: Custom Annotation using BeforeExecutionGenerator

springboot,java,hibernate,jpa,programming,software development,technology,software engineering
Hibernates offers several identifier generation strategies, the most popular being the AUTO strategy. UUIDs (universally unique identifiers) are frequently preferred over auto-incrementing integers. They provide uniqueness across distributed systems without requiring coordination. In this article, we'll write a new annotation type to use UUIDs as primary keys in a Spring Boot application.

We will use ULID (Universally Unique Lexicographically Sortable Identifier) as the value for primary or foreign keys. The ULID is a 128-bit identifier. The first 48 bits indicate the number of milliseconds since Unix Epoch. The last 80 bits are generated by a secure random number generator. We can store ULID values as a 26-character string.

Therefore, to use ULIDs, we will develop a new annotation type, which we will use in our entity class.

Custom Annotation Type

We developed an UlidGenerator class that implements the BeforeExecutionGenerator interface provided by Hibernate.
public class UlidGenerator implements BeforeExecutionGenerator {
    @Override
    public Object generate(SharedSessionContractImplementor sharedSessionContractImplementor,
                           Object o, Object o1, EventType eventType) {
        return UlidCreator.getUlid().toString();
    }

    @Override
    public EnumSet<EventType> getEventTypes() {
        return EventTypeSets.INSERT_ONLY;
    }
}
BeforeExecutionGenerator is a generator that produces a value before the execution of a specific task, such as a method, transaction, or process. The generate() method executes any Java code. In this case, it produces a ULID. We used ULID Creator to generate ULIDs.

EventTypeSets.INSERT_ONLY is generally used to produce identifiers.

Now, it's time to create a new annotation type. Our custom annotation is Ulid, which was created with @interface.
@IdGeneratorType(UlidGenerator.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface Ulid {
}
The annotation type is annotated with @IdGeneratorType (UlidGenerator.class), @Retention (RetentionPolicy.RUNTIME), and @Target ({ElementType.FIELD}). These annotations are considered meta-annotations.

@Retention(RetentionPolicy.RUNTIME) indicates that the VM will retain annotations of this type, which can be read reflectively at run-time.

@Target({ElementType.FIELD}) indicates that Ulid can annotate variable declarations.

@IdGeneratorType(UlidGenerator.class) sets up a custom identifier generator. The identifier is generated using the UlidGenerator, which we previously created.

Entity

Here is our first entity OrderMaster. We have used our custom annotation type, @Ulid along with @Id to generate primary key values.
@Entity
@Table(name = "ORDER_MASTER")
public class OrderMaster {
    @Id
    @Ulid
    @Column(name = "id", unique = true, nullable = false, length = 26)
    private String id;

    @Column(name = "order_date")
    private LocalDate orderDate;

    @Column(name = "customer_id", length = 40)
    private String customerId;

    @Column(name = "order_posted_date", nullable = false, updatable = false)
    @CreationTimestamp
    private LocalDateTime orderPostedDate;

    @JsonManagedReference
    @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "orderMaster")
    private List<OrderDetails> orderDetails;

    public String getId() {
        return id;
    }

    public OrderMaster setId(String id) {
        this.id = id;
        return this;
    }

    public LocalDate getOrderDate() {
        return orderDate;
    }

    public OrderMaster setOrderDate(LocalDate orderDate) {
        this.orderDate = orderDate;
        return this;
    }

    public String getCustomerId() {
        return customerId;
    }

    public OrderMaster setCustomerId(String customerId) {
        this.customerId = customerId;
        return this;
    }

    public LocalDateTime getOrderPostedDate() {
        return orderPostedDate;
    }

    public OrderMaster setOrderPostedDate(LocalDateTime orderPostedDate) {
        this.orderPostedDate = orderPostedDate;
        return this;
    }

    public List<OrderDetails> getOrderDetails() {
        return orderDetails;
    }

    public OrderMaster setOrderDetails(List<OrderDetails> orderDetails) {
        this.orderDetails = orderDetails
                .stream()
                .map(o -> o.setOrderMaster(this))
                .toList();
        return this;
    }
}

Another entity is OrderDetails. To generate primary key values, we used both @Ulid and @Id.
@Entity
@Table(name = "ORDER_DETAILS")
public class OrderDetails {
    @Id
    @Ulid
    @Column(name = "ID", unique = true, nullable = false, length = 26)
    private String id;

    @Column(name = "item_id", length = 40)
    private String itemId;

    @Column(name = "quantity")
    private Integer quantity;

    @Column(name = "unit_price")
    private BigDecimal unitPrice;

    @JsonBackReference
    @ManyToOne(fetch = FetchType.LAZY, cascade = CascadeType.ALL)
    @JoinColumn(name = "order_master_id", referencedColumnName = "ID", nullable = false)
    private OrderMaster orderMaster;

    public String getId() {
        return id;
    }

    public OrderDetails setId(String id) {
        this.id = id;
        return this;
    }

    public String getItemId() {
        return itemId;
    }

    public OrderDetails setItemId(String itemId) {
        this.itemId = itemId;
        return this;
    }

    public Integer getQuantity() {
        return quantity;
    }

    public OrderDetails setQuantity(Integer quantity) {
        this.quantity = quantity;
        return this;
    }

    public BigDecimal getUnitPrice() {
        return unitPrice;
    }

    public OrderDetails setUnitPrice(BigDecimal unitPrice) {
        this.unitPrice = unitPrice;
        return this;
    }

    public OrderMaster getOrderMaster() {
        return orderMaster;
    }

    public OrderDetails setOrderMaster(OrderMaster orderMaster) {
        this.orderMaster = orderMaster;
        return this;
    }
}
We kept these entities, OrderMaster and OrderDetails, in the @OneToMany relationship. As the primary keys of these entities are of type ULID, their foreign key values will be of the same type.

Repository

We created IOrderMasterRepository, a repository that extends JpaRepository.
@Repository
public interface IOrderMasterRepository extends JpaRepository<OrderMaster, String> {
}

Service

OrderMasterService class contains two methods: one to save a new order and another to fetch order details by Id.
@Service
public class OrderMasterService {
    private final IOrderMasterRepository orderMasterRepository;

    @Autowired
    public OrderMasterService(IOrderMasterRepository orderMasterRepository) {
        this.orderMasterRepository = orderMasterRepository;
    }

    public OrderMaster saveOrder(OrderMasterInput input) {
        List<OrderDetails> orderDetails = new ArrayList<>();

        if (input.detailsInputs().isPresent()) {
            orderDetails = input.detailsInputs().get()
                    .stream()
                    .map(e -> new OrderDetails()
                            .setItemId(e.itemId())
                            .setQuantity(e.quantity())
                            .setUnitPrice(e.unitPrice())
                    ).toList();
        }

        OrderMaster orderMaster = new OrderMaster()
                .setOrderDate(input.orderDate())
                .setCustomerId(input.customerId())
                .setOrderDetails(orderDetails);

        orderMaster = this.orderMasterRepository.save(orderMaster);
        return orderMaster;
    }

    public OrderMaster getDetails(String id) {
        Optional<OrderMaster> orderMaster = orderMasterRepository.findById(id);
        if (orderMaster.isPresent()) {
            return orderMaster.get();
        } else {
            return null;
        }
    }
}

Controller

The OrderController class contains two endpoints: one for saving new orders and one for retrieving order details by Id.
@RestController
@Tag(description = "API related to Order.", name = "Order")
@RequestMapping("/order")
public class OrderController {
    private final OrderMasterService orderMasterService;

    @Autowired
    public OrderController(OrderMasterService orderMasterService) {
        this.orderMasterService = orderMasterService;
    }

    @Operation(summary = "Save a new Order.", description = "Save a new Order details.")
    @PostMapping(value = "/v1")
    public ResponseEntity<AppApiResponse<OrderMaster>> saveOrder(@RequestBody OrderMasterInput input) {
        return new ResponseEntity<>(new AppApiResponse<>("SUCCESS", orderMasterService.saveOrder(input), "Data saved!"),
                HttpStatus.OK);
    }

    @Operation(summary = "Get a Order details.", description = "Get a Order details by ID.")
    @RequestMapping(value = "/v1", method = RequestMethod.GET)
    public ResponseEntity<AppApiResponse<OrderMaster>> getDetails(@RequestParam String id) {
        return new ResponseEntity<>(new AppApiResponse<>>("SUCCESS", orderMasterService.getDetails(id), "Data fetched!"),
                HttpStatus.OK);
    }
}
Implementing UUIDs as Primary Keys in Spring Boot

Testing

First, we'll call the endpoint, http://localhost:8090/order/v1, to save order information to the database.
Implementing UUIDs as Primary Keys in Spring Boot
Now, look at our order master database table.
Implementing UUIDs as Primary Keys in Spring Boot
We can see that ULID is set as values to the Id column.

And here is our order details database table.
Implementing UUIDs as Primary Keys in Spring Boot
ULID is also a value for the primary and foreign key columns.

Now we'll call another endpoint, http://localhost:8090/order/v1?id=c73b041d-9320-453d-b57a-1ce7b6e5accd, to get order details by ID.
Implementing UUIDs as Primary Keys in Spring Boot

Traditional UUIDs do not follow a natural order. It causes random insertions in database indexes. On the other hand, as the first 48 bits represent the number of milliseconds, ULID values follow a natural order by generation time.

So, in this article, we learned how to set ULID as a database key value and retrieve details using ULIDs.

You can get the source code from here.
Happy coding!!! 😊

Thursday, October 10, 2024

Java Stream API: A Beginner's Crash Course

java,stream api,functional programming,consumer,predicate,programming,software development,technology, software engineering
Java 8 introduced a powerful new feature called the Stream API, which marked a significant evolution in the language. Developers can use the Stream API to perform data processing operations on collections concisely and expressively. In this blog post, we'll look at the Stream API, its core concepts, and how it simplifies data processing in Java applications.

What is the Stream API?

In Java 8, a Stream is a collection of elements processed concurrently or sequentially. CPUs have become more powerful and complex due to recent advancements in hardware development, with multiple cores that can process data in parallel. The Stream API was introduced to support parallel data processing in Java, without the boiler code of defining and synchronizing threads. Unlike traditional collections, streams do not store data; they serve as a pipeline for transforming and aggregating data from a source such as a List, Set, or array. Streams enable developers to perform declarative and functional operations such as filtering, mapping, sorting, and reducing.

java,stream api,functional programming,consumer,predicate,programming,software development,technology
The Stream interface defines many operations, which can be divided into two categories: intermediate operations and terminal operations. Stream operations that can be connected are referred to as intermediate operations, while those that close a stream are known as terminal operations.

java,stream api,functional programming,consumer,predicate,programming,software development,technology

We can divide the operations into two groups:
  • Intermediate Operations: filter() and map() return another stream as a return type and can be combined to form a pipeline.
  • Terminal Operationscollect() executes the pipeline, returns the result, and closes it.

Creating Streams

Before optimizing our code with streams, let's look at how to create them. To create a stream, we need a source. That source could be anything: a collection (list, set, or map), an array, or I/O resources that are used as input.
A stream does not change its source, so multiple stream instances can be created from the same source and used for different purposes.


Create Streams from Collections

In the code snippet below, we convert a list of integers into a stream by calling the stream() method. After creating a stream, we use the forEach() method to print the stream's values as well as the name of the execution thread on which this code is running.
public class StreamFromCollection {
    public static void main(String[] args) {
        List<Integer> geometrySeries = List.of(1, 3, 9, 27, 81, 243);
        geometrySeries.stream()
                .forEach(s -> System.out.println(Thread.currentThread().getName() + ": " + s));
    }
}
The previous code generates a stream of integer elements. The most commonly used method is forEach(), which iterates over the stream's elements and prints their values. Because the output of the forEach() method is not a stream, it is also referred to as a terminal function (operation).


Create Streams from Arrays

We can also create streams from arrays. Consider the following piece of code.
public class StreamFromArrays {
    public static void main(String[] args) {
        Integer<> squareSeries = {1, 4, 9, 16, 25};
        Arrays.stream(squareSeries)
                .forEach(s -> System.out.println(Thread.currentThread().getName() + ": " + s));
    }
}
The static method stream(int[] array) from the utility class Arrays creates a stream of primitives.

By using the parallel() method, we can also parallelise the stream that is created from arrays.
public class StreamFromArrays {
    public static void main(String[] args) {
        Integer<> squareSeries = {1, 4, 9, 16, 25};
        Arrays.stream(squareSeries).parallel()
                .forEach(i -> System.out.println(Thread.currentThread().getName() + ": " + i));
    }
}

In the previous example, we saw how to pass an array object to the stream() method. The stream() method also allows us to pass the start and end indexes along with the array object. This will create a subarray of the original array object.
public class StreamFromArrays {
    public static void main(String[] args) {
        Integer<> squareSeries = {1, 4, 9, 16, 25, 36, 49};
        Arrays.stream(squareSeries, 3, squareSeries.length - 1)
                .forEach(i -> System.out.println(Thread.currentThread().getName() + ": " + i));
    }
}

Create Finite Stream

You can also create streams without a source, such as collections or arrays, using Stream.generate() and Stream.builder(). The code below shows how the builder() method can be used to generate a finite stream of Integer values. 
public class StreamFinite {
    public static void main(String[] args) {
        Stream<Integer> integerStream = Stream.<Integer>builder()
                .add(1).add(2).add(9).add(16).add(25)
                .build();
        integerStream.takeWhile(s -> s > 0)
                .forEach(s -> System.out.println(s + " "));
    }
}
Because the builder() method is generic, it is required to specify the type that will be used to create the stream's elements.

The generate() method can be used to create a stream. This method takes an instance of Supplier<T> as a parameter.
public class StreamFinite {
    public static void main(String[] args) {
        Stream<BigDecimal> randomDecimal = Stream.generate(
                new Supplier<BigDecimal>() {
                    @Override
                    public BigDecimal get() {
                        Random random = new Random();
                        return BigDecimal.valueOf(random.nextDouble() * 100)
                                .setScale(2, RoundingMode.UP);
                    }
                }
        ).limit(10);
        randomDecimal.forEach(s -> System.out.println(s + " "));
    }
}
To keep the stream finite, we used the limit() method to control the number of elements generated.

Product class

Here's a simple Product class with a few properties. We will be using this class throughout the tutorial.

Product List

We've written a static method that returns a list of Products. This product list is used later in the tutorial.

Intermediate Operation: map() And Terminal Operation: collect()

The map() method is used to perform intermediate operations. Intermediate operations process elements are in the stream and send the result as a stream to the next function in the pipeline. It takes an argument of type Function<T, R>, essentially a reference to a function, and then applies this function to each element of the stream and returns a stream.

A stream of objects can be transformed in a variety of ways using the map() method. For instance, we can extract a particular attribute or field from each item in a stream or change an object stream from one type to another.

πŸ‘‰ Consider this example. Here, we will find the square of each element in the stream and print the resulting stream.
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamMapExample {
    public static void main(String args[]) {
      List<Integer> result = Arrays.asList(2, 3, 4, 5, 6, 7, 8)
				.stream()
				.map(n-> n*n)
				.collect(Collectors.toList());
      System.out.println(result);
    }
}
java,stream api,functional programming,consumer,predicate,programming,software development,technology
The map() method in the preceding program takes the stream values one by one and executes the logic that we specified. The collector then gathers the resultant elements in a list and returns it to the caller method.


Convert lowercase letters to uppercase

Let's look at another example of the Java stream's map() method, this time converting all lowercase letters to uppercase letters. The code for this is shown below.
import java.util.stream.Stream;
import java.util.function.Function;

public class StreamMapExample {
    public static void main(String args[]) {
        Function<String, String> uppercaseFunction = String::toUpperCase;
	Stream.of("Seoul", "Istanbul", "Chicago", "Barcelona", "Sydney")
		.map(uppercaseFunction)
		.forEach(System.out::println);
    }
}


Number of characters in each word

In this example, we want to know how many characters are in each word. We can accomplish this task by calling the map() method. This method will take each word and return its length.
public class StreamMapCollect {
    public static void main(String[] args) {
        List<String> countries = List.of("Barbados", "Japan", "New Zealand");
        List<Integer> nameLength = countries
                .stream()
                .map(String::length)
                .collect(Collectors.toList());
        System.out.println(nameLength);
    }
}
java,stream api,functional programming,consumer,predicate,programming,software development,technology

Fetch values from a list of objects

In the next example, we will extract a list of brand names from the Product list.
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamMapCollect {
	public static void main(String[] args) {
		List<String> brandList = ProductList.getProductList()
				.stream().map(Product::getBrand)
				.collect(Collectors.toList());
		System.out.println(brandList);

		Function<String, String> uppercaseFunction = String::toUpperCase;
		brandList = ProductList.getProductList()
				.stream().map(obj -> uppercaseFunction.apply(obj.getBrand()))
				.collect(Collectors.toList());
		System.out.println(brandList);
	}
}
The map() method returns a String stream. Using the collect() method, all of these elements are added to a List<String>. This method adds elements to a Collection instance as they are processed.

Fetch distinct values from a list of objects

We can find distinct brand names in this manner:
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamMapCollect {
	public static void main(String[] args) {
		List<String> distinctBrands = ProductList.getProductList()
			.stream().map(Product::getBrand).distinct()
			.collect(Collectors.toList());
		System.out.println(distinctBrands);
	}
}
The distinct() method takes a stream and returns a stream containing all of the stream's different elements.

πŸ‘‰ We can find distinctive brand names in another way.
public class StreamMapCollect {
	public static void main(String[] args) {
		Set<String> brandSet = ProductList.getProductList()
				.stream()
                .map(Product::getBrand).collect(Collectors.toSet());
		System.out.println(brandSet);
	}
}
So we're collecting the processed elements in a Set<String> with the collect() method. We will get a collection of distinct elements because Set does not permit duplicate elements.


Create a list of Object using Stream

πŸ‘‰ We can extract elements from the stream and create another list of objects using the map() method.
public class StreamMapCollect {
	public static void main(String[] args) {
		Set<ProductDetails> productSet = ProductList.getProductList()
			.stream().map(obj -> {
				return new ProductDetails(obj.getTitle(), 
                		obj.getDescription(), 
                    		obj.getPrice());
			}).collect(Collectors.toSet());
		System.out.println(productSet);
	}
}

class ProductDetails {
	private String name;
	private String description;
	private double price;

	public ProductDetails() {
	}

	public ProductDetails(String name, String description, double price) {
		this.name = name;
		this.description = description;
		this.price = price;
	}

	public String getName() {
		return name;
	}

	public String getDescription() {
		return description;
	}

	public double getPrice() {
		return price;
	}

	public void setName(String name) {
		this.name = name;
	}

	public void setDescription(String description) {
		this.description = description;
	}

	public void setPrice(double price) {
		this.price = price;
	}

	@Override
	public String toString() {
		return "ProductDetails [name=" + name + ", description=" + description + ", price=" + price + "]";
	}
}
We made a ProductDetails class with a few properties. We extract a few properties from ProductList and use them to create ProductDetails objects with the map(..) method.


Intermediate Operation: filter()

The Stream API's filter() method chooses elements from a collection based on a condition. It takes a predicate (a functional interface that returns a boolean) as input and returns a new stream containing only elements that satisfy the condition. Here’s the syntax:
Stream<T> filter(Predicate<? super T> predicate)
Let's look at a basic example. Assume we have a list of integers and want to exclude the even numbers.
public class StreamFilterCollect {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(19, 24, 32, 47, 52, 67, 71, 85);

        // Predicate to filter out even numbers
        Predicate<Integer> predicate = (x) -> x % 2 != 0;

        List<Integer> oddNumbers = numbers
                .stream()
                .filter(predicate) 
                .collect(Collectors.toList());

        System.out.println("Odd Numbers: " + oddNumbers);
    }
}
java,stream api,functional programming,consumer,predicate,programming,software development,technology


Filtering Objects

You can also filter a list of objects based on specific attributes. Our data source will be ProductList.
public class StreamFilterExample {
    public static void main(String[] args) {
        List<Product> products = ProductList.getProductList()
                .stream()
                .filter(e -> e.getRating() > 4.5)
                .collect(Collectors.toList());
        System.out.println("Products :: " + products);
    }
}
We've filtered the products based on their ratings. In this case, predicate is mentioned in the filter() method.

We can use the && or || operators in the filter() method to apply multiple conditions.
public class StreamFilterExample {
    public static void main(String[] args) {
        List<Product> products = ProductList.getProductList()
                .stream()
                .filter(e -> e.getRating() > 4.5
                        && e.getDiscountPercentage() > 10)
                .collect(Collectors.toList());
        System.out.println("Products :: " + products);
    }
}
Products are filtered by their ratings and discounts.


Combining filter() with map()

To extract Product titles based on ratings and discounts, we first filter the stream and then use map() to extract the title.
public class StreamFilterExample {
    public static void main(String[] args) {
        List<String> productTitles = ProductList.getProductList()
                .stream()
                .filter(e -> e.getRating() > 4.5
                        && e.getDiscountPercentage() > 10)
//                .map(e -> e.getTitle())
                .map(Product::getTitle)
                .collect(Collectors.toList());
        System.out.println("Product titles :: " + productTitles);
    }
}


Terminal Operation: findAny()

Use findAny() to find any element in the current stream. It yields an Optional<T> instance. Here's a basic example.
public class StreamFindAnyDemo {
    public static void main(String[] args) {
        Optional<Integer> anyNumber = Arrays.asList(8, 57, 19, 14, 51, 33, 38)
                .stream()
                .filter(x -> x % 2 != 0)
                .findAny();
        anyNumber.ifPresent(n -> System.out.println(n));
    }
}
java,stream api,functional programming,consumer,predicate,programming,software development,technology

πŸ‘‰ We can also select a product from our Product List.
public class StreamExample {
    public static void main(String[] args) {
        Optional<Product> productAny = ProductList.getProductList()
                .stream()
                .findAny();
        productAny.ifPresent(c -> System.out.println(c));
    }
}

πŸ‘‰ We can use findAny() alongside other intermediate operations. In the example below, it is combined with filter().
public class StreamExample {
    public static void main(String[] args) {        
        Optional<Product> productFilterAny = ProductList.getProductList()
                .stream()
                .filter(e -> e.getPrice() > 890)
                .findAny();
        productFilterAny.ifPresent(c -> System.out.println(c));
    }
}


Terminal Operation: anyMatch()

We want to know if a specific element is in the stream, we can use anyMatch(). It accepts Predicate<T> arguments and returns a true or false boolean.

Here's a simple code snippet for checking whether or not a specific word is in the list.
public class StreamAnyMatchExample {
    public static void main(String[] args) {
        Boolean isAnimalPresent = List.of("Fern", "Zebra", "Peace Lily", "Snake Plant")
                .stream()
                .anyMatch(e -> e.equals("Zebra"));
        if (isAnimalPresent) {
            System.out.println("List contains animal name.");
        } else {
            System.out.println("We have a list of plant names.");
        }
    }
}
java,stream api,functional programming,consumer,predicate,programming,software development,technology

πŸ‘‰ The code snippet below checks to see if there is a Product in the stream with a price greater than a specified amount.
public class StreamAnyMatchExample {
    public static void main(String[] args) {
	// Is there any Product of price greater than 1000/- in the Product list.
        Predicate<Product> predicate = (p) -> p.getPrice() > 1000;
        Boolean anyMatch = ProductList.getProductList()
                .stream()
                .anyMatch(predicate);
        System.out.println("Product of price 1000/- is preset? " + anyMatch);
    }
}

πŸ‘‰ We can combine the anyMatch() and filter() methods. We've got a list of brand names. We want to know if these brand names are present in our existing stream.
public class StreamAnyMatchExample {
    public static void main(String[] args) {
        List<String> brandNames = List.of(new String[]{"Apple", "Boat"});

        List<Product> products = ProductList.getProductStream()
                .filter(e -> brandNames.stream().anyMatch(a -> a.equals(e.getBrand())))
                .collect(Collectors.toList());
        System.out.println("Products :: " + products);
    }
}


Conclusion

The Stream API is an effective tool for modern Java development. Understanding its key concepts and operations will help you write more concise, readable, and efficient code. Whether you're working with collections, arrays, or custom data sources, the Stream API gives you a flexible and declarative way to process data.

Happy coding!!! 😊
in

Popular posts