Skip to content

Commit

Permalink
Support offloading the request data to disk with VertxHttpClientHTTPC…
Browse files Browse the repository at this point in the history
…onduit fix #1628
  • Loading branch information
ppalaga committed Jan 18, 2025
1 parent f99c7a9 commit 9aca68c
Show file tree
Hide file tree
Showing 23 changed files with 1,992 additions and 111 deletions.
10 changes: 10 additions & 0 deletions docs/modules/ROOT/examples/client-server/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,15 @@ quarkus.cxf.client.helloWithoutWsdlWithBlocking.client-endpoint-url = http://loc
quarkus.cxf.client.helloWithoutWsdlWithBlocking.service-interface = io.quarkiverse.cxf.deployment.test.HelloService


quarkus.cxf.retransmit-cache.threshold = 500K
quarkus.cxf.retransmit-cache.directory = ${qcxf.retransmitCacheDir}
quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.BodyRecorder".level = DEBUG
quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.TempStore".level = DEBUG

quarkus.cxf.client.retransmitCache.client-endpoint-url = http://localhost:${quarkus.http.test-port}/RedirectRest/retransmitCacheRedirect
quarkus.cxf.client.retransmitCache.service-interface = io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheService
quarkus.cxf.client.retransmitCache.auto-redirect = true
#quarkus.cxf.client.retransmitCache.logging.enabled = true

quarkus.default-locale = en_US
quarkus.log.category."io.quarkiverse.cxf.vertx.http.client.VertxHttpClientHTTPConduit".level=DEBUG
94 changes: 94 additions & 0 deletions docs/modules/ROOT/pages/reference/extensions/quarkus-cxf.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,73 @@ respectively.
*Environment variable*: `+++QUARKUS_CXF_LOGGING_SENSITIVE_PROTOCOL_HEADER_NAMES+++` +
*Since Quarkus CXF*: 2.6.0

.<| [[quarkus-cxf_quarkus-cxf-retransmit-cache-threshold]]`link:#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]`
.<| `MemorySize` link:#memory-size-note-anchor-quarkus-cxf[icon:question-circle[title=More information about the MemorySize format]]
.<| `128K`

3+a|If the request retransmission is active for the given client and if request body is larger than this threshold,
then the body is cached on disk instead of keeping it in memory.

See also:

* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]`

*Environment variable*: `+++QUARKUS_CXF_RETRANSMIT_CACHE_THRESHOLD+++` +
*Since Quarkus CXF*: 3.18.0

.<| [[quarkus-cxf_quarkus-cxf-retransmit-cache-max-size]]`link:#quarkus-cxf_quarkus-cxf-retransmit-cache-max-size[quarkus.cxf.retransmit-cache.max-size]`
.<| `MemorySize` link:#memory-size-note-anchor-quarkus-cxf[icon:question-circle[title=More information about the MemorySize format]]
.<|

3+a|The maximum size of a request body allowed to be cached on disk when retransmitting.
If not set, no limit will be enforced.
If set and the limit is exceeded, an exception will be thrown and therefore the request will neither be sent nor redirected.

See also:

* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]`
* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]`

*Environment variable*: `+++QUARKUS_CXF_RETRANSMIT_CACHE_MAX_SIZE+++` +
*Since Quarkus CXF*: 3.18.0

.<| [[quarkus-cxf_quarkus-cxf-retransmit-cache-directory]]`link:#quarkus-cxf_quarkus-cxf-retransmit-cache-directory[quarkus.cxf.retransmit-cache.directory]`
.<| `string`
.<|

3+a|A directory where request bodies exceeding
`xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-retransmit-cache-threshold[quarkus.cxf.client."client-name".retransmit-cache.threshold]`
will be be stored for the sake of retransmission.
If specified, the directory must exist on application startup.
If not specified the system temporary directory will be used.

See also:

* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]`
* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-threshold[quarkus.cxf.retransmit-cache.threshold]`

*Environment variable*: `+++QUARKUS_CXF_RETRANSMIT_CACHE_DIRECTORY+++` +
*Since Quarkus CXF*: 3.18.0

.<| [[quarkus-cxf_quarkus-cxf-retransmit-cache-gc-delay]]`link:#quarkus-cxf_quarkus-cxf-retransmit-cache-gc-delay[quarkus.cxf.retransmit-cache.gc-delay]`
.<| link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html[`Duration`] link:#duration-note-anchor-quarkus-cxf[icon:question-circle[title=More information about the Duration format]]
.<| `30S`

