Skip to content

Commit

Permalink
KAFKA-17411: Create local state Standbys on start (apache#16922)
Browse files Browse the repository at this point in the history
Instead of waiting until Tasks are assigned to us, we pre-emptively
create a StandbyTask for each non-empty Task directory found on-disk.

We do this before starting any StreamThreads, and on our first
assignment (after joining the consumer group), we recycle any of these
StandbyTasks that were assigned to us, either as an Active or a
Standby.

We can't just use these "initial Standbys" as-is, because they were
constructed outside the context of a StreamThread, so we first have to
update them with the context (log context, ChangelogReader, and source
topics) of the thread that it has been assigned to.

The motivation for this is to (in a later commit) read StateStore
offsets for unowned Tasks from the StateStore itself, rather than the
.checkpoint file, which we plan to deprecate and remove.

There are a few additional benefits:

Initializing these Tasks on start-up, instead of on-assignment, will
reduce the time between a member joining the consumer group and beginning
processing. This is especially important when active tasks are being moved
over, for example, as part of a rolling restart.

If a Task has corrupt data on-disk, it will be discovered on startup and
wiped under EOS. This is preferable to wiping the state after being
assigned the Task, because another instance may have non-corrupt data and
would not need to restore (as much).

There is a potential performance impact: we open all on-disk Task
StateStores, and keep them all open until we have our first assignment.
This could require large amounts of memory, in particular when there are
a large number of local state stores on-disk.

However, since old local state for Tasks we don't own is automatically
cleaned up after a period of time, in practice, we will almost always
only be dealing with the state that was last assigned to the local
instance.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Bruno Cadonna <[email protected]>, Matthias Sax <[email protected]>
  • Loading branch information
nicktelford authored Oct 29, 2024
1 parent 7366f04 commit 571f508
Show file tree
Hide file tree
Showing 9 changed files with 551 additions and 17 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>

<suppress checks="MethodLength"
files="KTableImpl.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public class KafkaStreams implements AutoCloseable {
protected final TopologyMetadata topologyMetadata;
private final QueryableStoreProvider queryableStoreProvider;
private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener;
private final LogContext logContext;

GlobalStreamThread globalStreamThread;
protected StateDirectory stateDirectory = null;
Expand Down Expand Up @@ -677,6 +678,9 @@ private void maybeSetRunning() {
return;
}

// all (alive) threads have received their assignment, close any remaining startup tasks, they're not needed
stateDirectory.closeStartupTasks();

setState(State.RUNNING);
}

Expand Down Expand Up @@ -999,7 +1003,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
} else {
clientId = userClientId;
}
final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
logContext = new LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
topologyMetadata.setLog(logContext);

