Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate Periodic Realtime Segment Metadata Queries: Task Now Publish Schema for Seamless Coordinator Updates #15475

Merged
merged 40 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
43098ba
Changes to publish realtime segment schema changes in segment announc…
findingrish Nov 28, 2023
095c7ea
Merge remote-tracking branch 'upstream/master' into schema_annoucement
findingrish Nov 28, 2023
828d150
Fix checkstyle issues
findingrish Nov 28, 2023
10856d6
Revert changes in CombiningSequenceTest
findingrish Nov 28, 2023
91582fc
Minor changes
findingrish Nov 28, 2023
c0de275
Fix build
findingrish Nov 29, 2023
abeb6a0
Add javadocs
findingrish Nov 29, 2023
fc6249b
minor change
findingrish Nov 29, 2023
1c6af13
Minor change
findingrish Nov 29, 2023
e9b83d4
Update SegmentSchemas pojo, dev testing
findingrish Nov 30, 2023
fac850e
Add tests
findingrish Dec 3, 2023
050c708
Remove forbidden apis
findingrish Dec 3, 2023
a8a90a8
Remove forbidden api usage
findingrish Dec 3, 2023
219e270
Fix tests
findingrish Dec 3, 2023
6394616
Update logic to build sink schema
findingrish Dec 5, 2023
1315922
Explicitly add __time column in Sink#getSignature
findingrish Dec 5, 2023
f9edb58
Remove forbidden api usage
findingrish Dec 5, 2023
3a00d9e
Fix spotbug
findingrish Dec 5, 2023
03a384d
Merge remote-tracking branch 'upstream/master' into schema_annoucement
findingrish Dec 6, 2023
f32d390
Rename config for CentralizedDatasourceSchema feature
findingrish Dec 6, 2023
e6cbc92
Minor changes
findingrish Dec 6, 2023
6aac669
Fix checkstyle
findingrish Dec 8, 2023
bae54c4
Merge remote-tracking branch 'upstream/master' into schema_annoucement
findingrish Dec 8, 2023
b5a656e
Merge remote-tracking branch 'upstream/master' into schema_annoucement
findingrish Dec 11, 2023
2518495
Add guardrail to prevent enabling the feature with zk based segment a…
findingrish Dec 11, 2023
ec56554
checkstyle
findingrish Dec 11, 2023
c110e17
Enable segment schema announcement for realtime segment in IT
findingrish Dec 12, 2023
b0f8637
minor change
findingrish Dec 13, 2023
141158c
null executor service in StreamAppenderator#SinkSchemaAnnouncer if fe…
findingrish Dec 13, 2023
92adfa3
Merge remote-tracking branch 'upstream/master' into schema_annoucement
findingrish Dec 13, 2023
63d43f5
Merge remote-tracking branch 'upstream/master' into schema_annoucement
findingrish Jan 3, 2024
1ffdf80
Address feedback
findingrish Jan 3, 2024
74150f8
Minor change
findingrish Jan 3, 2024
2986317
Throw exception in Peons if feature is enabled alongwith zk based seg…
findingrish Jan 3, 2024
dc7b0ba
Merge remote-tracking branch 'upstream/master' into schema_annoucement
findingrish Jan 4, 2024
9316cb0
Minor changes
findingrish Jan 4, 2024
0513e80
Rename method in DataSegmentAnnouncer
findingrish Jan 4, 2024
2704d1e
Merge remote-tracking branch 'upstream/master' into schema_annoucement
findingrish Jan 8, 2024
9fcfb62
Address feedback
findingrish Jan 8, 2024
4923414
Add test to achieve coverage
findingrish Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ZkPathsConfig;
Expand Down Expand Up @@ -291,6 +292,12 @@ public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}

