diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 685dbd92c..d5cd02f2e 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.management.remote.JMXServiceURL; @@ -70,6 +71,7 @@ @ApplicationScoped public class KubeApiDiscovery implements ResourceEventHandler { + private static final String ALL_NAMESPACES = "*"; private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY"; private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC"; @@ -108,30 +110,42 @@ public class KubeApiDiscovery implements ResourceEventHandler { @Override protected HashMap> initialize() throws ConcurrentException { - // TODO: add support for some wildcard indicating a single Informer for any - // namespace that Cryostat has permissions to. This will need some restructuring - // of how the namespaces within the discovery tree are mapped. var result = new HashMap>(); - kubeConfig - .getWatchNamespaces() - .forEach( - ns -> { - result.put( - ns, - client.endpoints() - .inNamespace(ns) - .inform( - KubeApiDiscovery.this, - informerResyncPeriod.toMillis())); - logger.debugv( - "Started Endpoints SharedInformer for namespace" - + " \"{0}\" with resync period {1}", - ns, informerResyncPeriod); - }); + if (watchAllNamespaces()) { + result.put( + ALL_NAMESPACES, + client.endpoints() + .inAnyNamespace() + .inform( + KubeApiDiscovery.this, + informerResyncPeriod.toMillis())); + } else { + kubeConfig + .getWatchNamespaces() + .forEach( + ns -> { + result.put( + ns, + client.endpoints() + .inNamespace(ns) + .inform( + KubeApiDiscovery.this, + informerResyncPeriod + .toMillis())); + logger.debugv( + "Started Endpoints SharedInformer for namespace" + + " \"{0}\" with resync period {1}", + ns, informerResyncPeriod); + }); + } return result; } }; + private boolean watchAllNamespaces() { + return kubeConfig.getWatchNamespaces().stream().anyMatch(ns -> ALL_NAMESPACES.equals(ns)); + } + void onStart(@Observes StartupEvent evt) { if (!enabled()) { return; @@ -144,18 +158,26 @@ void onStart(@Observes StartupEvent evt) { logger.debugv("Starting {0} client", REALM); safeGetInformers(); - resyncWorker.scheduleAtFixedRate( - () -> { - try { - logger.debug("Resyncing"); - notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces())); - } catch (Exception e) { - logger.warn(e); - } - }, - 0, - informerResyncPeriod.toMillis(), - TimeUnit.MILLISECONDS); + // TODO we should not need to force manual re-syncs this way - the Informer is already + // supposed to resync itself. + if (!watchAllNamespaces()) { + resyncWorker.scheduleAtFixedRate( + () -> { + try { + logger.debug("Resyncing"); + notify( + NamespaceQueryEvent.from( + kubeConfig.getWatchNamespaces().stream() + .filter(ns -> !ALL_NAMESPACES.equals(ns)) + .toList())); + } catch (Exception e) { + logger.warn(e); + } + }, + 0, + informerResyncPeriod.toMillis(), + TimeUnit.MILLISECONDS); + } } void onStop(@Observes ShutdownEvent evt) { @@ -226,6 +248,15 @@ List tuplesFromEndpoints(Endpoints endpoints) { for (EndpointPort port : subset.getPorts()) { for (EndpointAddress addr : subset.getAddresses()) { var ref = addr.getTargetRef(); + if (ref == null) { + logger.debugv( + "Endpoints object {0} in {1} with address {2} had a null" + + " targetRef", + endpoints.getMetadata().getName(), + endpoints.getMetadata().getNamespace(), + addr.getIp()); + continue; + } tts.add( new TargetTuple( ref, @@ -295,8 +326,20 @@ public void handleQueryEvent(NamespaceQueryEvent evt) { persistedTargets.add(node.target); } + Stream endpoints; + if (watchAllNamespaces()) { + endpoints = + safeGetInformers().get(ALL_NAMESPACES).getStore().list().stream() + .filter( + ep -> + Objects.equals( + ep.getMetadata().getNamespace(), + namespace)); + } else { + endpoints = safeGetInformers().get(namespace).getStore().list().stream(); + } Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() + endpoints .map((endpoint) -> getTargetTuplesFrom(endpoint)) .flatMap(List::stream) .filter((tuple) -> Objects.nonNull(tuple.objRef))