Skip to content

Commit

Permalink
fix(inbound-state-management): multiple small process state managemen…
Browse files Browse the repository at this point in the history
…t bugfixes (#2629)
  • Loading branch information
chillleader authored May 24, 2024
1 parent 4a26f32 commit b764f77
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry;
import io.camunda.connector.runtime.metrics.ConnectorMetrics.Inbound;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -108,7 +109,7 @@ public Map<UUID, RegisteredExecutable> activateBatch(
failed.reason());

// deactivate all previously activated connectors
deactivateBatch(List.of(failed));
deactivateBatch(new ArrayList<>(alreadyActivated.values()));

var failureReasonForOthers =
"Process contains invalid connector(s): "
Expand Down Expand Up @@ -199,6 +200,7 @@ public void deactivateBatch(List<RegisteredExecutable> executables) {
for (var activeExecutable : executables) {
if (activeExecutable instanceof Activated activated) {
try {
LOG.info("Deactivating executable: {}", activated.context().getDefinition().type());
if (activated.executable() instanceof WebhookConnectorExecutable) {
LOG.debug("Unregistering webhook: {}", activated.context().getDefinition().type());
webhookConnectorRegistry.deregister(activated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class InboundExecutableRegistryImpl implements InboundExecutableRegistry
private final BatchExecutableProcessor batchExecutableProcessor;

private final Map<ProcessElement, UUID> executablesByElement = new ConcurrentHashMap<>();
private final Map<UUID, RegisteredExecutable> executables = new HashMap<>();
final Map<UUID, RegisteredExecutable> executables = new HashMap<>();

private static final Logger LOG = LoggerFactory.getLogger(InboundExecutableRegistryImpl.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public class ProcessStateStoreImpl implements ProcessStateStore {
private final InboundExecutableRegistry executableRegistry;

private record ProcessState(
int version, long processDefinitionKey, List<InboundConnectorElement> connectorElements) {}
int version,
long processDefinitionKey,
String tenantId,
List<InboundConnectorElement> connectorElements) {}

public ProcessStateStoreImpl(
ProcessDefinitionInspector processDefinitionInspector,
Expand Down Expand Up @@ -95,6 +98,7 @@ private void newlyDeployed(
return new ProcessState(
entry.getValue().version(),
entry.getValue().processDefinitionKey(),
entry.getKey().tenantId(),
connectorElements);
});
} catch (Throwable e) {
Expand All @@ -118,6 +122,7 @@ private void replacedWithDifferentVersion(
return new ProcessState(
entry.getValue().version(),
entry.getValue().processDefinitionKey(),
entry.getKey().tenantId(),
newConnectorElements);
});
} catch (Throwable e) {
Expand All @@ -131,7 +136,7 @@ private void deleted(String processId) {
processStates.computeIfPresent(
processId,
(key1, state) -> {
var tenantId = state.connectorElements.getFirst().element().tenantId();
var tenantId = state.tenantId;
deactivate(tenantId, state.processDefinitionKey);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public boolean isRegistered(RegisteredExecutable.Activated connector) {
public void register(RegisteredExecutable.Activated connector) {
var properties = connector.context().bindProperties(CommonWebhookProperties.class);
var context = properties.getContext();
if (context == null) {
var logMessage = "Webhook path not provided";
LOG.debug(logMessage);
throw new RuntimeException(logMessage);
}

WebhookConnectorValidationUtil.logIfWebhookPathDeprecated(connector, context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ final class WebhookConnectorValidationUtil {
private static final String DEPRECATED_WEBHOOK_MESSAGE_PREFIX = "Deprecated webhook path: ";

static void logIfWebhookPathDeprecated(RegisteredExecutable.Activated connector, String webhook) {

if (webhook == null) {
return;
}
if (!CURRENT_WEBHOOK_PATH_PATTERN.matcher(webhook).matches()) {
String message =
DEPRECATED_WEBHOOK_MESSAGE_PREFIX + webhook + DEPRECATED_WEBHOOK_MESSAGE_SUFFIX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.Health.Status;
import io.camunda.connector.api.inbound.InboundConnectorDefinition;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.ProcessElement;
import io.camunda.connector.runtime.core.Keywords;
Expand All @@ -34,6 +37,7 @@
import io.camunda.connector.runtime.core.inbound.InboundConnectorFactory;
import io.camunda.connector.runtime.core.inbound.correlation.StartEventCorrelationPoint;
import io.camunda.connector.runtime.inbound.executable.InboundExecutableEvent.Activated;
import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.ConnectorNotRegistered;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -143,6 +147,11 @@ public void activationFailure_shouldYieldFailedToActivate() throws Exception {
when(factory.getInstance(any())).thenReturn(executable);
doThrow(new RuntimeException("failed")).when(executable).activate(any());

var mockContext = mock(InboundConnectorContextImpl.class);
when(contextFactory.createContext(any(), any(), any(), any())).thenReturn(mockContext);

doThrow(new RuntimeException("failed")).when(executable).activate(any());

// when
registry.handleEvent(new Activated("tenant", 0, List.of(element)));

Expand All @@ -152,6 +161,97 @@ public void activationFailure_shouldYieldFailedToActivate() throws Exception {
assertThat(result.getFirst().health().getError().message()).isEqualTo("failed");
}

@Test
public void activationFailure_batch_shouldRollbackOtherConnectors() throws Exception {
// given
var processId = "processId";
var element1 =
new InboundConnectorElement(
Map.of(Keywords.INBOUND_TYPE_KEYWORD, "type"),
new StartEventCorrelationPoint(processId, 0, 0),
new ProcessElement(processId, 0, 0, "element1", "tenant"));
var element2 =
new InboundConnectorElement(
Map.of(Keywords.INBOUND_TYPE_KEYWORD, "type"),
new StartEventCorrelationPoint(processId, 0, 0),
new ProcessElement(processId, 0, 0, "element2", "tenant"));

var executable1 = mock(InboundConnectorExecutable.class);
var executable2 = mock(InboundConnectorExecutable.class);

when(factory.getInstance(any())).thenReturn(executable1).thenReturn(executable2);
var mockContext = mock(InboundConnectorContextImpl.class);
when(mockContext.connectorElements()).thenReturn(List.of(element1, element2));
when(mockContext.getDefinition())
.thenReturn(new InboundConnectorDefinition("type", "tenant", "id", null));
when(mockContext.getHealth()).thenReturn(Health.up());
when(contextFactory.createContext(any(), any(), any(), any())).thenReturn(mockContext);

doThrow(new RuntimeException("failed")).when(executable2).activate(mockContext);

// when
registry.handleEvent(new Activated("tenant", 0, List.of(element1, element2)));

// then
verify(executable1).activate(mockContext);
verify(executable2).activate(mockContext);
verify(executable1).deactivate();

assertThat(registry.executables.size()).isEqualTo(2);
assertThat(
registry.executables.values().stream()
.allMatch(e -> e instanceof RegisteredExecutable.FailedToActivate))
.isTrue();
}

@Test
public void connectorNotFound_batch_shouldNotRollbackOtherConnectors() throws Exception {
// we want to allow other connectors to be activated even if one is not registered in the
// connector factory - this way we can also support hybrid mode.

// given
var processId = "processId";
var element1 =
new InboundConnectorElement(
Map.of(Keywords.INBOUND_TYPE_KEYWORD, "type"),
new StartEventCorrelationPoint(processId, 0, 0),
new ProcessElement(processId, 0, 0, "element1", "tenant"));
var element2 =
new InboundConnectorElement(
Map.of(Keywords.INBOUND_TYPE_KEYWORD, "type"),
new StartEventCorrelationPoint(processId, 0, 0),
new ProcessElement(processId, 0, 0, "element2", "tenant"));

var executable1 = mock(InboundConnectorExecutable.class);

when(factory.getInstance(any()))
.thenReturn(executable1)
.thenThrow(new NoSuchElementException("not registered"));
var mockContext = mock(InboundConnectorContextImpl.class);
when(mockContext.connectorElements()).thenReturn(List.of(element1, element2));
when(mockContext.getDefinition())
.thenReturn(new InboundConnectorDefinition("type", "tenant", "id", null));
when(mockContext.getHealth()).thenReturn(Health.up());
when(contextFactory.createContext(any(), any(), any(), any())).thenReturn(mockContext);

// when
registry.handleEvent(new Activated("tenant", 0, List.of(element1, element2)));

// then
verify(executable1).activate(mockContext);
verifyNoMoreInteractions(executable1);

assertThat(registry.executables.size()).isEqualTo(2);
assertThat(
registry.executables.values().stream()
.anyMatch(e -> e instanceof ConnectorNotRegistered))
.isTrue();
assertThat(
registry.executables.values().stream()
.anyMatch(e -> e instanceof RegisteredExecutable.Activated))
.isTrue();
}

@Test
public void missingConnector_shouldYieldConnectorNotRegistered() {
// given
Expand Down

0 comments on commit b764f77

Please sign in to comment.