Managing Distributed Architecture using Azure Event Hubs
Overview
A distributed application is a program that runs on more than one workload and communicates through a network
Let’s have an example, imagine we have E-Commerce application, we can have the catalog capability running in one or more than one workload, and the Cart capability is running in another workload (it can be kind of micro service or SOA architecture), when an end user want to add an article to his Cart, the two capabilities need to communicate between each other.
This communication can be made using one of the two following patterns:
1. Synchronous Communication
The Catalog capability sends the product information to the Cart capability using an http request.
2. Asynchronous Communication
The catalog capability publishes the product information ‘’ to the middleware and the Cart capability subscribe to the middleware to consume the message.
I am not going to compare the two solutions, it needs a specific article to talk about the subject, in this article I am going to explain how to implement asynchronous communication using pub/Sub design pattern,
Just keep in mind that middleware like Azure EventHub allow us to create an architecture with less coupling and more cohesion.
Provisioning of Azure Event Hub
Event Hub is a messaging service available in Azure, it is a fully managed, real-time data ingestion service.
As usual, we need to create a resource group, after that we create event hub namespaces and the event hub.
For the standard tier we can have up to 10 event hubs per namespace and up to 32 portions per Event hub.
Partitions are used to scale out the performance of the system.
you can check limits for other tiers in this link Azure Event hub limitation
resource_group_name="distributed-application"
event_Hubs_namespace="ns-lab-event-ehn-da"
event_hub_name="ns-lab-event-eh"
sendauthorule="sendauthorule"
readauthorule="readauthorule"
location="eastus"
az group create --name $resource_group_name --location $location
az eventhubs namespace create --name $event_Hubs_namespace \
--resource-group $resource_group_name \
--location $location \
--enable-auto-inflate true \
--maximum-throughput-units 20 \
--zone-redundant false \
--sku Standard
az eventhubs eventhub create --name $event_hub_name \
--resource-group $resource_group_name \
--namespace-name $event_Hubs_namespace \
--partition-count 4 \
--message-retention 1
Once the event hub is created, we need to create authorization rules that are going to allow the producer and the consumer to access to the event hub.
The producer needs send (publish) authorization rule and the consumer needs listen (subscribe) authorization to consume messages from the event hub.
az eventhubs eventhub authorization-rule create \
--resource-group $resource_group_name \
--namespace-name $event_Hubs_namespace \
--eventhub-name $event_hub_name \
--name $sendauthorule \
--rights Send
az eventhubs eventhub authorization-rule create \
--resource-group $resource_group_name \
--namespace-name $event_Hubs_namespace \
--eventhub-name $event_hub_name \
--name $readauthorule \
--rights Listen
sendercnxstring=$(az eventhubs eventhub authorization-rule keys list \
--resource-group $resource_group_name \
--namespace-name $event_Hubs_namespace \
--eventhub-name $event_hub_name \
--name $sendauthorule \
--query "primaryConnectionString" \
--out tsv)
consumercnxstring=$(az eventhubs eventhub authorization-rule keys list \
--resource-group $resource_group_name \
--namespace-name $event_Hubs_namespace \
--eventhub-name $event_hub_name \
--name $readauthorule\
--query primaryConnectionString \
--out tsv)
echo "PRODUCER_CONNECTION_STRING : ${sendercnxstring}"
echo "CONSUMER_CONNECTION_STRING : ${consumercnxstring}"
The Azure Event Hub allows to have multiple consumers for the same message, let’s have an example, in our e-commerce application, we need the check the stock when the client adds an article to the Cart.
Cart and Inventory Capbilites consume the same message produced by the Catalog capability, to distinguish between consumers, Azure Event Hub uses consumer-group configuration, each consumer has to consume the event hub using a specific consumer group.
The script below creates two consumer groups
az eventhubs eventhub consumer-group create \
--resource-group $resource_group_name \
--namespace-name $event_Hubs_namespace \
--eventhub-name $event_hub_name \
--name consumer-group-cart
az eventhubs eventhub consumer-group create \
--resource-group $resource_group_name \
--namespace-name $event_Hubs_namespace \
--eventhub-name $event_hub_name \
--name consumer-group-inventory
Let’s have some fun and create two java, maven applications, the first one will be the producer and the second one will be the consumer.
Producer Application
In pom.xml add the Azure event hub dependency
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.1.1</version>
</dependency>
Then create an instance of the producer using the producer connection string, after that create a producer batch with a message, a message can be a string or a Jason serialized object.
With the send method, we push the message to the event hub
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(PRODUCER_CONNECTION_STRING)
.buildProducerClient();
EventDataBatch batch = producer.createBatch();
batch.tryAdd(new EventData(message));
producer.send(batch);
The Azure event hub will distribute the published messages between the four operations, remember that we configured the event hub with four partitions.
Azure use by default Round Robin algorithm to distribute the load between partitions of the event hubs.
Consumer application
in pom.xml add the same Azure event hub dependency
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.1.1</version>
</dependency>
Then create a consumer instance using the consumer connection string and one of created consumer groups, after that we need to subscribe to event hubs partitions
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.connectionString(CONSUMER_CONNECTION_STRING)
.consumerGroup("consumer-group-cart")
.buildAsyncConsumerClient();
Flux<String> partitionIds = consumer.getPartitionIds();
for(String id : partitionIds.toIterable())
{
consumer.receiveFromPartition(id, EventPosition.latest())
.subscribe(PARTITION_PROCESSOR,
ERROR_HANDLER,
COMPLETE_CONSUMER
);
}
Then just add methods to handle received messages
private Consumer<? super PartitionEvent>
PARTITION_PROCESSOR = eventProcessor -> {
EventData eventData = eventProcessor.getData();
String pratitionId = eventProcessor.getPartitionContext()
.getPartitionId();
long sequenceId = eventData.getSequenceNumber();
String contents = new String(eventData.getBody(),UTF_8);
System.out.printf("Parition[%s] -Sequance : %s- contents: %s",
pratitionId,
sequenceId,
contents);
} ;
private Consumer<? super Throwable>
ERROR_HANDLER = eventError -> {
System.out.printf(eventError.toString());
};
private Runnable COMPLETE_CONSUMER = ()->{
System.out.printf("End Processing event");
};