Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async task handling improvement #209

Merged
merged 17 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-1214.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "dev/1.21.4" ]
pull_request:
branches: [ "ver/1.21.4" ]
branches: [ "dev/1.21.4" ]

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Original project: https://github.com/KaiijuMC/Kaiiju
Original license: GPLv3
Original project: https://github.com/Bloom-host/Petal

Co-authored-by: HaHaWTH <[email protected]>
Co-authored-by: Taiyou06 <[email protected]>
Co-authored-by: Altiami <[email protected]>

This patch was ported downstream from the Petal fork.

Makes most pathfinding-related work happen asynchronously
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.dreeam.leaf.async.path;

import com.destroystokyo.paper.util.SneakyThrow;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import net.minecraft.server.MinecraftServer;
import net.minecraft.world.level.pathfinder.Path;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;

Expand All @@ -16,19 +21,59 @@
*/
public class AsyncPathProcessor {

private static final Executor pathProcessingExecutor = new ThreadPoolExecutor(
private static final String THREAD_PREFIX = "Leaf Async Pathfinding";
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
private static long lastWarnMillis = System.currentTimeMillis();
private static final ThreadPoolExecutor pathProcessingExecutor = new ThreadPoolExecutor(
1,
org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingMaxThreads,
org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingKeepalive, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
getQueueImpl(),
new ThreadFactoryBuilder()
.setNameFormat("Leaf Async Pathfinding Thread - %d")
.setNameFormat(THREAD_PREFIX + " Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.build()
.build(),
new RejectedTaskHandler()
);

private static class RejectedTaskHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable rejectedTask, ThreadPoolExecutor executor) {
BlockingQueue<Runnable> workQueue = executor.getQueue();
if (!executor.isShutdown()) {
switch (org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingRejectPolicy) {
case FLUSH_ALL -> {
if (!workQueue.isEmpty()) {
List<Runnable> pendingTasks = new ArrayList<>(workQueue.size());

workQueue.drainTo(pendingTasks);

for (Runnable pendingTask : pendingTasks) {
pendingTask.run();
}
}
rejectedTask.run();
}
case CALLER_RUNS -> rejectedTask.run();
}
}

if (System.currentTimeMillis() - lastWarnMillis > 30000L) {
LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in Leaf config may help.");
lastWarnMillis = System.currentTimeMillis();
}
}
}

protected static CompletableFuture<Void> queue(@NotNull AsyncPath path) {
return CompletableFuture.runAsync(path::process, pathProcessingExecutor);
return CompletableFuture.runAsync(path::process, pathProcessingExecutor)
.orTimeout(60L, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException e) {
LOGGER.warn("Async Pathfinding process timed out", e);
} else SneakyThrow.sneaky(throwable);
return null;
});
}

