diff --git a/src/main/java/io/vertx/serviceresolver/impl/ServiceResolverImpl.java b/src/main/java/io/vertx/serviceresolver/impl/ServiceResolverImpl.java index 31cb8cb..69c861e 100644 --- a/src/main/java/io/vertx/serviceresolver/impl/ServiceResolverImpl.java +++ b/src/main/java/io/vertx/serviceresolver/impl/ServiceResolverImpl.java @@ -8,14 +8,14 @@ public class ServiceResolverImpl implements ServiceResolver { - private final BiFunction> provider; + private final BiFunction> provider; - public ServiceResolverImpl(BiFunction> provider) { + public ServiceResolverImpl(BiFunction> provider) { this.provider = provider; } @Override - public AddressResolver resolver(Vertx vertx) { + public AddressResolver resolver(Vertx vertx) { return provider.apply(vertx, this); } } diff --git a/src/main/java/io/vertx/serviceresolver/kube/impl/KubeResolverImpl.java b/src/main/java/io/vertx/serviceresolver/kube/impl/KubeResolverImpl.java index e534cb9..53297b6 100644 --- a/src/main/java/io/vertx/serviceresolver/kube/impl/KubeResolverImpl.java +++ b/src/main/java/io/vertx/serviceresolver/kube/impl/KubeResolverImpl.java @@ -19,7 +19,6 @@ import io.vertx.core.net.Address; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.resolver.address.AddressResolver; -import io.vertx.core.spi.resolver.address.Endpoint; import io.vertx.serviceresolver.ServiceAddress; import io.vertx.serviceresolver.kube.KubeResolverOptions; @@ -28,7 +27,7 @@ import static io.vertx.core.http.HttpMethod.GET; -public class KubeResolverImpl implements AddressResolver { +public class KubeResolverImpl implements AddressResolver, B> { final KubeResolverOptions options; final String host; @@ -60,7 +59,7 @@ public ServiceAddress tryCast(Address address) { } @Override - public Future resolve(Function> factory, ServiceAddress address) { + public Future> resolve(Function factory, ServiceAddress address) { return httpClient .request(GET, port, host, "/api/v1/namespaces/" + namespace + "/endpoints") .compose(req -> { @@ -84,7 +83,7 @@ public Future resolve(Function { String resourceVersion = response.getJsonObject("metadata").getString("resourceVersion"); - KubeServiceState state = new KubeServiceState(factory, this, vertx, resourceVersion, address.name()); + KubeServiceState state = new KubeServiceState<>(factory, this, vertx, resourceVersion, address.name()); JsonArray items = response.getJsonArray("items"); for (int i = 0;i < items.size();i++) { JsonObject item = items.getJsonObject(i); @@ -93,14 +92,14 @@ public Future resolve(Function { if (ar.succeeded()) { - KubeServiceState res = ar.result(); + KubeServiceState res = ar.result(); res.connectWebSocket(); } }); } @Override - public List> endpoints(KubeServiceState state) { + public List endpoints(KubeServiceState state) { return state.endpoints.get(); } diff --git a/src/main/java/io/vertx/serviceresolver/kube/impl/KubeServiceState.java b/src/main/java/io/vertx/serviceresolver/kube/impl/KubeServiceState.java index e192768..27d3e12 100644 --- a/src/main/java/io/vertx/serviceresolver/kube/impl/KubeServiceState.java +++ b/src/main/java/io/vertx/serviceresolver/kube/impl/KubeServiceState.java @@ -17,7 +17,6 @@ import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.net.SocketAddress; -import io.vertx.core.spi.resolver.address.Endpoint; import java.util.ArrayList; import java.util.Collections; @@ -25,18 +24,18 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -class KubeServiceState { +class KubeServiceState { final String name; final Vertx vertx; final KubeResolverImpl resolver; - final Function> endpointFactory; + final Function endpointFactory; String lastResourceVersion; boolean disposed; WebSocket ws; - AtomicReference>> endpoints = new AtomicReference<>(Collections.emptyList()); + AtomicReference> endpoints = new AtomicReference<>(Collections.emptyList()); - KubeServiceState(Function> endpointFactory, KubeResolverImpl resolver, Vertx vertx, String lastResourceVersion, String name) { + KubeServiceState(Function endpointFactory, KubeResolverImpl resolver, Vertx vertx, String lastResourceVersion, String name) { this.endpointFactory = endpointFactory; this.name = name; this.resolver = resolver; @@ -102,7 +101,7 @@ void handleEndpoints(JsonObject item) { if (this.name.equals(name)) { JsonArray subsets = item.getJsonArray("subsets"); if (subsets != null) { - List> endpoints = new ArrayList<>(); + List endpoints = new ArrayList<>(); for (int j = 0;j < subsets.size();j++) { List podIps = new ArrayList<>(); JsonObject subset = subsets.getJsonObject(j); diff --git a/src/main/java/io/vertx/serviceresolver/srv/impl/SrvResolverImpl.java b/src/main/java/io/vertx/serviceresolver/srv/impl/SrvResolverImpl.java index 4efc106..9d13595 100644 --- a/src/main/java/io/vertx/serviceresolver/srv/impl/SrvResolverImpl.java +++ b/src/main/java/io/vertx/serviceresolver/srv/impl/SrvResolverImpl.java @@ -17,7 +17,6 @@ import io.vertx.core.net.Address; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.resolver.address.AddressResolver; -import io.vertx.core.spi.resolver.address.Endpoint; import io.vertx.serviceresolver.ServiceAddress; import io.vertx.serviceresolver.srv.SrvResolverOptions; @@ -25,7 +24,7 @@ import java.util.List; import java.util.function.Function; -public class SrvResolverImpl implements AddressResolver { +public class SrvResolverImpl implements AddressResolver, B> { Vertx vertx; DnsClient client; @@ -45,19 +44,21 @@ public ServiceAddress tryCast(Address address) { } @Override - public Future resolve(Function> factory, ServiceAddress address) { + public Future> resolve(Function factory, ServiceAddress address) { Future> fut = client.resolveSRV(address.name()); return fut.map(records -> { - List> endpoints = new ArrayList<>(); + long ttl = 10_000_000; + List endpoints = new ArrayList<>(); for (SrvRecord record : records) { endpoints.add(factory.apply(record)); + ttl = Math.min(ttl, record.ttl()); } - return new SrvServiceState(System.currentTimeMillis(), endpoints); + return new SrvServiceState<>(endpoints, System.currentTimeMillis() + 1000 * ttl); }); } @Override - public List> endpoints(SrvServiceState state) { + public List endpoints(SrvServiceState state) { return state.endpoints; } diff --git a/src/main/java/io/vertx/serviceresolver/srv/impl/SrvServiceState.java b/src/main/java/io/vertx/serviceresolver/srv/impl/SrvServiceState.java index 4242993..57b5bab 100644 --- a/src/main/java/io/vertx/serviceresolver/srv/impl/SrvServiceState.java +++ b/src/main/java/io/vertx/serviceresolver/srv/impl/SrvServiceState.java @@ -10,28 +10,19 @@ */ package io.vertx.serviceresolver.srv.impl; -import io.vertx.core.dns.SrvRecord; -import io.vertx.core.spi.resolver.address.Endpoint; - import java.util.List; -class SrvServiceState { +class SrvServiceState { - final long timestamp; - final List> endpoints; + final List endpoints; + final long expirationMs; - public SrvServiceState(long timestamp, List> endpoints) { - this.timestamp = timestamp; + public SrvServiceState(List endpoints, long expirationMs) { this.endpoints = endpoints; + this.expirationMs = expirationMs; } boolean isValid() { - long now = System.currentTimeMillis(); - for (Endpoint endpoint : endpoints) { - if (now > endpoint.get().ttl() * 1000 + timestamp) { - return false; - } - } - return true; + return System.currentTimeMillis() <= expirationMs; } } diff --git a/src/test/java/io/vertx/serviceresolver/srv/SrvServiceResolverTest.java b/src/test/java/io/vertx/serviceresolver/srv/SrvServiceResolverTest.java index ac490d4..217080a 100644 --- a/src/test/java/io/vertx/serviceresolver/srv/SrvServiceResolverTest.java +++ b/src/test/java/io/vertx/serviceresolver/srv/SrvServiceResolverTest.java @@ -1,14 +1,11 @@ package io.vertx.serviceresolver.srv; -import io.vertx.core.VertxOptions; -import io.vertx.core.dns.AddressResolverOptions; import io.vertx.ext.unit.TestContext; import io.vertx.serviceresolver.ServiceAddress; import io.vertx.serviceresolver.ServiceResolverTestBase; import io.vertx.test.fakedns.FakeDNSServer; import org.apache.directory.server.dns.messages.*; import org.apache.directory.server.dns.store.DnsAttribute; -import org.junit.Ignore; import org.junit.Test; import org.apache.directory.server.dns.store.RecordStore; diff --git a/src/test/java/mock/MockResolver.java b/src/test/java/mock/MockResolver.java index 9279ca3..530ca51 100644 --- a/src/test/java/mock/MockResolver.java +++ b/src/test/java/mock/MockResolver.java @@ -4,7 +4,6 @@ import io.vertx.core.net.Address; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.resolver.address.AddressResolver; -import io.vertx.core.spi.resolver.address.Endpoint; import io.vertx.serviceresolver.impl.*; import io.vertx.serviceresolver.ServiceAddress; import io.vertx.serviceresolver.ServiceResolver; @@ -14,7 +13,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.Function; -public class MockResolver implements AddressResolver { +public class MockResolver implements AddressResolver, B> { public static ServiceResolver create(MockController controller) { return new ServiceResolverImpl((vertx, lookup) -> { @@ -48,12 +47,12 @@ public SocketAddress addressOfEndpoint(SocketAddress endpoint) { } @Override - public Future resolve(Function> factory, ServiceAddress address) { + public Future> resolve(Function factory, ServiceAddress address) { List endpoints = templates.get(address.name()); if (endpoints == null) { return Future.failedFuture("No addresses for service svc"); } - MockServiceState state = new MockServiceState(); + MockServiceState state = new MockServiceState<>(); // state.set(endpoints); return Future.succeededFuture(state); } @@ -64,7 +63,7 @@ public void dispose(MockServiceState state) { } @Override - public List> endpoints(MockServiceState state) { + public List endpoints(MockServiceState state) { throw new UnsupportedOperationException(); } diff --git a/src/test/java/mock/MockServiceState.java b/src/test/java/mock/MockServiceState.java index 8d58271..000a109 100644 --- a/src/test/java/mock/MockServiceState.java +++ b/src/test/java/mock/MockServiceState.java @@ -1,6 +1,6 @@ package mock; -public class MockServiceState { +public class MockServiceState { boolean disposed;