Skip to content

Commit

Permalink
Support duplicate context duplication.
Browse files Browse the repository at this point in the history
Motivation:

Duplicating a duplicated context is supported but the duplication semantic is not defined.

Changes:

This update the duplicated context duplication by doing a copy of each local in the duplicated duplicate. This introduce a duplicator for each local that is responsible for copying the object when it is not null.
  • Loading branch information
vietj committed Jan 15, 2025
1 parent d5f613c commit 221fb10
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 24 deletions.
6 changes: 3 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {

static final boolean DISABLE_TIMINGS = SysProps.DISABLE_CONTEXT_TIMINGS.getBoolean();

private final VertxInternal owner;
private final VertxImpl owner;
private final JsonObject config;
private final DeploymentContext deployment;
private final CloseFuture closeFuture;
Expand All @@ -51,7 +51,7 @@ public final class ContextImpl extends ContextBase implements ContextInternal {
final WorkerPool workerPool;
final WorkerTaskQueue executeBlockingTasks;

public ContextImpl(VertxInternal vertx,
public ContextImpl(VertxImpl vertx,
Object[] locals,
EventLoopExecutor eventLoop,
ThreadingModel threadingModel,
Expand Down Expand Up @@ -113,7 +113,7 @@ public EventLoop nettyEventLoop() {
return eventLoop.eventLoop;
}

public VertxInternal owner() {
public VertxImpl owner() {
return owner;
}

Expand Down
19 changes: 14 additions & 5 deletions vertx-core/src/main/java/io/vertx/core/impl/ContextLocalImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,27 @@

import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.function.Function;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class ContextLocalImpl<T> implements ContextLocal<T> {

public static <T> ContextLocal<T> create(Class<T> type, Function<T, T> duplicator) {
synchronized (LocalSeq.class) {
int idx = LocalSeq.locals.size();
ContextLocal<T> local = new ContextLocalImpl<>(idx, duplicator);
LocalSeq.locals.add(local);
return local;
}
}

final int index;
final Function<T, T> duplicator;

public ContextLocalImpl(int index) {
public ContextLocalImpl(int index, Function<T, T> duplicator) {
this.index = index;
}

public ContextLocalImpl() {
this.index = LocalSeq.next();
this.duplicator = duplicator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public boolean isWorkerContext() {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
DuplicatedContext duplicate = new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
delegate.owner().duplicate(this, duplicate);
return duplicate;
}

@Override
Expand Down
25 changes: 16 additions & 9 deletions vertx-core/src/main/java/io/vertx/core/impl/LocalSeq.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
*/
package io.vertx.core.impl;

import io.vertx.core.internal.ContextInternal;
import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -18,20 +23,22 @@
public class LocalSeq {

// 0 : reserved slot for local context map
private static final AtomicInteger seq = new AtomicInteger(1);
static final List<ContextLocal<?>> locals = new ArrayList<>();

static {
reset();
}

/**
* Hook for testing purposes
*/
public static void reset() {
seq.set((1));
}

static int get() {
return seq.get();
public synchronized static void reset() {
// 0 : reserved slot for local context map
locals.clear();
locals.add(ContextInternal.LOCAL_MAP);
}

static int next() {
return seq.getAndIncrement();
synchronized static ContextLocal<?>[] get() {
return locals.toArray(new ContextLocal[0]);
}
}
26 changes: 23 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.vertx.core.net.impl.*;
import io.vertx.core.impl.transports.NioTransport;
import io.vertx.core.spi.context.executor.EventExecutorProvider;
import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.file.FileResolver;
import io.vertx.core.file.impl.FileSystemImpl;
import io.vertx.core.file.impl.WindowsFileSystem;
Expand Down Expand Up @@ -143,7 +145,8 @@ private static ThreadFactory virtualThreadFactory() {
private final FileResolver fileResolver;
private final EventExecutorProvider eventExecutorProvider;
private final Map<ServerID, NetServerInternal> sharedNetServers = new HashMap<>();
private final int contextLocals;
private final ContextLocal<?>[] contextLocals;
private final List<ContextLocal<?>> contextLocalsList;
final WorkerPool workerPool;
final WorkerPool internalWorkerPool;
final WorkerPool virtualThreaWorkerPool;
Expand Down Expand Up @@ -202,6 +205,7 @@ private static ThreadFactory virtualThreadFactory() {
ThreadFactory virtualThreadFactory = virtualThreadFactory();

contextLocals = LocalSeq.get();
contextLocalsList = Collections.unmodifiableList(Arrays.asList(contextLocals));
closeFuture = new CloseFuture(log);
maxEventLoopExecTime = maxEventLoopExecuteTime;
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
Expand Down Expand Up @@ -563,10 +567,10 @@ public boolean cancelTimer(long id) {
}

private Object[] createContextLocals() {
if (contextLocals == 0) {
if (contextLocals.length == 0) {
return EMPTY_CONTEXT_LOCALS;
} else {
return new Object[contextLocals];
return new Object[contextLocals.length];
}
}

Expand Down Expand Up @@ -936,6 +940,11 @@ public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
return hostnameResolver.nettyAddressResolverGroup();
}

@Override
public List<ContextLocal<?>> contextLocals() {
return contextLocalsList;
}

@Override
public FileResolver fileResolver() {
return fileResolver;
Expand Down Expand Up @@ -1317,6 +1326,17 @@ public <C> C createSharedResource(String resourceKey, String resourceName, Close
return SharedResourceHolder.createSharedResource(this, resourceKey, resourceName, closeFuture, supplier);
}

void duplicate(ContextBase src, ContextBase dst) {
for (int i = 0;i < contextLocals.length;i++) {
ContextLocalImpl<?> contextLocal = (ContextLocalImpl<?>) contextLocals[i];
Object local = AccessMode.CONCURRENT.get(src.locals, i);
if (local != null) {
local = ((Function)contextLocal.duplicator).apply(local);
}
AccessMode.CONCURRENT.put(dst.locals, i, local);
}
}

/**
* Reads the version from the {@code vertx-version.txt} file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public interface ContextInternal extends Context {

ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0);
ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0, ConcurrentHashMap::new);

/**
* @return the current context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.NetServerInternal;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.transport.Transport;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.file.FileResolver;
Expand All @@ -33,6 +34,7 @@
import java.lang.ref.Cleaner;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -305,6 +307,11 @@ default <T> Future<T> executeBlockingInternal(Callable<T> blockingCodeHandler) {
*/
AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup();

/**
* @return an immutable list of this vertx instance context locals
*/
List<ContextLocal<?>> contextLocals();

BlockedThreadChecker blockedThreadChecker();

CloseFuture closeFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.NetServerInternal;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.transport.Transport;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.spi.VerticleFactory;
Expand All @@ -42,9 +43,9 @@
import java.lang.ref.Cleaner;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -385,6 +386,11 @@ public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
return delegate.nettyAddressResolverGroup();
}

@Override
public List<ContextLocal<?>> contextLocals() {
return delegate.contextLocals();
}

@Override
public BlockedThreadChecker blockedThreadChecker() {
return delegate.blockedThreadChecker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.impl.ContextLocalImpl;

import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand All @@ -35,7 +36,16 @@ public interface ContextLocal<T> {
* @return the context local storage
*/
static <T> ContextLocal<T> registerLocal(Class<T> type) {
return new ContextLocalImpl<>();
return ContextLocalImpl.create(type, Function.identity());
}

/**
* Registers a context local storage.
*
* @return the context local storage
*/
static <T> ContextLocal<T> registerLocal(Class<T> type, Function<T, T> duplicator) {
return ContextLocalImpl.create(type, duplicator);
}

/**
Expand Down
23 changes: 23 additions & 0 deletions vertx-core/src/test/java/io/vertx/tests/context/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1204,4 +1204,27 @@ public void testInterruptTask(ContextInternal context, Consumer<Runnable> actor)
assertTrue((System.currentTimeMillis() - now) < 2000);
assertTrue(interrupted.get());
}

@Test
public void testNestedDuplicate() {
ContextInternal ctx = ((ContextInternal) vertx.getOrCreateContext()).duplicate();
ctx.putLocal("foo", "bar");
Object expected = new Object();
ctx.putLocal(contextLocal, AccessMode.CONCURRENT, expected);
ContextInternal duplicate = ctx.duplicate();
assertEquals("bar", duplicate.getLocal("foo"));
assertEquals(expected, duplicate.getLocal(contextLocal));
ctx.removeLocal("foo");
ctx.removeLocal(contextLocal, AccessMode.CONCURRENT);
assertEquals("bar", duplicate.getLocal("foo"));
assertEquals(expected, duplicate.getLocal(contextLocal));
}

@Test
public void testContextLocals() {
List<ContextLocal<?>> locals = ((VertxInternal) vertx).contextLocals();
assertSame(ContextInternal.LOCAL_MAP, locals.get(0));
assertSame(contextLocal, locals.get(1));
assertSame(locals, ((VertxInternal) vertx).contextLocals());
}
}

0 comments on commit 221fb10

Please sign in to comment.