Technical

Event-driven Microservices with Compensating Transactions

Rapidly develop & deploy resilient, scalable microservices with compensating transactions on Grainite
Gautam Mulchandani
8 min to read

An important goal for microservice architecture is loose coupling so that each service can evolve, deploy and scale independently of other services. The database-per-service pattern is an important aspect that enables microservice architecture. It ensures that each microservice encapsulates its own data and makes it available only via its API. 

However, a major challenge that emerges from this pattern is coordinating writes as distributed transactions. Unlike monolithic applications that can use ACID transactions to roll back to a consistent state, a failure in any of the microservices involved in a distributed transaction requires each microservice to undo writes to its private database.

In this blog post, we will walk through building microservices on Grainite that communicate with each other and coordinate writes. In case of failure, we use compensating transactions to undo the effects of the writes. 

Compensating Transaction pattern is well covered here, here, and here

Grainite

Grainite is a platform that converges the capabilities required to build event-driven microservice applications. These capabilities include an event bus, containers for the execution of user-provided business logic, a NoSQL database, a built-in adaptive cache and more, all in a single executable that runs on a scaleout cluster. And critically, exactly-once guarantees as data moves between the event bus, compute, and database.  

When building microservices on Grainite, developers focus on their application logic and leave platform-level concerns of guaranteed event delivery, database storage, caching, scaling, and resiliency to Grainite. Grainite runs on public cloud and private infrastructure. Apart from Kubernetes for cluster management, Grainite has no other dependencies. The Grainite conceptual model explains how to think about Grainite. The rest of this blog assumes knowledge of the Grainite conceptual model.  

To follow along, you can request a free Grainite trial (or optionally download a docker image) here and download the Grainite client. The Grainite client includes the command line tool gx and a few samples used to interact with the Grainite server that runs in the trial instance or the docker container.

Example application - Order system in Grainite

We will build an order system to demonstrate compensating transactions between microservices in Grainite. The complete project, along with instructions and prerequisites is in this Gitlab repository. 

The application flow is shown in figure below. Order events stream into the Order Service. After some validations, the Order Service sends an event to Inventory Service. The Inventory service also does some validations and if they pass, it sends an event to the account service for payment. If there is a failure at any service, the compensating transactions rollback the changes.

In Grainite, the central artifact is the app.yaml. The app.yaml file contains the definition of the app including its tables, topics, action handlers, and their subscriptions to topics. Here is a snippet of the Order Service app.yaml. 


app_name: orderservice
package_id: org.samples
jars:
 - target/order_service-jar-with-dependencies.jar

topics:
 - topic_name: order_events_topic
   key_name: order_id
   key_type: string
  
tables:
 - table_name: order_table
   key_type: string
   value_type: string

   action_classes:
     - type: java
       name: OrderHandler
       class_name: org.samples.orderservice.actions.OrderHandler
       actions:
         - action_name: handleIncomingOrderEvent
           method_name: handleIncomingOrderEvent
           subscriptions:
             - subscription_name: orderevents
               topic_key: order_id
               topic_name: order_events_topic
         - action_name: compensateProcessOrder
           method_name: compensateProcessOrder
         - action_name: updateOrderStatus
           method_name: updateOrderStatus

We will create 3 apps - Order Service, Inventory Service, and Account service. The app.yaml files for each of these services are in their respective directories here

Here is the high-level picture:

Each of the 3 apps can be deployed and updated independently of each other. They do not need to be stopped before updating. Here is the flow along with the relevant code snippets:

Step 1: The client first creates 10 inventory items and accounts


System.out.println("Creating 10 items");
Table itemTable = client.getTable(Constants.INVENTORY_SERVICE, Constants.ITEM_TABLE);
for (int i = 0; i < 10; i++) {
     Grain item = itemTable.getGrain(Value.of("item-" + i));
     item.setValue(Value.of(JsonStream.serialize(new ItemValue(random.nextInt(5) + 5))));
}

System.out.println("Creating 10 accounts");
Table accountTable = client.getTable(Constants.ACCOUNT_SERVICE, Constants.ACCOUNT_TABLE);
for (int i = 0; i < 10; i++) {
     Grain account = accountTable.getGrain(Value.of("account-" + i));
     account.setValue(Value.of(JsonStream.serialize(new AccountValue(random.nextInt(300) + 200))));
}