3+a|The delay in seconds (or suffixed with `h`, `m`, `s` or `ms` for hours, minutes, seconds or milliseconds respectively)
for cleaning up stale temporary files in the
xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-retransmit-cache-directory[retransmit cache directory].
Those temporary files are normally removed upon receiving a non-redirect response.
The periodic garbage collection is a fallback mechanism for exceptional conditions.

The minimum value is 2 seconds. If the value of the delay is set to 0, the garbage collection of stale temporary files will be deactivated.

See also:

* `xref:reference/extensions/quarkus-cxf.adoc#quarkus-cxf_quarkus-cxf-client-client-name-auto-redirect[quarkus.cxf.client."client-name".auto-redirect]`

*Environment variable*: `+++QUARKUS_CXF_RETRANSMIT_CACHE_GC_DELAY+++` +
*Since Quarkus CXF*: 3.18.0

.<|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus-cxf-codegen-wsdl2java-named-parameter-sets-includes]]`link:#quarkus-cxf_quarkus-cxf-codegen-wsdl2java-named-parameter-sets-includes[quarkus.cxf.codegen.wsdl2java."named-parameter-sets".includes]`
.<| List of ``string``
.<|
Expand Down Expand Up @@ -2090,3 +2157,30 @@ enforced unless it is enabled by other means, such as `@org.apache.cxf.annotatio
*Since Quarkus CXF*: 2.7.0
|===

[NOTE]
[id=duration-note-anchor-quarkus-cxf]
.About the Duration format
====
To write duration values, use the standard `java.time.Duration` format.
See the link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html#parse(java.lang.CharSequence)[Duration#parse() Java API documentation] for more information.
You can also use a simplified format, starting with a number:
* If the value is only a number, it represents time in seconds.
* If the value is a number followed by `ms`, it represents time in milliseconds.
In other cases, the simplified format is translated to the `java.time.Duration` format for parsing:
* If the value is a number followed by `h`, `m`, or `s`, it is prefixed with `PT`.
* If the value is a number followed by `d`, it is prefixed with `P`.
====

[NOTE]
[id=memory-size-note-anchor-quarkus-cxf]
.About the MemorySize format
====
A size configuration option recognizes strings in this format (shown as a regular expression): `[0-9]+[KkMmGgTtPpEeZzYy]?`.
If no suffix is given, assume bytes.
====

Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package io.quarkiverse.cxf.vertx.http.client;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import jakarta.inject.Inject;

import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.cxf.test.VertxTestUtil;
import io.quarkiverse.cxf.vertx.http.client.TempStore.InitializedTempStore;
import io.quarkiverse.cxf.vertx.http.client.TempStore.InitializedTempStore.TempPath;
import io.quarkus.logging.Log;
import io.quarkus.test.QuarkusUnitTest;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

public class TempStoreTest {
private static final int FILE_DELAY = 10;

private static final int GC_DELAY = 360000;

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest()
.overrideConfigKey("quarkus.log.category.\"" + TempStore.class.getName() + "\".level", "DEBUG");

@Inject
Vertx vertx;

@Test
void initializeExistingTempDir() throws IOException, InterruptedException, ExecutionException, TimeoutException {
Log.info("TempPathTest.initializeExistingTempDir()");

final Path tempDir = Path.of("target/" + TempStoreTest.class.getSimpleName() + "-" + UUID.randomUUID() + "/temp");
Files.createDirectories(tempDir);
Assertions.assertThat(tempDir).exists();

final CompletableFuture<AsyncResult<InitializedTempStore>> initializedCF = new CompletableFuture<>();
final CompletableFuture<ContextInternal> ctxCF = new CompletableFuture<>();
final CompletableFuture<AsyncResult<TempPath>> tempPathCF = new CompletableFuture<>();

vertx.runOnContext(v -> {
final ContextInternal ct = (ContextInternal) vertx.getOrCreateContext();
ctxCF.complete(ct);
final Future<InitializedTempStore> initialized = TempStore
.fromContext(vertx.getOrCreateContext(), Optional.of(tempDir.toString()), GC_DELAY, FILE_DELAY)
.andThen(initializedCF::complete);

initialized.compose(i -> i.newTempPath())
.andThen(tempPathCF::complete);

});
final ContextInternal ctx = ctxCF.get(5, TimeUnit.SECONDS);

final AsyncResult<InitializedTempStore> initialized = initializedCF.get(5, TimeUnit.SECONDS);
VertxTestUtil.assertSuccess(initialized);
Assertions.assertThat(initialized.result().getDirectory()).isEqualTo(tempDir);

/* Get a tempFile path */
final AsyncResult<TempPath> tempPath = tempPathCF.get(5, TimeUnit.SECONDS);
VertxTestUtil.assertSuccess(tempPath);
final Path p = tempPath.result().getPath();
Assertions.assertThat(p.toString()).startsWith(tempDir.toString());

Promise<Void> close = ctx.promise();
/* Check InitializedTempStore.close() works */
ctx.runOnContext(v -> initialized.result().close(close));
close.future().toCompletionStage().toCompletableFuture().get(5, TimeUnit.SECONDS);

final Promise<Object> tmpStoreAfterCloseP = ctx.promise();
ctx.runOnContext(v -> {
tmpStoreAfterCloseP.complete(ctx.contextData().get(TempStore.CONTEXT_KEY));
});

final Object tmpStoreAfterClose = tmpStoreAfterCloseP.future().toCompletionStage().toCompletableFuture().get(5,
TimeUnit.SECONDS);
Assertions.assertThat(tmpStoreAfterClose).isNull();

Log.info("Finished testing TempStore.fromContext(Context, Path, long, long)");
}

@Test
void initializeNewTempDir() throws InterruptedException, ExecutionException, TimeoutException, IOException {
Log.info("TempPathTest.initializeNewTempDir()");
final Path tempDir = Path.of("target/" + TempStoreTest.class.getSimpleName() + "-" + UUID.randomUUID() + "/temp");
Assertions.assertThat(tempDir).doesNotExist();

final CompletableFuture<AsyncResult<InitializedTempStore>> initializedCF = new CompletableFuture<>();
final CompletableFuture<ContextInternal> ctxCF = new CompletableFuture<>();
final CompletableFuture<AsyncResult<TempPath>> tempPathCF = new CompletableFuture<>();

vertx.runOnContext(v -> {
final ContextInternal ct = (ContextInternal) vertx.getOrCreateContext();
ctxCF.complete(ct);
final Future<InitializedTempStore> initialized = TempStore
.fromContext(ct, Optional.of(tempDir.toString()), GC_DELAY, FILE_DELAY)
.andThen(initializedCF::complete);

initialized.compose(i -> i.newTempPath())
.andThen(tempPathCF::complete);

});

final AsyncResult<InitializedTempStore> initialized = initializedCF.get(5, TimeUnit.SECONDS);
VertxTestUtil.assertSuccess(initialized);
Assertions.assertThat(initialized.result().getDirectory()).isDirectory();

final ContextInternal ctx = ctxCF.get(5, TimeUnit.SECONDS);
//Assertions.assertThat((Object) ctx.get(TempStore.CONTEXT_KEY)).isInstanceOf(TempStore.class);
final Future<InitializedTempStore> initialized2 = TempStore.fromContext(ctx, Optional.of(tempDir.toString()), GC_DELAY,
FILE_DELAY);
Assertions.assertThat(initialized.result()).isSameAs(initialized2.result());

{
/* Test TempPath.delete() */
Log.info("Testing TempPath.delete()");

/* Get a tempFile path */
final TempPath tempPath = VertxTestUtil.assertSuccess(tempPathCF.get(5, TimeUnit.SECONDS));
final Path p = tempPath.getPath();
Assertions.assertThat(p).doesNotExist();

/* Use the path to create the file */
Files.write(p, new byte[] { 42 });
Assertions.assertThat(p).isRegularFile();

/* Check the direct deletion works */
tempPath.discard().toCompletionStage().toCompletableFuture().get(5, TimeUnit.SECONDS);
Assertions.assertThat(p).doesNotExist();
}

{
/* Test InitializedTempStore.gc() */
Log.info("Testing InitializedTempStore.gc()");

/* Get a tempFile path */
final CompletableFuture<AsyncResult<TempPath>> tempPathCF1 = new CompletableFuture<>();
vertx.runOnContext(v -> {
initialized.result().newTempPath()
.andThen(tempPathCF1::complete);
});
final TempPath tp1 = VertxTestUtil.assertSuccess(tempPathCF1.get(5, TimeUnit.SECONDS));
final Path p1 = tp1.getPath();
Assertions.assertThat(p1).doesNotExist();

/* Use the path to create the file */
Files.write(p1, new byte[] { 42 });
Assertions.assertThat(p1).isRegularFile();

/*
* Get a path but do not create the file - the InitializedTempStore.gc() should not get confused if the leased path
* does
* not exist
*/
final CompletableFuture<AsyncResult<TempPath>> tempPathCF2 = new CompletableFuture<>();
vertx.runOnContext(v -> {
initialized.result().newTempPath()
.andThen(tempPathCF2::complete);
});
final TempPath tp2 = VertxTestUtil.assertSuccess(tempPathCF2.get(5, TimeUnit.SECONDS));

/* Invoke gc() manually, so that we do not need to mess with waiting for the timer */
initialized.result().gc();

Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
boolean f1Exists = Files.exists(p1);
boolean f2Exists = Files.exists(tp2.getPath());
Log.infof("%s exists: %s, %s exists: %s", p1, f1Exists, tp2.getPath(), f2Exists);
return !f1Exists && !f2Exists;
});

}

{
/* Test InitializedTempStore.close(Promise<Void>) */
Log.info("Testing InitializedTempStore.close(Promise<Void>)");

/* Get a tempFile path */
final CompletableFuture<AsyncResult<TempPath>> tempPathCF1 = new CompletableFuture<>();
vertx.runOnContext(v -> {
initialized.result().newTempPath()
.andThen(tempPathCF1::complete);
});
final TempPath tp1 = VertxTestUtil.assertSuccess(tempPathCF1.get(5, TimeUnit.SECONDS));
final Path p1 = tp1.getPath();
Assertions.assertThat(p1).doesNotExist();

/* Use the path to create the file */
Files.write(p1, new byte[] { 42 });
Assertions.assertThat(p1).isRegularFile();

/*
* Get a path but do not create the file - the TempStore.shutdown() should not get confused if the leased path does
* not exist
*/
final CompletableFuture<AsyncResult<TempPath>> tempPathCF2 = new CompletableFuture<>();
vertx.runOnContext(v -> {
initialized.result().newTempPath()
.andThen(tempPathCF2::complete);
});
final TempPath tp2 = VertxTestUtil.assertSuccess(tempPathCF2.get(5, TimeUnit.SECONDS));

Promise<Void> close = ctx.promise();
/* Check TempStore.shutdown() works */
ctx.runOnContext(v -> initialized.result().close(close));
close.future().toCompletionStage().toCompletableFuture().get(5, TimeUnit.SECONDS);

final Promise<Object> tmpStoreAfterCloseP = ctx.promise();
ctx.runOnContext(v -> {
tmpStoreAfterCloseP.complete(ctx.contextData().get(TempStore.CONTEXT_KEY));
});

final Object tmpStoreAfterClose = tmpStoreAfterCloseP.future().toCompletionStage().toCompletableFuture().get(5,
TimeUnit.SECONDS);
Assertions.assertThat(tmpStoreAfterClose).isNull();
Assertions.assertThat(p1).doesNotExist();
Assertions.assertThat(tp2.getPath()).doesNotExist();

Log.info("Finished testing InitializedTempStore.close(Promise<Void>)");

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.cxf.transports.http.configuration.ConnectionType;
import org.apache.cxf.transports.http.configuration.ProxyServerType;

import io.quarkiverse.cxf.CxfConfig.RetransmitCacheConfig;
import io.quarkus.arc.Arc;
import io.quarkus.arc.Unremovable;
import io.quarkus.tls.TlsConfiguration;
Expand Down Expand Up @@ -87,6 +88,9 @@ public class CXFClientInfo {
* (name is not part of standard)
*/
private final int maxRetransmits;

private final RetransmitCacheConfig retransmitCache;

/**
* Specifies the maximum amount of retransmits to the same uri that are allowed for redirects.
* Retransmits for authorization is included in the retransmit count. Each redirect may cause another
Expand Down Expand Up @@ -238,6 +242,7 @@ public CXFClientInfo(CXFClientData other, CxfConfig cxfConfig, CxfClientConfig c
this.redirectRelativeUri = config.redirectRelativeUri();
this.maxRetransmits = config.maxRetransmits();
this.maxSameUri = config.maxSameUri();
this.retransmitCache = cxfConfig.retransmitCache();
this.allowChunking = config.allowChunking();
this.chunkingThreshold = config.chunkingThreshold();
this.chunkLength = config.chunkLength();
Expand Down Expand Up @@ -495,6 +500,10 @@ public boolean isRedirectRelativeUri() {
return redirectRelativeUri;
}

public RetransmitCacheConfig getRetransmitCache() {
return retransmitCache;
}

public int getMaxRetransmits() {
return maxRetransmits;
}
Expand Down
Loading

0 comments on commit 9aca68c

Please sign in to comment.