Skip to content

Commit

Permalink
feat: document handling (#3361)
Browse files Browse the repository at this point in the history
* feat(sdk): first iteration of document support

* apply review suggestions

* feat(document): document deserialization & serialization (#3066)

* wip

(cherry picked from commit a57e940)

* wip

(cherry picked from commit 5634aea)

* prepare end-to-end draft

(cherry picked from commit 92ec79f)

* prepare end-to-end draft

(cherry picked from commit 5a808bd)

* rework deserializers

(cherry picked from commit 0b0a558)

* incorporate feedback, move jackson code into a separate module

* clean up accidental change

* naming adjustments

* lint

* remove extra dollar sign

* remove extra dollar sign

* feat(document): restructure the project and move document stuff to a different package (#3149)

* refactor: move document stuff to a different module

---------

Co-authored-by: Jonathan Roques <[email protected]>

* fix(document): document serializer doesn't handle sdk document model (#3205)

* feat(doc-handling): Some investigations to use documents in the REST … (#2976)

* feat(doc-handling): Some investigations to use documents in the REST connector

---------

Co-authored-by: Pavel Kotelevsky <[email protected]>
Co-authored-by: Pavel Kotelevsky <[email protected]>
  • Loading branch information
3 people authored Oct 2, 2024
1 parent c9a7180 commit 9e427e9
Show file tree
Hide file tree
Showing 53 changed files with 2,173 additions and 41 deletions.
11 changes: 11 additions & 0 deletions connector-runtime/connector-runtime-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
<artifactId>connector-core</artifactId>
</dependency>

<dependency>
<groupId>io.camunda.connector</groupId>
<artifactId>connector-document</artifactId>
</dependency>

<!-- Jackson dependencies defined as provided in the Connector SDK Core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -95,5 +100,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.document.factory.DocumentFactory;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.function.Consumer;
Expand All @@ -35,18 +36,21 @@ public class DefaultInboundConnectorContextFactory implements InboundConnectorCo
private final SecretProviderAggregator secretProviderAggregator;
private final ValidationProvider validationProvider;
private final OperateClientAdapter operateClientAdapter;
private final DocumentFactory documentFactory;

public DefaultInboundConnectorContextFactory(
final ObjectMapper mapper,
final InboundCorrelationHandler correlationHandler,
final SecretProviderAggregator secretProviderAggregator,
final ValidationProvider validationProvider,
final OperateClientAdapter operateClientAdapter) {
final OperateClientAdapter operateClientAdapter,
final DocumentFactory documentFactory) {
this.objectMapper = mapper;
this.correlationHandler = correlationHandler;
this.secretProviderAggregator = secretProviderAggregator;
this.validationProvider = validationProvider;
this.operateClientAdapter = operateClientAdapter;
this.documentFactory = documentFactory;
}

@Override
Expand All @@ -60,6 +64,7 @@ public <T extends InboundConnectorExecutable<?>> InboundConnectorContext createC
new InboundConnectorContextImpl(
secretProviderAggregator,
validationProvider,
documentFactory,
connectorDetails,
correlationHandler,
cancellationCallback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
import io.camunda.document.Document;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.document.factory.DocumentFactoryImpl;
import io.camunda.document.store.DocumentCreationRequest;
import io.camunda.document.store.InMemoryDocumentStore;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -51,20 +56,22 @@ public class InboundConnectorContextImpl extends AbstractConnectorContext
private final ObjectMapper objectMapper;

private final Consumer<Throwable> cancellationCallback;

private Health health = Health.unknown();

private final EvictingQueue<Activity> logs;
private final DocumentFactory documentFactory;
private Health health = Health.unknown();
private Map<String, Object> propertiesWithSecrets;

public InboundConnectorContextImpl(
SecretProvider secretProvider,
ValidationProvider validationProvider,
DocumentFactory documentFactory,
ValidInboundConnectorDetails connectorDetails,
InboundCorrelationHandler correlationHandler,
Consumer<Throwable> cancellationCallback,
ObjectMapper objectMapper,
EvictingQueue logs) {
super(secretProvider, validationProvider);
this.documentFactory = documentFactory;
this.correlationHandler = correlationHandler;
this.connectorDetails = connectorDetails;
this.properties =
Expand All @@ -75,6 +82,25 @@ public InboundConnectorContextImpl(
this.logs = logs;
}

public InboundConnectorContextImpl(
SecretProvider secretProvider,
ValidationProvider validationProvider,
ValidInboundConnectorDetails connectorDetails,
InboundCorrelationHandler correlationHandler,
Consumer<Throwable> cancellationCallback,
ObjectMapper objectMapper,
EvictingQueue logs) {
this(
secretProvider,
validationProvider,
new DocumentFactoryImpl(InMemoryDocumentStore.INSTANCE),
connectorDetails,
correlationHandler,
cancellationCallback,
objectMapper,
logs);
}

@Override
public CorrelationResult correlateWithResult(Object variables) {
try {
Expand Down Expand Up @@ -150,7 +176,10 @@ public List<InboundConnectorElement> connectorElements() {
return connectorDetails.connectorElements();
}

private Map<String, Object> propertiesWithSecrets;
@Override
public Document createDocument(DocumentCreationRequest request) {
return documentFactory.create(request);
}

private Map<String, Object> getPropertiesWithSecrets(Map<String, Object> properties) {
if (propertiesWithSecrets == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.camunda.connector.api.validation.ValidationProvider;
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.correlation.MessageCorrelationPoint.BoundaryEventCorrelationPoint;
import io.camunda.document.Document;
import io.camunda.document.store.DocumentCreationRequest;
import io.camunda.operate.model.FlowNodeInstance;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -131,6 +133,11 @@ public Queue<Activity> getLogs() {
return inboundContext.getLogs();
}

@Override
public Document createDocument(DocumentCreationRequest request) {
return inboundContext.createDocument(request);
}

@Override
public List<InboundConnectorElement> connectorElements() {
return inboundContext.connectorElements();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.camunda.connector.runtime.core.outbound.ErrorExpressionJobContext.ErrorExpressionJob;
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.connector.runtime.core.secret.SecretProviderDiscovery;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.CompleteJobResponse;
Expand All @@ -58,6 +59,8 @@ public class ConnectorJobHandler implements JobHandler {

protected ValidationProvider validationProvider;

protected DocumentFactory documentFactory;

protected ObjectMapper objectMapper;

/**
Expand All @@ -80,10 +83,12 @@ public ConnectorJobHandler(
final OutboundConnectorFunction call,
final SecretProvider secretProvider,
final ValidationProvider validationProvider,
final DocumentFactory documentFactory,
final ObjectMapper objectMapper) {
this.call = call;
this.secretProvider = secretProvider;
this.validationProvider = validationProvider;
this.documentFactory = documentFactory;
this.objectMapper = objectMapper;
}

Expand Down Expand Up @@ -180,7 +185,8 @@ public void handle(final JobClient client, final ActivatedJob job) {

try {
var context =
new JobHandlerContext(job, getSecretProvider(), validationProvider, objectMapper);
new JobHandlerContext(
job, getSecretProvider(), validationProvider, documentFactory, objectMapper);
var response = call.execute(context);
var responseVariables =
ConnectorHelper.createOutputVariables(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
import io.camunda.connector.api.secret.SecretProvider;
import io.camunda.connector.api.validation.ValidationProvider;
import io.camunda.connector.runtime.core.AbstractConnectorContext;
import io.camunda.document.Document;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.document.factory.DocumentFactoryImpl;
import io.camunda.document.store.DocumentCreationRequest;
import io.camunda.document.store.InMemoryDocumentStore;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import java.util.Objects;
import org.slf4j.Logger;
Expand All @@ -45,19 +50,35 @@ public class JobHandlerContext extends AbstractConnectorContext

private final ObjectMapper objectMapper;
private final JobContext jobContext;
private final DocumentFactory documentFactory;
private String jsonWithSecrets = null;

public JobHandlerContext(
final ActivatedJob job,
final SecretProvider secretProvider,
final ValidationProvider validationProvider,
final DocumentFactory documentFactory,
final ObjectMapper objectMapper) {
super(secretProvider, validationProvider);
this.documentFactory = documentFactory;
this.job = job;
this.objectMapper = objectMapper;
this.jobContext = new ActivatedJobContext(job, this::getJsonReplacedWithSecrets);
}

public JobHandlerContext(
final ActivatedJob job,
final SecretProvider secretProvider,
final ValidationProvider validationProvider,
final ObjectMapper objectMapper) {
this(
job,
secretProvider,
validationProvider,
new DocumentFactoryImpl(InMemoryDocumentStore.INSTANCE),
objectMapper);
}

@Override
public <T> T bindVariables(Class<T> cls) {
var mappedObject = mapJson(cls);
Expand Down Expand Up @@ -110,6 +131,11 @@ public JobContext getJobContext() {
return jobContext;
}

@Override
public Document createDocument(DocumentCreationRequest document) {
return documentFactory.create(document);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.document.factory.DocumentFactory;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -44,6 +45,7 @@ class DefaultInboundConnectorContextFactoryTest {
@Mock private OperateClientAdapter operateClientAdapter;
@Mock private Consumer<Throwable> cancellationCallback;
@Mock private ValidInboundConnectorDetails newConnector;
@Mock private DocumentFactory documentFactory;
private DefaultInboundConnectorContextFactory factory;

@BeforeEach
Expand All @@ -54,7 +56,8 @@ void setUp() {
correlationHandler,
secretProviderAggregator,
validationProvider,
operateClientAdapter);
operateClientAdapter,
documentFactory);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.camunda.connector.runtime.inbound.state.ProcessStateStore;
import io.camunda.connector.runtime.inbound.state.TenantAwareProcessStateStoreImpl;
import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
Expand All @@ -58,6 +59,11 @@ public class InboundConnectorRuntimeConfiguration {
@Value("${camunda.connector.inbound.message.ttl:PT1H}")
private Duration messageTtl;

@Bean
public static InboundConnectorBeanDefinitionProcessor inboundConnectorBeanDefinitionProcessor() {
return new InboundConnectorBeanDefinitionProcessor();
}

@Bean
public ProcessElementContextFactory processElementContextFactory(
ObjectMapper objectMapper,
Expand All @@ -77,24 +83,21 @@ public InboundCorrelationHandler inboundCorrelationHandler(
zeebeClient, feelEngine, metricsRecorder, elementContextFactory, messageTtl);
}

@Bean
public static InboundConnectorBeanDefinitionProcessor inboundConnectorBeanDefinitionProcessor() {
return new InboundConnectorBeanDefinitionProcessor();
}

@Bean
public InboundConnectorContextFactory springInboundConnectorContextFactory(
ObjectMapper mapper,
InboundCorrelationHandler correlationHandler,
SecretProviderAggregator secretProviderAggregator,
@Autowired(required = false) ValidationProvider validationProvider,
OperateClientAdapter operateClientAdapter) {
OperateClientAdapter operateClientAdapter,
DocumentFactory documentFactory) {
return new DefaultInboundConnectorContextFactory(
mapper,
correlationHandler,
secretProviderAggregator,
validationProvider,
operateClientAdapter);
operateClientAdapter,
documentFactory);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.connector.runtime.outbound.lifecycle.OutboundConnectorAnnotationProcessor;
import io.camunda.connector.runtime.outbound.lifecycle.OutboundConnectorManager;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.document.factory.DocumentFactoryImpl;
import io.camunda.document.store.InMemoryDocumentStore;
import io.camunda.zeebe.spring.client.jobhandling.CommandExceptionHandlingStrategy;
import io.camunda.zeebe.spring.client.jobhandling.JobWorkerManager;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
Expand All @@ -40,13 +43,19 @@ public OutboundConnectorFactory outboundConnectorFactory() {
OutboundConnectorDiscovery.loadConnectorConfigurations());
}

@Bean
public DocumentFactory documentFactory() {
return new DocumentFactoryImpl(InMemoryDocumentStore.INSTANCE);
}

@Bean
public OutboundConnectorManager outboundConnectorManager(
JobWorkerManager jobWorkerManager,
OutboundConnectorFactory connectorFactory,
CommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
SecretProviderAggregator secretProviderAggregator,
@Autowired(required = false) ValidationProvider validationProvider,
DocumentFactory documentFactory,
ObjectMapper objectMapper,
MetricsRecorder metricsRecorder) {
return new OutboundConnectorManager(
Expand All @@ -55,6 +64,7 @@ public OutboundConnectorManager outboundConnectorManager(
commandExceptionHandlingStrategy,
secretProviderAggregator,
validationProvider,
documentFactory,
objectMapper,
metricsRecorder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.connector.runtime.metrics.ConnectorMetrics;
import io.camunda.connector.runtime.metrics.ConnectorMetrics.Outbound;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.JobClient;
Expand All @@ -52,10 +53,16 @@ public SpringConnectorJobHandler(
CommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
SecretProviderAggregator secretProviderAggregator,
ValidationProvider validationProvider,
DocumentFactory documentFactory,
ObjectMapper objectMapper,
OutboundConnectorFunction connectorFunction,
OutboundConnectorConfiguration connectorConfiguration) {
super(connectorFunction, secretProviderAggregator, validationProvider, objectMapper);
super(
connectorFunction,
secretProviderAggregator,
validationProvider,
documentFactory,
objectMapper);
this.metricsRecorder = metricsRecorder;
this.commandExceptionHandlingStrategy = commandExceptionHandlingStrategy;
this.connectorConfiguration = connectorConfiguration;
Expand Down
Loading

0 comments on commit 9e427e9

Please sign in to comment.