Unionized threads

- Allow the user to configure the number of worker threads
- Allow the user to disable parallelism entirely without a jvm flag
- Move thread count logic into FlwTaskExecutor and make
  ParallelTaskExecutor simply take a thread count as a parameter
This commit is contained in:
Jozufozu 2024-03-31 16:59:53 -07:00
parent d5c5b998bc
commit 506f2d5158
7 changed files with 41 additions and 22 deletions

View file

@ -66,5 +66,5 @@ public interface TaskExecutor extends Executor {
* *
* @return The number of threads this executor uses. * @return The number of threads this executor uses.
*/ */
int getThreadCount(); int threadCount();
} }

View file

@ -66,6 +66,10 @@ public class FlwConfig {
return client.limitUpdates.get(); return client.limitUpdates.get();
} }
public int workerThreads() {
return client.workerThreads.get();
}
public void registerSpecs(ModLoadingContext context) { public void registerSpecs(ModLoadingContext context) {
context.registerConfig(ModConfig.Type.CLIENT, clientSpec); context.registerConfig(ModConfig.Type.CLIENT, clientSpec);
} }
@ -73,6 +77,7 @@ public class FlwConfig {
public static class ClientConfig { public static class ClientConfig {
public final ConfigValue<String> backend; public final ConfigValue<String> backend;
public final BooleanValue limitUpdates; public final BooleanValue limitUpdates;
public final ForgeConfigSpec.IntValue workerThreads;
private ClientConfig(ForgeConfigSpec.Builder builder) { private ClientConfig(ForgeConfigSpec.Builder builder) {
backend = builder.comment("Select the backend to use.") backend = builder.comment("Select the backend to use.")
@ -80,6 +85,10 @@ public class FlwConfig {
limitUpdates = builder.comment("Enable or disable instance update limiting with distance.") limitUpdates = builder.comment("Enable or disable instance update limiting with distance.")
.define("limitUpdates", true); .define("limitUpdates", true);
workerThreads = builder.comment("The number of worker threads to use. Set to -1 to let Flywheel decide. Set to 0 to disable parallelism. Requires a game restart to take effect.")
.defineInRange("workerThreads", -1, -1, Runtime.getRuntime()
.availableProcessors());
} }
} }
} }

View file

@ -4,11 +4,12 @@ import org.apache.commons.lang3.concurrent.AtomicSafeInitializer;
import org.apache.commons.lang3.concurrent.ConcurrentUtils; import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.config.FlwConfig;
import com.mojang.blaze3d.systems.RenderSystem; import com.mojang.blaze3d.systems.RenderSystem;
public final class FlwTaskExecutor { import net.minecraft.util.Mth;
public static final boolean USE_SERIAL_EXECUTOR = System.getProperty("flw.useSerialExecutor") != null;
public final class FlwTaskExecutor {
private static final Initializer INITIALIZER = new Initializer(); private static final Initializer INITIALIZER = new Initializer();
private FlwTaskExecutor() { private FlwTaskExecutor() {
@ -22,14 +23,34 @@ public final class FlwTaskExecutor {
return ConcurrentUtils.initializeUnchecked(INITIALIZER); return ConcurrentUtils.initializeUnchecked(INITIALIZER);
} }
/**
* Returns the "optimal" number of threads to be used for tasks. This will always return at least one thread.
*/
private static int getOptimalThreadCount() {
return Mth.clamp(Math.max(getMaxThreadCount() / 3, getMaxThreadCount() - 6), 1, 10);
}
private static int getMaxThreadCount() {
return Runtime.getRuntime()
.availableProcessors();
}
private static class Initializer extends AtomicSafeInitializer<TaskExecutor> { private static class Initializer extends AtomicSafeInitializer<TaskExecutor> {
@Override @Override
protected TaskExecutor initialize() { protected TaskExecutor initialize() {
if (USE_SERIAL_EXECUTOR) { int threadCount = FlwConfig.get()
.workerThreads();
if (threadCount == 0) {
return SerialTaskExecutor.INSTANCE; return SerialTaskExecutor.INSTANCE;
} else if (threadCount < 0) {
threadCount = getOptimalThreadCount();
} else {
threadCount = Mth.clamp(threadCount, 1, getMaxThreadCount());
} }
ParallelTaskExecutor executor = new ParallelTaskExecutor("Flywheel", RenderSystem::isOnRenderThread); ParallelTaskExecutor executor = new ParallelTaskExecutor("Flywheel", threadCount, RenderSystem::isOnRenderThread);
executor.startWorkers(); executor.startWorkers();
return executor; return executor;
} }

View file

@ -38,14 +38,14 @@ public class ParallelTaskExecutor implements TaskExecutor {
private final ThreadGroupNotifier taskNotifier = new ThreadGroupNotifier(); private final ThreadGroupNotifier taskNotifier = new ThreadGroupNotifier();
private final WaitGroup waitGroup = new WaitGroup(); private final WaitGroup waitGroup = new WaitGroup();
public ParallelTaskExecutor(String name, BooleanSupplier mainThreadQuery) { public ParallelTaskExecutor(String name, int threadCount, BooleanSupplier mainThreadQuery) {
this.name = name; this.name = name;
this.mainThreadQuery = mainThreadQuery; this.mainThreadQuery = mainThreadQuery;
threadCount = getOptimalThreadCount(); this.threadCount = threadCount;
} }
@Override @Override
public int getThreadCount() { public int threadCount() {
return threadCount; return threadCount;
} }
@ -247,17 +247,6 @@ public class ParallelTaskExecutor implements TaskExecutor {
} }
} }
/**
* Returns the "optimal" number of threads to be used for tasks. This will always return at least one thread.
*/
private static int getOptimalThreadCount() {
return Mth.clamp(Math.max(getMaxThreadCount() / 3, getMaxThreadCount() - 6), 1, 10);
}
private static int getMaxThreadCount() {
return Runtime.getRuntime().availableProcessors();
}
private class WorkerThread extends Thread { private class WorkerThread extends Thread {
public WorkerThread(String name) { public WorkerThread(String name) {
super(name); super(name);

View file

@ -35,7 +35,7 @@ public class SerialTaskExecutor implements TaskExecutor {
} }
@Override @Override
public int getThreadCount() { public int threadCount() {
return 1; return 1;
} }

View file

@ -182,7 +182,7 @@ public final class Distribute {
} }
public static int sliceSize(TaskExecutor taskExecutor, int totalSize, int denominator) { public static int sliceSize(TaskExecutor taskExecutor, int totalSize, int denominator) {
return MoreMath.ceilingDiv(totalSize, taskExecutor.getThreadCount() * denominator); return MoreMath.ceilingDiv(totalSize, taskExecutor.threadCount() * denominator);
} }
private Distribute() { private Distribute() {

View file

@ -27,7 +27,7 @@ class PlanExecutionTest {
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
var currentThread = Thread.currentThread(); var currentThread = Thread.currentThread();
EXECUTOR = new ParallelTaskExecutor("PlanTest", () -> currentThread == Thread.currentThread()); EXECUTOR = new ParallelTaskExecutor("PlanTest", 2, () -> currentThread == Thread.currentThread());
EXECUTOR.startWorkers(); EXECUTOR.startWorkers();
} }