Skip to content

Commit

Permalink
[MODORDSTOR-357] - Implement the capability to send the modified stat…
Browse files Browse the repository at this point in the history
…e using a transactional outbox for Piece (#360)

[MODORDSTOR-357] - Implement the capability to send the modified state using a transactional outbox for Piece
  • Loading branch information
imerabishvili authored Nov 14, 2023
1 parent fedec80 commit 1d4407d
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 86 deletions.
1 change: 1 addition & 0 deletions ramls/audit_outbox.raml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ documentation:
types:
order-audit-event: !include acq-models/mod-orders-storage/schemas/order_audit_event.json
order-line-audit-event: !include acq-models/mod-orders-storage/schemas/order_line_audit_event.json
piece-audit-event: !include acq-models/mod-orders-storage/schemas/piece_audit_event.json
outbox-event-log: !include acq-models/mod-orders-storage/schemas/outbox_event_log.json

/orders-storage/audit-outbox:
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/folio/event/AuditEventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
@Getter
public enum AuditEventType {
ACQ_ORDER_CHANGED("ACQ_ORDER_CHANGED"),
ACQ_ORDER_LINE_CHANGED("ACQ_ORDER_LINE_CHANGED");
ACQ_ORDER_LINE_CHANGED("ACQ_ORDER_LINE_CHANGED"),
ACQ_PIECE_CHANGED("ACQ_PIECE_CHANGED");

private final String topicName;
}
37 changes: 35 additions & 2 deletions src/main/java/org/folio/event/service/AuditEventProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.folio.rest.jaxrs.model.OrderAuditEvent;
import org.folio.rest.jaxrs.model.OrderLineAuditEvent;
import org.folio.rest.jaxrs.model.OutboxEventLog.EntityType;
import org.folio.rest.jaxrs.model.Piece;
import org.folio.rest.jaxrs.model.PieceAuditEvent;
import org.folio.rest.jaxrs.model.PoLine;
import org.folio.rest.jaxrs.model.PurchaseOrder;
import org.folio.rest.tools.utils.TenantTool;
Expand Down Expand Up @@ -54,20 +56,39 @@ public Future<Boolean> sendOrderEvent(PurchaseOrder order,
* OrderLineId is used as partition key to send all events for particular order to the same partition.
*
* @param poLine the event payload
* @param eventAction the event action
* @param eventAction the event action
* @param okapiHeaders the okapi headers
* @return future with true if sending was success or failed future otherwise
*/
public Future<Boolean> sendOrderLineEvent(PoLine poLine,
OrderLineAuditEvent.Action eventAction,
Map<String, String> okapiHeaders) {
OrderLineAuditEvent event = getOrderLineEvent(poLine, eventAction);
log.info("Starting to send event wit id: {} for Order Line to Kafka for orderLineId: {}", event.getId(),
log.info("Starting to send event with id: {} for Order Line to Kafka for orderLineId: {}", event.getId(),
poLine.getId());
return sendToKafka(AuditEventType.ACQ_ORDER_LINE_CHANGED, event, okapiHeaders, event.getOrderLineId(), EntityType.ORDER_LINE)
.onFailure(t -> log.warn("sendOrderLineEvent failed, poLine id={}", poLine.getId(), t));
}

/**
* Sends change event for piece to kafka.
* PieceId is used as partition key to send all events for particular piece to the same partition.
*
* @param piece the event payload
* @param eventAction the event action
* @param okapiHeaders the okapi headers
* @return future with true if sending was success or failed future otherwise
*/
public Future<Boolean> sendPieceEvent(Piece piece,
PieceAuditEvent.Action eventAction,
Map<String, String> okapiHeaders) {
PieceAuditEvent event = getPieceEvent(piece, eventAction);
log.info("Starting to send event with id: {} for Piece to Kafka for pieceId: {}", event.getId(),
piece.getId());
return sendToKafka(AuditEventType.ACQ_PIECE_CHANGED, event, okapiHeaders, event.getPieceId(), EntityType.PIECE)
.onFailure(t -> log.warn("sendPieceEvent failed, piece id={}", piece.getId(), t));
}

private OrderAuditEvent getOrderEvent(PurchaseOrder order, OrderAuditEvent.Action eventAction) {
Metadata metadata = order.getMetadata();
return new OrderAuditEvent()
Expand All @@ -93,6 +114,18 @@ private OrderLineAuditEvent getOrderLineEvent(PoLine poLine, OrderLineAuditEvent
.withOrderLineSnapshot(poLine.withMetadata(null)); // not populate metadata to not include it in snapshot's comparation in UI
}

private PieceAuditEvent getPieceEvent(Piece piece, PieceAuditEvent.Action eventAction) {
Metadata metadata = piece.getMetadata();
return new PieceAuditEvent()
.withId(UUID.randomUUID().toString())
.withAction(eventAction)
.withPieceId(piece.getId())
.withEventDate(new Date())
.withActionDate(metadata.getUpdatedDate())
.withUserId(metadata.getUpdatedByUserId())
.withPieceSnapshot(piece.withMetadata(null)); // not populate metadata to not include it in snapshot's comparation in UI
}

private Future<Boolean> sendToKafka(AuditEventType eventType,
Object eventPayload,
Map<String, String> okapiHeaders,
Expand Down
94 changes: 57 additions & 37 deletions src/main/java/org/folio/event/service/AuditOutboxService.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.folio.event.service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -17,6 +16,8 @@
import org.folio.rest.jaxrs.model.OrderLineAuditEvent;
import org.folio.rest.jaxrs.model.OutboxEventLog;
import org.folio.rest.jaxrs.model.OutboxEventLog.EntityType;
import org.folio.rest.jaxrs.model.Piece;
import org.folio.rest.jaxrs.model.PieceAuditEvent;
import org.folio.rest.jaxrs.model.PoLine;
import org.folio.rest.jaxrs.model.PurchaseOrder;
import org.folio.rest.persist.Conn;
Expand Down Expand Up @@ -81,77 +82,96 @@ public Future<Integer> processOutboxEventLogs(Map<String, String> okapiHeaders)
);
}

private List<Future<Boolean>> getKafkaFutures(List<OutboxEventLog> eventLogs, Map<String, String> okapiHeaders) {
return eventLogs.stream().map(eventLog -> {
switch (eventLog.getEntityType()) {
case ORDER -> {
PurchaseOrder entity = Json.decodeValue(eventLog.getPayload(), PurchaseOrder.class);
OrderAuditEvent.Action action = OrderAuditEvent.Action.fromValue(eventLog.getAction());
return producer.sendOrderEvent(entity, action, okapiHeaders);
}
case ORDER_LINE -> {
PoLine entity = Json.decodeValue(eventLog.getPayload(), PoLine.class);
OrderLineAuditEvent.Action action = OrderLineAuditEvent.Action.fromValue(eventLog.getAction());
return producer.sendOrderLineEvent(entity, action, okapiHeaders);
}
case PIECE -> {
Piece entity = Json.decodeValue(eventLog.getPayload(), Piece.class);
PieceAuditEvent.Action action = PieceAuditEvent.Action.fromValue(eventLog.getAction());
return producer.sendPieceEvent(entity, action, okapiHeaders);
}
default -> throw new IllegalStateException("Missing handler for events with entityType: " + eventLog.getEntityType());
}
}).collect(Collectors.toList());
}

/**
* Saves order outbox log.
*
* @param conn connection in transaction
* @param entity the purchase order
* @param action the event action
* @param conn connection in transaction
* @param entity the purchase order
* @param action the event action
* @param okapiHeaders okapi headers
* @return future with saved outbox log in the same transaction
*/
public Future<Boolean> saveOrderOutboxLog(Conn conn, PurchaseOrder entity, OrderAuditEvent.Action action, Map<String, String> okapiHeaders) {
log.trace("saveOrderOutboxLog, order id={}", entity.getId());
String order = Json.encode(entity);
return saveOutboxLog(conn, action.value(), EntityType.ORDER, order, okapiHeaders)
.onSuccess(reply -> log.info("Outbox log has been saved for order id: {}", entity.getId()))
.onFailure(e -> log.warn("Could not save outbox audit log for order with id {}", entity.getId(), e));
return saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.ORDER, entity.getId(), entity);
}

/**
* Saves order lines outbox logs.
*
* @param conn connection in transaction
* @param poLines the poLine
* @param action action for order line
* @param conn connection in transaction
* @param poLines the poLine
* @param action action for order line
* @param okapiHeaders the okapi headers
* @return future with saved outbox log in the same transaction
*/
public Future<Boolean> saveOrderLinesOutboxLogs(Conn conn, List<PoLine> poLines, OrderLineAuditEvent.Action action, Map<String, String> okapiHeaders) {
var futures = poLines.stream()
.map(poLine -> {
log.trace("saveOrderLineOutboxLog, po line id={}", poLine.getId());
String orderLine = Json.encode(poLine);
return saveOutboxLog(conn, action.value(), EntityType.ORDER_LINE, orderLine, okapiHeaders)
.onSuccess(reply -> log.info("Outbox log has been saved for order line id: {}", poLine.getId()))
.onFailure(e -> log.warn("Could not save outbox audit log for order line with id {}", poLine.getId(), e));
})
.map(poLine -> saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.ORDER_LINE, poLine.getId(), poLine))
.toList();

return GenericCompositeFuture.join(futures)
.map(res -> true)
.otherwise(t -> false);
}

private List<Future<Boolean>> getKafkaFutures(List<OutboxEventLog> eventLogs, Map<String, String> okapiHeaders) {
List<Future<Boolean>> futures = new ArrayList<>();
for (OutboxEventLog eventLog : eventLogs) {
if (EntityType.ORDER == eventLog.getEntityType()) {
PurchaseOrder purchaseOrder = Json.decodeValue(eventLog.getPayload(), PurchaseOrder.class);
OrderAuditEvent.Action orderAction = OrderAuditEvent.Action.fromValue(eventLog.getAction());
futures.add(producer.sendOrderEvent(purchaseOrder, orderAction, okapiHeaders));
} else if (EntityType.ORDER_LINE == eventLog.getEntityType()) {
PoLine poLine = Json.decodeValue(eventLog.getPayload(), PoLine.class);
OrderLineAuditEvent.Action orderLineAction = OrderLineAuditEvent.Action.fromValue(eventLog.getAction());
futures.add(producer.sendOrderLineEvent(poLine, orderLineAction, okapiHeaders));
}
}
return futures;
/**
* Saves piece outbox log.
*
* @param conn connection in transaction
* @param piece the audited piece
* @param action action for piece
* @param okapiHeaders the okapi headers
* @return future with saved outbox log in the same transaction
*/
public Future<Boolean> savePieceOutboxLog(Conn conn,
Piece piece,
PieceAuditEvent.Action action,
Map<String, String> okapiHeaders) {
return saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.PIECE, piece.getId(), piece);
}

private Future<Boolean> saveOutboxLog(Conn conn,
Map<String, String> okapiHeaders,
String action,
EntityType entityType,
String entity,
Map<String, String> okapiHeaders) {
String entityId,
Object entity) {
log.debug("saveOutboxLog:: for {} with id: {}", entityType, entityId);

String tenantId = TenantTool.tenantId(okapiHeaders);

OutboxEventLog eventLog = new OutboxEventLog()
.withEventId(UUID.randomUUID().toString())
.withAction(action)
.withEntityType(entityType)
.withPayload(entity);
.withPayload(Json.encode(entity));

return outboxRepository.saveEventLog(conn, eventLog, tenantId);
return outboxRepository.saveEventLog(conn, eventLog, tenantId)
.onSuccess(reply -> log.info("Outbox log has been saved for {} with id: {}", entityType, entityId))
.onFailure(e -> log.warn("Could not save outbox audit log for {} with id: {}", entityType, entityId, e));
}

}
37 changes: 18 additions & 19 deletions src/main/java/org/folio/rest/core/BaseApi.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package org.folio.rest.core;