@Override
public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
{
return CallbackAction.CONTINUE;
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class TaskToolbox

private final TaskLogPusher taskLogPusher;
private final String attemptId;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public TaskToolbox(
SegmentLoaderConfig segmentLoaderConfig,
Expand Down Expand Up @@ -171,7 +173,8 @@ public TaskToolbox(
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
String attemptId
String attemptId,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
this.segmentLoaderConfig = segmentLoaderConfig;
Expand Down Expand Up @@ -215,6 +218,7 @@ public TaskToolbox(
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
this.attemptId = attemptId;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}

public SegmentLoaderConfig getSegmentLoaderConfig()
Expand Down Expand Up @@ -487,6 +491,11 @@ public RuntimeInfo getAdjustedRuntimeInfo()
return createAdjustedRuntimeInfo(JvmUtils.getRuntimeInfo(), appenderatorsManager);
}

public CentralizedDatasourceSchemaConfig getCentralizedTableSchemaConfig()
{
return centralizedDatasourceSchemaConfig;
}

/**
* Create {@link AdjustedRuntimeInfo} based on the given {@link RuntimeInfo} and {@link AppenderatorsManager}. This
* is a way to allow code to properly apportion the amount of processors and heap available to the entire JVM.
Expand Down Expand Up @@ -553,6 +562,7 @@ public static class Builder
private ShuffleClient shuffleClient;
private TaskLogPusher taskLogPusher;
private String attemptId;
private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public Builder()
{
Expand Down Expand Up @@ -598,6 +608,7 @@ public Builder(TaskToolbox other)
this.intermediaryDataManager = other.intermediaryDataManager;
this.supervisorTaskClientProvider = other.supervisorTaskClientProvider;
this.shuffleClient = other.shuffleClient;
this.centralizedDatasourceSchemaConfig = other.centralizedDatasourceSchemaConfig;
}

public Builder config(final SegmentLoaderConfig segmentLoaderConfig)
Expand Down Expand Up @@ -840,6 +851,12 @@ public Builder attemptId(final String attemptId)
return this;
}

public Builder centralizedTableSchemaConfig(final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig)
{
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
return this;
}

public TaskToolbox build()
{
return new TaskToolbox(
Expand Down Expand Up @@ -882,7 +899,8 @@ public TaskToolbox build()
supervisorTaskClientProvider,
shuffleClient,
taskLogPusher,
attemptId
attemptId,
centralizedDatasourceSchemaConfig
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
Expand Down Expand Up @@ -114,6 +115,7 @@ public class TaskToolboxFactory
private final ShuffleClient shuffleClient;
private final TaskLogPusher taskLogPusher;
private final String attemptId;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

@Inject
public TaskToolboxFactory(
Expand Down Expand Up @@ -155,7 +157,8 @@ public TaskToolboxFactory(
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
@AttemptId String attemptId
@AttemptId String attemptId,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
this.segmentLoaderConfig = segmentLoadConfig;
Expand Down Expand Up @@ -197,6 +200,7 @@ public TaskToolboxFactory(
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
this.attemptId = attemptId;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}

public TaskToolbox build(Task task)
Expand Down Expand Up @@ -260,6 +264,7 @@ public TaskToolbox build(TaskConfig config, Task task)
.shuffleClient(shuffleClient)
.taskLogPusher(taskLogPusher)
.attemptId(attemptId)
.centralizedTableSchemaConfig(centralizedDatasourceSchemaConfig)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,8 @@ private Appenderator newAppenderator(
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler,
isUseMaxMemoryEstimates()
isUseMaxMemoryEstimates(),
toolbox.getCentralizedTableSchemaConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
Expand Down Expand Up @@ -310,6 +311,16 @@ public void unannounceSegments(Iterable<DataSegment> segments) throws IOExceptio
}
}
}

@Override
public void announceSegmentSchema(String taskId, SegmentSchemas sinksSchema, SegmentSchemas sinksSchemaChange)
{
}

@Override
public void unannouceTask(String taskId)
{
}
};

// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public Appenderator newAppenderator(
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler,
isUseMaxMemoryEstimates()
isUseMaxMemoryEstimates(),
toolbox.getCentralizedTableSchemaConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
Expand Down Expand Up @@ -155,7 +156,8 @@ public void setUp() throws IOException
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
Expand Down Expand Up @@ -1644,7 +1645,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
Expand Down Expand Up @@ -1017,7 +1018,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);

return toolboxFactory.build(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
Expand Down Expand Up @@ -70,7 +71,8 @@ public Appenderator createRealtimeAppenderatorForTask(
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
realtimeAppenderator = Appenderators.createRealtime(
Expand All @@ -93,7 +95,8 @@ public Appenderator createRealtimeAppenderatorForTask(
cachePopulatorStats,
rowIngestionMeters,
parseExceptionHandler,
useMaxMemoryEstimates
useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
return realtimeAppenderator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.NoopDataSegmentMover;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
Expand Down Expand Up @@ -135,7 +136,8 @@ public void setup() throws IOException
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
runner = new SingleTaskBackgroundRunner(
toolboxFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentTest;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
Expand Down Expand Up @@ -665,7 +666,8 @@ public void announceSegment(DataSegment segment)
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
Expand Down Expand Up @@ -115,7 +116,8 @@ public TestTaskToolboxFactory(
bob.supervisorTaskClientProvider,
bob.shuffleClient,
bob.taskLogPusher,
bob.attemptId
bob.attemptId,
bob.centralizedDatasourceSchemaConfig
);
}

Expand Down Expand Up @@ -159,6 +161,7 @@ public static class Builder
private ShuffleClient shuffleClient;
private TaskLogPusher taskLogPusher;
private String attemptId;
private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public Builder setConfig(TaskConfig config)
{
Expand Down Expand Up @@ -387,5 +390,10 @@ public Builder setAttemptId(String attemptId)
this.attemptId = attemptId;
return this;
}

public void setCentralizedTableSchemaConfig(CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig)
{
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
Expand Down Expand Up @@ -700,7 +701,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.test;

import com.google.common.collect.Sets;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;

Expand Down Expand Up @@ -57,4 +58,13 @@ public void unannounceSegments(Iterable<DataSegment> segments)
}
}

@Override
public void announceSegmentSchema(String taskId, SegmentSchemas segmentSchemas, SegmentSchemas segmentSchemasChange)
{
}

@Override
public void unannouceTask(String taskId)
{
}
}
Loading
Loading