Skip to content

Commit

Permalink
MSEARCH-617 Ignore hard-delete domain authority events
Browse files Browse the repository at this point in the history
  • Loading branch information
GeloPakDev1 committed Nov 6, 2023
1 parent 65c8eec commit f911c3a
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.integration.ResourceChangeFilterStrategy;
import org.folio.search.model.event.ConsortiumInstanceEvent;
import org.folio.spring.config.properties.FolioEnvironment;
import org.folio.spring.tools.kafka.FolioKafkaTopic;
Expand Down Expand Up @@ -51,6 +52,7 @@ public ConcurrentKafkaListenerContainerFactory<String, ResourceEvent> standardLi
var factory = new ConcurrentKafkaListenerContainerFactory<String, ResourceEvent>();
factory.setBatchListener(true);
factory.setConsumerFactory(resourceEventConsumerFactory());
factory.setRecordFilterStrategy(new ResourceChangeFilterStrategy());
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.folio.search.integration;

import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.folio.search.domain.dto.ResourceEvent;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

import static org.folio.search.utils.SearchUtils.AUTHORITY_RESOURCE;

@Log4j2
public class ResourceChangeFilterStrategy implements RecordFilterStrategy<String, ResourceEvent> {

@Override
public boolean filter(ConsumerRecord<String, ResourceEvent> consumerRecord) {
log.info("Processing resource event [id: {}]", consumerRecord.value().getId());
var resourceEvent = consumerRecord.value();
var resourceName = resourceEvent.getResourceName();

var result = false;
if (resourceName.equals(AUTHORITY_RESOURCE)) {
log.info("Processing authority resource event [id: {}]", resourceEvent.getId());
result = checkAuthorityEventType(resourceEvent);
}
return result;
}

private boolean checkAuthorityEventType(ResourceEvent resourceEvent) {
return switch (resourceEvent.getType()) {
case UPDATE -> {
if (resourceHasChanges(resourceEvent)) {
yield false;
} else {
log.info("Skip message. No need to process update event for authority resource");
yield true;
}
}
case CREATE, REINDEX -> true;
case DELETE, DELETE_ALL -> {
log.info("Skip message. No need to process delete event");
yield false;
}
};
}

private boolean resourceHasChanges(ResourceEvent resourceEvent) {
return resourceEvent.getOld() != null && resourceEvent.getNew() != null
&& !resourceEvent.getOld().equals(resourceEvent.getNew());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.folio.search.integration;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.folio.search.domain.dto.Authority;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.domain.dto.ResourceEventType;
import org.folio.spring.test.type.UnitTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.folio.search.utils.SearchUtils.AUTHORITY_RESOURCE;
import static org.folio.search.utils.SearchUtils.CONTRIBUTOR_RESOURCE;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

@UnitTest
@ExtendWith(MockitoExtension.class)
public class ResourceChangeFilterStrategyTest {

private final ResourceChangeFilterStrategy filterStrategy = new ResourceChangeFilterStrategy();

@Mock
private ConsumerRecord<String, ResourceEvent> consumerRecord;

@Test
void shouldNotFilterHardDeleteAuthResourceEvent() {
var event = createResourceEvent(ResourceEventType.DELETE_ALL, AUTHORITY_RESOURCE);
mockConsumerRecord(event);

var actual = filterStrategy.filter(consumerRecord);

assertFalse(actual);
}

@Test
void shouldNotFilterNonAuthResourceEvent() {
var event = createResourceEvent(ResourceEventType.DELETE_ALL, CONTRIBUTOR_RESOURCE);
mockConsumerRecord(event);

var actual = filterStrategy.filter(consumerRecord);

assertFalse(actual);
}

@Test
void shouldFilterUpdateAuthEventEvent_whenNewAndOldAreEqual() {
var event = createResourceEvent(ResourceEventType.UPDATE, AUTHORITY_RESOURCE);
event.setNew(new Authority().id("1"));
event.setOld(new Authority().id("1"));
mockConsumerRecord(event);

var actual = filterStrategy.filter(consumerRecord);

assertTrue(actual);
}

@Test
void shouldNotFilterUpdateAuthEventEvent_whenNewAndOldAreNotEqual() {
var event = createResourceEvent(ResourceEventType.UPDATE, AUTHORITY_RESOURCE);
event.setNew(new Authority().id("1"));
event.setOld(new Authority().id("2"));
mockConsumerRecord(event);

var actual = filterStrategy.filter(consumerRecord);

assertFalse(actual);
}

@ValueSource(strings = {"REINDEX", "CREATE"})
@ParameterizedTest
void shouldFilterAuthEvent_whenTypeIsNotSupported(String type) {
var event = createResourceEvent(ResourceEventType.valueOf(type), AUTHORITY_RESOURCE);
mockConsumerRecord(event);

var actual = filterStrategy.filter(consumerRecord);

assertTrue(actual);
}

private void mockConsumerRecord(ResourceEvent event) {
when(consumerRecord.value()).thenReturn(event);
}

private ResourceEvent createResourceEvent(ResourceEventType type, String resourceName) {
var event = new ResourceEvent();
event.setId("1");
event.setType(type);
event.setResourceName(resourceName);
return event;
}
}

0 comments on commit f911c3a

Please sign in to comment.