From fb13ed9c75380c29a67ad7a3501bdc7141ddcdea Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Sat, 18 Jan 2025 22:06:36 +0100 Subject: [PATCH] Support offloading the request data to disk with VertxHttpClientHTTPConduit fix #1628 --- docs/.asciidoctorconfig.adoc | 12 + .../client-server/application.properties | 10 + .../reference/extensions/quarkus-cxf.adoc | 93 +++++ .../cxf/vertx/http/client/TempStoreTest.java | 240 +++++++++++++ .../io/quarkiverse/cxf/CXFClientInfo.java | 9 + .../java/io/quarkiverse/cxf/CxfConfig.java | 96 +++++ .../cxf/vertx/http/client/BodyRecorder.java | 328 ++++++++++++++++++ .../cxf/vertx/http/client/HttpClientPool.java | 4 + .../cxf/vertx/http/client/TempStore.java | 303 ++++++++++++++++ .../client/VertxHttpClientHTTPConduit.java | 210 ++++++----- .../cxf/it/redirect/RedirectRest.java | 104 ++++++ .../retransmitcache/ObjectFactory.java | 97 ++++++ .../retransmitcache/RetransmitCache.java | 84 +++++ .../RetransmitCacheOutput.java | 75 ++++ .../RetransmitCacheResponse.java | 67 ++++ .../RetransmitCacheService.java | 47 +++ .../RetransmitCacheServiceImpl.java | 89 +++++ .../RetransmitCacheService_Service.java | 57 +++ .../retransmitcache/package-info.java | 2 + .../src/main/resources/application.properties | 10 + .../cxf/it/redirect/RedirectTest.java | 108 ++++++ .../cxf/it/redirect/RedirectTestResource.java | 38 ++ pom.xml | 2 +- test-util-parent/test-util/pom.xml | 9 + .../quarkiverse/cxf/test/VertxTestUtil.java | 21 ++ 25 files changed, 2003 insertions(+), 112 deletions(-) create mode 100644 docs/.asciidoctorconfig.adoc create mode 100644 extensions/core/deployment/src/test/java/io/quarkiverse/cxf/vertx/http/client/TempStoreTest.java create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/BodyRecorder.java create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/TempStore.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/ObjectFactory.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCache.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheOutput.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheResponse.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheService.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheServiceImpl.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheService_Service.java create mode 100644 integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/package-info.java create mode 100644 integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTestResource.java create mode 100644 test-util-parent/test-util/src/main/java/io/quarkiverse/cxf/test/VertxTestUtil.java diff --git a/docs/.asciidoctorconfig.adoc b/docs/.asciidoctorconfig.adoc new file mode 100644 index 000000000..d3cddb773 --- /dev/null +++ b/docs/.asciidoctorconfig.adoc @@ -0,0 +1,12 @@ +// +++++++++++++++++++++++++++++++++++++++++++++++++++++++ +// + Initial AsciiDoc editor configuration file - V1.0 + +// ++++++++++++++++++++++++++++++++++++++++++++++++++++++ +// +// Did not find any configuration files, so creating this at project root level. +// If you do not like those files to be generated - you can turn it off inside Asciidoctor Editor preferences. +// +// You can define editor specific parts here. +// For example: with next line you could set imagesdir attribute to subfolder "images" relative to the folder where this config file is located. +// :imagesdir: {asciidoctorconfigdir}/images +// +// For more information please take a look at https://github.com/de-jcup/eclipse-asciidoctor-editor/wiki/Asciidoctor-configfiles diff --git a/docs/modules/ROOT/examples/client-server/application.properties b/docs/modules/ROOT/examples/client-server/application.properties index b93f6fb63..1c8db869b 100644 --- a/docs/modules/ROOT/examples/client-server/application.properties +++ b/docs/modules/ROOT/examples/client-server/application.properties @@ -191,5 +191,15 @@ quarkus.cxf.client.helloWithoutWsdlWithBlocking.client-endpoint-url = http://loc quarkus.cxf.client.helloWithoutWsdlWithBlocking.service-interface = io.quarkiverse.cxf.deployment.test.HelloService +quarkus.cxf.retransmit-cache.threshold = 500K +quarkus.cxf.retransmit-cache.directory = ${qcxf.retransmitCacheDir} +quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.BodyRecorder".level = DEBUG +quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.TempStore".level = DEBUG + +quarkus.cxf.client.retransmitCache.client-endpoint-url = http://localhost:${quarkus.http.test-port}/RedirectRest/retransmitCacheRedirect +quarkus.cxf.client.retransmitCache.service-interface = io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheService +quarkus.cxf.client.retransmitCache.auto-redirect = true +#quarkus.cxf.client.retransmitCache.logging.enabled = true + quarkus.default-locale = en_US quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.VertxHttpClientHTTPConduit".level=DEBUG diff --git a/docs/modules/ROOT/pages/reference/extensions/quarkus-cxf.adoc b/docs/modules/ROOT/pages/reference/extensions/quarkus-cxf.adoc index f26458fc7..4bfb4617f 100644 --- a/docs/modules/ROOT/pages/reference/extensions/quarkus-cxf.adoc +++ b/docs/modules/ROOT/pages/reference/extensions/quarkus-cxf.adoc @@ -733,6 +733,72 @@ respectively. *Environment variable*: `+++QUARKUS_CXF_LOGGING_SENSITIVE_PROTOCOL_HEADER_NAMES+++` + *Since Quarkus CXF*: 2.6.0 +.<| [[quarkus-cxf_quarkus-cxf-retransmit-cache-threshold]]`link:#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]` +.<| `MemorySize` link:#memory-size-note-anchor-quarkus-cxf[icon:question-circle[title=More information about the MemorySize format]] +.<| `128K` + +3+a|If the request retransmission is active for the given client and if request body is larger than this threshold, +then the body is cached on disk instead of keeping it in memory. + +See also: + +* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]` + +*Environment variable*: `+++QUARKUS_CXF_RETRANSMIT_CACHE_THRESHOLD+++` + +*Since Quarkus CXF*: 3.18.0 + +.<| [[quarkus-cxf_quarkus-cxf-retransmit-cache-max-size]]`link:#quarkus-cxf_quarkus-cxf-retransmit-cache-max-size[quarkus.cxf.retransmit-cache.max-size]` +.<| `MemorySize` link:#memory-size-note-anchor-quarkus-cxf[icon:question-circle[title=More information about the MemorySize format]] +.<| + +3+a|The maximum size of a request body allowed to be cached on disk when retransmitting. +If not set, no limit will be enforced. +If set and the limit is exceeded, an exception will be thrown and therefore the request will neither be sent nor redirected. + +See also: + +* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]` +* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]` + +*Environment variable*: `+++QUARKUS_CXF_RETRANSMIT_CACHE_MAX_SIZE+++` + +*Since Quarkus CXF*: 3.18.0 + +.<| [[quarkus-cxf_quarkus-cxf-retransmit-cache-directory]]`link:#quarkus-cxf_quarkus-cxf-retransmit-cache-directory[quarkus.cxf.retransmit-cache.directory]` +.<| `string` +.<| + +3+a|A directory where request bodies exceeding +`xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]` +will be be stored for retransmission. +If specified, the directory must exist on application startup. +If not specified, the system temporary directory will be used. + +See also: + +* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]` +* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]` + +*Environment variable*: `+++QUARKUS_CXF_RETRANSMIT_CACHE_DIRECTORY+++` + +*Since Quarkus CXF*: 3.18.0 + +.<| [[quarkus-cxf_quarkus-cxf-retransmit-cache-gc-delay]]`link:#quarkus-cxf_quarkus-cxf-retransmit-cache-gc-delay[quarkus.cxf.retransmit-cache.gc-delay]` +.<| link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html[`Duration`] link:#duration-note-anchor-quarkus-cxf[icon:question-circle[title=More information about the Duration format]] +.<| `30S` + +3+a|A delay for cleaning up stale temporary files in the +xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-directory[retransmit cache directory]. +Those temporary files are normally removed upon receiving a non-redirect response. +The periodic garbage collection is a fallback mechanism for exceptional conditions. + +The minimum value is 2 seconds. If the value of the delay is set to 0, the garbage collection of stale temporary files will be deactivated. + +See also: + +* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]` + +*Environment variable*: `+++QUARKUS_CXF_RETRANSMIT_CACHE_GC_DELAY+++` + +*Since Quarkus CXF*: 3.18.0 + .<|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus-cxf-codegen-wsdl2java-named-parameter-sets-includes]]`link:#quarkus-cxf_quarkus-cxf-codegen-wsdl2java-named-parameter-sets-includes[quarkus.cxf.codegen.wsdl2java."named-parameter-sets".includes]` .<| List of ``string`` .<| @@ -2090,3 +2156,30 @@ enforced unless it is enabled by other means, such as `@org.apache.cxf.annotatio *Since Quarkus CXF*: 2.7.0 |=== +[NOTE] +[id=duration-note-anchor-quarkus-cxf] +.About the Duration format +==== +To write duration values, use the standard `java.time.Duration` format. +See the link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html#parse(java.lang.CharSequence)[Duration#parse() Java API documentation] for more information. + +You can also use a simplified format, starting with a number: + +* If the value is only a number, it represents time in seconds. +* If the value is a number followed by `ms`, it represents time in milliseconds. + +In other cases, the simplified format is translated to the `java.time.Duration` format for parsing: + +* If the value is a number followed by `h`, `m`, or `s`, it is prefixed with `PT`. +* If the value is a number followed by `d`, it is prefixed with `P`. +==== + +[NOTE] +[id=memory-size-note-anchor-quarkus-cxf] +.About the MemorySize format +==== +A size configuration option recognizes strings in this format (shown as a regular expression): `[0-9]+[KkMmGgTtPpEeZzYy]?`. + +If no suffix is given, assume bytes. +==== + diff --git a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/vertx/http/client/TempStoreTest.java b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/vertx/http/client/TempStoreTest.java new file mode 100644 index 000000000..df3f59276 --- /dev/null +++ b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/vertx/http/client/TempStoreTest.java @@ -0,0 +1,240 @@ +package io.quarkiverse.cxf.vertx.http.client; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import jakarta.inject.Inject; + +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkiverse.cxf.test.VertxTestUtil; +import io.quarkiverse.cxf.vertx.http.client.TempStore.InitializedTempStore; +import io.quarkiverse.cxf.vertx.http.client.TempStore.InitializedTempStore.TempPath; +import io.quarkus.logging.Log; +import io.quarkus.test.QuarkusUnitTest; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; + +public class TempStoreTest { + private static final int FILE_DELAY = 10; + + private static final int GC_DELAY = 360000; + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .overrideConfigKey("quarkus.log.category.\"" + TempStore.class.getName() + "\".level", "DEBUG"); + + @Inject + Vertx vertx; + + @Test + void initializeExistingTempDir() throws IOException, InterruptedException, ExecutionException, TimeoutException { + Log.info("TempPathTest.initializeExistingTempDir()"); + + final Path tempDir = Path.of("target/" + TempStoreTest.class.getSimpleName() + "-" + UUID.randomUUID() + "/temp"); + Files.createDirectories(tempDir); + Assertions.assertThat(tempDir).exists(); + + final CompletableFuture> initializedCF = new CompletableFuture<>(); + final CompletableFuture ctxCF = new CompletableFuture<>(); + final CompletableFuture> tempPathCF = new CompletableFuture<>(); + + vertx.runOnContext(v -> { + final ContextInternal ct = (ContextInternal) vertx.getOrCreateContext(); + ctxCF.complete(ct); + final Future initialized = TempStore + .fromContext(vertx.getOrCreateContext(), Optional.of(tempDir.toString()), GC_DELAY, FILE_DELAY) + .andThen(initializedCF::complete); + + initialized.compose(i -> i.newTempPath()) + .andThen(tempPathCF::complete); + + }); + final ContextInternal ctx = ctxCF.get(5, TimeUnit.SECONDS); + + final AsyncResult initialized = initializedCF.get(5, TimeUnit.SECONDS); + VertxTestUtil.assertSuccess(initialized); + Assertions.assertThat(initialized.result().getDirectory()).isEqualTo(tempDir); + + /* Get a tempFile path */ + final AsyncResult tempPath = tempPathCF.get(5, TimeUnit.SECONDS); + VertxTestUtil.assertSuccess(tempPath); + final Path p = tempPath.result().getPath(); + Assertions.assertThat(p.toString()).startsWith(tempDir.toString()); + + Promise close = ctx.promise(); + /* Check InitializedTempStore.close() works */ + ctx.runOnContext(v -> initialized.result().close(close)); + close.future().toCompletionStage().toCompletableFuture().get(5, TimeUnit.SECONDS); + + final Promise tmpStoreAfterCloseP = ctx.promise(); + ctx.runOnContext(v -> { + tmpStoreAfterCloseP.complete(ctx.contextData().get(TempStore.CONTEXT_KEY)); + }); + + final Object tmpStoreAfterClose = tmpStoreAfterCloseP.future().toCompletionStage().toCompletableFuture().get(5, + TimeUnit.SECONDS); + Assertions.assertThat(tmpStoreAfterClose).isNull(); + + Log.info("Finished testing TempStore.fromContext(Context, Path, long, long)"); + } + + @Test + void initializeNewTempDir() throws InterruptedException, ExecutionException, TimeoutException, IOException { + Log.info("TempPathTest.initializeNewTempDir()"); + final Path tempDir = Path.of("target/" + TempStoreTest.class.getSimpleName() + "-" + UUID.randomUUID() + "/temp"); + Assertions.assertThat(tempDir).doesNotExist(); + + final CompletableFuture> initializedCF = new CompletableFuture<>(); + final CompletableFuture ctxCF = new CompletableFuture<>(); + final CompletableFuture> tempPathCF = new CompletableFuture<>(); + + vertx.runOnContext(v -> { + final ContextInternal ct = (ContextInternal) vertx.getOrCreateContext(); + ctxCF.complete(ct); + final Future initialized = TempStore + .fromContext(ct, Optional.of(tempDir.toString()), GC_DELAY, FILE_DELAY) + .andThen(initializedCF::complete); + + initialized.compose(i -> i.newTempPath()) + .andThen(tempPathCF::complete); + + }); + + final AsyncResult initialized = initializedCF.get(5, TimeUnit.SECONDS); + VertxTestUtil.assertSuccess(initialized); + Assertions.assertThat(initialized.result().getDirectory()).isDirectory(); + + final ContextInternal ctx = ctxCF.get(5, TimeUnit.SECONDS); + //Assertions.assertThat((Object) ctx.get(TempStore.CONTEXT_KEY)).isInstanceOf(TempStore.class); + final Future initialized2 = TempStore.fromContext(ctx, Optional.of(tempDir.toString()), GC_DELAY, + FILE_DELAY); + Assertions.assertThat(initialized.result()).isSameAs(initialized2.result()); + + { + /* Test TempPath.delete() */ + Log.info("Testing TempPath.delete()"); + + /* Get a tempFile path */ + final TempPath tempPath = VertxTestUtil.assertSuccess(tempPathCF.get(5, TimeUnit.SECONDS)); + final Path p = tempPath.getPath(); + Assertions.assertThat(p).doesNotExist(); + + /* Use the path to create the file */ + Files.write(p, new byte[] { 42 }); + Assertions.assertThat(p).isRegularFile(); + + /* Check the direct deletion works */ + tempPath.discard().toCompletionStage().toCompletableFuture().get(5, TimeUnit.SECONDS); + Assertions.assertThat(p).doesNotExist(); + } + + { + /* Test InitializedTempStore.gc() */ + Log.info("Testing InitializedTempStore.gc()"); + + /* Get a tempFile path */ + final CompletableFuture> tempPathCF1 = new CompletableFuture<>(); + vertx.runOnContext(v -> { + initialized.result().newTempPath() + .andThen(tempPathCF1::complete); + }); + final TempPath tp1 = VertxTestUtil.assertSuccess(tempPathCF1.get(5, TimeUnit.SECONDS)); + final Path p1 = tp1.getPath(); + Assertions.assertThat(p1).doesNotExist(); + + /* Use the path to create the file */ + Files.write(p1, new byte[] { 42 }); + Assertions.assertThat(p1).isRegularFile(); + + /* + * Get a path but do not create the file - the InitializedTempStore.gc() should not get confused if the leased path + * does + * not exist + */ + final CompletableFuture> tempPathCF2 = new CompletableFuture<>(); + vertx.runOnContext(v -> { + initialized.result().newTempPath() + .andThen(tempPathCF2::complete); + }); + final TempPath tp2 = VertxTestUtil.assertSuccess(tempPathCF2.get(5, TimeUnit.SECONDS)); + + /* Invoke gc() manually, so that we do not need to mess with waiting for the timer */ + initialized.result().gc(); + + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + boolean f1Exists = Files.exists(p1); + boolean f2Exists = Files.exists(tp2.getPath()); + Log.infof("%s exists: %s, %s exists: %s", p1, f1Exists, tp2.getPath(), f2Exists); + return !f1Exists && !f2Exists; + }); + + } + + { + /* Test InitializedTempStore.close(Promise) */ + Log.info("Testing InitializedTempStore.close(Promise)"); + + /* Get a tempFile path */ + final CompletableFuture> tempPathCF1 = new CompletableFuture<>(); + vertx.runOnContext(v -> { + initialized.result().newTempPath() + .andThen(tempPathCF1::complete); + }); + final TempPath tp1 = VertxTestUtil.assertSuccess(tempPathCF1.get(5, TimeUnit.SECONDS)); + final Path p1 = tp1.getPath(); + Assertions.assertThat(p1).doesNotExist(); + + /* Use the path to create the file */ + Files.write(p1, new byte[] { 42 }); + Assertions.assertThat(p1).isRegularFile(); + + /* + * Get a path but do not create the file - the TempStore.shutdown() should not get confused if the leased path does + * not exist + */ + final CompletableFuture> tempPathCF2 = new CompletableFuture<>(); + vertx.runOnContext(v -> { + initialized.result().newTempPath() + .andThen(tempPathCF2::complete); + }); + final TempPath tp2 = VertxTestUtil.assertSuccess(tempPathCF2.get(5, TimeUnit.SECONDS)); + + Promise close = ctx.promise(); + /* Check TempStore.shutdown() works */ + ctx.runOnContext(v -> initialized.result().close(close)); + close.future().toCompletionStage().toCompletableFuture().get(5, TimeUnit.SECONDS); + + final Promise tmpStoreAfterCloseP = ctx.promise(); + ctx.runOnContext(v -> { + tmpStoreAfterCloseP.complete(ctx.contextData().get(TempStore.CONTEXT_KEY)); + }); + + final Object tmpStoreAfterClose = tmpStoreAfterCloseP.future().toCompletionStage().toCompletableFuture().get(5, + TimeUnit.SECONDS); + Assertions.assertThat(tmpStoreAfterClose).isNull(); + Assertions.assertThat(p1).doesNotExist(); + Assertions.assertThat(tp2.getPath()).doesNotExist(); + + Log.info("Finished testing InitializedTempStore.close(Promise)"); + + } + + } + +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFClientInfo.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFClientInfo.java index 22d4c9446..75530d7e8 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFClientInfo.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFClientInfo.java @@ -11,6 +11,7 @@ import org.apache.cxf.transports.http.configuration.ConnectionType; import org.apache.cxf.transports.http.configuration.ProxyServerType; +import io.quarkiverse.cxf.CxfConfig.RetransmitCacheConfig; import io.quarkus.arc.Arc; import io.quarkus.arc.Unremovable; import io.quarkus.tls.TlsConfiguration; @@ -87,6 +88,9 @@ public class CXFClientInfo { * (name is not part of standard) */ private final int maxRetransmits; + + private final RetransmitCacheConfig retransmitCache; + /** * Specifies the maximum amount of retransmits to the same uri that are allowed for redirects. * Retransmits for authorization is included in the retransmit count. Each redirect may cause another @@ -238,6 +242,7 @@ public CXFClientInfo(CXFClientData other, CxfConfig cxfConfig, CxfClientConfig c this.redirectRelativeUri = config.redirectRelativeUri(); this.maxRetransmits = config.maxRetransmits(); this.maxSameUri = config.maxSameUri(); + this.retransmitCache = cxfConfig.retransmitCache(); this.allowChunking = config.allowChunking(); this.chunkingThreshold = config.chunkingThreshold(); this.chunkLength = config.chunkLength(); @@ -495,6 +500,10 @@ public boolean isRedirectRelativeUri() { return redirectRelativeUri; } + public RetransmitCacheConfig getRetransmitCache() { + return retransmitCache; + } + public int getMaxRetransmits() { return maxRetransmits; } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfConfig.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfConfig.java index e8c9de796..5dd27aa1c 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfConfig.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfConfig.java @@ -1,5 +1,6 @@ package io.quarkiverse.cxf; +import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -8,7 +9,12 @@ import io.quarkus.runtime.annotations.ConfigDocMapKey; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; +import io.quarkus.runtime.configuration.DurationConverter; +import io.quarkus.runtime.configuration.MemorySize; +import io.quarkus.runtime.configuration.MemorySizeConverter; import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithConverter; +import io.smallrye.config.WithDefault; import io.smallrye.config.WithDefaults; import io.smallrye.config.WithName; @@ -117,6 +123,13 @@ public interface CxfConfig { */ GlobalLoggingConfig logging(); + /** + * Retransmission cache configuration + * + * @asciidoclet + */ + RetransmitCacheConfig retransmitCache(); + default boolean isClientPresent(String key) { return Optional.ofNullable(clients()).map(m -> m.containsKey(key)).orElse(false); } @@ -130,4 +143,87 @@ public interface InternalConfig { @ConfigDocIgnore public CxfClientConfig client(); } + + public interface RetransmitCacheConfig { + // The formatter breaks the list with long items + // @formatter:off + /** + * If the request retransmission is active for the given client and if request body is larger than this threshold, + * then the body is cached on disk instead of keeping it in memory. + * + * See also: + * + * * `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]` + * + * @since 3.18.0 + * @asciidoclet + */ + // @formatter:on + @WithDefault("128K") + @WithConverter(MemorySizeConverter.class) + public MemorySize threshold(); + + // The formatter breaks the list with long items + // @formatter:off + /** + * The maximum size of a request body allowed to be cached on disk when retransmitting. + * If not set, no limit will be enforced. + * If set and the limit is exceeded, an exception will be thrown and therefore the request will neither be sent nor redirected. + * + * See also: + * + * * `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]` + * * `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]` + * + * @since 3.18.0 + * @asciidoclet + */ + // @formatter:on + @WithConverter(MemorySizeConverter.class) + public Optional maxSize(); + + // The formatter breaks the list with long items + // @formatter:off + /** + * A directory where request bodies exceeding + * `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]` + * will be be stored for retransmission. + * If specified, the directory must exist on application startup. + * If not specified, the system temporary directory will be used. + * + * See also: + * + * * `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]` + * * `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]` + * + * @since 3.18.0 + * @asciidoclet + */ + // @formatter:on + public Optional directory(); + + // The formatter breaks the list with long items + // @formatter:off + /** + * A delay for cleaning up stale temporary files in the + * xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-directory[retransmit cache directory]. + * Those temporary files are normally removed upon receiving a non-redirect response. + * The periodic garbage collection is a fallback mechanism for exceptional conditions. + * + * The minimum value is 2 seconds. If the value of the delay is set to 0, the garbage collection of stale temporary files will be deactivated. + * + * See also: + * + * * `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]` + * + * @since 3.18.0 + * @asciidoclet + */ + // @formatter:on + @WithDefault("30s") + @WithConverter(DurationConverter.class) + public Duration gcDelay(); + + } + } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/BodyRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/BodyRecorder.java new file mode 100644 index 000000000..5bbbcc0d9 --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/BodyRecorder.java @@ -0,0 +1,328 @@ +package io.quarkiverse.cxf.vertx.http.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.jboss.logging.Logger; + +import io.quarkiverse.cxf.CxfConfig.RetransmitCacheConfig; +import io.quarkiverse.cxf.vertx.http.client.TempStore.InitializedTempStore; +import io.quarkiverse.cxf.vertx.http.client.TempStore.InitializedTempStore.TempPath; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.OpenOptions; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.impl.ContextInternal; + +/** + * A non-blocking facility for storing request bodies in memory or on disk for the sake of retransmission. + */ +class BodyRecorder { + private static final Logger log = Logger.getLogger(VertxHttpClientHTTPConduit.class); + + /** + * Schedules an opening of a non-blocking {@link BodyWriter} on a Vert.x event loop thread. + * + * @param ctx + * @param retransmitCacheConfig + * @return a new {@link BodyWriter} {@link Future} + */ + public static Future openWriter(ContextInternal ctx, RetransmitCacheConfig retransmitCacheConfig) { + final Promise result = ctx.promise(); + ctx.runOnContext(v -> { + final long maxSize = retransmitCacheConfig.maxSize().isPresent() + ? retransmitCacheConfig.maxSize().get().asLongValue() + : -1; + result.complete( + new MemoryBodyWriter( + ctx, + retransmitCacheConfig.threshold().asLongValue(), + maxSize, + retransmitCacheConfig.directory(), + retransmitCacheConfig.gcDelay().toMillis())); + }); + return result.future(); + } + + /** + * A non-blocking writer. There are implementations for storing in memory or on disk. + */ + interface BodyWriter { + public Future write(Buffer buffer); + + public Future close(); + } + + /** + * A request body readable multiple times stored in memory or on disk. + */ + interface StoredBody { + /** + * @return the length of the stored body in bytes + */ + public long length(); + + /** + * Pipe this {@link StoredBody} to the given {@code request}. + * + * @param request + * @return a {@link Future} holding the operation outcome + */ + public Future pipeTo(HttpClientRequest request); + + /** + * @return release any system resources held by this {@link StoredBody}, such as in memory buffers or temporary files. + */ + public Future discard(); + } + + /** + * A {@link BodyWriter} storing in memory. + */ + static class MemoryBodyWriter implements BodyWriter { + + private final ContextInternal ctx; + private final long threshold; + private final long maxSize; + private final Optional tempDir; + private final long fileDelayMs; + private final String threadName; + + /* Read and written only on one specific event loop thread */ + private List buffers; + private long length = 0; + + public MemoryBodyWriter(ContextInternal ctx, long threshold, long maxSize, Optional tempDir, long fileDelayMs) { + this.ctx = ctx; + this.threshold = threshold; + this.maxSize = maxSize; + this.tempDir = tempDir; + this.fileDelayMs = fileDelayMs; + boolean asserting = false; + assert asserting = true; + this.threadName = asserting ? Thread.currentThread().getName() : null; + } + + @Override + public Future write(Buffer buffer) { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + List buffs = buffers; + length += buffer.length(); + if (length > threshold) { + if (maxSize >= 0 && length > maxSize) { + return Future.failedFuture(new IOException( + "Request body size " + length + " bytes exceeded the max-size limit " + maxSize + " bytes")); + } + buffers = null; // avoid damaging the list by subsequent writes + Future diskWriter = DiskBodyWriter.open(ctx, threshold, maxSize, tempDir, fileDelayMs, buffs, + threadName); + return diskWriter.compose(bw -> bw.write(buffer)); + } + /* Not big enough for the file system -> keep it in memory */ + if (buffs == null) { + buffs = buffers = new ArrayList<>(); + } + buffs.add(buffer); + return Future.succeededFuture(this); + } + + @Override + public Future close() { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + final List buffs = buffers; + buffers = null; // avoid damaging the list by subsequent writes + return Future.succeededFuture(new MemoryStoredBody(buffs, length, threadName)); + } + } + + /** + * A {@link StoredBody} in memory. + */ + static class MemoryStoredBody implements StoredBody { + + private List buffers; + private final long length; + private final String threadName; + + MemoryStoredBody(List buffers, long length, String threadName) { + this.buffers = buffers; + this.length = length; + this.threadName = threadName; + } + + @Override + public long length() { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + return length; + } + + @Override + public Future pipeTo(HttpClientRequest req) { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + final List buffs = buffers; + if (buffs != null) { + final int last = buffs.size() - 1; + if (last == 0) { + /* Single buffer recorded */ + return req.end(buffs.get(0).slice()); + } else if (last == -1) { + /* Empty body */ + return req.end(); + } else { + /* Multiple buffers recorded */ + //req.setChunked(true); + // TODO: consider letting MemoryStoredBody implement ReadStream so that we can pipe to HttpClientRequest with back pressure + Future result = Future.succeededFuture(); + for (int i = 0; i < last; i++) { + final int fi = i; + result = result.compose(v -> req.write(buffs.get(fi).slice())); + } + return result.compose(v -> req.end(buffs.get(last).slice())); + } + } + /* Empty body */ + return req.end(); + } + + @Override + public Future discard() { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + buffers = null; + return Future.succeededFuture(); + } + + } + + static class DiskBodyWriter implements BodyWriter { + + private final ContextInternal ctx; + private final long maxSize; + private final AsyncFile tempFile; + private final TempPath tempPath; + private final String threadName; + + /* Read and written only on one specific event loop thread */ + private long length = 0; + + private DiskBodyWriter(ContextInternal ctx, long maxSize, TempPath tempPath, AsyncFile tempFile, String threadName) { + this.ctx = ctx; + this.maxSize = maxSize; + this.tempPath = tempPath; + this.tempFile = tempFile; + this.threadName = threadName; + } + + public static Future open(ContextInternal ctx, long threshold, long maxSize, Optional tempDir, + long fileDelayMs, List buffs, String threadName) { + final Future tempStore = TempStore.fromContext(ctx, tempDir, fileDelayMs); + Future result = tempStore + .compose(ts -> ts.newTempPath()) + .compose(tempPath -> { + log.debugf("Offloading request body exceeding %s bytes to disk: %s", threshold, tempPath.getPath()); + + final Future fileFuture = ctx.owner().fileSystem() + .open( + tempPath.getPath().toString(), + new OpenOptions().setWrite(true).setCreate(true)); + return fileFuture + .compose(file -> Future + .succeededFuture(new DiskBodyWriter(ctx, maxSize, tempPath, file, threadName))); + }); + if (buffs != null) { + for (Buffer b : buffs) { + result = result.compose(bw -> bw.write(b)); + } + } + return result; + } + + @Override + public Future write(Buffer buffer) { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + length += buffer.length(); + if (maxSize >= 0 && length > maxSize) { + return Future.failedFuture(new IOException( + "Request body size " + length + " bytes exceeded the max-size limit " + maxSize + " bytes")); + } + + return tempFile + .write(buffer) + .compose(v -> Future.succeededFuture(this)); + } + + @Override + public Future close() { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + return tempFile + .close() + .compose(v -> Future.succeededFuture(new DiskStoredBody(ctx, tempPath, length, threadName))); + } + } + + /** + * A {@link StoredBody} on disk. + */ + static class DiskStoredBody implements StoredBody { + private final ContextInternal ctx; + private final TempPath tempPath; + private final String threadName; + private final long length; + + public DiskStoredBody(ContextInternal ctx, TempPath tempPath, long length, String threadName) { + super(); + this.ctx = ctx; + this.tempPath = tempPath; + this.length = length; + this.threadName = threadName; + } + + @Override + public Future discard() { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + return tempPath.discard(); + } + + @Override + public Future pipeTo(HttpClientRequest req) { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + return ctx.owner() + .fileSystem() + .open(tempPath.getPath().toString(), new OpenOptions().setRead(true)) + .compose(f -> { + return f.pipeTo(req); + }); + } + + @Override + public long length() { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + return length; + } + + } + +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/HttpClientPool.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/HttpClientPool.java index 1d5458862..8717fb35c 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/HttpClientPool.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/HttpClientPool.java @@ -128,4 +128,8 @@ public int hashCode() { } + public Vertx getVertx() { + return vertx; + } + } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/TempStore.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/TempStore.java new file mode 100644 index 000000000..893fe0f40 --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/TempStore.java @@ -0,0 +1,303 @@ +package io.quarkiverse.cxf.vertx.http.client; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.jboss.logging.Logger; + +import io.vertx.core.Closeable; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.file.FileSystem; +import io.vertx.core.impl.ContextInternal; + +/** + * A disk store for temporary files. + */ +public class TempStore { + private static final Logger log = Logger.getLogger(TempStore.class); + private static final long MIN_DELAY = 2000; /* 2 seconds */ + public static final String CONTEXT_KEY = TempStore.class.getName(); + + /** + * + * @param ctx the {@link Context} to bind the returned {@link InitializedTempStore} to + * @param tempDir the temporary directory where to create temporary files + * @param fileDelayMs number of milliseconds since the creation of the given file after which the file can be deleted + * @return a {@link Future} holding an {@link InitializedTempStore} - either a new one or one retrieved from the given + * {@link Context} + */ + public static Future fromContext(Context ctx, Optional tempDir, long fileDelayMs) { + validateDelayMs(fileDelayMs, "quarkus.cxf.retransmit-cache.gc-delay"); + return fromContext(ctx, tempDir, fileDelayMs >> 1, fileDelayMs); + } + + /** + * @param ctx the {@link Context} to bind the returned {@link InitializedTempStore} to + * @param tempDir the temporary directory where to create temporary files + * @param gcDelayMs delay in milliseconds for periodic cleaning of the temporary files + * @param fileDelayMs number of milliseconds since the creation of the given file after which the file can be deleted + * @return a {@link Future} holding an {@link InitializedTempStore} - either a new one or one retrieved from the given + * {@link Context} + */ + public static Future fromContext(Context ctx, Optional tempDir, long gcDelayMs, + long fileDelayMs) { + ContextInternal ctxi = (ContextInternal) ctx; + return ((TempStore) ctxi + .contextData() + .computeIfAbsent(CONTEXT_KEY, + k -> new TempStore( + ctxi, + Path.of(tempDir.orElse(System.getProperty("java.io.tmpdir"))), + gcDelayMs, + fileDelayMs))) + .initializeIfNeeded(); + } + + private final ContextInternal ctx; + private final Path directory; + private final long gcDelayMs; + private final long fileDelayMs; + + /* + * No need for volatile as TempStore instances are per Vet.x Context and this this field is both read and written from the + * same thread + */ + private Future initializedTempStore; + + TempStore(ContextInternal ctx, Path directory, long gcDelayMs, long fileDelayMs) { + super(); + this.ctx = ctx; + this.directory = directory; + this.gcDelayMs = gcDelayMs; + this.fileDelayMs = fileDelayMs; + } + + public static long validateDelayMs(long delayMs, String label) { + if (delayMs < MIN_DELAY && delayMs != 0) { + throw new IllegalArgumentException("The value of " + label + + " must be >= " + MIN_DELAY + " or 0 to disable regular deletion of stale temporary files"); + } + return delayMs; + } + + Future initializeIfNeeded() { + + if (initializedTempStore != null) { + /* A shortcut to avoid checking the tempDir existence if we checked already */ + return initializedTempStore; + } + final FileSystem fs = ctx.owner().fileSystem(); + return fs + .exists(directory.toString()) + .compose(exists -> { + if (initializedTempStore != null) { + return initializedTempStore; + } + if (!exists) { + return initializedTempStore = fs + .mkdirs(directory.toString()) + .compose(dir -> Future + .succeededFuture(new InitializedTempStore(ctx, directory, gcDelayMs, fileDelayMs))); + } else { + return initializedTempStore = Future + .succeededFuture(new InitializedTempStore(ctx, directory, gcDelayMs, fileDelayMs)); + } + }); + } + + public static class InitializedTempStore implements Closeable { + private final ContextInternal ctx; + private final Path directory; + private final long gcDelayMs; + private final long fileDelayMs; + private final String prefix; + private final String threadName; + + /* Read/written from a single specific thread */ + private int counter = 0; + + /* + * Typically read/written from a single specific thread + * but the cleaning ops on close, can be done from another thread, such as main. + * We are deliberately not making it volatile for that case, + * because we hold race conditions after close for highly unlikely + */ + private long timerId = -1; + private List tempFiles = new CopyOnWriteArrayList<>(); + + InitializedTempStore(ContextInternal ctx, Path directory, long gcDelayMs, long fileDelayMs) { + super(); + ctx.addCloseHook(this); + this.ctx = ctx; + this.directory = directory; + this.gcDelayMs = gcDelayMs; + this.fileDelayMs = fileDelayMs; + final String threadName = Thread.currentThread().getName(); + final String shortThreadName = threadName.replace("vert.x-eventloop-thread-", "evtloop-"); + this.prefix = "qcxf-TempStore-" + ProcessHandle.current().pid() + "-" + shortThreadName + "-"; + + boolean asserting = false; + assert asserting = true; + this.threadName = asserting ? threadName : null; + + log.debugf("Initialized a new TempStore %s/%s*", directory, prefix); + + } + + public Path getDirectory() { + return directory; + } + + public Future newTempPath() { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + if (tempFiles != null) { + final TempPath newPath = new TempPath( + directory.resolve(prefix + (counter++)), + System.currentTimeMillis() + fileDelayMs); + tempFiles.add(newPath); + if (fileDelayMs >= MIN_DELAY && timerId < 0) { + timerId = ctx.owner().setPeriodic(gcDelayMs, tid -> gc()); + } + log.debugf("Created new temporary path %s", newPath); + return Future.succeededFuture(newPath); + } + return Future.failedFuture("Cannot get new TempPath: TempStore closed already"); + } + + public void gc() { + if (tempFiles != null) { + ctx.runOnContext(v -> { + assert Thread.currentThread().getName().equals(threadName) + : "Expected " + threadName + "; found " + Thread.currentThread().getName(); + + if (tempFiles != null) { + delete(new ArrayList<>(tempFiles), System.currentTimeMillis()) + .onSuccess(dels -> log.debugf("Gc'd %d temporary files in TempStore %s/%s*", dels.size(), + directory, prefix)) + .onFailure(e -> log.errorf(e, "Could not gc some temporary files in TempStore %s/%s*", + directory, prefix)); + } + }); + } + } + + @Override + public void close(Promise completion) { + if (timerId >= 0) { + ctx.owner().cancelTimer(timerId); + timerId = -1; + } + ctx.contextData().remove(CONTEXT_KEY); + if (tempFiles != null) { + final List tempFiles = this.tempFiles; + this.tempFiles = null; // disallow adding new files + delete(tempFiles, 0 /* 0 to delete all files immediately */) + .onSuccess(dels -> { + log.debugf("Deleted %d files on close in TempStore %s/%s*", dels.size(), directory, + prefix); + completion.complete(); + }) + .onFailure(e -> { + log.errorf(e, "Could not delete some temporary files on close in TempStore %s/%s*", directory, + prefix); + completion.fail(e); + }); + } else { + /* If tempFiles == null there is nothing to cleanup */ + log.debugf("Nothing to cleanup on close in TempStore %s/%s*", directory, prefix); + completion.complete(); + } + } + + CompositeFuture delete(final List tempFiles, long currentTimeMillis) { + final List> deletions = new ArrayList<>(tempFiles.size()); + for (TempPath path : tempFiles) { + if (path.gcTime >= currentTimeMillis) { + deletions.add(ctx.owner().fileSystem() + .exists(path.path.toString()) + .compose( + exists -> { + if (exists) { + return ctx.owner().fileSystem() + .delete(path.path.toString()) + .andThen(ar -> { + if (ar.succeeded()) { + log.debugf("Deleted temporary file %s", path.path); + } else { + log.warnf(ar.cause(), "Could not delete temporary file %s", + path.path); + } + }); + } else { + log.debugf("Temporary file %s did not exist when attempting to delete it", + path.path); + return Future.succeededFuture(); + } + })); + } + } + return Future.all(deletions); + } + + public class TempPath { + private final Path path; + /** Unix era time at or after which this {@link TempPath} can be deleted */ + private final long gcTime; + + public TempPath(Path path, long gcTime) { + super(); + this.path = path; + this.gcTime = gcTime; + } + + public Future discard() { + final String p = path.toString(); + final FileSystem fs = ctx.owner().fileSystem(); + return fs + .exists(p) + .compose(exists -> { + if (exists) { + return fs + .delete(p) + .andThen(ar -> { + if (ar.succeeded()) { + log.debugf("Deleted temporary file %s", p); + } else { + log.warnf(ar.cause(), "Could not delete temporary file %s", p); + } + }); + } else { + log.debugf("Temporary file %s did not exist when attempting to delete it", p); + return Future.succeededFuture(); + } + }) + .andThen(f -> { + final List tfs = tempFiles; + /* tempFiles can be null when TempStore.close() was called */ + if (tfs != null) { + tfs.remove(this); + } + }); + } + + public Path getPath() { + return path; + } + + @Override + public String toString() { + return path + "@" + gcTime; + } + } + + } + +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java index 0be1e52d3..f1096e70c 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java @@ -75,6 +75,8 @@ import io.quarkiverse.cxf.CXFClientInfo; import io.quarkiverse.cxf.QuarkusCxfUtils; import io.quarkiverse.cxf.QuarkusTLSClientParameters; +import io.quarkiverse.cxf.vertx.http.client.BodyRecorder.BodyWriter; +import io.quarkiverse.cxf.vertx.http.client.BodyRecorder.StoredBody; import io.quarkiverse.cxf.vertx.http.client.HttpClientPool.ClientSpec; import io.quarkiverse.cxf.vertx.http.client.VertxHttpClientHTTPConduit.RequestBodyEvent.RequestBodyEventType; import io.quarkus.arc.Arc; @@ -94,6 +96,7 @@ import io.vertx.core.http.HttpVersion; import io.vertx.core.http.RequestOptions; import io.vertx.core.http.impl.HttpUtils; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.net.ProxyOptions; import io.vertx.core.net.ProxyType; import io.vertx.core.streams.WriteStream; @@ -360,9 +363,19 @@ static record RequestContext( static record RequestBodyEvent(Buffer buffer, RequestBodyEventType eventType) { public enum RequestBodyEventType { - NON_FINAL_CHUNK, - FINAL_CHUNK, - COMPLETE_BODY + NON_FINAL_CHUNK(false), + FINAL_CHUNK(true), + COMPLETE_BODY(true); + + private final boolean finalChunk; + + private RequestBodyEventType(boolean finalChunk) { + this.finalChunk = finalChunk; + } + + public boolean isFinalChunk() { + return finalChunk; + } }; } @@ -443,9 +456,12 @@ static class RequestBodyHandler implements IOEHandler { private final RequestOptions requestOptions; private final ClientSpec clientSpec; - /** Read an written only from the producer thread */ + /* Read an written only from the producer thread */ private boolean firstEvent = true; - /** + private Future bodyWriter; + private Future body; + + /* * Read from the producer thread, written from the event loop. Protected by {@link #lock} {@link #requestReady} * {@link Condition} */ @@ -453,7 +469,6 @@ static class RequestBodyHandler implements IOEHandler { /* Retransmit settings, read/written from the event loop */ private final boolean possibleRetransmit; - private List bodyRecorder; private List redirects; private final int maxRetransmits; private final CXFClientInfo clientInfo; @@ -505,11 +520,19 @@ public RequestBodyHandler( public void handle(RequestBodyEvent event) throws IOException { final Buffer buffer = event.buffer(); + final boolean finalChunk = event.eventType().isFinalChunk(); if (firstEvent) { firstEvent = false; if (possibleRetransmit) { - final List recorder = bodyRecorder = new ArrayList<>(); - recorder.add(buffer.slice()); + Future bw = BodyRecorder.openWriter( + (ContextInternal) clientPool.getVertx().getOrCreateContext(), + clientInfo.getRetransmitCache()); + bw = bw.compose(w -> w.write(buffer.slice())); + if (finalChunk) { + body = bw.compose(w -> w.close()); + } else { + bodyWriter = bw; + } final List redirs = redirects = new ArrayList<>(); redirs.add(url); } @@ -523,32 +546,21 @@ public void handle(RequestBodyEvent event) throws IOException { client.request(requestOptions) .onSuccess(req -> { - switch (event.eventType()) { - case NON_FINAL_CHUNK: { - req - .setChunked(true) - .write(buffer) - .onFailure(t -> mode.responseFailed(t, true)); - - lock.lock(); - try { - this.request = new Result<>(req, null); - requestReady.signal(); - } finally { - lock.unlock(); - } + if (!finalChunk) { + req + .setChunked(true) + .write(buffer) + .onFailure(t -> mode.responseFailed(t, true)); - break; - } - case FINAL_CHUNK: - case COMPLETE_BODY: { - finishRequest(req, buffer); - break; + lock.lock(); + try { + this.request = new Result<>(req, null); + requestReady.signal(); + } finally { + lock.unlock(); } - default: - throw new IllegalArgumentException( - "Unexpected " + RequestBodyEventType.class.getName() + ": " + event.eventType()); - + } else { + finishRequest(req, buffer); } }) .onFailure(t -> { @@ -564,51 +576,43 @@ public void handle(RequestBodyEvent event) throws IOException { } }); - switch (event.eventType()) { - case NON_FINAL_CHUNK: - /* Nothing to do */ - break; - case FINAL_CHUNK: - case COMPLETE_BODY: { - mode.awaitResponse(); - break; - } - default: - throw new IllegalArgumentException( - "Unexpected " + RequestBodyEventType.class.getName() + ": " + event.eventType()); - + if (finalChunk) { + mode.awaitResponse(); } - } else { /* Non-first event */ - if (bodyRecorder != null) { - bodyRecorder.add(buffer.slice()); + Future bw = bodyWriter; + if (bw != null) { + bw = bw.compose(w -> w.write(buffer.slice())); + if (finalChunk) { + body = bw.compose(w -> w.close()); + bodyWriter = null; + } else { + bodyWriter = bw; + } } final HttpClientRequest req = awaitRequest(); - switch (event.eventType()) { - case NON_FINAL_CHUNK: { - req - .write(buffer) - .onFailure(RequestBodyHandler.this::failResponse); - break; - } - case FINAL_CHUNK: - case COMPLETE_BODY: { - finishRequest(req, buffer); - mode.awaitResponse(); - break; - } - default: - throw new IllegalArgumentException( - "Unexpected " + RequestBodyEventType.class.getName() + ": " + event.eventType()); - + if (!finalChunk) { + req + .write(buffer) + .onFailure(RequestBodyHandler.this::failResponse); + } else { + finishRequest(req, buffer); + mode.awaitResponse(); } } } @SuppressWarnings("resource") void finishRequest(HttpClientRequest req, Buffer buffer) { + prepareResponse(req); + req + .end(buffer) + .onFailure(t -> mode.responseFailed(t, true)); + } + + private void prepareResponse(HttpClientRequest req) { req.response() .onComplete(ar -> { final InputStreamWriteStream sink = new InputStreamWriteStream(2); @@ -642,16 +646,16 @@ void finishRequest(HttpClientRequest req, Buffer buffer) { redirectRetransmit(newUri); } catch (IOException e) { sink.setException((IOException) e); - mode.responseReady(new Result<>(new ResponseEvent(response, sink), e)); + mode.responseReady(new Result<>(ResponseEvent.prepare(body, response, sink), e)); } catch (URISyntaxException e) { final IOException ioe = new IOException( "Could not resolve redirect Location " + loc + " relative to " + url, e); sink.setException(ioe); - mode.responseReady(new Result<>(new ResponseEvent(response, sink), ioe)); + mode.responseReady(new Result<>(ResponseEvent.prepare(body, response, sink), ioe)); } catch (Exception e) { final IOException ioe = new IOException(e); sink.setException(ioe); - mode.responseReady(new Result<>(new ResponseEvent(response, sink), ioe)); + mode.responseReady(new Result<>(ResponseEvent.prepare(body, response, sink), ioe)); } return; } else { @@ -664,7 +668,7 @@ void finishRequest(HttpClientRequest req, Buffer buffer) { + " You may want to set quarkus.cxf.client." + qKey + ".auto-redirect = true"); sink.setException(ioe); - mode.responseReady(new Result<>(new ResponseEvent(response, sink), ioe)); + mode.responseReady(new Result<>(ResponseEvent.prepare(body, response, sink), ioe)); return; } if (possibleRetransmit && isRedirect && maxRetransmits >= 0 @@ -678,7 +682,7 @@ void finishRequest(HttpClientRequest req, Buffer buffer) { + " increase quarkus.cxf.client." + qKey + ".max-retransmits. Visited URIs: " + redirects.stream().map(URI::toString).collect(Collectors.joining(" -> "))); sink.setException(ioe); - mode.responseReady(new Result<>(new ResponseEvent(response, sink), ioe)); + mode.responseReady(new Result<>(ResponseEvent.prepare(body, response, sink), ioe)); return; } /* No retransmit */ @@ -692,12 +696,8 @@ void finishRequest(HttpClientRequest req, Buffer buffer) { sink.setException(new IOException(ar.cause())); } } - mode.responseReady(new Result<>(new ResponseEvent(response, sink), ar.cause())); + mode.responseReady(new Result<>(ResponseEvent.prepare(body, response, sink), ar.cause())); }); - req - .end(buffer) - .onFailure(t -> mode.responseFailed(t, true)); - } private static int performedRetransmits(List retransmits) { @@ -741,45 +741,26 @@ void redirectRetransmit(URI newURL) throws IOException { options.setSsl(ssl); options.setURI(requestURI); - final List body = bodyRecorder; - final int last = body.size() - 1; - if (last == 0 && requestHasBody(options.getMethod())) { - /* Only one buffer recorded */ - requestOptions.putHeader(CONTENT_LENGTH, String.valueOf(body.get(0).length())); - } else if (last == -1 && requestHasBody(options.getMethod())) { - /* No buffer recorded */ - requestOptions.putHeader(CONTENT_LENGTH, "0"); - } else { - options.removeHeader(CONTENT_LENGTH); - } + this.body.compose(storedBody -> { + final long contentLength = storedBody.length(); + if (contentLength >= 0 && requestHasBody(options.getMethod())) { + /* Only one buffer recorded */ + options.putHeader(CONTENT_LENGTH, String.valueOf(contentLength)); + } else { + options.removeHeader(CONTENT_LENGTH); + } - final HttpClient client = clientPool.getClient(clientSpec); + final HttpClient client = clientPool.getClient(clientSpec); - // Should not be necessary, because we copy from the original requestOptions - // setProtocolHeaders(outMessage, options, userAgent); + // Should not be necessary, because we copy from the original requestOptions + // setProtocolHeaders(outMessage, options, userAgent); - client.request(options) - .onSuccess(req -> { - if (last == 0) { - /* Single buffer recorded */ - finishRequest(req, body.get(0).slice()); - } else if (last == -1) { - /* Empty body */ - finishRequest(req, Buffer.buffer()); - } else { - /* Multiple buffers recorded */ - req.setChunked(true); - for (int i = 0; i <= last; i++) { - if (i == last) { - finishRequest(req, body.get(i).slice()); - } else { - req - .write(body.get(i).slice()) - .onFailure(t -> mode.responseFailed(t, true)); - } - } - } - }) + return client.request(options) + .compose(req -> { + prepareResponse(req); + return storedBody.pipeTo(req).compose(v -> Future.succeededFuture(req)); + }); + }) .onFailure(t -> { lock.lock(); try { @@ -1172,6 +1153,13 @@ protected void awaitResponse() throws IOException { } static record ResponseEvent(HttpClientResponse response, InputStream responseBodyInputStream) { + public static ResponseEvent prepare(Future body, HttpClientResponse response, + InputStream responseBodyInputStream) { + if (body != null) { + body.compose(b -> b.discard()); + } + return new ResponseEvent(response, responseBodyInputStream); + } } /** diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/RedirectRest.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/RedirectRest.java index fc5dc7d30..876b0e891 100644 --- a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/RedirectRest.java +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/RedirectRest.java @@ -1,10 +1,15 @@ package io.quarkiverse.cxf.it.redirect; import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import jakarta.ws.rs.DELETE; import jakarta.ws.rs.GET; +import jakarta.ws.rs.HeaderParam; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; @@ -12,10 +17,21 @@ import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import jakarta.xml.ws.BindingProvider; +import jakarta.xml.ws.handler.MessageContext; +import jakarta.xml.ws.soap.SOAPFaultException; + +import org.apache.cxf.message.Message; +import org.eclipse.microprofile.config.inject.ConfigProperty; import io.quarkiverse.cxf.annotation.CXFClient; import io.quarkiverse.cxf.it.large.slow.generated.LargeSlowService; +import io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheOutput; +import io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheResponse; +import io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheService; +import io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheServiceImpl; import io.quarkus.logging.Log; +import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; @Path("/RedirectRest") @@ -56,6 +72,12 @@ public class RedirectRest { @CXFClient("loop") LargeSlowService loop; + @CXFClient("retransmitCache") + RetransmitCacheService retransmitCache; + + @ConfigProperty(name = "qcxf.retransmitCacheDir") + String retransmitCacheDir; + LargeSlowService getClient(String clientName) { switch (clientName) { case "singleRedirect": { @@ -173,4 +195,86 @@ public String largeHelloSync(@PathParam("client") String client, @QueryParam("si return getClient(client).largeSlow(sizeBytes, delayMs).getPayload(); } + @Path("/retransmitCacheSync") + @POST + @Produces(MediaType.TEXT_PLAIN) + public Response retransmitCacheSync( + String body, + @HeaderParam(REQUEST_ID_HEADER) String requestId, + @HeaderParam(STATUS_CODE_HEADER) String statusCode) { + RetransmitCacheOutput result = null; + Map reqContext = ((BindingProvider) retransmitCache).getRequestContext(); + reqContext.put( + MessageContext.HTTP_REQUEST_HEADERS, + statusCode != null && requestId != null + ? Map.of(REQUEST_ID_HEADER, List.of(requestId), STATUS_CODE_HEADER, List.of(statusCode)) + : Map.of()); + try { + result = retransmitCache.retransmitCache(retransmitCacheDir, body); + return Response.ok(result.getPayload()).build(); + } catch (SOAPFaultException e) { + Map responseContext = ((BindingProvider) retransmitCache).getResponseContext(); + final int sc = (Integer) responseContext.get(Message.RESPONSE_CODE); + if (sc != 200) { + return Response.status(sc).build(); + } + return Response.status(500, "Unexpected").build(); + } + } + + @Path("/retransmitCacheAsyncBlocking") + @POST + @Produces(MediaType.TEXT_PLAIN) + @Blocking + public Uni retransmitCacheAsyncBlocking( + String body, + @HeaderParam(REQUEST_ID_HEADER) String requestId, + @HeaderParam(STATUS_CODE_HEADER) String statusCode) { + Map reqContext = ((BindingProvider) retransmitCache).getRequestContext(); + reqContext.put( + MessageContext.HTTP_REQUEST_HEADERS, + statusCode != null && requestId != null + ? Map.of(REQUEST_ID_HEADER, List.of(requestId), STATUS_CODE_HEADER, List.of(statusCode)) + : Map.of()); + + final jakarta.xml.ws.Response resp = retransmitCache.retransmitCacheAsync(retransmitCacheDir, + body); + return Uni.createFrom() + .future(resp) + .map(retransmitCacheResponse -> retransmitCacheResponse.getReturn().getPayload()) + .map(payload -> Response.ok(payload).build()) + .onFailure().recoverWithItem(e -> { + final int sc = (Integer) resp.getContext().get(Message.RESPONSE_CODE); + return Response.status(sc).build(); + }); + } + + public static final String REQUEST_ID_HEADER = "X-Request-ID"; + public static final String STATUS_CODE_HEADER = "X-Status-Code"; + private final Map tempFiles = new ConcurrentHashMap<>(); + + @Path("/retransmitCacheRedirect") + @POST + public Response retransmitCacheRedirect( + @HeaderParam(REQUEST_ID_HEADER) String requestId, + @HeaderParam(STATUS_CODE_HEADER) Integer statusCode) { + + if (statusCode != null && requestId != null) { + Log.infof("Enforcing status %d", statusCode); + Properties props = RetransmitCacheServiceImpl.listTempFiles(retransmitCacheDir); + final String propsString = RetransmitCacheServiceImpl.toString(props); + tempFiles.put(requestId, propsString); + Log.infof("Paths %s", props.keySet()); + return Response.status(statusCode).build(); + } + + return Response.temporaryRedirect(URI.create("/soap/retransmitCache")).build(); + } + + @Path("/retransmitCache-tempFiles/{requestId}") + @GET + public String retransmitCacheTempFiles(@PathParam("requestId") String requestId) { + return tempFiles.get(requestId); + } + } diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/ObjectFactory.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/ObjectFactory.java new file mode 100644 index 000000000..03f9261e2 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/ObjectFactory.java @@ -0,0 +1,97 @@ + +package io.quarkiverse.cxf.it.redirect.retransmitcache; + +import javax.xml.namespace.QName; + +import jakarta.xml.bind.JAXBElement; +import jakarta.xml.bind.annotation.XmlElementDecl; +import jakarta.xml.bind.annotation.XmlRegistry; + +/** + * This object contains factory methods for each + * Java content interface and Java element interface + * generated in the io.quarkiverse.cxf.it.redirect.retransmitcache package. + *