Step 2: It then sends order events to the order events topic


System.out.println("sending 10 orders");
Topic topic = client.getTopic(Constants.APP_NAME, Constants.ORDER_EVENTS_TOPIC);
for (int i = 0; i < 10; i++) {
     String key = "order-" + ts + "-" + i;
     OrderDTO order = new OrderDTO(key, "account-" + random.nextInt(10),
         "item-" + random.nextInt(10), random.nextInt(200) + 50, random.nextInt(5) + 2);

     Event topicEvent = new Event(Value.of(key), Value.of(JsonStream.serialize(order)));
     // Append event to topic.
     topic.append(topicEvent);
}

Step 3: Grainite stores these events durably and as specified in the app.yaml, invokes handleIncomingOrderEvent action in OrderHandler, passing the current state of the order in it also. Since this example only sends new orders, the state will be empty initially.

Step 4: The handleIncomingOrderEvent action validates the order. If the validation succeeds, it stores the order in the order table, sends an event to the specific item by invoking the executeItemRequest action that belongs to ItemHandler in Inventory Service, and registers a compensation transaction with the CompensationHandler.

Note: The action executes transactionally. If it succeeds, its state will get updated and the events to InventoryService and CompensationHandler will all be committed. If it fails, none of these side effects will be executed. In the absence of such guarantees, application developers have to resort to complex and inconvenient mechanisms to achieve this outcome.

public ActionResult handleIncomingOrderEvent(Action action, GrainContext context) {
   // Parse event payload.
   Value payload = ((TopicEvent) action).getPayload();
   OrderDTO orderEvent = JsonIterator.deserialize(payload.asString()).as(OrderDTO.class);

   boolean isValid = isOrderValid(orderEvent);

   if (isValid) {
     // Store order into grain value with status of PENDING.
     if (context.getValue().isNull()) {
       // Store order value as a JSON string.
       context.setValue(Value.of(JsonStream.serialize(new OrderValue(PENDING))));
     }

     // Send order to inventory service.
     context.sendToGrain(INVENTORY_SERVICE, ITEM_TABLE, Value.of(orderEvent.getItemId()),
         new GrainContext.GrainOp.Invoke(EXECUTE_ITEM_REQUEST_ACTION, payload), null);

     // Send the compensation info for this order to the compensation table.
     // In case this order fails, the order will be compensated with DTO.
     CompensationDTO compDTO = new CompensationDTO(APP_NAME, HANDLE_INCOMING_ORDER_EVENT_ACTION,
         COMPENSATE_PROCESS_ORDER_ACTION, ORDER_TABLE, context.getKey().asString(), !isValid,
         orderEvent.getOrderId());

     context.sendToGrain(COMPENSATION_TABLE, context.getKey(),
         new Invoke(REGISTER_COMPENSATION_ACTION, Value.of(JsonStream.serialize(compDTO))), null);

     return ActionResult.success(action);
   } else {
     context.setValue(Value.of(JsonStream.serialize(new OrderValue(FAILED))));
     return ActionResult.failure(
         action, "handleIncomingOrderEvent Failed for order: " + context.getKey().asString());
   }
 }
 

Step 5: executeItemRequest in turn tries to reduce the quantity of the item in the inventory. If that succeeds, it registers the compensation transaction & sends an event to the deductAmount action in Account Service. If there is insufficient inventory, it triggers compensation. 


