Skip to content

Commit

Permalink
Add TmpStorageProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
rohangarg committed Jan 6, 2025
1 parent d5eb94d commit b27c538
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.local;

import com.google.inject.Provider;
import org.apache.druid.java.util.common.FileUtils;

import java.io.File;

/**
* {@link LocalTmpStorageConfig} is a provider for temporary directories. A default implementation is binded in all services except
* Peon. For peons, a custom implementation is binded in CliPeon which uses the working directory of the peon to create
* a temporary storage. This interface will be guice injectable in all services.
* The cleaning up of the temporary files/directories created in this storage is handled by the caller.
*/
public interface LocalTmpStorageConfig
{
/**
* Get a temporary directory.
*
* @return a temporary directory
*/
File getTmpDir();

class DefaultLocalTmpStorageConfigProvider implements Provider<LocalTmpStorageConfig>
{
private final String prefix;

public DefaultLocalTmpStorageConfigProvider(String prefix)
{
this.prefix = prefix;
}

@Override
public LocalTmpStorageConfig get()
{
File result = FileUtils.createTempDir(prefix);
return () -> result;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.storage.local;

import com.google.inject.Guice;
import com.google.inject.Injector;
import org.junit.Assert;
import org.junit.Test;

import java.util.UUID;

public class LocalTmpStorageConfigTest
{
@Test
public void testDefaultLocalTmpStorage()
{
String tmpString = UUID.randomUUID().toString();
Injector injector = Guice.createInjector(
binder -> binder.bind(LocalTmpStorageConfig.class)
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider(tmpString))
);
LocalTmpStorageConfig localTmpStorageConfig = injector.getInstance(LocalTmpStorageConfig.class);
Assert.assertTrue(localTmpStorageConfig.getTmpDir().getAbsolutePath().contains(tmpString));
}
}
5 changes: 5 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.druid.server.router.TieredBrokerConfig;
import org.apache.druid.sql.calcite.schema.MetadataSegmentView;
import org.apache.druid.sql.guice.SqlModule;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.apache.druid.timeline.PruneLoadSpec;
import org.eclipse.jetty.server.Server;

Expand Down Expand Up @@ -182,6 +183,10 @@ protected List<? extends Module> getModules()

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

binder.bind(LocalTmpStorageConfig.class)
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("broker"))
.in(LazySingleton.class);
},
new LookupModule(),
new SqlModule()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.apache.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
import org.apache.druid.server.router.TieredBrokerConfig;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.eclipse.jetty.server.Server;
import org.joda.time.Duration;

Expand Down Expand Up @@ -293,6 +294,10 @@ public void configure(Binder binder)
binder.bind(CoordinatorCustomDutyGroups.class)
.toProvider(new CoordinatorCustomDutyGroupsProvider())
.in(LazySingleton.class);

binder.bind(LocalTmpStorageConfig.class)
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("coordinator"))
.in(LazySingleton.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;

Expand Down Expand Up @@ -140,6 +141,10 @@ protected List<? extends Module> getModules()

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

binder.bind(LocalTmpStorageConfig.class)
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("historical"))
.in(LazySingleton.class);
},
new LookupModule()
);
Expand Down
4 changes: 4 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.eclipse.jetty.server.Server;

import java.util.List;
Expand Down Expand Up @@ -196,6 +197,9 @@ public void configure(Binder binder)

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
binder.bind(LocalTmpStorageConfig.class)
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("indexer"))
.in(LazySingleton.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;

Expand Down Expand Up @@ -184,6 +185,10 @@ public void configure(Binder binder)
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

configureIntermediaryData(binder);

binder.bind(LocalTmpStorageConfig.class)
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("middle-manager"))
.in(LazySingleton.class);
}

private void configureIntermediaryData(Binder binder)
Expand Down
5 changes: 5 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliOverlord.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.druid.server.security.AuthenticationUtils;
import org.apache.druid.server.security.Authenticator;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.tasklogs.TaskLogs;
import org.eclipse.jetty.rewrite.handler.RewriteHandler;
Expand Down Expand Up @@ -317,6 +318,10 @@ public void configure(Binder binder)

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

binder.bind(LocalTmpStorageConfig.class)
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("overlord"))
.in(LazySingleton.class);
}

private void configureTaskStorage(Binder binder)
Expand Down
16 changes: 16 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliPeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.apache.druid.tasklogs.TaskPayloadManager;
import org.eclipse.jetty.server.Server;

Expand Down Expand Up @@ -355,6 +356,21 @@ public BroadcastDatasourceLoadingSpec getBroadcastDatasourcesToLoad(final Task t
{
return task.getBroadcastDatasourceLoadingSpec();
}

@Provides
@LazySingleton
public LocalTmpStorageConfig getLocalTmpStorage()
{
File tmpDir = new File(taskDirPath, "tmp");
try {
org.apache.druid.java.util.common.FileUtils.mkdirp(tmpDir);
}
catch (IOException e) {
log.error("Failed to create tmp directory for the task");
throw new RuntimeException(e);
}
return () -> tmpDir;
}
},
new QueryablePeonModule(),
new IndexingServiceInputSourceModule(),
Expand Down
5 changes: 5 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.server.router.TieredBrokerHostSelector;
import org.apache.druid.server.router.TieredBrokerSelectorStrategiesProvider;
import org.apache.druid.server.router.TieredBrokerSelectorStrategy;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.eclipse.jetty.server.Server;

import java.util.List;
Expand Down Expand Up @@ -126,6 +127,10 @@ protected List<? extends Module> getModules()

Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));

binder.bind(LocalTmpStorageConfig.class)
.toProvider(new LocalTmpStorageConfig.DefaultLocalTmpStorageConfigProvider("router"))
.in(LazySingleton.class);
},
new LookupSerdeModule()
);
Expand Down
21 changes: 21 additions & 0 deletions services/src/test/java/org/apache/druid/cli/CliPeonTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.cli;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import org.apache.commons.io.FileUtils;
Expand All @@ -42,6 +43,7 @@
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.storage.local.LocalTmpStorageConfig;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
Expand All @@ -57,6 +59,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.easymock.EasyMock.mock;

Expand Down Expand Up @@ -143,6 +146,24 @@ public void testCliPeonHeartbeatDimensions() throws IOException
);
}

@Test
public void testCliPeonLocalTmpStorage() throws IOException
{
File file = temporaryFolder.newFile("task.json");
FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);

CliPeon runnable = new CliPeon();
runnable.taskAndStatusFile = ImmutableList.of(file.getParent(), "1");
Properties properties = new Properties();
runnable.configure(properties);
runnable.configure(properties, GuiceInjectors.makeStartupInjector());
Injector secondaryInjector = runnable.makeInjector();
Assert.assertNotNull(secondaryInjector);

LocalTmpStorageConfig localTmpStorageConfig = secondaryInjector.getInstance(LocalTmpStorageConfig.class);
Assert.assertEquals(new File(file.getParent(), "/tmp").getAbsolutePath(), localTmpStorageConfig.getTmpDir().getAbsolutePath());
}

private static class FakeCliPeon extends CliPeon
{
List<String> taskAndStatusFile = new ArrayList<>();
Expand Down

0 comments on commit b27c538

Please sign in to comment.