/**
Expand All @@ -48,4 +93,9 @@ public static void awaitProcessing(@Nullable Path path, Consumer<@Nullable Path>
afterProcessing.accept(path);
}
}
}

private static BlockingQueue<Runnable> getQueueImpl() {
final int queueCapacity = org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingQueueSize;

return new LinkedBlockingQueue<>(queueCapacity);
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.dreeam.leaf.async.path;

import org.dreeam.leaf.config.LeafConfig;

import java.util.Locale;

public enum PathfindTaskRejectPolicy {
FLUSH_ALL,
CALLER_RUNS;

public static PathfindTaskRejectPolicy fromString(String policy) {
try {
return PathfindTaskRejectPolicy.valueOf(policy.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
LeafConfig.LOGGER.warn("Invalid pathfind task reject policy: {}, falling back to {}.", policy, FLUSH_ALL.toString());
return FLUSH_ALL;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

public class MultithreadedTracker {

private static final Logger LOGGER = LogManager.getLogger("MultithreadedTracker");
private static final String THREAD_PREFIX = "Leaf Async Tracker";
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
private static long lastWarnMillis = System.currentTimeMillis();
private static final ThreadPoolExecutor trackerExecutor = new ThreadPoolExecutor(
getCorePoolSize(),
Expand Down Expand Up @@ -128,27 +129,23 @@ private static int getCorePoolSize() {
}

private static int getMaxPoolSize() {
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize ? Integer.MAX_VALUE : org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads;
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads;
}

private static long getKeepAliveTime() {
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize ? 30L : org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive;
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive;
}

private static BlockingQueue<Runnable> getQueueImpl() {
if (org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize) {
return new SynchronousQueue<>();
}

final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads * (Math.max(org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads, 4));
final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerQueueSize;

return new LinkedBlockingQueue<>(queueCapacity);
}

private static @NotNull ThreadFactory getThreadFactory() {
return new ThreadFactoryBuilder()
.setThreadFactory(MultithreadedTrackerThread::new)
.setNameFormat("Leaf Async Tracker Thread - %d")
.setNameFormat(THREAD_PREFIX + " Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.dreeam.leaf.config.modules.async;

import org.dreeam.leaf.async.path.PathfindTaskRejectPolicy;
import org.dreeam.leaf.config.ConfigModules;
import org.dreeam.leaf.config.EnumConfigCategory;
import org.dreeam.leaf.config.LeafConfig;
Expand All @@ -13,12 +14,25 @@ public String getBasePath() {
public static boolean enabled = false;
public static int asyncPathfindingMaxThreads = 0;
public static int asyncPathfindingKeepalive = 60;
public static int asyncPathfindingQueueSize = 0;
public static PathfindTaskRejectPolicy asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.FLUSH_ALL;

@Override
public void onLoaded() {
enabled = config.getBoolean(getBasePath() + ".enabled", enabled);
asyncPathfindingMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncPathfindingMaxThreads);
asyncPathfindingKeepalive = config.getInt(getBasePath() + ".keepalive", asyncPathfindingKeepalive);
asyncPathfindingQueueSize = config.getInt(getBasePath() + ".queue-size", asyncPathfindingQueueSize);
asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.fromString(config.getString(getBasePath() + ".reject-policy", asyncPathfindingRejectPolicy.toString(), config.pickStringRegionBased(
"""
The policy to use when the queue is full and a new task is submitted.
FLUSH_ALL: All pending tasks will be run on server thread.
CALLER_RUNS: Newly submitted task will be run on server thread.""",
"""
当队列满时, 新提交的任务将使用以下策略处理.
FLUSH_ALL: 所有等待中的任务都将在主线程上运行.
CALLER_RUNS: 新提交的任务将在主线程上运行."""
)));

if (asyncPathfindingMaxThreads < 0)
asyncPathfindingMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncPathfindingMaxThreads, 1);
Expand All @@ -28,5 +42,8 @@ else if (asyncPathfindingMaxThreads == 0)
asyncPathfindingMaxThreads = 0;
else
LeafConfig.LOGGER.info("Using {} threads for Async Pathfinding", asyncPathfindingMaxThreads);

if (asyncPathfindingQueueSize <= 0)
asyncPathfindingQueueSize = asyncPathfindingMaxThreads * Math.max(asyncPathfindingMaxThreads, 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public String getBasePath() {

public static boolean enabled = false;
public static boolean compatModeEnabled = false;
public static boolean autoResize = false;
public static int asyncEntityTrackerMaxThreads = 0;
public static int asyncEntityTrackerKeepalive = 60;
public static int asyncEntityTrackerQueueSize = 0;

@Override
public void onLoaded() {
Expand All @@ -33,16 +33,9 @@ public void onLoaded() {
"""
是否启用兼容模式,
如果你的服务器安装了 Citizens 或其他类似非发包 NPC 插件, 请开启此项."""));
autoResize = config.getBoolean(getBasePath() + ".auto-resize", autoResize, config.pickStringRegionBased("""
Auto adjust thread pool size based on server load,
This will tweak thread pool size dynamically,
overrides max-threads and keepalive.""",
"""
根据服务器负载自动调整线程池大小,
这会使线程池大小动态调整,
覆盖设置 max-threads 和 keepalive."""));
asyncEntityTrackerMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncEntityTrackerMaxThreads);
asyncEntityTrackerKeepalive = config.getInt(getBasePath() + ".keepalive", asyncEntityTrackerKeepalive);
asyncEntityTrackerQueueSize = config.getInt(getBasePath() + ".queue-size", asyncEntityTrackerQueueSize);

if (asyncEntityTrackerMaxThreads < 0)
asyncEntityTrackerMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncEntityTrackerMaxThreads, 1);
Expand All @@ -53,5 +46,8 @@ else if (asyncEntityTrackerMaxThreads == 0)
asyncEntityTrackerMaxThreads = 0;
else
LeafConfig.LOGGER.info("Using {} threads for Async Entity Tracker", asyncEntityTrackerMaxThreads);

if (asyncEntityTrackerQueueSize <= 0)
asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * Math.max(asyncEntityTrackerMaxThreads, 4);
}
}