public ActionResult executeItemRequest(Action action, GrainContext context) {
   // context.counter("executeItemRequest").inc();
   context.getLogger().info("check item request for item: " + context.getKey().asString());
   Value payload = ((GrainRequest) action).getPayload();
   OrderDTO orderEvent = JsonIterator.deserialize(payload.asString()).as(OrderDTO.class);

   ItemValue item = context.getValue().isNull()
       ? new ItemValue(5)
       : JsonIterator.deserialize(context.getValue().asString()).as(ItemValue.class);

   // Lower item count. If no more items are left, fail order.
   boolean isSuccess = item.reduceCount(orderEvent.getQuantity());

   if (isSuccess) {
     context.setValue(Value.of(JsonStream.serialize(item)));
     // There is enough quantity of this item for the order, check with account service to ensure
     // account has enough balance.
     context.sendToGrain(ACCOUNT_SERVICE, ACCOUNT_TABLE, Value.of(orderEvent.getAccountId()),
         new Invoke(DEDUCT_AMOUNT_ACTION, payload), null);

     // Update map state to record order.
     context.mapPut(
         ORDERS_MAP, Value.of(orderEvent.getOrderId()), Value.of(orderEvent.getQuantity()));

     // Register compensation since this action is a success

     CompensationDTO compDTO = new CompensationDTO(APP_NAME, EXECUTE_ITEM_REQUEST_ACTION,
         COMPENSATE_EXECUTE_ITEM_REQUEST_ACTION, ITEM_TABLE, context.getKey().asString(),
         !isSuccess, orderEvent.getOrderId());
     context.sendToGrain(ORDER_SERVICE, COMPENSATION_TABLE, Value.of(orderEvent.getOrderId()),
         new Invoke(REGISTER_COMPENSATION_ACTION, Value.of(JsonStream.serialize(compDTO))), null);
     context.getLogger().info(
         "executeItemRequest Succeeded for order: " + orderEvent.getOrderId());
     return ActionResult.success(action);
   } else {
     // trigger compensations since this action is a failure
     context.sendToGrain(ORDER_SERVICE, COMPENSATION_TABLE, Value.of(orderEvent.getOrderId()),
         new Invoke(TRIGGER_COMPENSATION_ACTIONS, Value.of(orderEvent.getOrderId())), null);
     context.getLogger().info("executeItemRequest Failed for order: " + orderEvent.getOrderId());
     return ActionResult.failure(
         action, "executeItemRequest Failed for order: " + orderEvent.getOrderId());
   }
 }
 

Step 6: deductAmount action deducts the amount the Account holds by the amount in the order. If successful, it sends an event to the order to change the status from PENDING to SUCCESS. If it has insufficient funds, it triggers compensation. 


public ActionResult deductAmount(Action action, GrainContext context) {
   context.getLogger().info("check account request for account: " + context.getKey().asString());

   // Parse order payload.
   Value payload = ((GrainRequest) action).getPayload();
   OrderDTO orderEvent = JsonIterator.deserialize(payload.asString()).as(OrderDTO.class);

   // Parse account info from grain's state.
   AccountValue account = context.getValue().isNull()
       ? new AccountValue(100)
       : JsonIterator.deserialize(context.getValue().asString()).as(AccountValue.class);

   // Reduce account balance based on order and save it back in the grain's value.
   boolean isSuccess = account.reduceAmount(orderEvent.getAmount());

   if (isSuccess) {
     // this is the last part of the transaction so no need to register compensation
     context.setValue(Value.of(JsonStream.serialize(account)));
     // There is enough balance to complete this order, update the map state to record order.
     context.mapPut(
         ORDERS_MAP, Value.of(orderEvent.getOrderId()), Value.of(orderEvent.getAmount()));
     context.sendToGrain(ORDER_SERVICE, ORDER_TABLE, Value.of(orderEvent.getOrderId()),
         new Invoke(UPDATE_ORDER_STATUS_ACTION, Value.of(SUCCESS)), null);
     context.getLogger().info("deductAmount Succeeded for order: " + orderEvent.getOrderId());
     return ActionResult.success(action);
   } else {
     // Trigger compensations if failure.
     context.sendToGrain(ORDER_SERVICE, COMPENSATION_TABLE, Value.of(orderEvent.getOrderId()),
         new Invoke(TRIGGER_COMPENSATION_ACTIONS, Value.of(orderEvent.getOrderId())), null);
     context.getLogger().info("deductAmount Failed for order: " + orderEvent.getOrderId());
     return ActionResult.failure(
         action, "deductAmount Failed for order: " + orderEvent.getOrderId());
   }
 }
 

Step 7: The compensation transactions are registered in the registerCompensations action of the CompensationHandler action handler that is part of Order Service. If any action was unsuccessful, the triggerCompensations will invoke the compensation for each of the registered transactions. Then it is up to the compensation transaction to undo the effects of the successful transactions. 