+ * An ObjectFactory allows you to programmatically + * construct new instances of the Java representation + * for XML content. The Java representation of XML + * content can consist of schema derived interfaces + * and classes representing the binding of schema + * type definitions, element declarations and model + * groups. Factory methods for each of these are + * provided in this class. + * + */ +@XmlRegistry +public class ObjectFactory { + + private static final QName _RetransmitCache_QNAME = new QName( + "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", "retransmitCache"); + private static final QName _RetransmitCacheResponse_QNAME = new QName( + "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", "retransmitCacheResponse"); + + /** + * Create a new ObjectFactory that can be used to create new instances of schema derived classes for package: + * io.quarkiverse.cxf.it.redirect.retransmitcache + * + */ + public ObjectFactory() { + } + + /** + * Create an instance of {@link RetransmitCache } + * + * @return + * the new instance of {@link RetransmitCache } + */ + public RetransmitCache createRetransmitCache() { + return new RetransmitCache(); + } + + /** + * Create an instance of {@link RetransmitCacheResponse } + * + * @return + * the new instance of {@link RetransmitCacheResponse } + */ + public RetransmitCacheResponse createRetransmitCacheResponse() { + return new RetransmitCacheResponse(); + } + + /** + * Create an instance of {@link RetransmitCacheOutput } + * + * @return + * the new instance of {@link RetransmitCacheOutput } + */ + public RetransmitCacheOutput createRetransmitCacheOutput() { + return new RetransmitCacheOutput(); + } + + /** + * Create an instance of {@link JAXBElement }{@code <}{@link RetransmitCache }{@code >} + * + * @param value + * Java instance representing xml element's value. + * @return + * the new instance of {@link JAXBElement }{@code <}{@link RetransmitCache }{@code >} + */ + @XmlElementDecl(namespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", name = "retransmitCache") + public JAXBElement createRetransmitCache(RetransmitCache value) { + return new JAXBElement<>(_RetransmitCache_QNAME, RetransmitCache.class, null, value); + } + + /** + * Create an instance of {@link JAXBElement }{@code <}{@link RetransmitCacheResponse }{@code >} + * + * @param value + * Java instance representing xml element's value. + * @return + * the new instance of {@link JAXBElement }{@code <}{@link RetransmitCacheResponse }{@code >} + */ + @XmlElementDecl(namespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", name = "retransmitCacheResponse") + public JAXBElement createRetransmitCacheResponse(RetransmitCacheResponse value) { + return new JAXBElement<>(_RetransmitCacheResponse_QNAME, RetransmitCacheResponse.class, null, value); + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCache.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCache.java new file mode 100644 index 000000000..f0e7f1bad --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCache.java @@ -0,0 +1,84 @@ + +package io.quarkiverse.cxf.it.redirect.retransmitcache; + +import jakarta.xml.bind.annotation.XmlAccessType; +import jakarta.xml.bind.annotation.XmlAccessorType; +import jakarta.xml.bind.annotation.XmlType; + +/** + *

+ * Java class for retransmitCache complex type + *

+ * . + * + *

+ * The following schema fragment specifies the expected content contained within this class. + *

+ * + *
{@code
+ * 
+ *   
+ *     
+ *       
+ *         
+ *         
+ *       
+ *     
+ *   
+ * 
+ * }
+ * + * + */ +@XmlAccessorType(XmlAccessType.FIELD) +@XmlType(name = "retransmitCache", propOrder = { + "directory", + "payload" +}) +public class RetransmitCache { + + protected String directory; + protected String payload; + + public RetransmitCache() { + + } + + public RetransmitCache(String directory, String payload) { + this.directory = directory; + this.payload = payload; + } + + /** + * Gets the value of the payload property. + * + * @return + * possible object is + * {@link String } + * + */ + public String getPayload() { + return payload; + } + + /** + * Sets the value of the payload property. + * + * @param value + * allowed object is + * {@link String } + * + */ + public void setPayload(String value) { + this.payload = value; + } + + public String getDirectory() { + return directory; + } + + public void setDirectory(String directory) { + this.directory = directory; + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheOutput.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheOutput.java new file mode 100644 index 000000000..98355afd3 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheOutput.java @@ -0,0 +1,75 @@ + +package io.quarkiverse.cxf.it.redirect.retransmitcache; + +import jakarta.xml.bind.annotation.XmlAccessType; +import jakarta.xml.bind.annotation.XmlAccessorType; +import jakarta.xml.bind.annotation.XmlElement; +import jakarta.xml.bind.annotation.XmlType; + +/** + *

+ * Java class for retransmitCacheOutput complex type + *

+ * . + * + *

+ * The following schema fragment specifies the expected content contained within this class. + *

+ * + *
{@code
+ * 
+ *   
+ *     
+ *       
+ *         
+ *         
+ *       
+ *     
+ *   
+ * 
+ * }
+ * + * + */ +@XmlAccessorType(XmlAccessType.PROPERTY) +@XmlType(name = "retransmitCacheOutput", propOrder = { + "payload" +}) +public class RetransmitCacheOutput { + + protected String payload; + + public RetransmitCacheOutput() { + + } + + public RetransmitCacheOutput(String payload) { + this.payload = payload; + } + + /** + * Gets the value of the payload property. + * + * @return + * possible object is + * {@link String } + * + */ + public String getPayload() { + return payload; + } + + /** + * Sets the value of the payload property. + * + * @param value + * allowed object is + * {@link String } + * + */ + @XmlElement(name = "payload") + public void setPayload(String value) { + this.payload = value; + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheResponse.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheResponse.java new file mode 100644 index 000000000..a96e076c0 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheResponse.java @@ -0,0 +1,67 @@ + +package io.quarkiverse.cxf.it.redirect.retransmitcache; + +import jakarta.xml.bind.annotation.XmlAccessType; +import jakarta.xml.bind.annotation.XmlAccessorType; +import jakarta.xml.bind.annotation.XmlElement; +import jakarta.xml.bind.annotation.XmlType; + +/** + *

+ * Java class for retransmitCacheResponse complex type + *

+ * . + * + *

+ * The following schema fragment specifies the expected content contained within this class. + *

+ * + *
{@code
+ * 
+ *   
+ *     
+ *       
+ *         
+ *       
+ *     
+ *   
+ * 
+ * }
+ * + * + */ +@XmlAccessorType(XmlAccessType.FIELD) +@XmlType(name = "retransmitCacheResponse", propOrder = { + "_return" +}) +public class RetransmitCacheResponse { + + @XmlElement(name = "return") + protected RetransmitCacheOutput _return; + + /** + * Gets the value of the return property. + * + * @return + * possible object is + * {@link RetransmitCacheOutput } + * + */ + public RetransmitCacheOutput getReturn() { + return _return; + } + + /** + * Sets the value of the return property. + * + * @param value + * allowed object is + * {@link RetransmitCacheOutput } + * + */ + public void setReturn(RetransmitCacheOutput value) { + this._return = value; + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheService.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheService.java new file mode 100644 index 000000000..0d4bb5c72 --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheService.java @@ -0,0 +1,47 @@ +package io.quarkiverse.cxf.it.redirect.retransmitcache; + +import java.util.concurrent.Future; + +import jakarta.jws.WebMethod; +import jakarta.jws.WebParam; +import jakarta.jws.WebResult; +import jakarta.jws.WebService; +import jakarta.xml.bind.annotation.XmlSeeAlso; +import jakarta.xml.ws.AsyncHandler; +import jakarta.xml.ws.RequestWrapper; +import jakarta.xml.ws.Response; +import jakarta.xml.ws.ResponseWrapper; + +/** + * This class was generated by Apache CXF 4.0.5 + * 2024-11-21T16:40:52.615+01:00 + * Generated source version: 4.0.5 + * + */ +@WebService(targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", name = "RetransmitCacheService") +@XmlSeeAlso({ ObjectFactory.class }) +public interface RetransmitCacheService { + + @WebMethod(operationName = "retransmitCache") + @RequestWrapper(localName = "retransmitCache", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCache") + @ResponseWrapper(localName = "retransmitCacheResponse", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheResponse") + public Response retransmitCacheAsync( + @WebParam(name = "directory", targetNamespace = "") String directory, + @WebParam(name = "payload", targetNamespace = "") String payload); + + @WebMethod(operationName = "retransmitCache") + @ResponseWrapper(localName = "retransmitCacheResponse", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheResponse") + @RequestWrapper(localName = "retransmitCache", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCache") + public Future retransmitCacheAsync( + @WebParam(name = "directory", targetNamespace = "") String directory, + @WebParam(name = "prefix", targetNamespace = "") String payload, + @WebParam(name = "asyncHandler", targetNamespace = "") AsyncHandler asyncHandler); + + @WebMethod + @RequestWrapper(localName = "retransmitCache", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCache") + @ResponseWrapper(localName = "retransmitCacheResponse", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", className = "io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheResponse") + @WebResult(name = "return", targetNamespace = "") + public io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheOutput retransmitCache( + @WebParam(name = "directory", targetNamespace = "") String directory, + @WebParam(name = "payload", targetNamespace = "") String payload); +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheServiceImpl.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheServiceImpl.java new file mode 100644 index 000000000..1615e08cb --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheServiceImpl.java @@ -0,0 +1,89 @@ +package io.quarkiverse.cxf.it.redirect.retransmitcache; + +import java.io.IOException; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.stream.Stream; + +import jakarta.jws.WebService; +import jakarta.xml.ws.AsyncHandler; +import jakarta.xml.ws.Response; + +import io.quarkiverse.cxf.annotation.CXFEndpoint; +import io.quarkus.logging.Log; + +@WebService(serviceName = "RetransmitCacheService", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test") +@CXFEndpoint("/retransmitCache") +public class RetransmitCacheServiceImpl implements RetransmitCacheService { + + @Override + public Response retransmitCacheAsync(String directory, String payload) { + throw new UnsupportedOperationException(); + } + + @Override + public Future retransmitCacheAsync(String directory, String payload, + AsyncHandler asyncHandler) { + throw new UnsupportedOperationException(); + } + + @Override + public RetransmitCacheOutput retransmitCache(String directory, String payload) { + final Properties props = listTempFiles(directory); + props.put("payload.length", String.valueOf(payload.length())); + return new RetransmitCacheOutput(toString(props)); + } + + public static String toString(final Properties props) { + final StringWriter sw = new StringWriter(); + try { + props.store(sw, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + return sw.toString(); + } + + public static Properties listTempFiles(String directory) { + final String prefix = "qcxf-TempStore-" + ProcessHandle.current().pid() + "-"; + final Properties props = new Properties(); + final Path dir = Path.of(directory); + Log.infof("Listing %s/%s", directory, prefix); + if (Files.isDirectory(dir)) { + try (Stream dirFiles = Files.list(dir)) { + dirFiles + .filter(p -> p.getFileName().toString().startsWith(prefix)) + .forEach(path -> { + Log.infof("Found temp file %s", path); + String content; + + /* We have to wait a bit till the event loop finishes writing to the file */ + while (true) { + try { + content = Files.readString(path, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Could not read " + path, e); + } + if (content.endsWith("")) { + break; + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + props.setProperty(path.toString(), content); + }); + } catch (IOException e) { + throw new RuntimeException("Could not list " + directory, e); + } + } + return props; + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheService_Service.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheService_Service.java new file mode 100644 index 000000000..d708f814e --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheService_Service.java @@ -0,0 +1,57 @@ +package io.quarkiverse.cxf.it.redirect.retransmitcache; + +import java.net.URL; + +import javax.xml.namespace.QName; + +import jakarta.xml.ws.Service; +import jakarta.xml.ws.WebEndpoint; +import jakarta.xml.ws.WebServiceClient; +import jakarta.xml.ws.WebServiceFeature; + +/** + * This class was generated by Apache CXF 4.0.5 + * 2024-11-21T16:40:52.628+01:00 + * Generated source version: 4.0.5 + * + */ +@WebServiceClient(name = "RetransmitCacheService", wsdlLocation = "file:/home/ppalaga/orgs/cxf/qcxf/integration-tests/client-server/src/main/resources/wsdl/RetransmitCache.wsdl", targetNamespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test") +public class RetransmitCacheService_Service extends Service { + + public static final QName SERVICE = new QName("https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", + "RetransmitCacheService"); + public static final QName RetransmitCacheServiceImplPort = new QName( + "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test", "RetransmitCacheServiceImplPort"); + + public RetransmitCacheService_Service(URL wsdlLocation) { + super(wsdlLocation, SERVICE); + } + + public RetransmitCacheService_Service(URL wsdlLocation, QName serviceName) { + super(wsdlLocation, serviceName); + } + + /** + * + * @return + * returns RetransmitCacheService + */ + @WebEndpoint(name = "RetransmitCacheServiceImplPort") + public RetransmitCacheService getRetransmitCacheServiceImplPort() { + return super.getPort(RetransmitCacheServiceImplPort, RetransmitCacheService.class); + } + + /** + * + * @param features + * A list of {@link jakarta.xml.ws.WebServiceFeature} to configure on the proxy. Supported features not in the + * features parameter will have their default values. + * @return + * returns RetransmitCacheService + */ + @WebEndpoint(name = "RetransmitCacheServiceImplPort") + public RetransmitCacheService getRetransmitCacheServiceImplPort(WebServiceFeature... features) { + return super.getPort(RetransmitCacheServiceImplPort, RetransmitCacheService.class, features); + } + +} diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/package-info.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/package-info.java new file mode 100644 index 000000000..71979604a --- /dev/null +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/package-info.java @@ -0,0 +1,2 @@ +@jakarta.xml.bind.annotation.XmlSchema(namespace = "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test") +package io.quarkiverse.cxf.it.redirect.retransmitcache; diff --git a/integration-tests/client-server/src/main/resources/application.properties b/integration-tests/client-server/src/main/resources/application.properties index b93f6fb63..1c8db869b 100644 --- a/integration-tests/client-server/src/main/resources/application.properties +++ b/integration-tests/client-server/src/main/resources/application.properties @@ -191,5 +191,15 @@ quarkus.cxf.client.helloWithoutWsdlWithBlocking.client-endpoint-url = http://loc quarkus.cxf.client.helloWithoutWsdlWithBlocking.service-interface = io.quarkiverse.cxf.deployment.test.HelloService +quarkus.cxf.retransmit-cache.threshold = 500K +quarkus.cxf.retransmit-cache.directory = ${qcxf.retransmitCacheDir} +quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.BodyRecorder".level = DEBUG +quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.TempStore".level = DEBUG + +quarkus.cxf.client.retransmitCache.client-endpoint-url = http://localhost:${quarkus.http.test-port}/RedirectRest/retransmitCacheRedirect +quarkus.cxf.client.retransmitCache.service-interface = io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheService +quarkus.cxf.client.retransmitCache.auto-redirect = true +#quarkus.cxf.client.retransmitCache.logging.enabled = true + quarkus.default-locale = en_US quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.VertxHttpClientHTTPConduit".level=DEBUG diff --git a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTest.java b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTest.java index ded49e00d..c83c0056c 100644 --- a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTest.java +++ b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTest.java @@ -1,7 +1,14 @@ package io.quarkiverse.cxf.it.redirect; +import java.io.IOException; +import java.io.StringReader; +import java.nio.file.Path; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; @@ -12,11 +19,14 @@ import io.quarkiverse.cxf.HTTPConduitImpl; import io.quarkiverse.cxf.it.large.slow.LargeSlowServiceImpl; +import io.quarkus.runtime.configuration.MemorySizeConverter; +import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.restassured.RestAssured; import io.restassured.response.ValidatableResponse; @QuarkusTest +@QuarkusTestResource(RedirectTestResource.class) class RedirectTest { private static final Logger log = Logger.getLogger(RedirectTest.class); @@ -178,4 +188,102 @@ static ValidatableResponse getResponse(String endpoint, int sizeBytes) { .then(); } + @ParameterizedTest + @ValueSource(strings = { // + "retransmitCacheSync", // + "retransmitCacheAsyncBlocking" // + }) + void retransmitCache(String endpoint) throws IOException { + final MemorySizeConverter converter = new MemorySizeConverter(); + { + /* + * 1k is smaller than 500K we set in quarkus.cxf.retransmit-cache.threshold + * Hence the file should not be cached on disk + */ + final int payloadLen = (int) converter.convert("1K").asLongValue(); + final Properties props = retransmitCache(payloadLen, endpoint); + Assertions.assertThat(props.size()).isEqualTo(1); + } + + { + /* + * 9M is greater than the 500K we set in quarkus.cxf.retransmit-cache.threshold + * Hence the file should not be cached on disk + */ + final int payloadLen = (int) converter.convert("9M").asLongValue(); + final Properties props = retransmitCache(payloadLen, endpoint); + Assertions.assertThat(props.size()).isEqualTo(2); + + for (Entry en : props.entrySet()) { + String path = (String) en.getKey(); + if (path.contains("qcxf-TempStore-")) { + Assertions.assertThat(Path.of(path)).doesNotExist(); + Assertions.assertThat((String) en.getValue()) + .startsWith("" + + "" + + "target/RedirectTestResource"); + Assertions.assertThat((String) en.getValue()) + .endsWith(""); + Assertions.assertThat((String) en.getValue()) + .contains("" + LargeSlowServiceImpl.largeString(payloadLen) + ""); + } + } + + } + { + /* + * Let server return 500 + */ + final int payloadLen = (int) converter.convert("501K").asLongValue(); + final String reqId = UUID.randomUUID().toString(); + RestAssured.given() + .header(RedirectRest.REQUEST_ID_HEADER, reqId) + .header(RedirectRest.STATUS_CODE_HEADER, "500") + .body(LargeSlowServiceImpl.largeString(payloadLen)) + .post("/RedirectRest/" + endpoint) + .then() + .statusCode(500); + + final String propString = RestAssured.given() + .get("/RedirectRest/retransmitCache-tempFiles/" + reqId) + .then() + .statusCode(200) + .extract().body().asString(); + + Properties props = new Properties(); + props.load(new StringReader(propString)); + + Assertions.assertThat(props.size()).isEqualTo(1); + for (Entry en : props.entrySet()) { + String path = (String) en.getKey(); + if (path.contains("qcxf-TempStore-")) { + Assertions.assertThat(Path.of(path)).doesNotExist(); + Assertions.assertThat((String) en.getValue()) + .startsWith("" + + "" + + "target/RedirectTestResource"); + Assertions.assertThat((String) en.getValue()) + .endsWith(""); + Assertions.assertThat((String) en.getValue()) + .contains("" + LargeSlowServiceImpl.largeString(payloadLen) + ""); + } + } + + } + } + + private Properties retransmitCache(final int payloadLen, String syncAsync) throws IOException { + String body = RestAssured.given() + .body(LargeSlowServiceImpl.largeString(payloadLen)) + .post("/RedirectRest/" + syncAsync) + .then() + .statusCode(200) + .extract().body().asString(); + + final Properties props = new Properties(); + props.load(new StringReader(body)); + Assertions.assertThat(props.get("payload.length")).isEqualTo(String.valueOf(payloadLen)); + return props; + } + } diff --git a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTestResource.java b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTestResource.java new file mode 100644 index 000000000..ee440ffee --- /dev/null +++ b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTestResource.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.quarkiverse.cxf.it.redirect; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class RedirectTestResource implements QuarkusTestResourceLifecycleManager { + + @Override + public Map start() { + final Map props = new LinkedHashMap<>(); + props.put("qcxf.retransmitCacheDir", "target/RedirectTestResource-" + UUID.randomUUID().toString()); + return props; + } + + @Override + public void stop() { + } +} diff --git a/pom.xml b/pom.xml index ec2f8c6d0..ab0bdcee0 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ 3.17.7 4.1.0 ${quarkus.version} - 1.0.1 + 1.0.2 diff --git a/test-util-parent/test-util/pom.xml b/test-util-parent/test-util/pom.xml index 9415e60ed..eb60afbe8 100644 --- a/test-util-parent/test-util/pom.xml +++ b/test-util-parent/test-util/pom.xml @@ -43,10 +43,19 @@ io.quarkus quarkus-core + + io.vertx + vertx-core + org.apache.cxf cxf-rt-frontend-jaxws + + org.assertj + assertj-core + ${assertj.version} + org.eclipse.microprofile.config microprofile-config-api diff --git a/test-util-parent/test-util/src/main/java/io/quarkiverse/cxf/test/VertxTestUtil.java b/test-util-parent/test-util/src/main/java/io/quarkiverse/cxf/test/VertxTestUtil.java new file mode 100644 index 000000000..8525fdde2 --- /dev/null +++ b/test-util-parent/test-util/src/main/java/io/quarkiverse/cxf/test/VertxTestUtil.java @@ -0,0 +1,21 @@ +package io.quarkiverse.cxf.test; + +import org.assertj.core.api.Assertions; + +import io.vertx.core.AsyncResult; + +public class VertxTestUtil { + + private VertxTestUtil() { + } + + public static T assertSuccess(AsyncResult result) { + Assertions.assertThat(result).satisfiesAnyOf( + i -> Assertions.assertThat(i.succeeded()).isTrue(), + i -> { + throw i.cause(); + }); + return result.result(); + } + +}