diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalTmpStorageConfig.java b/processing/src/main/java/org/apache/druid/storage/local/LocalTmpStorageConfig.java new file mode 100644 index 000000000000..f112cb25a085 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalTmpStorageConfig.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.local; + +import com.google.inject.Provider; +import org.apache.druid.java.util.common.FileUtils; + +import java.io.File; + +/** + * {@link LocalTmpStorageConfig} is a provider for temporary directories. A default implementation is binded in all services except + * Peon. For peons, a custom implementation is binded in CliPeon which uses the working directory of the peon to create + * a temporary storage. This interface will be guice injectable in all services. + * The cleaning up of the temporary files/directories created in this storage is handled by the caller. + */ +public interface LocalTmpStorageConfig +{ + /** + * Get a temporary directory. + * + * @return a temporary directory + */ + File getTmpDir(); + + class DefaultLocalTmpStorageConfigProvider implements Provider + { + private final String prefix; + + public DefaultLocalTmpStorageConfigProvider(String prefix) + { + this.prefix = prefix; + } + + @Override + public LocalTmpStorageConfig get() + { + File result = FileUtils.createTempDir(prefix); + return () -> result; + } + } +} diff --git a/processing/src/test/java/org/apache/druid/storage/local/LocalTmpStorageConfigTest.java b/processing/src/test/java/org/apache/druid/storage/local/LocalTmpStorageConfigTest.java new file mode 100644 index 000000000000..1f36a986e1bb --- /dev/null +++ b/processing/src/test/java/org/apache/druid/storage/local/LocalTmpStorageConfigTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.local; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; + +public class LocalTmpStorageConfigTest +{ + @Test + public void testDefaultLocalTmpStorage() + { + String tmpString = UUID.randomUUID().toString(); + Injector injector = Guice.createInjector( + binder -> binder.bind(LocalTmpStorageConfig.class) + .toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider(tmpString)) + ); + LocalTmpStorageConfig localTmpStorageConfig = injector.getInstance(LocalTmpStorageConfig.class); + Assert.assertTrue(localTmpStorageConfig.getTmpDir().getAbsolutePath().contains(tmpString)); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 4a32bc6ffa83..09212ec75972 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -78,6 +78,7 @@ import org.apache.druid.server.router.TieredBrokerConfig; import org.apache.druid.sql.calcite.schema.MetadataSegmentView; import org.apache.druid.sql.guice.SqlModule; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.apache.druid.timeline.PruneLoadSpec; import org.eclipse.jetty.server.Server; @@ -185,6 +186,10 @@ protected List getModules() Jerseys.addResource(binder, SelfDiscoveryResource.class); LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class)); + + binder.bind(LocalTmpStorageConfig.class) + .toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("broker")) + .in(LazySingleton.class); }, new LookupModule(), new SqlModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index f1f3d9f693e1..b1fc5f634047 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -130,6 +130,7 @@ import org.apache.druid.server.lookup.cache.LookupCoordinatorManagerConfig; import org.apache.druid.server.metrics.ServiceStatusMonitor; import org.apache.druid.server.router.TieredBrokerConfig; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.eclipse.jetty.server.Server; import org.joda.time.Duration; @@ -288,6 +289,9 @@ public void configure(Binder binder) binder.bind(new TypeLiteral>>() {}) .annotatedWith(Names.named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)) .toProvider(HeartbeatSupplier.class); + binder.bind(LocalTmpStorageConfig.class) + .toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("coordinator")) + .in(LazySingleton.class); } binder.bind(CoordinatorCustomDutyGroups.class) diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index ea8bbd994348..433d9ced54ef 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -59,6 +59,7 @@ import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.apache.druid.timeline.PruneLastCompactionState; import org.eclipse.jetty.server.Server; @@ -140,6 +141,10 @@ protected List getModules() Jerseys.addResource(binder, SelfDiscoveryResource.class); LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class)); + + binder.bind(LocalTmpStorageConfig.class) + .toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("historical")) + .in(LazySingleton.class); }, new LookupModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 5decef93018c..db5204b5f895 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -82,6 +82,7 @@ import org.apache.druid.server.initialization.jetty.CliIndexerServerModule; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.eclipse.jetty.server.Server; import java.util.List; @@ -196,6 +197,9 @@ public void configure(Binder binder) Jerseys.addResource(binder, SelfDiscoveryResource.class); LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class)); + binder.bind(LocalTmpStorageConfig.class) + .toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("indexer")) + .in(LazySingleton.class); } @Provides diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 2e542bc69745..0b2a8d02ad8d 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -81,6 +81,7 @@ import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.ServiceStatusMonitor; import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.apache.druid.timeline.PruneLastCompactionState; import org.eclipse.jetty.server.Server; @@ -184,6 +185,10 @@ public void configure(Binder binder) LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class)); configureIntermediaryData(binder); + + binder.bind(LocalTmpStorageConfig.class) + .toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("middle-manager")) + .in(LazySingleton.class); } private void configureIntermediaryData(Binder binder) diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 67a36ed1ff54..81be83947cd8 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -133,6 +133,7 @@ import org.apache.druid.server.security.AuthenticationUtils; import org.apache.druid.server.security.Authenticator; import org.apache.druid.server.security.AuthenticatorMapper; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.apache.druid.tasklogs.TaskLogStreamer; import org.apache.druid.tasklogs.TaskLogs; import org.eclipse.jetty.rewrite.handler.RewriteHandler; @@ -319,6 +320,10 @@ public void configure(Binder binder) Jerseys.addResource(binder, SelfDiscoveryResource.class); LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class)); + + binder.bind(LocalTmpStorageConfig.class) + .toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("overlord")) + .in(LazySingleton.class); } private void configureTaskStorage(Binder binder) diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 9d4bffecb5a8..307ba269a4f4 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -134,6 +134,7 @@ import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import org.apache.druid.server.metrics.ServiceStatusMonitor; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.apache.druid.tasklogs.TaskPayloadManager; import org.eclipse.jetty.server.Server; @@ -355,6 +356,21 @@ public BroadcastDatasourceLoadingSpec getBroadcastDatasourcesToLoad(final Task t { return task.getBroadcastDatasourceLoadingSpec(); } + + @Provides + @LazySingleton + public LocalTmpStorageConfig getLocalTmpStorage() + { + File tmpDir = new File(taskDirPath, "tmp"); + try { + org.apache.druid.java.util.common.FileUtils.mkdirp(tmpDir); + } + catch (IOException e) { + log.error("Failed to create tmp directory for the task"); + throw new RuntimeException(e); + } + return () -> tmpDir; + } }, new QueryablePeonModule(), new IndexingServiceInputSourceModule(), diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java index af0ca7c6eb24..a3344aa4e7fe 100644 --- a/services/src/main/java/org/apache/druid/cli/CliRouter.java +++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java @@ -56,6 +56,7 @@ import org.apache.druid.server.router.TieredBrokerHostSelector; import org.apache.druid.server.router.TieredBrokerSelectorStrategiesProvider; import org.apache.druid.server.router.TieredBrokerSelectorStrategy; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.eclipse.jetty.server.Server; import java.util.List; @@ -126,6 +127,10 @@ protected List getModules() Jerseys.addResource(binder, SelfDiscoveryResource.class); LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class)); + + binder.bind(LocalTmpStorageConfig.class) + .toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("router")) + .in(LazySingleton.class); }, new LookupSerdeModule() ); diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java index c3ed57cac61d..456f70e7012b 100644 --- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java @@ -19,6 +19,7 @@ package org.apache.druid.cli; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Injector; import org.apache.commons.io.FileUtils; @@ -42,6 +43,7 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import org.apache.druid.storage.local.LocalTmpStorageConfig; import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; @@ -57,6 +59,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; import static org.easymock.EasyMock.mock; @@ -143,6 +146,24 @@ public void testCliPeonHeartbeatDimensions() throws IOException ); } + @Test + public void testCliPeonLocalTmpStorage() throws IOException + { + File file = temporaryFolder.newFile("task.json"); + FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8); + + CliPeon runnable = new CliPeon(); + runnable.taskAndStatusFile = ImmutableList.of(file.getParent(), "1"); + Properties properties = new Properties(); + runnable.configure(properties); + runnable.configure(properties, GuiceInjectors.makeStartupInjector()); + Injector secondaryInjector = runnable.makeInjector(); + Assert.assertNotNull(secondaryInjector); + + LocalTmpStorageConfig localTmpStorageConfig = secondaryInjector.getInstance(LocalTmpStorageConfig.class); + Assert.assertEquals(new File(file.getParent(), "/tmp").getAbsolutePath(), localTmpStorageConfig.getTmpDir().getAbsolutePath()); + } + private static class FakeCliPeon extends CliPeon { List taskAndStatusFile = new ArrayList<>();