public ActionResult registerCompensation(Action action, GrainContext context) {
   CompensationValue registeredCompensations = context.getValue().isNull()
       ? new CompensationValue()
       : JsonIterator.deserialize(context.getValue().asString()).as(CompensationValue.class);

   Value payload = ((GrainRequest) action).getPayload();
   CompensationDTO compensationMsg =
       JsonIterator.deserialize(payload.asString()).as(CompensationDTO.class);

   registeredCompensations.addCompensation(compensationMsg);

   // if this order has already failed, trigger this compensation
   if (registeredCompensations.isFailed()) {
     for (CompensationDTO compensation : registeredCompensations.getCompensations()) {
       context.sendToGrain(compensation.getApp(), compensation.getCompensationTable(),
           Value.of(compensation.getCompensationKey()),
           new Invoke(compensation.getCompensationAction().toString(),
               Value.of(compensation.getOrderId())),
           null);
     }
     /*
      * After running the compensations, delete the list. New compensations can get added,
      * and if any subsequent compensation comes in, it will be immediately triggered.
      */
     registeredCompensations.emptyCompensations();
   }
   context.setValue(Value.of(JsonStream.serialize(registeredCompensations)));
   return ActionResult.success(action);
}

public ActionResult triggerCompensations(Action action, GrainContext context) {
   CompensationValue registeredCompensations = context.getValue().isNull()
       ? new CompensationValue()
       : JsonIterator.deserialize(context.getValue().asString()).as(CompensationValue.class);

   Value payload = ((GrainRequest) action).getPayload();
   String orderId = payload.asString();

   context.getLogger().info("Transaction failed, running compensations for order: " + orderId);
   for (CompensationDTO compensation : registeredCompensations.getCompensations()) {
     context.sendToGrain(compensation.getApp(), compensation.getCompensationTable(),
         Value.of(compensation.getCompensationKey()),
         new Invoke(
             compensation.getCompensationAction().toString(), Value.of(compensation.getOrderId())),
         null);
   }
   /*
    * After running the compensations, delete the list. New compensations can get added,
    * and if any subsequent compensation comes in, it will be immediately triggered.
    */
   registeredCompensations.emptyCompensations();
   registeredCompensations.setFailed(true);
   context.setValue(Value.of(JsonStream.serialize(registeredCompensations)));
   return ActionResult.success(action);
 }
 

Step 8: The client get the orders from Grainite


System.out.println("Checking status of order history");

Table orderHistoryTable = client.getTable(Constants.APP_NAME, Constants.COMPENSATION_TABLE);
for (int i = 0; i < 10; i++) {
     Grain orderHistory = orderHistoryTable.getGrain(Value.of("order-" + ts + "-" + i));
     System.out.println("Order: K> " + orderHistory.getKey().asString());
     System.out.println("Order: V> " + orderHistory.getValue().asString());
     System.out.println("----------------------------");
}

System.out.println("----------------------------");
System.out.println("Printing status of each order");
Table orderTable = client.getTable(Constants.APP_NAME, Constants.ORDER_TABLE);
for (int i = 0; i < 10; i++) {
     Grain order = orderTable.getGrain(Value.of("order-" + ts + "-" + i));
     System.out.println("Order: K> " + order.getKey().asString());
     System.out.println("Order: V> " + order.getValue().asString());
     System.out.println("----------------------------");
}

Summary

Applications built using event-driven microservices require a host of capabilities from the underlying platform. An event bus, database(s) per microservice, ensuring transactional operations in handlers, exactly-once messaging, observability, and the ability to scale compute & storage independently and automatically are just a few of them.

Grainite provides all these capabilities in a developer-friendly Kubernetes environment. As the example above showed, Grainite can host complete applications that are independently developed, deployed, and communicate with each other using asynchronous message passing. Application developers focus on the logic to process the streaming events while rapidly building and deploying these apps to production. 

To summarize: In this simple but complete example, you saw how Grainite can 

  • Ingest events from external clients
  • Execute developer-provided processing logic on those incoming events statefully 
  • Generate events to be sent to other applications asynchronously
  • Update the state of entities
  • Run compensation transactions across applications in case of failure
  • Serve client queries with its database. 

This convergence of capabilities is what makes Grainite extremely powerful, simple to use, and an excellent platform for building event-driven microservices.

Try Grainite for free

No contracts, no credit card.
Get started now
Takes only 5 mins to get started
Test drive the platform with sample applications
Dedicated slack channel for expert engagement