import io.vertx.core.Future;
import io.vertx.ext.web.handler.HttpException;

import javax.ws.rs.core.Response;
import java.net.URI;

import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;

import java.net.URI;

import javax.ws.rs.core.Response;

import org.apache.commons.lang3.StringUtils;
import org.folio.rest.persist.PgExceptionUtil;

import io.vertx.core.Future;
import io.vertx.ext.web.handler.HttpException;

public abstract class BaseApi {
public Future<Response> buildResponseWithLocation(Object body, String endpoint) {
return Future.succeededFuture(Response.created(URI.create(endpoint))
Expand All @@ -26,25 +30,20 @@ public Future<Response> buildOkResponse(Object body) {
}

public Future<Response> buildErrorResponse(Throwable throwable) {
final String message;
final int code;

if (throwable instanceof HttpException httpException) {
code = httpException.getStatusCode();
message = httpException.getPayload();
} else {
code = INTERNAL_SERVER_ERROR.getStatusCode();
message = throwable.getMessage();
return buildErrorResponse(httpException.getStatusCode(), httpException.getPayload());
}

return Future.succeededFuture(buildErrorResponse(code, message));
if (StringUtils.isNotBlank(PgExceptionUtil.badRequestMessage(throwable))) {
return buildErrorResponse(Response.Status.BAD_REQUEST.getStatusCode(), PgExceptionUtil.badRequestMessage(throwable));
}
return buildErrorResponse(INTERNAL_SERVER_ERROR.getStatusCode(), throwable.getMessage());
}

private Response buildErrorResponse(int code, String message) {
return Response.status(code)
private Future<Response> buildErrorResponse(int code, String message) {
return Future.succeededFuture(Response.status(code)
.header(CONTENT_TYPE, TEXT_PLAIN)
.entity(message)
.build();
.build());
}

protected abstract String getEndpoint(Object entity);
Expand Down
Loading

0 comments on commit 1d4407d

Please sign in to comment.