diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java index 0b09d00b2..502e8267e 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java @@ -11,6 +11,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.Stream; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Disposes; @@ -35,6 +36,7 @@ import io.quarkiverse.cxf.CXFClientData; import io.quarkiverse.cxf.CXFClientInfo; import io.quarkiverse.cxf.CXFRecorder; +import io.quarkiverse.cxf.ClientInjectionPoint; import io.quarkiverse.cxf.CxfClientConfig.HTTPConduitImpl; import io.quarkiverse.cxf.CxfClientProducer; import io.quarkiverse.cxf.CxfFixedConfig; @@ -178,34 +180,53 @@ private static AnnotationInstance findWebServiceClientAnnotation(IndexView index return null; } + public static Stream findClientInjectionPoints(IndexView index) { + return index.getAnnotations(CxfDotNames.CXFCLIENT_ANNOTATION).stream() + .map(annotationInstance -> { + final AnnotationTarget target = annotationInstance.target(); + Type type; + switch (target.kind()) { + case FIELD: + type = target.asField().type(); + break; + case METHOD_PARAMETER: + MethodParameterInfo paramInfo = target.asMethodParameter(); + MethodInfo method = paramInfo.method(); + type = method.parameterTypes().get(paramInfo.position()); + break; + default: + type = null; + break; + } + if (type != null) { + type = type.name().equals(CxfDotNames.INJECT_INSTANCE) ? type.asParameterizedType().arguments().get(0) + : type; + final String typeName = type.name().toString(); + try { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + final Class sei = Class.forName(typeName, true, cl); + final AnnotationValue value = annotationInstance.value(); + return new ClientInjectionPoint(value != null ? value.asString() : "", sei); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load Service Endpoint Interface " + typeName); + } + } else { + return null; + } + }) + .filter(ip -> ip != null) + .distinct(); + } + private static Map findClientSEIsInUse(IndexView index, CxfFixedConfig config) { final Map seiToClientConfig = new TreeMap<>(); - index.getAnnotations(CxfDotNames.CXFCLIENT_ANNOTATION).forEach(annotationInstance -> { - final AnnotationTarget target = annotationInstance.target(); - Type type; - switch (target.kind()) { - case FIELD: - type = target.asField().type(); - break; - case METHOD_PARAMETER: - MethodParameterInfo paramInfo = target.asMethodParameter(); - MethodInfo method = paramInfo.method(); - type = method.parameterTypes().get(paramInfo.position()); - break; - default: - type = null; - break; - } - if (type != null) { - type = type.name().equals(CxfDotNames.INJECT_INSTANCE) ? type.asParameterizedType().arguments().get(0) - : type; - final String typeName = type.name().toString(); - final ClientFixedConfig clientConfig = findClientConfig( - config, - Optional.ofNullable(annotationInstance.value()).map(AnnotationValue::asString).orElse(null), - typeName); - seiToClientConfig.put(typeName, clientConfig); - } + findClientInjectionPoints(index).forEach(clientInjectionPoint -> { + String sei = clientInjectionPoint.getSei().getName(); + final ClientFixedConfig clientConfig = findClientConfig( + config, + clientInjectionPoint.getConfigKey(), + sei); + seiToClientConfig.put(sei, clientConfig); }); return seiToClientConfig; } diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/devui/DevUIProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/devui/DevUIProcessor.java index 94f9e85f4..451942286 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/devui/DevUIProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/devui/DevUIProcessor.java @@ -1,12 +1,17 @@ package io.quarkiverse.cxf.deployment.devui; import java.util.List; +import java.util.stream.Collectors; -import io.quarkiverse.cxf.deployment.CxfClientBuildItem; -import io.quarkiverse.cxf.deployment.CxfEndpointImplementationBuildItem; +import io.quarkiverse.cxf.ClientInjectionPoint; +import io.quarkiverse.cxf.deployment.CxfClientProcessor; import io.quarkiverse.cxf.devui.CxfJsonRPCService; +import io.quarkiverse.cxf.devui.DevUiRecorder; import io.quarkus.deployment.IsDevelopment; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.devui.spi.JsonRPCProvidersBuildItem; import io.quarkus.devui.spi.page.CardPageBuildItem; import io.quarkus.devui.spi.page.Page; @@ -14,18 +19,21 @@ public class DevUIProcessor { @BuildStep(onlyIf = IsDevelopment.class) - public CardPageBuildItem pages(List services, - List clients) { + public CardPageBuildItem pages() { CardPageBuildItem cardPageBuildItem = new CardPageBuildItem(); - int total = services.size() + clients.size(); + cardPageBuildItem.addPage(Page.webComponentPageBuilder() + .title("Clients") + .icon("font-awesome-solid:message") + .componentLink("qwc-cxf-clients.js") + .dynamicLabelJsonRPCMethodName("getClientCount")); cardPageBuildItem.addPage(Page.webComponentPageBuilder() - .title("List of SOAP WS") - .icon("font-awesome-solid:cubes") + .title("Service Endpoints") + .icon("font-awesome-solid:gears") .componentLink("qwc-cxf-services.js") - .staticLabel(String.valueOf(total))); + .dynamicLabelJsonRPCMethodName("getServiceCount")); return cardPageBuildItem; } @@ -34,4 +42,14 @@ public CardPageBuildItem pages(List services JsonRPCProvidersBuildItem createJsonRPCServiceForCache() { return new JsonRPCProvidersBuildItem(CxfJsonRPCService.class); } + + @BuildStep(onlyIf = IsDevelopment.class) + @Record(ExecutionTime.RUNTIME_INIT) + void collectClients(CombinedIndexBuildItem combinedIndexBuildItem, DevUiRecorder devuiRecorder) { + final List injectionPoints = CxfClientProcessor.findClientInjectionPoints( + combinedIndexBuildItem.getIndex()) + .collect(Collectors.toList()); + devuiRecorder.addClientInjectionPoints(injectionPoints); + } + } diff --git a/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-clients.js b/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-clients.js new file mode 100644 index 000000000..3c3958366 --- /dev/null +++ b/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-clients.js @@ -0,0 +1,114 @@ +import { LitElement, html, css} from 'lit'; +import 'qui-card'; +import '@vaadin/progress-bar'; +import '@vaadin/grid'; +import { columnBodyRenderer } from '@vaadin/grid/lit.js'; +import { JsonRpc } from 'jsonrpc'; +import 'qui-ide-link'; + +/** + * This component shows the list of clients + */ +export class QwcCxfClients extends LitElement { + jsonRpc = new JsonRpc(this); + + static styles = css` + .cxf-table { + height: 100%; + padding-bottom: 10px; + } + + code { + font-size: 85%; + } + + .annotation { + color: var(--lumo-contrast-50pct); + } + + :host { + display: flex; + flex-direction:column; + gap: 20px; + padding-left: 10px; + padding-right: 10px; + } + .nothing-found { + padding: 5px; + }`; + + static properties = { + _clients: {state: true} + }; + + constructor() { + super(); + this._clients = null; + } + + connectedCallback() { + super.connectedCallback(); + this.jsonRpc.getClients().then(jsonRpcResponse => { + this._clients = jsonRpcResponse.result; + }); + } + + render() { + if (this._clients) { + if (this._clients.length > 0) { + return this._renderClientList(); + } else { + return html`
No clients found
`; + } + } else { + return html``; + } + } + + _renderClientList(){ + return html` + + + + + + + + + `; + } + + _classNameRenderer(client){ + return html` + @CXFClient("${client.configKey}") + ${client.sei} + `; + } + + _addressRenderer(client) { + return html` +   + ${client.address} + `; + } + + _wsdlRenderer(client) { + return html` +   + ${client.wsdl} + `; + } + +} +customElements.define('qwc-cxf-clients', QwcCxfClients); \ No newline at end of file diff --git a/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-services.js b/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-services.js index 5bd586877..c5a0e3565 100644 --- a/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-services.js +++ b/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-services.js @@ -1,18 +1,31 @@ -import { LitElement, html, css} from 'lit'; +import { LitElement, html, css} from 'lit'; import 'qui-card'; import '@vaadin/progress-bar'; -import '@vaadin/grid'; +import '@vaadin/grid'; import { columnBodyRenderer } from '@vaadin/grid/lit.js'; import { JsonRpc } from 'jsonrpc'; import 'qui-ide-link'; /** - * This component shows the List SOAP Web Services and clients + * This component shows the list of Service endpoints */ -export class QwcCxfServices extends LitElement { +export class QwcCxfServices extends LitElement { jsonRpc = new JsonRpc(this); - - static styles = css` + + static styles = css` + .cxf-table { + height: 100%; + padding-bottom: 10px; + } + + code { + font-size: 85%; + } + + .service-sei { + color: var(--lumo-contrast-50pct); + } + :host { display: flex; flex-direction:column; @@ -23,125 +36,67 @@ export class QwcCxfServices extends LitElement { .nothing-found { padding: 5px; }`; - + static properties = { - _services: {state: true}, - _clients: {state: true} + _services: {state: true} }; - - constructor() { + + constructor() { super(); this._services = null; - this._clients = null; } connectedCallback() { super.connectedCallback(); - this.jsonRpc.getServices().then(jsonRpcResponse => { + this.jsonRpc.getServices().then(jsonRpcResponse => { this._services = jsonRpcResponse.result; }); - this.jsonRpc.getClients().then(jsonRpcResponse => { - this._clients = jsonRpcResponse.result; - }); } - render() { - return html`${this._renderSoapServiceCard()} - ${this._renderSoapClientsCard()}`; - } - - _renderSoapServiceCard(){ - return html` -
- ${this._renderSoapServices()} -
-
`; - } - - _renderSoapServices(){ - if(this._services){ - if(this._services.length>0) { - return html` - - - - - - `; + render() { + if (this._services) { + if (this._services.length > 0) { + return this._renderServiceList(); }else { - return html`
No SOAP Services found
`; + return html`
No service endpoints found
`; } - }else{ + } else { return html``; } } - - _classNameRenderer(service){ - return html`${service.className}`; - } - - _pathRenderer(service) { - return html`${service.path}${service.relativePath}`; - } - - _renderSoapClientsCard(){ - return html` -
- ${this._renderSoapClients()} -
-
`; - } - - _renderSoapClients(){ - if(this._clients){ - if(this._clients.length>0) { - - return html` - - - - - `; - }else { - return html`
No SOAP Clients found
`; - } - }else{ - return html``; - } + _renderServiceList(){ + return html` + + + + + + `; } - - _seiRenderer(client){ - return html`${client.sei}`; + + _classNameRenderer(service){ + /* service.sei always the same as service.className + ${service.sei} + */ + return html` + ${service.className} + `; } - - _endpointAddressRenderer(client){ - if(client.wsdlUrl){ - return html`${client.wsdlUrl}`; - }else if(client.endpointAddress){ - return html`${client.endpointAddress}`; - }else { - return html`N/A`; - } + + _wsdlRenderer(service) { + return html`${service.path}${service.relativePath}?wsdl`; } - + } -customElements.define('qwc-cxf-services', QwcCxfServices); \ No newline at end of file +customElements.define('qwc-cxf-services', QwcCxfServices); diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java index 2aa4ef59f..ef9ecdc75 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java @@ -142,6 +142,7 @@ public Handler initServer( public void resetDestinationRegistry(ShutdownContext context) { context.addShutdownTask(VertxDestinationFactory::resetRegistry); + context.addShutdownTask(CxfJsonRPCService::shutdown); } public void addRuntimeBusCustomizer(RuntimeValue> customizer) { diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/ClientInjectionPoint.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/ClientInjectionPoint.java new file mode 100644 index 000000000..79f60833b --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/ClientInjectionPoint.java @@ -0,0 +1,39 @@ +package io.quarkiverse.cxf; + +import java.util.Objects; + +public class ClientInjectionPoint { + private final String configKey; + private final Class sei; + + public ClientInjectionPoint(String configKey, Class sei) { + super(); + this.configKey = configKey; + this.sei = sei; + } + + public String getConfigKey() { + return configKey; + } + + public Class getSei() { + return sei; + } + + @Override + public int hashCode() { + return Objects.hash(configKey, sei); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ClientInjectionPoint other = (ClientInjectionPoint) obj; + return Objects.equals(configKey, other.configKey) && Objects.equals(sei, other.sei); + } +} \ No newline at end of file diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientProducer.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientProducer.java index c323b0ebc..0812f7130 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientProducer.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientProducer.java @@ -5,6 +5,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.lang.annotation.Annotation; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; @@ -22,6 +23,7 @@ import jakarta.annotation.PostConstruct; import jakarta.enterprise.inject.Any; import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.spi.Annotated; import jakarta.enterprise.inject.spi.InjectionPoint; import jakarta.inject.Inject; import jakarta.xml.ws.BindingProvider; @@ -146,6 +148,7 @@ private Object produceCxfClient(CXFClientInfo cxfClientInfo) { QuarkusJaxWsProxyFactoryBean factory = new QuarkusJaxWsProxyFactoryBean(quarkusClientFactoryBean, interfaces); final Map props = new LinkedHashMap<>(); factory.setProperties(props); + props.put(CXFClientInfo.class.getName(), cxfClientInfo); factory.setServiceClass(seiClass); LOGGER.debugf("using servicename {%s}%s", cxfClientInfo.getWsNamespace(), cxfClientInfo.getWsName()); factory.setServiceName(new QName(cxfClientInfo.getWsNamespace(), cxfClientInfo.getWsName())); @@ -375,22 +378,32 @@ protected static CXFClientInfo selectorCXFClientInfo( // If injection point is annotated with @CXFClient then determine a // configuration by looking up annotated config value: - - if (ip.getAnnotated().isAnnotationPresent(CXFClient.class)) { - CXFClient anno = ip.getAnnotated().getAnnotation(CXFClient.class); - String configKey = anno.value(); - - if (config.isClientPresent(configKey)) { - return new CXFClientInfo(meta, config.getClient(configKey), configKey); - } - - // If config-key is present and not default: This is an error: - if (configKey != null && !configKey.isEmpty()) { + final CXFClient annot; + final Annotated annotated = ip.getAnnotated(); + if (annotated != null && annotated.isAnnotationPresent(CXFClient.class)) { + annot = ip.getAnnotated().getAnnotation(CXFClient.class); + } else { + Optional cxfClientAnnotOptional = ip.getQualifiers().stream() + .filter(an -> an.annotationType().equals(CXFClient.class)).findFirst(); + if (cxfClientAnnotOptional.isPresent()) { + annot = (CXFClient) cxfClientAnnotOptional.get(); + } else { throw new IllegalStateException( - "quarkus.cxf.\"" + configKey + "\" is referenced in " + ip.getMember() - + " but no such build time configuration entry exists"); + "@io.quarkiverse.cxf.annotation.CXFClient not found on an imjection point " + ip); } } + final String configKey = annot.value(); + if (config.isClientPresent(configKey)) { + return new CXFClientInfo(meta, config.getClient(configKey), configKey); + } + + // If config-key is present and not default: This is an error: + if (configKey != null && !configKey.isEmpty()) { + throw new IllegalStateException( + "quarkus.cxf.client.\"" + configKey + "\" is referenced in " + ip.getMember() + + " but no such build time configuration entry exists"); + } + // User did not specify any client config value. Thus we make a smart guess // about which configuration is to be used. // diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/CxfJsonRPCService.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/CxfJsonRPCService.java index 9d5c23b06..93a05b5c4 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/CxfJsonRPCService.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/CxfJsonRPCService.java @@ -1,41 +1,86 @@ package io.quarkiverse.cxf.devui; import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.stream.Collectors; -import jakarta.enterprise.inject.spi.CDI; +import jakarta.enterprise.util.AnnotationLiteral; +import jakarta.xml.ws.BindingProvider; import io.quarkiverse.cxf.CXFClientInfo; import io.quarkiverse.cxf.CXFServletInfo; import io.quarkiverse.cxf.CXFServletInfos; +import io.quarkiverse.cxf.ClientInjectionPoint; +import io.quarkiverse.cxf.annotation.CXFClient; +import io.quarkus.arc.Arc; public class CxfJsonRPCService { - private static CXFServletInfos cxfServletInfos; + private static List servletInfos = Collections.emptyList(); + private static List clientInjectionPoints = Collections.emptyList(); public List getServices() { - List servletInfos = cxfServletInfos != null ? new ArrayList<>(cxfServletInfos.getInfos()) - : new ArrayList<>(); - servletInfos.sort(Comparator.comparing(CXFServletInfo::getSei)); return servletInfos; } - public List getClients() { - List clientInfos = new ArrayList<>(allClientInfos()); - clientInfos.sort(Comparator.comparing(CXFClientInfo::getSei)); - return clientInfos; + public int getServiceCount() { + return servletInfos.size(); + } + + public int getClientCount() { + return clientInjectionPoints.size(); } - private static Collection allClientInfos() { - return CDI.current().select(CXFClientInfo.class).stream().collect(Collectors.toCollection(ArrayList::new)); + public List getClients() { + List result = new ArrayList<>(clientInjectionPoints.size()); + for (ClientInjectionPoint ip : clientInjectionPoints) { + + final Object rawClient = Arc.container().instance(ip.getSei(), new CxfClientAnnotationLiteral(ip.getConfigKey())) + .get(); + if (rawClient instanceof BindingProvider) { + final CXFClientInfo clientInfo = (CXFClientInfo) ((BindingProvider) rawClient).getRequestContext() + .get(CXFClientInfo.class.getName()); + final DevUiClientInfo devUiIInfo = new DevUiClientInfo(ip.getConfigKey(), ip.getSei().getName(), + clientInfo.getEndpointAddress(), clientInfo.getWsdlUrl()); + result.add(devUiIInfo); + } + } + result.sort(Comparator.comparing(DevUiClientInfo::getSei)); + return result; + } + + public static void setClientInjectionPoints(List injectionPoints) { + clientInjectionPoints = Collections.unmodifiableList(new ArrayList<>(injectionPoints)); } public static void setServletInfos(CXFServletInfos infos) { - if (cxfServletInfos == null) { - cxfServletInfos = infos; + List tmp = infos != null ? new ArrayList<>(infos.getInfos()) + : new ArrayList<>(); + tmp.sort(Comparator.comparing(CXFServletInfo::getSei)); + servletInfos = Collections.unmodifiableList(tmp); + } + + public static void shutdown() { + servletInfos = Collections.emptyList(); + clientInjectionPoints = Collections.emptyList(); + } + + public static final class CxfClientAnnotationLiteral extends AnnotationLiteral implements CXFClient { + private static final long serialVersionUID = 1L; + + private final String value; + + public CxfClientAnnotationLiteral(String value) { + super(); + this.value = value; } + + @Override + public String value() { + return value; + } + } + } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiClientInfo.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiClientInfo.java new file mode 100644 index 000000000..fcaaaa698 --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiClientInfo.java @@ -0,0 +1,39 @@ +package io.quarkiverse.cxf.devui; + +public class DevUiClientInfo { + + private final String configKey; + private final String sei; + private final String address; + private final String wsdl; + + public DevUiClientInfo(String configKey, String sei, String address, String wsdl) { + super(); + this.configKey = configKey; + this.sei = sei; + this.address = address; + this.wsdl = wsdl; + } + + public String getConfigKey() { + return configKey; + } + + public String getSei() { + return sei; + } + + public String getAddress() { + return address; + } + + public String getWsdl() { + return wsdl; + } + + @Override + public String toString() { + return "DevUiClientInfo [configKey=" + configKey + ", sei=" + sei + ", address=" + address + ", wsdl=" + wsdl + "]"; + } + +} \ No newline at end of file diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiRecorder.java new file mode 100644 index 000000000..9117e5cdf --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiRecorder.java @@ -0,0 +1,13 @@ +package io.quarkiverse.cxf.devui; + +import java.util.List; + +import io.quarkiverse.cxf.ClientInjectionPoint; +import io.quarkus.runtime.annotations.Recorder; + +@Recorder +public class DevUiRecorder { + public void addClientInjectionPoints(List injectionPoints) { + CxfJsonRPCService.setClientInjectionPoints(injectionPoints); + } +} diff --git a/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java b/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java new file mode 100644 index 000000000..3b8414d0f --- /dev/null +++ b/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java @@ -0,0 +1,202 @@ +package io.quarkiverse.cxf.transport.generated; + +import java.util.ArrayDeque; +import java.util.Objects; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; + +/** + * Adapted by sync-quarkus-classes.groovy from + * ResteasyReactiveOutputStream + * from Quarkus. + * + *

+ * + * It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.
+ * In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}. + */ +final class AppendBuffer { + + private final ByteBufAllocator allocator; + + private final int minChunkSize; + + private final int capacity; + + private ByteBuf buffer; + + private ArrayDeque otherBuffers; + + private int size; + + private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) { + this.allocator = allocator; + this.minChunkSize = Math.min(minChunkSize, capacity); + this.capacity = capacity; + } + + /** + * This buffer append data in a single eagerly allocated {@link ByteBuf}. + */ + public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, capacity, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, 0, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or + * as each {@code len}, if greater than it.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) { + return new AppendBuffer(allocator, minChunkSize, capacity); + } + + private ByteBuf lastBuffer() { + if (otherBuffers == null || otherBuffers.isEmpty()) { + return buffer; + } + return otherBuffers.peekLast(); + } + + /** + * It returns how many bytes have been appended
+ * If returns a value different from {@code len}, is it required to invoke {@link #clear} + * that would refill the available capacity till {@link #capacity()} + */ + public int append(byte[] bytes, int off, int len) { + Objects.requireNonNull(bytes); + if (len == 0) { + return 0; + } + int alreadyWritten = 0; + if (minChunkSize > 0) { + var lastBuffer = lastBuffer(); + if (lastBuffer != null) { + int availableOnLast = lastBuffer.writableBytes(); + if (availableOnLast > 0) { + int toWrite = Math.min(len, availableOnLast); + lastBuffer.writeBytes(bytes, off, toWrite); + size += toWrite; + len -= toWrite; + // we stop if there's no more to append + if (len == 0) { + return toWrite; + } + off += toWrite; + alreadyWritten = toWrite; + } + } + } + final int availableCapacity = capacity - size; + if (availableCapacity == 0) { + return alreadyWritten; + } + // we can still write some + int toWrite = Math.min(len, availableCapacity); + assert toWrite > 0; + final int chunkCapacity; + if (minChunkSize > 0) { + // Cannot allocate less than minChunkSize, till the limit of capacity left + chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity); + } else { + chunkCapacity = toWrite; + } + var tmpBuf = allocator.directBuffer(chunkCapacity); + try { + tmpBuf.writeBytes(bytes, off, toWrite); + } catch (Throwable t) { + tmpBuf.release(); + throw t; + } + if (buffer == null) { + buffer = tmpBuf; + } else { + boolean resetOthers = false; + try { + if (otherBuffers == null) { + otherBuffers = new ArrayDeque<>(); + resetOthers = true; + } + otherBuffers.add(tmpBuf); + } catch (Throwable t) { + rollback(alreadyWritten, tmpBuf, resetOthers); + throw t; + } + } + size += toWrite; + return toWrite + alreadyWritten; + } + + private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) { + tmpBuf.release(); + if (resetOthers) { + otherBuffers = null; + } + if (alreadyWritten > 0) { + var last = lastBuffer(); + last.writerIndex(last.writerIndex() - alreadyWritten); + size -= alreadyWritten; + assert last.writerIndex() > 0; + } + } + + public ByteBuf clear() { + var firstBuf = buffer; + if (firstBuf == null) { + return null; + } + var others = otherBuffers; + if (others == null || others.isEmpty()) { + size = 0; + buffer = null; + // super fast-path + return firstBuf; + } + return clearBuffers(); + } + + private CompositeByteBuf clearBuffers() { + var firstBuf = buffer; + var others = otherBuffers; + var batch = allocator.compositeDirectBuffer(1 + others.size()); + try { + buffer = null; + size = 0; + batch.addComponent(true, 0, firstBuf); + for (int i = 0, othersCount = others.size(); i < othersCount; i++) { + // if addComponent fail, it takes care of releasing curr and throwing the exception: + batch.addComponent(true, 1 + i, others.poll()); + } + return batch; + } catch (Throwable anyError) { + batch.release(); + releaseOthers(others); + throw anyError; + } + } + + private static void releaseOthers(ArrayDeque others) { + ByteBuf buf; + while ((buf = others.poll()) != null) { + buf.release(); + } + } + + public int capacity() { + return capacity; + } + + public int availableCapacity() { + return capacity - size; + } +} diff --git a/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java b/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java new file mode 100644 index 000000000..1a92283e7 --- /dev/null +++ b/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java @@ -0,0 +1,294 @@ +package io.quarkiverse.cxf.transport.generated; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import jakarta.servlet.ServletOutputStream; +import jakarta.servlet.WriteListener; +import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.quarkiverse.cxf.transport.VertxReactiveRequestContext; +import io.quarkus.vertx.core.runtime.VertxBufferImpl; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; + +/** + * Adapted by sync-quarkus-classes.groovy from + * ResteasyReactiveOutputStream + * from Quarkus. + */ +public class VertxServletOutputStream extends ServletOutputStream { + + private static final Logger log = Logger.getLogger("org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveOutputStream"); + + private final VertxReactiveRequestContext context; + + protected final HttpServerRequest request; + + private final AppendBuffer appendBuffer; + + private boolean committed; + + private boolean closed; + + protected boolean waitingForDrain; + + protected boolean drainHandlerRegistered; + + protected boolean first = true; + + protected Throwable throwable; + + private ByteArrayOutputStream overflow; + + public VertxServletOutputStream(VertxReactiveRequestContext context) { + this.context = context; + this.request = context.getContext().request(); + this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize()); + request.response().exceptionHandler(new Handler() { + + @Override + public void handle(Throwable event) { + throwable = event; + log.debugf(event, "IO Exception "); + //TODO: do we need this? + terminateResponse(); + request.connection().close(); + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + } + } + }); + context.getContext().addEndHandler(new Handler>() { + + @Override + public void handle(AsyncResult event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + } + terminateResponse(); + } + }); + } + + public void terminateResponse() { + } + + Buffer createBuffer(ByteBuf data) { + return new VertxBufferImpl(data); + } + + public void write(ByteBuf data, boolean last) throws IOException { + if (last && data == null) { + request.response().end((Handler>) null); + return; + } + //do all this in the same lock + synchronized (request.connection()) { + try { + boolean bufferRequired = awaitWriteable() || (overflow != null && overflow.size() > 0); + if (bufferRequired) { + //just buffer everything + registerDrainHandler(); + if (overflow == null) { + overflow = new ByteArrayOutputStream(); + } + if (data.hasArray()) { + overflow.write(data.array(), data.arrayOffset() + data.readerIndex(), data.readableBytes()); + } else { + data.getBytes(data.readerIndex(), overflow, data.readableBytes()); + } + if (last) { + closed = true; + } + data.release(); + } else { + if (last) { + request.response().end(createBuffer(data), null); + } else { + request.response().write(createBuffer(data), null); + } + } + } catch (Exception e) { + if (data != null && data.refCnt() > 0) { + data.release(); + } + throw new IOException("Failed to write", e); + } + } + } + + private boolean awaitWriteable() throws IOException { + if (Context.isOnEventLoopThread()) { + return request.response().writeQueueFull(); + } + if (first) { + first = false; + return false; + } + assert Thread.holdsLock(request.connection()); + while (request.response().writeQueueFull()) { + if (throwable != null) { + throw new IOException(throwable); + } + if (request.response().closed()) { + throw new IOException("Connection has been closed"); + } + registerDrainHandler(); + try { + waitingForDrain = true; + request.connection().wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } finally { + waitingForDrain = false; + } + } + return false; + } + + private void registerDrainHandler() { + if (!drainHandlerRegistered) { + drainHandlerRegistered = true; + Handler handler = new Handler() { + + @Override + public void handle(Void event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + if (overflow != null) { + if (overflow.size() > 0) { + if (closed) { + request.response().end(Buffer.buffer(overflow.toByteArray()), null); + } else { + request.response().write(Buffer.buffer(overflow.toByteArray()), null); + } + overflow.reset(); + } + } + } + } + }; + request.response().drainHandler(handler); + request.response().closeHandler(handler); + } + } + + /** + * {@inheritDoc} + */ + public void write(final int b) throws IOException { + write(new byte[] { (byte) b }, 0, 1); + } + + /** + * {@inheritDoc} + */ + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** + * {@inheritDoc} + */ + public void write(final byte[] b, final int off, final int len) throws IOException { + if (len < 1) { + return; + } + if (closed) { + throw new IOException("Stream is closed"); + } + int rem = len; + int idx = off; + try { + while (rem > 0) { + final int written = appendBuffer.append(b, idx, rem); + if (written < rem) { + writeBlocking(appendBuffer.clear(), false); + } + rem -= written; + idx += written; + } + } catch (Exception e) { + throw new IOException(e); + } + } + + public void writeBlocking(ByteBuf buffer, boolean finished) throws IOException { + prepareWrite(buffer, finished); + write(buffer, finished); + } + + private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException { + if (!committed) { + committed = true; + if (finished) { + final HttpServerResponse response = request.response(); + if (!response.headWritten()) { + if (buffer == null) { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); + } else { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buffer.readableBytes())); + } + } + } else { + request.response().setChunked(true); + } + } + } + + /** + * {@inheritDoc} + */ + public void flush() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + try { + var toFlush = appendBuffer.clear(); + if (toFlush != null) { + writeBlocking(toFlush, false); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * {@inheritDoc} + */ + public void close() throws IOException { + if (closed) + return; + try { + writeBlocking(appendBuffer.clear(), true); + } catch (Exception e) { + throw new IOException(e); + } finally { + closed = true; + } + } + + @Override + public boolean isReady() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteListener(WriteListener writeListener) { + throw new UnsupportedOperationException(); + } +}