Expand Down Expand Up @@ -1411,6 +1415,9 @@ private static HostInfo parseHostInfo(final String endPoint) {
*/
public synchronized void start() throws IllegalStateException, StreamsException {
if (setState(State.REBALANCING)) {
log.debug("Initializing STANDBY tasks for existing local state");
stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext);

log.debug("Starting Streams client");

if (globalStreamThread != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -166,11 +167,11 @@ public String toString() {

private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";

private final String logPrefix;
private String logPrefix;

private final TaskId taskId;
private final boolean eosEnabled;
private final ChangelogRegister changelogReader;
private ChangelogRegister changelogReader;
private final Collection<TopicPartition> sourcePartitions;
private final Map<String, String> storeToChangelogTopic;

Expand Down Expand Up @@ -222,6 +223,38 @@ public ProcessorStateManager(final TaskId taskId,
log.debug("Created state store manager for task {}", taskId);
}

/**
* Special constructor used by {@link StateDirectory} to partially initialize startup tasks for local state, before
* they're assigned to a thread. When the task is assigned to a thread, the initialization of this StateManager is
* completed in {@link #assignToStreamThread(LogContext, ChangelogRegister, Collection)}.
*/
static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
final boolean eosEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final Map<String, String> storeToChangelogTopic,
final boolean stateUpdaterEnabled) {
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, new HashSet<>(0), stateUpdaterEnabled);
}

/**
* Standby tasks initialized for local state on-startup are only partially initialized, because they are not yet
* assigned to a StreamThread. Once assigned to a StreamThread, we complete their initialization here using the
* assigned StreamThread's context.
*/
void assignToStreamThread(final LogContext logContext,
final ChangelogRegister changelogReader,
final Collection<TopicPartition> sourcePartitions) {
if (this.changelogReader != null) {
throw new IllegalStateException("Attempted to replace an existing changelogReader on a StateManager without closing it.");
}
this.sourcePartitions.clear();
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
this.changelogReader = changelogReader;
this.sourcePartitions.addAll(sourcePartitions);
}

void registerStateStores(final List<StateStore> allStores, final InternalProcessorContext processorContext) {
processorContext.uninitialize();
for (final StateStore store : allStores) {
Expand Down Expand Up @@ -314,7 +347,7 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
}

private void maybeRegisterStoreWithChangelogReader(final String storeName) {
if (isLoggingEnabled(storeName)) {
if (isLoggingEnabled(storeName) && changelogReader != null) {
changelogReader.register(getStorePartition(storeName), this);
}
}
Expand Down Expand Up @@ -569,7 +602,7 @@ public void flushCache() {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state stores: {}", stores);

if (!stateUpdaterEnabled) {
if (!stateUpdaterEnabled && changelogReader != null) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}

Expand Down Expand Up @@ -610,7 +643,7 @@ public void close() throws ProcessorStateException {
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);

if (!stateUpdaterEnabled) {
if (!stateUpdaterEnabled && changelogReader != null) {
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -45,11 +50,16 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -100,6 +110,9 @@ public StateDirectoryProcessFile() {
private FileChannel stateDirLockChannel;
private FileLock stateDirLock;

private final StreamsConfig config;
private final ConcurrentMap<TaskId, Task> tasksForLocalState = new ConcurrentHashMap<>();

/**
* Ensures that the state base directory as well as the application's sub-directory are created.
*
Expand All @@ -118,6 +131,7 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean
this.hasPersistentStores = hasPersistentStores;
this.hasNamedTopologies = hasNamedTopologies;
this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.config = config;
final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
final File baseDir = new File(stateDirName);
stateDir = new File(baseDir, appId);
Expand Down Expand Up @@ -182,6 +196,104 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}

public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
final StreamsMetricsImpl streamsMetrics,
final LogContext logContext) {
final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories();
if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics);
final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());

// discover all non-empty task directories in StateDirectory
for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
final String dirName = taskDirectory.file().getName();
final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology());
final ProcessorTopology subTopology = topologyMetadata.buildSubtopology(id);

// we still check if the task's sub-topology is stateful, even though we know its directory contains state,
// because it's possible that the topology has changed since that data was written, and is now stateless
// this therefore prevents us from creating unnecessary Tasks just because of some left-over state
if (subTopology.hasStateWithChangelogs()) {
final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager(
id,
eosEnabled,
logContext,
this,
subTopology.storeToChangelogTopic(),
stateUpdaterEnabled
);

final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
id,
config,
stateManager,
streamsMetrics,
dummyCache
);

final Task task = new StandbyTask(
id,
new HashSet<>(),
subTopology,
topologyMetadata.taskConfig(id),
streamsMetrics,
stateManager,
this,
dummyCache,
context
);

try {
task.initializeIfNeeded();

tasksForLocalState.put(id, task);
} catch (final TaskCorruptedException e) {
// Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it
task.suspend();
task.closeDirty();
}
}
}
}
}

public boolean hasStartupTasks() {
return !tasksForLocalState.isEmpty();
}

public Task removeStartupTask(final TaskId taskId) {
final Task task = tasksForLocalState.remove(taskId);
if (task != null) {
lockedTasksToOwner.replace(taskId, Thread.currentThread());
}
return task;
}

public void closeStartupTasks() {
closeStartupTasks(t -> true);
}

private void closeStartupTasks(final Predicate<Task> predicate) {
if (!tasksForLocalState.isEmpty()) {
// "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close
final Set<Task> drainedTasks = new HashSet<>(tasksForLocalState.size());
for (final Map.Entry<TaskId, Task> entry : tasksForLocalState.entrySet()) {
if (predicate.test(entry.getValue()) && tasksForLocalState.remove(entry.getKey()) != null) {
// only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState
// to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads
drainedTasks.add(entry.getValue());
}
}

// now that we have exclusive ownership of the drained tasks, close them
for (final Task task : drainedTasks) {
task.suspend();
task.closeClean();
}
}
}

public UUID initializeProcessId() {
if (!hasPersistentStores) {
final UUID processId = UUID.randomUUID();
Expand Down Expand Up @@ -379,9 +491,17 @@ synchronized void unlock(final TaskId taskId) {
}
}

/**
* Expose for tests.
*/
Thread lockOwner(final TaskId taskId) {
return lockedTasksToOwner.get(taskId);
}

@Override
public void close() {
if (hasPersistentStores) {
closeStartupTasks();
try {
stateDirLock.release();
stateDirLockChannel.close();
Expand Down Expand Up @@ -499,6 +619,7 @@ private IOException maybeCleanEmptyNamedTopologyDirs(final boolean logExceptionA
);
if (namedTopologyDirs != null) {
for (final File namedTopologyDir : namedTopologyDirs) {
closeStartupTasks(task -> task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName())));
final File[] contents = namedTopologyDir.listFiles();
if (contents != null && contents.length == 0) {
try {
Expand Down Expand Up @@ -536,6 +657,7 @@ public void clearLocalStateForNamedTopology(final String topologyName) {
log.debug("Tried to clear out the local state for NamedTopology {} but none was found", topologyName);
}
try {
closeStartupTasks(task -> task.id().topologyName().equals(topologyName));
Utils.delete(namedTopologyDir);
} catch (final IOException e) {
log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e);
Expand Down
Loading

0 comments on commit 571f508

Please sign in to comment.