Skip to content

Commit

Permalink
feat(idp-extraction-connector): update connector to work with documents
Browse files Browse the repository at this point in the history
  • Loading branch information
Rei Balla committed Jan 8, 2025
1 parent 3d20ed2 commit 1cdb6a7
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@
},
"type" : "Hidden"
}, {
"id" : "input.documentUrl",
"label" : "Document URL",
"description" : "Specify the URL where the document is hosted",
"id" : "input.document",
"label" : "Document",
"description" : "Specify the document",
"optional" : false,
"value" : "= input.documentUrl",
"value" : "= input.document",
"constraints" : {
"notEmpty" : true
},
"group" : "input",
"binding" : {
"name" : "input.documentUrl",
"name" : "input.document",
"type" : "zeebe:input"
},
"type" : "Hidden"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@
},
"type" : "Hidden"
}, {
"id" : "input.documentUrl",
"label" : "Document URL",
"description" : "Specify the URL where the document is hosted",
"id" : "input.document",
"label" : "Document",
"description" : "Specify the document",
"optional" : false,
"value" : "= input.documentUrl",
"value" : "= input.document",
"constraints" : {
"notEmpty" : true
},
"group" : "input",
"binding" : {
"name" : "input.documentUrl",
"name" : "input.document",
"type" : "zeebe:input"
},
"type" : "Hidden"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
import io.camunda.connector.idp.extraction.supplier.BedrockRuntimeClientSupplier;
import io.camunda.connector.idp.extraction.supplier.S3ClientSupplier;
import io.camunda.connector.idp.extraction.supplier.TextractClientSupplier;
import java.net.URI;
import java.net.URL;
import org.apache.pdfbox.Loader;
import org.apache.pdfbox.io.IOUtils;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.text.PDFTextStripper;
import org.slf4j.Logger;
Expand Down Expand Up @@ -97,16 +94,14 @@ public Object execute(OutboundConnectorContext context) {

private String extractTextUsingAwsTextract(ExtractionRequest extractionRequest) throws Exception {
return pollingTextractCaller.call(
extractionRequest.input().documentUrl(),
extractionRequest.input().document(),
extractionRequest.input().s3BucketName(),
textractClientSupplier.getTextractClient(extractionRequest),
s3ClientSupplier.getAsyncS3Client(extractionRequest));
}

private String extractTextUsingApachePdf(ExtractionRequest extractionRequest) throws Exception {
String documentUrl = extractionRequest.input().documentUrl();
URL url = URI.create(documentUrl).toURL();
PDDocument document = Loader.loadPDF(IOUtils.toByteArray(url.openStream()));
PDDocument document = Loader.loadPDF(extractionRequest.input().document().asByteArray());
PDFTextStripper pdfStripper = new PDFTextStripper();
return pdfStripper.getText(document);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.idp.extraction.model.TextractTask;
import io.camunda.connector.idp.extraction.utils.AwsS3Util;
import io.camunda.document.Document;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
Expand All @@ -31,13 +32,13 @@ public class PollingTextractCaller {
private static final Logger LOGGER = LoggerFactory.getLogger(PollingTextractCaller.class);

public String call(
String documentUrl,
Document document,
String bucketName,
TextractClient textractClient,
S3AsyncClient s3AsyncClient)
throws Exception {

S3Object s3Object = AwsS3Util.buildS3ObjectFromUrl(documentUrl, bucketName, s3AsyncClient);
S3Object s3Object = AwsS3Util.buildS3ObjectFromDocument(document, bucketName, s3AsyncClient);

LOGGER.debug("Starting polling task for document analysis with document: {}", s3Object.name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.camunda.connector.generator.java.annotation.TemplateProperty;
import io.camunda.connector.generator.java.annotation.TemplateProperty.PropertyBinding;
import io.camunda.connector.generator.java.annotation.TemplateProperty.PropertyConstraints;
import io.camunda.document.Document;
import jakarta.validation.constraints.NotNull;
import java.util.List;

Expand All @@ -27,17 +28,17 @@ public record ExtractionRequestData(
@NotNull
TextExtractionEngineType extractionEngineType,
@TemplateProperty(
id = "documentUrl",
label = "Document URL",
id = "document",
label = "Document",
group = "input",
type = TemplateProperty.PropertyType.Hidden,
description = "Specify the URL where the document is hosted",
defaultValue = "= input.documentUrl",
binding = @PropertyBinding(name = "documentUrl"),
description = "Specify the document",
defaultValue = "= input.document",
binding = @PropertyBinding(name = "document"),
feel = Property.FeelMode.disabled,
constraints = @PropertyConstraints(notEmpty = true))
@NotNull
String documentUrl,
Document document,
@TemplateProperty(
id = "s3BucketName",
label = "AWS S3 Bucket name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
*/
package io.camunda.connector.idp.extraction.utils;

import io.camunda.document.Document;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -28,22 +26,16 @@
public class AwsS3Util {
private static final Logger LOGGER = LoggerFactory.getLogger(AwsS3Util.class);

private static String uploadNewFileFromUrl(
final String documentUrl, final String bucketName, final S3AsyncClient s3AsyncClient)
private static String uploadNewFileFromDocument(
final Document document, final String bucketName, final S3AsyncClient s3AsyncClient)
throws IOException {
String documentKey = UUID.randomUUID().toString();

LOGGER.debug("Starting document upload to AWS S3 with key {}", documentKey);

URL url = URI.create(documentUrl).toURL();
URLConnection urlConnection = url.openConnection();
long contentLength = urlConnection.getContentLength();
long contentLength = document.asByteArray().length;

if (contentLength == -1) {
throw new IOException("Unable to determine file size for URL: " + documentUrl);
}

try (InputStream inputStream = urlConnection.getInputStream()) {
try (InputStream inputStream = document.asInputStream()) {
PutObjectRequest putObjectRequest =
PutObjectRequest.builder().bucket(bucketName).key(documentKey).build();

Expand Down Expand Up @@ -82,12 +74,12 @@ public static void deleteS3ObjectFromBucketAsync(
response.thenApply(r -> null);
}

public static S3Object buildS3ObjectFromUrl(
final String documentUrl, final String bucketName, final S3AsyncClient s3AsyncClient)
public static S3Object buildS3ObjectFromDocument(
final Document document, final String bucketName, final S3AsyncClient s3AsyncClient)
throws IOException {
return S3Object.builder()
.bucket(bucketName)
.name(uploadNewFileFromUrl(documentUrl, bucketName, s3AsyncClient))
.name(uploadNewFileFromDocument(document, bucketName, s3AsyncClient))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ void executeSuccessfulExtraction() {

String bedrockResponse = bedrockCaller.call(extractionRequest, "", bedrockRuntimeClient);

assertEquals(bedrockResponse, expectedResponse);
assertEquals(expectedResponse, bedrockResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.idp.extraction.utils.AwsS3Util;
import io.camunda.document.Document;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -34,6 +35,7 @@ class PollingTextractCallerTest {
S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class);
S3Object s3Object = Mockito.mock(S3Object.class);
MockedStatic<AwsS3Util> awsS3UtilMockedStatic;
Document mockedDocument = Mockito.mock(Document.class);

@BeforeEach
void beforeEach() {
Expand All @@ -43,7 +45,7 @@ void beforeEach() {
awsS3UtilMockedStatic
.when(
() ->
AwsS3Util.buildS3ObjectFromUrl(any(), any(String.class), any(S3AsyncClient.class)))
AwsS3Util.buildS3ObjectFromDocument(any(), any(String.class), any(S3AsyncClient.class)))
.thenReturn(s3Object);
awsS3UtilMockedStatic
.when(
Expand Down Expand Up @@ -85,7 +87,7 @@ void callTextractDocumentAnalysisWithSuccess() throws Exception {
String expectedExtractedText = "AAA\nBBB";
String extractedText =
new PollingTextractCaller()
.call("test Url", "test-aws-s3-bucket-name", textractClient, s3AsyncClient);
.call(mockedDocument, "test-aws-s3-bucket-name", textractClient, s3AsyncClient);

assertThat(extractedText).isEqualTo(expectedExtractedText);
}
Expand Down Expand Up @@ -114,7 +116,7 @@ void callTextractDocumentAnalysisWithEmptyResult() throws Exception {
String expectedExtractedText = "";
String extractedText =
new PollingTextractCaller()
.call("test Url", "test-aws-s3-bucket-name", textractClient, s3AsyncClient);
.call(mockedDocument, "test-aws-s3-bucket-name", textractClient, s3AsyncClient);

assertThat(extractedText).isEqualTo(expectedExtractedText);
}
Expand Down Expand Up @@ -147,7 +149,7 @@ void callTextractDocumentAnalysisWithFailure() {
ConnectorException.class,
() ->
pollingTextractCaller.call(
"test Url", "test-aws-s3-bucket-name", textractClient, s3AsyncClient));
mockedDocument, "test-aws-s3-bucket-name", textractClient, s3AsyncClient));

assertEquals("Test exception message", exception.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,19 @@
*/
package io.camunda.connector.idp.extraction.util;

import static org.apache.hc.core5.http.ContentType.APPLICATION_PDF;

import io.camunda.connector.idp.extraction.model.ConverseData;
import io.camunda.connector.idp.extraction.model.ExtractionRequestData;
import io.camunda.connector.idp.extraction.model.TaxonomyItem;
import io.camunda.connector.idp.extraction.model.TextExtractionEngineType;
import io.camunda.document.Document;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.document.factory.DocumentFactoryImpl;
import io.camunda.document.store.DocumentCreationRequest;
import io.camunda.document.store.InMemoryDocumentStore;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.List;

public class ExtractionTestUtils {
Expand All @@ -21,7 +30,12 @@ public class ExtractionTestUtils {
{
"input": {
"extractionEngineType": "AWS_TEXTRACT",
"documentUrl": "https://some-url-containing-your-document/documemt.pdf",
"document": {
"camunda.document.type": "camunda",
"storeId": "test",
"documentId": "test",
"metadata": {}
},
"s3BucketName": "test-aws-s3-bucket-name",
"converseData": {
"modelId": "anthropic.claude-3-5-sonnet-20240620-v1:0"
Expand Down Expand Up @@ -53,10 +67,25 @@ public class ExtractionTestUtils {
public static final ExtractionRequestData TEXTRACT_EXTRACTION_REQUEST_DATA =
new ExtractionRequestData(
TextExtractionEngineType.AWS_TEXTRACT,
"https://some-url-containing-your-document/documemt.pdf",
loadTestFile(),
"test-aws-s3-bucket-name",
List.of(
new TaxonomyItem("sum", "the total amount that was paid for this invoice"),
new TaxonomyItem("supplier", "who provided the goods or services")),
new ConverseData("anthropic.claude-3-5-sonnet-20240620-v1:0", 512, 0.5f, 0.9f));

private static Document loadTestFile() {
DocumentFactory documentFactory = new DocumentFactoryImpl(InMemoryDocumentStore.INSTANCE);
try {
FileInputStream fileInputStream =
new FileInputStream("src/test/resources/sample-invoice.pdf");
return documentFactory.create(
DocumentCreationRequest.from(fileInputStream)
.contentType(APPLICATION_PDF.getMimeType())
.fileName("sample-invoice")
.build());
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
}
Binary file not shown.

0 comments on commit 1cdb6a7

Please sign in to comment.