From fb11f290101dcd752b7370932bebff4565fd52f5 Mon Sep 17 00:00:00 2001 From: Jozufozu Date: Sat, 8 Apr 2023 18:06:19 -0700 Subject: [PATCH 1/5] Parallel planning primitives - Engines/InstanceManagers now provide Plans for frames/ticks - TODO: batching is broken - TODO: plan caching - Add Plan interface which can be used to compose complex systems of parallel execution - Add Synchronizer to facilitate barriering - Remove WorkGroup - Summary of different primitives below, names need work :ioa: - BarrierPlan: executes one plan then another - NestedPlan: executes many plans at once - OnMainThreadPlan: waits for a syncPoint to execute - RunOnAllPlan: dispatches work over a Supplier - SimplePlan: executes many Runnables at once - UnitPlan: does nothing --- .../jozufozu/flywheel/api/backend/Engine.java | 3 + .../com/jozufozu/flywheel/api/task/Plan.java | 42 +++++++++++ .../flywheel/api/task/TaskExecutor.java | 2 + .../engine/batching/BatchingEngine.java | 25 ++++++- .../engine/indirect/IndirectEngine.java | 13 +++- .../engine/instancing/InstancingEngine.java | 13 +++- .../backend/task/ParallelTaskExecutor.java | 25 ++++++- .../backend/task/SerialTaskExecutor.java | 5 ++ .../impl/instancing/InstanceWorld.java | 34 ++++----- .../instancing/manager/InstanceManager.java | 43 ++--------- .../flywheel/lib/light/LightUpdater.java | 17 ----- .../flywheel/lib/task/BarrierPlan.java | 11 +++ .../flywheel/lib/task/NestedPlan.java | 29 ++++++++ .../flywheel/lib/task/OnMainThreadPlan.java | 15 ++++ .../jozufozu/flywheel/lib/task/PlanUtil.java | 29 ++++++++ .../flywheel/lib/task/RunOnAllPlan.java | 57 +++++++++++++++ .../flywheel/lib/task/SimplePlan.java | 24 +++++++ .../flywheel/lib/task/Synchronizer.java | 19 +++++ .../jozufozu/flywheel/lib/task/UnitPlan.java | 16 +++++ .../jozufozu/flywheel/lib/task/WorkGroup.java | 72 ------------------- 20 files changed, 344 insertions(+), 150 deletions(-) create mode 100644 src/main/java/com/jozufozu/flywheel/api/task/Plan.java create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/OnMainThreadPlan.java create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/Synchronizer.java create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/UnitPlan.java delete mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/WorkGroup.java diff --git a/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java b/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java index 09765ca87..fa2202101 100644 --- a/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java +++ b/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java @@ -5,6 +5,7 @@ import java.util.List; import com.jozufozu.flywheel.api.event.RenderContext; import com.jozufozu.flywheel.api.event.RenderStage; import com.jozufozu.flywheel.api.instancer.InstancerProvider; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; import net.minecraft.client.Camera; @@ -28,4 +29,6 @@ public interface Engine extends InstancerProvider { void addDebugInfo(List info); void delete(); + + Plan planThisFrame(RenderContext context); } diff --git a/src/main/java/com/jozufozu/flywheel/api/task/Plan.java b/src/main/java/com/jozufozu/flywheel/api/task/Plan.java new file mode 100644 index 000000000..3849792e0 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/api/task/Plan.java @@ -0,0 +1,42 @@ +package com.jozufozu.flywheel.api.task; + +import com.jozufozu.flywheel.lib.task.BarrierPlan; + +public interface Plan { + /** + * Submit this plan for execution. + *

+ * You must call {@code onCompletion.run()} when the plan has completed execution. + * + * @param taskExecutor The executor to use for submitting tasks. + * @param onCompletion A callback to run when the plan has completed execution, useful for chaining plans. + */ + void execute(TaskExecutor taskExecutor, Runnable onCompletion); + + default void execute(TaskExecutor taskExecutor) { + execute(taskExecutor, () -> { + }); + } + + /** + * Create a new plan that executes this plan, then the given plan. + * + * @param plan The plan to execute after this plan. + * @return The new, composed plan. + */ + default Plan then(Plan plan) { + // TODO: AbstractPlan? + return new BarrierPlan(this, plan); + } + + /** + * If possible, create a new plan that accomplishes everything + * this plan does but with a simpler execution schedule. + * + * @return A simplified plan, or this. + */ + default Plan maybeSimplify() { + // TODO: plan caching/simplification + return this; + } +} diff --git a/src/main/java/com/jozufozu/flywheel/api/task/TaskExecutor.java b/src/main/java/com/jozufozu/flywheel/api/task/TaskExecutor.java index ee4b722bd..abe9dc71c 100644 --- a/src/main/java/com/jozufozu/flywheel/api/task/TaskExecutor.java +++ b/src/main/java/com/jozufozu/flywheel/api/task/TaskExecutor.java @@ -9,4 +9,6 @@ public interface TaskExecutor extends Executor { void syncPoint(); int getThreadCount(); + + void scheduleForMainThread(Runnable runnable); } diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java index fc5a58870..444622b2b 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java @@ -9,7 +9,9 @@ import com.jozufozu.flywheel.api.instancer.Instancer; import com.jozufozu.flywheel.api.model.Model; import com.jozufozu.flywheel.api.struct.InstancePart; import com.jozufozu.flywheel.api.struct.StructType; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; +import com.jozufozu.flywheel.lib.task.PlanUtil; import com.jozufozu.flywheel.util.FlwUtil; import com.mojang.blaze3d.vertex.PoseStack; @@ -32,17 +34,34 @@ public class BatchingEngine implements Engine { public void beginFrame(TaskExecutor executor, RenderContext context) { transformManager.flush(); - Vec3 cameraPos = context.camera().getPosition(); + Vec3 cameraPos = context.camera() + .getPosition(); var stack = FlwUtil.copyPoseStack(context.stack()); stack.translate(-cameraPos.x, -cameraPos.y, -cameraPos.z); - // TODO: async task executor barriers executor.syncPoint(); submitTasks(executor, stack.last(), context.level()); } + @Override + public Plan planThisFrame(RenderContext context) { + return PlanUtil.of(transformManager::flush) + .then(planTransformers(context)); + } + + private Plan planTransformers(RenderContext context) { + Vec3 cameraPos = context.camera() + .getPosition(); + var stack = FlwUtil.copyPoseStack(context.stack()); + stack.translate(-cameraPos.x, -cameraPos.y, -cameraPos.z); + + // TODO: planify batching engine + return PlanUtil.of(); + } + private void submitTasks(TaskExecutor executor, PoseStack.Pose matrices, ClientLevel level) { - for (var transformSetEntry : transformManager.getTransformSetsView().entrySet()) { + for (var transformSetEntry : transformManager.getTransformSetsView() + .entrySet()) { var stage = transformSetEntry.getKey(); var transformSet = transformSetEntry.getValue(); diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java index 5c53a2261..7a2a0e2bc 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java @@ -11,9 +11,11 @@ import com.jozufozu.flywheel.api.instancer.Instancer; import com.jozufozu.flywheel.api.model.Model; import com.jozufozu.flywheel.api.struct.InstancePart; import com.jozufozu.flywheel.api.struct.StructType; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.gl.GlStateTracker; import com.jozufozu.flywheel.gl.GlTextureUnit; +import com.jozufozu.flywheel.lib.task.PlanUtil; import com.mojang.blaze3d.systems.RenderSystem; import net.minecraft.client.Camera; @@ -40,7 +42,16 @@ public class IndirectEngine implements Engine { @Override public void beginFrame(TaskExecutor executor, RenderContext context) { - try (var restoreState = GlStateTracker.getRestoreState()) { + flushDrawManager(); + } + + @Override + public Plan planThisFrame(RenderContext context) { + return PlanUtil.onMainThread(this::flushDrawManager); + } + + private void flushDrawManager() { + try (var state = GlStateTracker.getRestoreState()) { drawManager.flush(); } } diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java index 80e0a6b3a..d5dff8b41 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java @@ -12,6 +12,7 @@ import com.jozufozu.flywheel.api.instancer.Instancer; import com.jozufozu.flywheel.api.model.Model; import com.jozufozu.flywheel.api.struct.InstancePart; import com.jozufozu.flywheel.api.struct.StructType; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.backend.Pipelines; import com.jozufozu.flywheel.backend.compile.FlwCompiler; @@ -19,6 +20,7 @@ import com.jozufozu.flywheel.backend.engine.UniformBuffer; import com.jozufozu.flywheel.gl.GlStateTracker; import com.jozufozu.flywheel.gl.GlTextureUnit; import com.jozufozu.flywheel.lib.material.MaterialIndices; +import com.jozufozu.flywheel.lib.task.PlanUtil; import com.mojang.blaze3d.systems.RenderSystem; import net.minecraft.client.Camera; @@ -47,7 +49,16 @@ public class InstancingEngine implements Engine { @Override public void beginFrame(TaskExecutor executor, RenderContext context) { - try (var state = GlStateTracker.getRestoreState()) { + flushDrawManager(); + } + + @Override + public Plan planThisFrame(RenderContext context) { + return PlanUtil.onMainThread(this::flushDrawManager); + } + + private void flushDrawManager() { + try (var restoreState = GlStateTracker.getRestoreState()) { drawManager.flush(); } } diff --git a/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java b/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java index 351b3d8b9..6a43248e0 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java +++ b/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java @@ -3,7 +3,9 @@ package com.jozufozu.flywheel.backend.task; import java.util.ArrayList; import java.util.Deque; import java.util.List; +import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.jetbrains.annotations.Nullable; @@ -34,6 +36,7 @@ public class ParallelTaskExecutor implements TaskExecutor { private final List threads = new ArrayList<>(); private final Deque taskQueue = new ConcurrentLinkedDeque<>(); + private final Queue mainThreadQueue = new ConcurrentLinkedQueue<>(); private final Object taskNotifier = new Object(); private final Object tasksCompletedNotifier = new Object(); @@ -118,10 +121,19 @@ public class ParallelTaskExecutor implements TaskExecutor { } synchronized (taskNotifier) { - taskNotifier.notify(); + taskNotifier.notifyAll(); } } + @Override + public void scheduleForMainThread(Runnable runnable) { + if (!running.get()) { + throw new IllegalStateException("Executor is stopped"); + } + + mainThreadQueue.add(runnable); + } + /** * Wait for all running tasks to finish. */ @@ -130,7 +142,7 @@ public class ParallelTaskExecutor implements TaskExecutor { Runnable task; // Finish everyone else's work... - while ((task = taskQueue.pollLast()) != null) { + while ((task = pollForSyncPoint()) != null) { processTask(task); } @@ -146,6 +158,15 @@ public class ParallelTaskExecutor implements TaskExecutor { } } + @Nullable + private Runnable pollForSyncPoint() { + Runnable task = mainThreadQueue.poll(); + if (task != null) { + return task; + } + return taskQueue.pollLast(); + } + public void discardAndAwait() { // Discard everyone else's work... while (taskQueue.pollLast() != null) { diff --git a/src/main/java/com/jozufozu/flywheel/backend/task/SerialTaskExecutor.java b/src/main/java/com/jozufozu/flywheel/backend/task/SerialTaskExecutor.java index 6e273ab1b..f7730b406 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/task/SerialTaskExecutor.java +++ b/src/main/java/com/jozufozu/flywheel/backend/task/SerialTaskExecutor.java @@ -13,6 +13,11 @@ public class SerialTaskExecutor implements TaskExecutor { task.run(); } + @Override + public void scheduleForMainThread(Runnable runnable) { + runnable.run(); + } + @Override public void syncPoint() { } diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java index e974d0b69..435d66a86 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java @@ -11,6 +11,7 @@ import com.jozufozu.flywheel.api.event.RenderStage; import com.jozufozu.flywheel.api.instance.DynamicInstance; import com.jozufozu.flywheel.api.instance.TickableInstance; import com.jozufozu.flywheel.api.instance.effect.Effect; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.backend.task.FlwTaskExecutor; import com.jozufozu.flywheel.backend.task.ParallelTaskExecutor; import com.jozufozu.flywheel.config.FlwCommands; @@ -19,6 +20,7 @@ import com.jozufozu.flywheel.impl.instancing.manager.BlockEntityInstanceManager; import com.jozufozu.flywheel.impl.instancing.manager.EffectInstanceManager; import com.jozufozu.flywheel.impl.instancing.manager.EntityInstanceManager; import com.jozufozu.flywheel.impl.instancing.manager.InstanceManager; +import com.jozufozu.flywheel.lib.task.PlanUtil; import net.minecraft.world.entity.Entity; import net.minecraft.world.level.LevelAccessor; @@ -68,9 +70,12 @@ public class InstanceWorld implements AutoCloseable { *

*/ public void tick(double cameraX, double cameraY, double cameraZ) { - blockEntities.tick(taskExecutor, cameraX, cameraY, cameraZ); - entities.tick(taskExecutor, cameraX, cameraY, cameraZ); - effects.tick(taskExecutor, cameraX, cameraY, cameraZ); + var blockEntityPlan = blockEntities.planThisTick(cameraX, cameraY, cameraZ); + var entityPlan = entities.planThisTick(cameraX, cameraY, cameraZ); + var effectPlan = effects.planThisTick(cameraX, cameraY, cameraZ); + + PlanUtil.of(blockEntityPlan, entityPlan, effectPlan) + .execute(taskExecutor); } /** @@ -82,17 +87,16 @@ public class InstanceWorld implements AutoCloseable { *

*/ public void beginFrame(RenderContext context) { - boolean originChanged = engine.updateRenderOrigin(context.camera()); - - if (originChanged) { - blockEntities.recreateAll(); - entities.recreateAll(); - effects.recreateAll(); - } - taskExecutor.syncPoint(); - if (!originChanged) { + getManagerPlan(context).then(engine.planThisFrame(context)) + .execute(taskExecutor); + } + + private Plan getManagerPlan(RenderContext context) { + if (engine.updateRenderOrigin(context.camera())) { + return PlanUtil.of(blockEntities::recreateAll, entities::recreateAll, effects::recreateAll); + } else { var cameraPos = context.camera() .getPosition(); double cameraX = cameraPos.x; @@ -100,12 +104,8 @@ public class InstanceWorld implements AutoCloseable { double cameraZ = cameraPos.z; FrustumIntersection culler = context.culler(); - blockEntities.beginFrame(taskExecutor, cameraX, cameraY, cameraZ, culler); - entities.beginFrame(taskExecutor, cameraX, cameraY, cameraZ, culler); - effects.beginFrame(taskExecutor, cameraX, cameraY, cameraZ, culler); + return PlanUtil.of(blockEntities.planThisFrame(cameraX, cameraY, cameraZ, culler), entities.planThisFrame(cameraX, cameraY, cameraZ, culler), effects.planThisFrame(cameraX, cameraY, cameraZ, culler)); } - - engine.beginFrame(taskExecutor, context); } /** diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java index ff0c5535d..702006a10 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java @@ -1,22 +1,21 @@ package com.jozufozu.flywheel.impl.instancing.manager; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.Consumer; import org.joml.FrustumIntersection; import com.jozufozu.flywheel.api.instance.DynamicInstance; import com.jozufozu.flywheel.api.instance.Instance; import com.jozufozu.flywheel.api.instance.TickableInstance; -import com.jozufozu.flywheel.api.task.TaskExecutor; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.config.FlwConfig; import com.jozufozu.flywheel.impl.instancing.ratelimit.BandedPrimeLimiter; import com.jozufozu.flywheel.impl.instancing.ratelimit.DistanceUpdateLimiter; import com.jozufozu.flywheel.impl.instancing.ratelimit.NonLimiter; import com.jozufozu.flywheel.impl.instancing.storage.Storage; import com.jozufozu.flywheel.impl.instancing.storage.Transaction; +import com.jozufozu.flywheel.lib.task.PlanUtil; public abstract class InstanceManager { private final Queue> queue = new ConcurrentLinkedQueue<>(); @@ -115,21 +114,10 @@ public abstract class InstanceManager { } } - /** - * Ticks the InstanceManager. - * - *

- * {@link TickableInstance}s get ticked. - *
- * Queued updates are processed. - *

- */ - public void tick(TaskExecutor executor, double cameraX, double cameraY, double cameraZ) { + public Plan planThisTick(double cameraX, double cameraY, double cameraZ) { tickLimiter.tick(); processQueue(); - - var instances = getStorage().getTickableInstances(); - distributeWork(executor, instances, instance -> tickInstance(instance, cameraX, cameraY, cameraZ)); + return PlanUtil.runOnAll(getStorage()::getTickableInstances, instance -> tickInstance(instance, cameraX, cameraY, cameraZ)); } protected void tickInstance(TickableInstance instance, double cameraX, double cameraY, double cameraZ) { @@ -138,12 +126,10 @@ public abstract class InstanceManager { } } - public void beginFrame(TaskExecutor executor, double cameraX, double cameraY, double cameraZ, FrustumIntersection frustum) { + public Plan planThisFrame(double cameraX, double cameraY, double cameraZ, FrustumIntersection frustum) { frameLimiter.tick(); processQueue(); - - var instances = getStorage().getDynamicInstances(); - distributeWork(executor, instances, instance -> updateInstance(instance, cameraX, cameraY, cameraZ, frustum)); + return PlanUtil.runOnAll(getStorage()::getDynamicInstances, instance -> updateInstance(instance, cameraX, cameraY, cameraZ, frustum)); } protected void updateInstance(DynamicInstance instance, double cameraX, double cameraY, double cameraZ, FrustumIntersection frustum) { @@ -153,21 +139,4 @@ public abstract class InstanceManager { } } } - - private static void distributeWork(TaskExecutor executor, List instances, Consumer action) { - final int size = instances.size(); - final int threadCount = executor.getThreadCount(); - - if (threadCount == 1) { - executor.execute(() -> instances.forEach(action)); - } else { - final int stride = Math.max(size / (threadCount * 2), 1); - for (int start = 0; start < size; start += stride) { - int end = Math.min(start + stride, size); - - var sub = instances.subList(start, end); - executor.execute(() -> sub.forEach(action)); - } - } - } } diff --git a/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java b/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java index 3bd003cd6..363b3ac9f 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java +++ b/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java @@ -1,13 +1,9 @@ package com.jozufozu.flywheel.lib.light; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Stream; -import com.jozufozu.flywheel.backend.task.FlwTaskExecutor; import com.jozufozu.flywheel.lib.box.ImmutableBox; -import com.jozufozu.flywheel.lib.task.WorkGroup; import com.jozufozu.flywheel.util.FlwUtil; import com.jozufozu.flywheel.util.WorldAttached; @@ -57,19 +53,6 @@ public class LightUpdater { } } - private void tickParallel() { - Queue listeners = new ConcurrentLinkedQueue<>(); - - WorkGroup.builder() - .addTasks(tickingListeners.stream(), listener -> { - if (listener.tickLightListener()) { - listeners.add(listener); - } - }) - .onComplete(() -> listeners.forEach(this::addListener)) - .execute(FlwTaskExecutor.get()); - } - /** * Add a listener. * diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java new file mode 100644 index 000000000..8b28e9361 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java @@ -0,0 +1,11 @@ +package com.jozufozu.flywheel.lib.task; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record BarrierPlan(Plan first, Plan second) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + first.execute(taskExecutor, () -> second.execute(taskExecutor, onCompletion)); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java new file mode 100644 index 000000000..c440b35df --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java @@ -0,0 +1,29 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.List; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record NestedPlan(List parallelPlans) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + if (parallelPlans.isEmpty()) { + onCompletion.run(); + return; + } + + var size = parallelPlans.size(); + + if (size == 1) { + parallelPlans.get(0) + .execute(taskExecutor, onCompletion); + return; + } + + var wait = new Synchronizer(size, onCompletion); + for (Plan plan : parallelPlans) { + plan.execute(taskExecutor, wait::decrementAndEventuallyRun); + } + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/OnMainThreadPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/OnMainThreadPlan.java new file mode 100644 index 000000000..9ef4993fc --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/OnMainThreadPlan.java @@ -0,0 +1,15 @@ +package com.jozufozu.flywheel.lib.task; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record OnMainThreadPlan(Runnable task) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + // TODO: detect if we're already on the render thread and just run the task directly + taskExecutor.scheduleForMainThread(() -> { + task.run(); + onCompletion.run(); + }); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java b/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java new file mode 100644 index 000000000..4d19f2e80 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java @@ -0,0 +1,29 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.jozufozu.flywheel.api.task.Plan; + +public class PlanUtil { + public static Plan runOnAll(Supplier> iterable, Consumer forEach) { + return new RunOnAllPlan<>(iterable, forEach); + } + + public static Plan of() { + return UnitPlan.INSTANCE; + } + + public static Plan of(Plan... plans) { + return new NestedPlan(List.of(plans)); + } + + public static Plan of(Runnable... tasks) { + return new SimplePlan(List.of(tasks)); + } + + public static Plan onMainThread(Runnable task) { + return new OnMainThreadPlan(task); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java new file mode 100644 index 000000000..3f36022e5 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java @@ -0,0 +1,57 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record RunOnAllPlan(Supplier> listSupplier, Consumer action) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + taskExecutor.execute(() -> { + var list = listSupplier.get(); + final int size = list.size(); + + if (size == 0) { + onCompletion.run(); + } else if (size <= getChunkingThreshold(taskExecutor)) { + processList(list, onCompletion); + } else { + dispatchChunks(list, taskExecutor, onCompletion); + } + }); + } + + private void dispatchChunks(List suppliedList, TaskExecutor taskExecutor, Runnable onCompletion) { + final int size = suppliedList.size(); + final int threadCount = taskExecutor.getThreadCount(); + + final int chunkSize = (size + threadCount - 1) / threadCount; // ceiling division + final int chunkCount = (size + chunkSize - 1) / chunkSize; // ceiling division + + var synchronizer = new Synchronizer(chunkCount, onCompletion); + int remaining = size; + + while (remaining > 0) { + int end = remaining; + remaining -= chunkSize; + int start = Math.max(remaining, 0); + + var subList = suppliedList.subList(start, end); + taskExecutor.execute(() -> processList(subList, synchronizer::decrementAndEventuallyRun)); + } + } + + private void processList(List suppliedList, Runnable onCompletion) { + for (T t : suppliedList) { + action.accept(t); + } + onCompletion.run(); + } + + private static int getChunkingThreshold(TaskExecutor taskExecutor) { + return 512; + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java new file mode 100644 index 000000000..a20ec1192 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java @@ -0,0 +1,24 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.List; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record SimplePlan(List parallelTasks) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + if (parallelTasks.isEmpty()) { + onCompletion.run(); + return; + } + + var synchronizer = new Synchronizer(parallelTasks.size(), onCompletion); + for (Runnable task : parallelTasks) { + taskExecutor.execute(() -> { + task.run(); + synchronizer.decrementAndEventuallyRun(); + }); + } + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/Synchronizer.java b/src/main/java/com/jozufozu/flywheel/lib/task/Synchronizer.java new file mode 100644 index 000000000..51c9716be --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/Synchronizer.java @@ -0,0 +1,19 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.concurrent.atomic.AtomicInteger; + +public class Synchronizer { + private final AtomicInteger countDown; + private final Runnable onCompletion; + + public Synchronizer(int countDown, Runnable onCompletion) { + this.countDown = new AtomicInteger(countDown); + this.onCompletion = onCompletion; + } + + public void decrementAndEventuallyRun() { + if (countDown.decrementAndGet() == 0) { + onCompletion.run(); + } + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/UnitPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/UnitPlan.java new file mode 100644 index 000000000..8894b02dd --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/UnitPlan.java @@ -0,0 +1,16 @@ +package com.jozufozu.flywheel.lib.task; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public class UnitPlan implements Plan { + public static final UnitPlan INSTANCE = new UnitPlan(); + + private UnitPlan() { + } + + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + onCompletion.run(); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/WorkGroup.java b/src/main/java/com/jozufozu/flywheel/lib/task/WorkGroup.java deleted file mode 100644 index 8de35d7b5..000000000 --- a/src/main/java/com/jozufozu/flywheel/lib/task/WorkGroup.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.jozufozu.flywheel.lib.task; - -import java.util.Iterator; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.Stream; - -import org.jetbrains.annotations.Nullable; - -public final class WorkGroup { - public static void run(Iterator tasks, Executor executor) { - tasks.forEachRemaining(executor::execute); - } - - public static void run(Iterator tasks, @Nullable Runnable finalizer, Executor executor) { - if (finalizer == null) { - run(tasks, executor); - return; - } - - AtomicInteger incompleteTaskCounter = new AtomicInteger(0); - tasks.forEachRemaining(task -> { - incompleteTaskCounter.incrementAndGet(); - executor.execute(() -> { - task.run(); - if (incompleteTaskCounter.decrementAndGet() == 0) { - executor.execute(finalizer); - } - }); - }); - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - @Nullable - private Runnable finalizer; - private Stream tasks; - - public Builder() { - } - - public Builder addTasks(Stream iterable, Consumer consumer) { - return addTasks(iterable.map(it -> () -> consumer.accept(it))); - } - - public Builder addTasks(Stream tasks) { - if (this.tasks == null) { - this.tasks = tasks; - } else { - this.tasks = Stream.concat(this.tasks, tasks); - } - return this; - } - - public Builder onComplete(Runnable runnable) { - this.finalizer = runnable; - return this; - } - - public void execute(Executor executor) { - if (tasks == null) { - return; - } - - run(tasks.iterator(), finalizer, executor); - } - } -} From 1627874e333a8654989738c04956ca1a7fb84705 Mon Sep 17 00:00:00 2001 From: Jozufozu Date: Sun, 9 Apr 2023 14:15:29 -0700 Subject: [PATCH 2/5] Putting our plan to the test - Implement plan simplification - Add unit tests for plan execution and simplification --- build.gradle | 5 + .../flywheel/lib/task/BarrierPlan.java | 15 ++ .../flywheel/lib/task/NestedPlan.java | 65 ++++++ .../jozufozu/flywheel/lib/task/PlanUtil.java | 2 +- .../flywheel/lib/task/SimplePlan.java | 13 ++ .../flywheel/lib/task/PlanExecutionTest.java | 192 ++++++++++++++++++ .../lib/task/PlanSimplificationTest.java | 109 ++++++++++ 7 files changed, 400 insertions(+), 1 deletion(-) create mode 100644 src/test/java/com/jozufozu/flywheel/lib/task/PlanExecutionTest.java create mode 100644 src/test/java/com/jozufozu/flywheel/lib/task/PlanSimplificationTest.java diff --git a/build.gradle b/build.gradle index 60cd5e602..2d388dd93 100644 --- a/build.gradle +++ b/build.gradle @@ -129,6 +129,7 @@ minecraft.runs.all { // ^--------------------------------------------------------------------^ dependencies { + testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' minecraft "net.minecraftforge:forge:${minecraft_version}-${forge_version}" jarJar('org.joml:joml:1.10.5') { @@ -154,6 +155,10 @@ dependencies { } } +test { + useJUnitPlatform() +} + mixin { add sourceSets.main, 'flywheel.refmap.json' } diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java index 8b28e9361..c304a0867 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java @@ -8,4 +8,19 @@ public record BarrierPlan(Plan first, Plan second) implements Plan { public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { first.execute(taskExecutor, () -> second.execute(taskExecutor, onCompletion)); } + + @Override + public Plan maybeSimplify() { + var first = this.first.maybeSimplify(); + var second = this.second.maybeSimplify(); + + if (first == UnitPlan.INSTANCE) { + return second; + } + if (second == UnitPlan.INSTANCE) { + return first; + } + + return new BarrierPlan(first, second); + } } diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java index c440b35df..cec6812e9 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java @@ -1,11 +1,17 @@ package com.jozufozu.flywheel.lib.task; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.List; import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; public record NestedPlan(List parallelPlans) implements Plan { + public static NestedPlan of(Plan... plans) { + return new NestedPlan(List.of(plans)); + } + @Override public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { if (parallelPlans.isEmpty()) { @@ -26,4 +32,63 @@ public record NestedPlan(List parallelPlans) implements Plan { plan.execute(taskExecutor, wait::decrementAndEventuallyRun); } } + + @Override + public Plan maybeSimplify() { + if (parallelPlans.isEmpty()) { + return UnitPlan.INSTANCE; + } + + if (parallelPlans.size() == 1) { + return parallelPlans.get(0) + .maybeSimplify(); + } + + var simplifiedTasks = new ArrayList(); + var simplifiedPlans = new ArrayList(); + + var toVisit = new ArrayDeque<>(parallelPlans); + while (!toVisit.isEmpty()) { + var plan = toVisit.pop() + .maybeSimplify(); + + if (plan == UnitPlan.INSTANCE) { + continue; + } + + if (plan instanceof SimplePlan simplePlan) { + // merge all simple plans into one + simplifiedTasks.addAll(simplePlan.parallelTasks()); + } else if (plan instanceof NestedPlan nestedPlan) { + // inline and re-visit nested plans + toVisit.addAll(nestedPlan.parallelPlans()); + } else { + // /shrug + simplifiedPlans.add(plan); + } + } + + if (simplifiedTasks.isEmpty() && simplifiedPlans.isEmpty()) { + // everything got simplified away + return UnitPlan.INSTANCE; + } + + if (simplifiedTasks.isEmpty()) { + // no simple plan to create + if (simplifiedPlans.size() == 1) { + // we only contained one complex plan, so we can just return that + return simplifiedPlans.get(0); + } + return new NestedPlan(simplifiedPlans); + } + + if (simplifiedPlans.isEmpty()) { + // we only contained simple plans, so we can just return one + return new SimplePlan(simplifiedTasks); + } + + // we have both simple and complex plans, so we need to create a nested plan + simplifiedPlans.add(new SimplePlan(simplifiedTasks)); + return new NestedPlan(simplifiedPlans); + } } diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java b/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java index 4d19f2e80..6cdbdf196 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java @@ -20,7 +20,7 @@ public class PlanUtil { } public static Plan of(Runnable... tasks) { - return new SimplePlan(List.of(tasks)); + return SimplePlan.of(tasks); } public static Plan onMainThread(Runnable task) { diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java index a20ec1192..45734fbbc 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java @@ -6,6 +6,10 @@ import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; public record SimplePlan(List parallelTasks) implements Plan { + public static Plan of(Runnable... tasks) { + return new SimplePlan(List.of(tasks)); + } + @Override public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { if (parallelTasks.isEmpty()) { @@ -21,4 +25,13 @@ public record SimplePlan(List parallelTasks) implements Plan { }); } } + + @Override + public Plan maybeSimplify() { + if (parallelTasks.isEmpty()) { + return UnitPlan.INSTANCE; + } + + return this; + } } diff --git a/src/test/java/com/jozufozu/flywheel/lib/task/PlanExecutionTest.java b/src/test/java/com/jozufozu/flywheel/lib/task/PlanExecutionTest.java new file mode 100644 index 000000000..6f572dee7 --- /dev/null +++ b/src/test/java/com/jozufozu/flywheel/lib/task/PlanExecutionTest.java @@ -0,0 +1,192 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.backend.task.ParallelTaskExecutor; + +import it.unimi.dsi.fastutil.ints.IntArrayList; + +class PlanExecutionTest { + + protected static final ParallelTaskExecutor EXECUTOR = new ParallelTaskExecutor("PlanTest"); + + @BeforeAll + public static void setUp() { + EXECUTOR.startWorkers(); + } + + @AfterAll + public static void tearDown() { + EXECUTOR.stopWorkers(); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + void testSynchronizer(int countDown) { + var done = new AtomicBoolean(false); + var synchronizer = new Synchronizer(countDown, () -> done.set(true)); + + for (int i = 0; i < countDown - 1; i++) { + synchronizer.decrementAndEventuallyRun(); + Assertions.assertFalse(done.get(), "Done early at " + i); + } + + synchronizer.decrementAndEventuallyRun(); + Assertions.assertTrue(done.get()); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + void simpleBarrierSequencing(int barriers) { + var sequence = new IntArrayList(barriers + 1); + var expected = new IntArrayList(barriers + 1); + + var plan = SimplePlan.of(() -> sequence.add(1)); + expected.add(1); + + for (int i = 0; i < barriers; i++) { + final int sequenceNum = i + 2; + expected.add(sequenceNum); + plan = plan.then(SimplePlan.of(() -> sequence.add(sequenceNum))); + } + + runAndWait(plan); + + Assertions.assertEquals(expected, sequence); + } + + @RepeatedTest(10) + void wideBarrierSequencing() { + var lock = new Object(); + var sequence = new IntArrayList(8); + + Runnable addOne = () -> { + synchronized (lock) { + sequence.add(1); + } + }; + Runnable addTwo = () -> { + synchronized (lock) { + sequence.add(2); + } + }; + + var plan = SimplePlan.of(addOne, addOne, addOne, addOne) + .then(SimplePlan.of(addTwo, addTwo, addTwo, addTwo)); + + runAndWait(plan); + + assertExpectedSequence(sequence, 1, 1, 1, 1, 2, 2, 2, 2); + } + + @Test + void simpleNestedPlan() { + var sequence = new IntArrayList(2); + var plan = NestedPlan.of(SimplePlan.of(() -> sequence.add(1))); + runAndWait(plan); + assertExpectedSequence(sequence, 1); + } + + @Test + void manyNestedPlans() { + var counter = new AtomicInteger(0); + var count4 = NestedPlan.of(SimplePlan.of(counter::incrementAndGet, counter::incrementAndGet), SimplePlan.of(counter::incrementAndGet, counter::incrementAndGet)); + + runAndWait(count4); + Assertions.assertEquals(4, counter.get()); + + counter.set(0); + + var count8Barrier = NestedPlan.of(count4, count4); + runAndWait(count8Barrier); + Assertions.assertEquals(8, counter.get()); + } + + @Test + void unitPlan() { + var done = new AtomicBoolean(false); + + UnitPlan.INSTANCE.execute(null, () -> done.set(true)); + + Assertions.assertTrue(done.get()); + } + + @Test + void emptyPlan() { + var done = new AtomicBoolean(false); + + SimplePlan.of() + .execute(null, () -> done.set(true)); + Assertions.assertTrue(done.get()); + + done.set(false); + NestedPlan.of() + .execute(null, () -> done.set(true)); + Assertions.assertTrue(done.get()); + } + + @Test + void mainThreadPlan() { + var done = new AtomicBoolean(false); + var plan = new OnMainThreadPlan(() -> done.set(true)); + + plan.execute(EXECUTOR); + + Assertions.assertFalse(done.get()); + + EXECUTOR.syncPoint(); + + Assertions.assertTrue(done.get()); + } + + private static void assertExpectedSequence(IntArrayList sequence, int... expected) { + Assertions.assertArrayEquals(expected, sequence.toIntArray()); + } + + public static void runAndWait(Plan plan) { + new TestBarrier(plan).runAndWait(); + } + + private static final class TestBarrier { + private final Plan plan; + private boolean done = false; + + private TestBarrier(Plan plan) { + this.plan = plan; + } + + public void runAndWait() { + plan.execute(EXECUTOR, this::doneWithPlan); + + synchronized (this) { + // early exit in case the plan is already done for e.g. UnitPlan + if (done) { + return; + } + + try { + wait(); + } catch (InterruptedException ignored) { + // noop + } + } + } + + public void doneWithPlan() { + synchronized (this) { + done = true; + notifyAll(); + } + } + } +} diff --git a/src/test/java/com/jozufozu/flywheel/lib/task/PlanSimplificationTest.java b/src/test/java/com/jozufozu/flywheel/lib/task/PlanSimplificationTest.java new file mode 100644 index 000000000..ec051599d --- /dev/null +++ b/src/test/java/com/jozufozu/flywheel/lib/task/PlanSimplificationTest.java @@ -0,0 +1,109 @@ +package com.jozufozu.flywheel.lib.task; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.jozufozu.flywheel.api.task.Plan; + +public class PlanSimplificationTest { + + public static final Runnable NOOP = () -> { + }; + public static final Plan SIMPLE = SimplePlan.of(NOOP); + + @Test + void emptyPlans() { + var empty = NestedPlan.of(); + Assertions.assertEquals(empty.maybeSimplify(), UnitPlan.INSTANCE); + + var simpleEmpty = SimplePlan.of(); + Assertions.assertEquals(simpleEmpty.maybeSimplify(), UnitPlan.INSTANCE); + } + + @Test + void nestedSimplePlans() { + var twoSimple = NestedPlan.of(SimplePlan.of(NOOP, NOOP, NOOP), SIMPLE); + Assertions.assertEquals(twoSimple.maybeSimplify(), SimplePlan.of(NOOP, NOOP, NOOP, NOOP)); + + var threeSimple = NestedPlan.of(SIMPLE, SIMPLE, SIMPLE); + Assertions.assertEquals(threeSimple.maybeSimplify(), SimplePlan.of(NOOP, NOOP, NOOP)); + } + + @Test + void oneNestedPlan() { + var oneSimple = NestedPlan.of(SIMPLE); + + Assertions.assertEquals(oneSimple.maybeSimplify(), SIMPLE); + + var mainThreadNoop = new OnMainThreadPlan(NOOP); + var oneMainThread = NestedPlan.of(mainThreadNoop); + + Assertions.assertEquals(oneMainThread.maybeSimplify(), mainThreadNoop); + + var barrier = new BarrierPlan(SIMPLE, SIMPLE); + var oneBarrier = NestedPlan.of(barrier); + + Assertions.assertEquals(oneBarrier.maybeSimplify(), barrier); + } + + @Test + void nestedNestedPlan() { + var outer = NestedPlan.of(SIMPLE); + var outermost = NestedPlan.of(outer); + + Assertions.assertEquals(outermost.maybeSimplify(), SIMPLE); + } + + @Test + void nestedUnitPlan() { + var onlyUnit = NestedPlan.of(UnitPlan.INSTANCE, UnitPlan.INSTANCE, UnitPlan.INSTANCE); + Assertions.assertEquals(onlyUnit.maybeSimplify(), UnitPlan.INSTANCE); + + var unitAndSimple = NestedPlan.of(UnitPlan.INSTANCE, UnitPlan.INSTANCE, SIMPLE); + Assertions.assertEquals(unitAndSimple.maybeSimplify(), SIMPLE); + } + + @Test + void complexNesting() { + var mainThreadNoop = new OnMainThreadPlan(NOOP); + + var nested = NestedPlan.of(mainThreadNoop, SIMPLE); + Assertions.assertEquals(nested.maybeSimplify(), nested); // cannot simplify + + var barrier = new BarrierPlan(SIMPLE, SIMPLE); + var complex = NestedPlan.of(barrier, nested); + Assertions.assertEquals(complex.maybeSimplify(), NestedPlan.of(barrier, mainThreadNoop, SIMPLE)); + } + + @Test + void nestedNoSimple() { + var mainThreadNoop = new OnMainThreadPlan(NOOP); + var barrier = new BarrierPlan(SIMPLE, SIMPLE); + var oneMainThread = NestedPlan.of(mainThreadNoop, NestedPlan.of(mainThreadNoop, barrier, barrier)); + + Assertions.assertEquals(oneMainThread.maybeSimplify(), NestedPlan.of(mainThreadNoop, mainThreadNoop, barrier, barrier)); + } + + @Test + void manyNestedButJustOneAfterSimplification() { + var barrier = new BarrierPlan(SIMPLE, SIMPLE); + var oneMainThread = NestedPlan.of(barrier, NestedPlan.of(UnitPlan.INSTANCE, UnitPlan.INSTANCE)); + + Assertions.assertEquals(oneMainThread.maybeSimplify(), barrier); + } + + @Test + void barrierPlan() { + var doubleUnit = new BarrierPlan(UnitPlan.INSTANCE, UnitPlan.INSTANCE); + Assertions.assertEquals(doubleUnit.maybeSimplify(), UnitPlan.INSTANCE); + + var simpleThenUnit = new BarrierPlan(SIMPLE, UnitPlan.INSTANCE); + Assertions.assertEquals(simpleThenUnit.maybeSimplify(), SIMPLE); + + var unitThenSimple = new BarrierPlan(UnitPlan.INSTANCE, SIMPLE); + Assertions.assertEquals(unitThenSimple.maybeSimplify(), SIMPLE); + + var simpleThenSimple = new BarrierPlan(SIMPLE, SIMPLE); + Assertions.assertEquals(simpleThenSimple.maybeSimplify(), new BarrierPlan(SIMPLE, SIMPLE)); + } +} From 76dcf3fed63e8961afbf9a3ed18640e91bcf87bd Mon Sep 17 00:00:00 2001 From: Jozufozu Date: Sun, 9 Apr 2023 14:36:35 -0700 Subject: [PATCH 3/5] The botched batching plan - Directly port over submitTasks to use Plan - Could probably make better use of the plan primitives, but oh well - Remove Engine#beginFrame --- .../jozufozu/flywheel/api/backend/Engine.java | 1 - .../engine/batching/BatchingEngine.java | 29 ++++++------------- .../engine/batching/TransformCall.java | 11 +++++-- .../engine/indirect/IndirectEngine.java | 5 ---- .../engine/instancing/InstancingEngine.java | 5 ---- .../impl/instancing/InstanceWorld.java | 2 ++ .../flywheel/lib/task/RunOnAllPlan.java | 1 + 7 files changed, 20 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java b/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java index fa2202101..4246f1afd 100644 --- a/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java +++ b/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java @@ -12,7 +12,6 @@ import net.minecraft.client.Camera; import net.minecraft.core.Vec3i; public interface Engine extends InstancerProvider { - void beginFrame(TaskExecutor executor, RenderContext context); void renderStage(TaskExecutor executor, RenderContext context, RenderStage stage); diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java index 444622b2b..9869da03d 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java @@ -1,5 +1,6 @@ package com.jozufozu.flywheel.backend.engine.batching; +import java.util.ArrayList; import java.util.List; import com.jozufozu.flywheel.api.backend.Engine; @@ -11,12 +12,11 @@ import com.jozufozu.flywheel.api.struct.InstancePart; import com.jozufozu.flywheel.api.struct.StructType; import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; +import com.jozufozu.flywheel.lib.task.NestedPlan; import com.jozufozu.flywheel.lib.task.PlanUtil; import com.jozufozu.flywheel.util.FlwUtil; -import com.mojang.blaze3d.vertex.PoseStack; import net.minecraft.client.Camera; -import net.minecraft.client.multiplayer.ClientLevel; import net.minecraft.core.BlockPos; import net.minecraft.core.Vec3i; import net.minecraft.world.phys.Vec3; @@ -30,19 +30,6 @@ public class BatchingEngine implements Engine { return transformManager.getInstancer(type, model, stage); } - @Override - public void beginFrame(TaskExecutor executor, RenderContext context) { - transformManager.flush(); - - Vec3 cameraPos = context.camera() - .getPosition(); - var stack = FlwUtil.copyPoseStack(context.stack()); - stack.translate(-cameraPos.x, -cameraPos.y, -cameraPos.z); - - executor.syncPoint(); - submitTasks(executor, stack.last(), context.level()); - } - @Override public Plan planThisFrame(RenderContext context) { return PlanUtil.of(transformManager::flush) @@ -55,11 +42,11 @@ public class BatchingEngine implements Engine { var stack = FlwUtil.copyPoseStack(context.stack()); stack.translate(-cameraPos.x, -cameraPos.y, -cameraPos.z); - // TODO: planify batching engine - return PlanUtil.of(); - } + var matrices = stack.last(); + var level = context.level(); + + var plans = new ArrayList(); - private void submitTasks(TaskExecutor executor, PoseStack.Pose matrices, ClientLevel level) { for (var transformSetEntry : transformManager.getTransformSetsView() .entrySet()) { var stage = transformSetEntry.getKey(); @@ -84,11 +71,13 @@ public class BatchingEngine implements Engine { int startVertex = 0; for (var transformCall : transformCalls) { - transformCall.submitTasks(executor, buffer, startVertex, matrices, level); + plans.add(transformCall.getPlan(buffer, startVertex, matrices, level)); startVertex += transformCall.getTotalVertexCount(); } } } + + return new NestedPlan(plans); } @Override diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/TransformCall.java b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/TransformCall.java index 61f1c61ba..6055e5e61 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/TransformCall.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/TransformCall.java @@ -1,13 +1,15 @@ package com.jozufozu.flywheel.backend.engine.batching; +import java.util.ArrayList; import java.util.List; import com.jozufozu.flywheel.api.material.Material; import com.jozufozu.flywheel.api.struct.InstancePart; import com.jozufozu.flywheel.api.struct.StructVertexTransformer; -import com.jozufozu.flywheel.api.task.TaskExecutor; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.vertex.MutableVertexList; import com.jozufozu.flywheel.api.vertex.ReusableVertexList; +import com.jozufozu.flywheel.lib.task.SimplePlan; import com.jozufozu.flywheel.lib.vertex.VertexTransformations; import com.mojang.blaze3d.vertex.PoseStack; import com.mojang.math.Matrix3f; @@ -40,9 +42,11 @@ public class TransformCall

{ instancer.update(); } - public void submitTasks(TaskExecutor executor, DrawBuffer buffer, int startVertex, PoseStack.Pose matrices, ClientLevel level) { + public Plan getPlan(DrawBuffer buffer, int startVertex, PoseStack.Pose matrices, ClientLevel level) { int instances = instancer.getInstanceCount(); + var out = new ArrayList(); + while (instances > 0) { int end = instances; instances -= 512; @@ -52,8 +56,9 @@ public class TransformCall

{ ReusableVertexList sub = buffer.slice(startVertex, vertexCount); startVertex += vertexCount; - executor.execute(() -> transformRange(sub, start, end, matrices, level)); + out.add(() -> transformRange(sub, start, end, matrices, level)); } + return new SimplePlan(out); } public void transformRange(ReusableVertexList vertexList, int from, int to, PoseStack.Pose matrices, ClientLevel level) { diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java index 7a2a0e2bc..23665914e 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java @@ -40,11 +40,6 @@ public class IndirectEngine implements Engine { return drawManager.getInstancer(type, model, stage); } - @Override - public void beginFrame(TaskExecutor executor, RenderContext context) { - flushDrawManager(); - } - @Override public Plan planThisFrame(RenderContext context) { return PlanUtil.onMainThread(this::flushDrawManager); diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java index d5dff8b41..a55104daf 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java @@ -47,11 +47,6 @@ public class InstancingEngine implements Engine { return drawManager.getInstancer(type, model, stage); } - @Override - public void beginFrame(TaskExecutor executor, RenderContext context) { - flushDrawManager(); - } - @Override public Plan planThisFrame(RenderContext context) { return PlanUtil.onMainThread(this::flushDrawManager); diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java index 435d66a86..f184e0883 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java @@ -75,6 +75,7 @@ public class InstanceWorld implements AutoCloseable { var effectPlan = effects.planThisTick(cameraX, cameraY, cameraZ); PlanUtil.of(blockEntityPlan, entityPlan, effectPlan) + .maybeSimplify() .execute(taskExecutor); } @@ -90,6 +91,7 @@ public class InstanceWorld implements AutoCloseable { taskExecutor.syncPoint(); getManagerPlan(context).then(engine.planThisFrame(context)) + .maybeSimplify() .execute(taskExecutor); } diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java index 3f36022e5..598af1a9b 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java @@ -10,6 +10,7 @@ import com.jozufozu.flywheel.api.task.TaskExecutor; public record RunOnAllPlan(Supplier> listSupplier, Consumer action) implements Plan { @Override public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + // TODO: unit tests, fix CME? taskExecutor.execute(() -> { var list = listSupplier.get(); final int size = list.size(); From f29dcbc4864d5f48cd2b4288155439763a9d7f69 Mon Sep 17 00:00:00 2001 From: Jozufozu Date: Thu, 13 Apr 2023 15:17:02 -0700 Subject: [PATCH 4/5] Weaving threads - Restore WaitGroup abstraction - Simplify WaitGroup to make proper use of atomics - Fix logic error in ParallelTaskExecutor causing task counter to go below zero when executing main thread tasks - Use ConcurrentHashMap in models to allow parallel access - Reduce chunk size in RunOnAllPlan - Only queue in InstanceManager - Defer syncPoint within renderStage - AbstractStorage return immutable view for tickable/dynamic instances - Process InstanceManager queues off-thread --- .../com/jozufozu/flywheel/api/task/Plan.java | 13 +++- .../engine/batching/BatchingDrawTracker.java | 5 ++ .../engine/batching/BatchingEngine.java | 4 ++ .../engine/indirect/IndirectCullingGroup.java | 3 + .../engine/indirect/IndirectDrawManager.java | 9 +++ .../engine/indirect/IndirectEngine.java | 6 ++ .../engine/instancing/InstancingEngine.java | 2 + .../backend/task/ParallelTaskExecutor.java | 72 +++++++------------ .../flywheel/handler/EntityWorldHandler.java | 2 +- .../impl/instancing/InstanceWorld.java | 9 ++- .../instancing/InstancedRenderDispatcher.java | 2 +- .../instancing/manager/InstanceManager.java | 51 ++++--------- .../instancing/storage/AbstractStorage.java | 7 +- .../flywheel/lib/light/LightUpdater.java | 26 ++++++- .../jozufozu/flywheel/lib/model/Models.java | 8 +-- .../flywheel/lib/task/NestedPlan.java | 11 ++- .../jozufozu/flywheel/lib/task/PlanUtil.java | 5 -- .../flywheel/lib/task/RunOnAllPlan.java | 26 ++++--- .../flywheel/lib/task/SimplePlan.java | 2 +- .../jozufozu/flywheel/lib/task/WaitGroup.java | 59 +++++++++++++++ .../instancemanage/InstanceAddMixin.java | 2 +- .../instancemanage/InstanceRemoveMixin.java | 2 +- .../instancemanage/InstanceUpdateMixin.java | 2 +- .../lib/task/PlanCompositionTest.java | 20 ++++++ .../flywheel/lib/task/WaitGroupTest.java | 14 ++++ 25 files changed, 241 insertions(+), 121 deletions(-) create mode 100644 src/main/java/com/jozufozu/flywheel/lib/task/WaitGroup.java create mode 100644 src/test/java/com/jozufozu/flywheel/lib/task/PlanCompositionTest.java create mode 100644 src/test/java/com/jozufozu/flywheel/lib/task/WaitGroupTest.java diff --git a/src/main/java/com/jozufozu/flywheel/api/task/Plan.java b/src/main/java/com/jozufozu/flywheel/api/task/Plan.java index 3849792e0..5334c454c 100644 --- a/src/main/java/com/jozufozu/flywheel/api/task/Plan.java +++ b/src/main/java/com/jozufozu/flywheel/api/task/Plan.java @@ -1,6 +1,7 @@ package com.jozufozu.flywheel.api.task; import com.jozufozu.flywheel.lib.task.BarrierPlan; +import com.jozufozu.flywheel.lib.task.NestedPlan; public interface Plan { /** @@ -22,13 +23,23 @@ public interface Plan { * Create a new plan that executes this plan, then the given plan. * * @param plan The plan to execute after this plan. - * @return The new, composed plan. + * @return The composed plan. */ default Plan then(Plan plan) { // TODO: AbstractPlan? return new BarrierPlan(this, plan); } + /** + * Create a new plan that executes this plan and the given plan in parallel. + * + * @param plan The plan to execute in parallel with this plan. + * @return The composed plan. + */ + default Plan and(Plan plan) { + return NestedPlan.of(this, plan); + } + /** * If possible, create a new plan that accomplishes everything * this plan does but with a simpler execution schedule. diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingDrawTracker.java b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingDrawTracker.java index b423dcc96..aae68fd53 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingDrawTracker.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingDrawTracker.java @@ -81,4 +81,9 @@ public class BatchingDrawTracker { buffers.clear(); } } + + public boolean hasStage(RenderStage stage) { + return !activeBuffers.get(stage) + .isEmpty(); + } } diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java index 9869da03d..af370c683 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java @@ -82,6 +82,10 @@ public class BatchingEngine implements Engine { @Override public void renderStage(TaskExecutor executor, RenderContext context, RenderStage stage) { + if (!drawTracker.hasStage(stage)) { + return; + } + executor.syncPoint(); drawTracker.draw(stage); } diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectCullingGroup.java b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectCullingGroup.java index 9636b512b..358b6d578 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectCullingGroup.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectCullingGroup.java @@ -185,4 +185,7 @@ public class IndirectCullingGroup

{ meshPool.delete(); } + public boolean hasStage(RenderStage stage) { + return drawSet.contains(stage); + } } diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectDrawManager.java b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectDrawManager.java index 49ab8402c..4f69cbcf8 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectDrawManager.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectDrawManager.java @@ -73,6 +73,15 @@ public class IndirectDrawManager { initializedInstancers.add(instancer); } + public boolean hasStage(RenderStage stage) { + for (var list : renderLists.values()) { + if (list.hasStage(stage)) { + return true; + } + } + return false; + } + private record UninitializedInstancer(IndirectInstancer instancer, Model model, RenderStage stage) { } } diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java index 23665914e..0047c124b 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java @@ -53,6 +53,12 @@ public class IndirectEngine implements Engine { @Override public void renderStage(TaskExecutor executor, RenderContext context, RenderStage stage) { + if (!drawManager.hasStage(stage)) { + return; + } + + executor.syncPoint(); + try (var restoreState = GlStateTracker.getRestoreState()) { setup(); diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java index a55104daf..9a14d1cf0 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java @@ -66,6 +66,8 @@ public class InstancingEngine implements Engine { return; } + executor.syncPoint(); + try (var state = GlStateTracker.getRestoreState()) { setup(); diff --git a/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java b/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java index 6a43248e0..9de43e445 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java +++ b/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java @@ -13,6 +13,7 @@ import org.slf4j.Logger; import com.jozufozu.flywheel.Flywheel; import com.jozufozu.flywheel.api.task.TaskExecutor; +import com.jozufozu.flywheel.lib.task.WaitGroup; import com.mojang.logging.LogUtils; import net.minecraft.util.Mth; @@ -29,17 +30,13 @@ public class ParallelTaskExecutor implements TaskExecutor { * If set to false, the executor will shut down. */ private final AtomicBoolean running = new AtomicBoolean(false); - /** - * Synchronized via {@link #tasksCompletedNotifier}. - */ - private int incompleteTaskCounter = 0; private final List threads = new ArrayList<>(); private final Deque taskQueue = new ConcurrentLinkedDeque<>(); private final Queue mainThreadQueue = new ConcurrentLinkedQueue<>(); private final Object taskNotifier = new Object(); - private final Object tasksCompletedNotifier = new Object(); + private final WaitGroup waitGroup = new WaitGroup(); public ParallelTaskExecutor(String name) { this.name = name; @@ -102,10 +99,6 @@ public class ParallelTaskExecutor implements TaskExecutor { threads.clear(); taskQueue.clear(); - synchronized (tasksCompletedNotifier) { - incompleteTaskCounter = 0; - tasksCompletedNotifier.notifyAll(); - } } @Override @@ -114,12 +107,9 @@ public class ParallelTaskExecutor implements TaskExecutor { throw new IllegalStateException("Executor is stopped"); } + waitGroup.add(); taskQueue.add(task); - synchronized (tasksCompletedNotifier) { - incompleteTaskCounter++; - } - synchronized (taskNotifier) { taskNotifier.notifyAll(); } @@ -140,19 +130,17 @@ public class ParallelTaskExecutor implements TaskExecutor { @Override public void syncPoint() { Runnable task; - // Finish everyone else's work... - while ((task = pollForSyncPoint()) != null) { - processTask(task); - } - - // and wait for any stragglers. - synchronized (tasksCompletedNotifier) { - while (incompleteTaskCounter > 0) { - try { - tasksCompletedNotifier.wait(); - } catch (InterruptedException e) { - // + while (true) { + if ((task = mainThreadQueue.poll()) != null) { + processMainThreadTask(task); + } else if ((task = taskQueue.pollLast()) != null) { + processTask(task); + } else { + // and wait for any stragglers. + waitGroup.await(); + if (mainThreadQueue.isEmpty()) { + break; } } } @@ -170,23 +158,12 @@ public class ParallelTaskExecutor implements TaskExecutor { public void discardAndAwait() { // Discard everyone else's work... while (taskQueue.pollLast() != null) { - synchronized (tasksCompletedNotifier) { - if (--incompleteTaskCounter == 0) { - tasksCompletedNotifier.notifyAll(); - } - } + waitGroup.done(); } // and wait for any stragglers. - synchronized (tasksCompletedNotifier) { - while (incompleteTaskCounter > 0) { - try { - tasksCompletedNotifier.wait(); - } catch (InterruptedException e) { - // - } - } - } + waitGroup.await(); + mainThreadQueue.clear(); } @Nullable @@ -213,11 +190,15 @@ public class ParallelTaskExecutor implements TaskExecutor { } catch (Exception e) { Flywheel.LOGGER.error("Error running task", e); } finally { - synchronized (tasksCompletedNotifier) { - if (--incompleteTaskCounter == 0) { - tasksCompletedNotifier.notifyAll(); - } - } + waitGroup.done(); + } + } + + private void processMainThreadTask(Runnable task) { + try { + task.run(); + } catch (Exception e) { + Flywheel.LOGGER.error("Error running main thread task", e); } } @@ -233,7 +214,6 @@ public class ParallelTaskExecutor implements TaskExecutor { } private class WorkerThread extends Thread { - private final AtomicBoolean running = ParallelTaskExecutor.this.running; public WorkerThread(String name) { super(name); @@ -242,7 +222,7 @@ public class ParallelTaskExecutor implements TaskExecutor { @Override public void run() { // Run until the executor shuts down - while (running.get()) { + while (ParallelTaskExecutor.this.running.get()) { Runnable task = getNextTask(); if (task == null) { diff --git a/src/main/java/com/jozufozu/flywheel/handler/EntityWorldHandler.java b/src/main/java/com/jozufozu/flywheel/handler/EntityWorldHandler.java index 9cefeb4a3..b004a07f6 100644 --- a/src/main/java/com/jozufozu/flywheel/handler/EntityWorldHandler.java +++ b/src/main/java/com/jozufozu/flywheel/handler/EntityWorldHandler.java @@ -28,7 +28,7 @@ public class EntityWorldHandler { if (FlwUtil.canUseInstancing(level)) { InstancedRenderDispatcher.getEntities(level) - .remove(event.getEntity()); + .queueRemove(event.getEntity()); } } } diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java index f184e0883..5eb709ae8 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java @@ -70,11 +70,11 @@ public class InstanceWorld implements AutoCloseable { *

*/ public void tick(double cameraX, double cameraY, double cameraZ) { - var blockEntityPlan = blockEntities.planThisTick(cameraX, cameraY, cameraZ); - var entityPlan = entities.planThisTick(cameraX, cameraY, cameraZ); - var effectPlan = effects.planThisTick(cameraX, cameraY, cameraZ); + taskExecutor.syncPoint(); - PlanUtil.of(blockEntityPlan, entityPlan, effectPlan) + blockEntities.planThisTick(cameraX, cameraY, cameraZ) + .and(entities.planThisTick(cameraX, cameraY, cameraZ)) + .and(effects.planThisTick(cameraX, cameraY, cameraZ)) .maybeSimplify() .execute(taskExecutor); } @@ -114,7 +114,6 @@ public class InstanceWorld implements AutoCloseable { * Draw all instances for the given stage. */ public void renderStage(RenderContext context, RenderStage stage) { - taskExecutor.syncPoint(); engine.renderStage(taskExecutor, context, stage); } diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstancedRenderDispatcher.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstancedRenderDispatcher.java index 11aaaabd3..f5180b8dd 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstancedRenderDispatcher.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstancedRenderDispatcher.java @@ -160,7 +160,7 @@ public class InstancedRenderDispatcher { // Block entities are loaded while chunks are baked. // Entities are loaded with the level, so when chunks are reloaded they need to be re-added. ClientLevelExtension.getAllLoadedEntities(level) - .forEach(world.getEntities()::add); + .forEach(world.getEntities()::queueAdd); } public static void addDebugInfo(List info) { diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java index 702006a10..d9756583f 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java @@ -6,7 +6,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.joml.FrustumIntersection; import com.jozufozu.flywheel.api.instance.DynamicInstance; -import com.jozufozu.flywheel.api.instance.Instance; import com.jozufozu.flywheel.api.instance.TickableInstance; import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.config.FlwConfig; @@ -15,7 +14,8 @@ import com.jozufozu.flywheel.impl.instancing.ratelimit.DistanceUpdateLimiter; import com.jozufozu.flywheel.impl.instancing.ratelimit.NonLimiter; import com.jozufozu.flywheel.impl.instancing.storage.Storage; import com.jozufozu.flywheel.impl.instancing.storage.Transaction; -import com.jozufozu.flywheel.lib.task.PlanUtil; +import com.jozufozu.flywheel.lib.task.RunOnAllPlan; +import com.jozufozu.flywheel.lib.task.SimplePlan; public abstract class InstanceManager { private final Queue> queue = new ConcurrentLinkedQueue<>(); @@ -47,14 +47,6 @@ public abstract class InstanceManager { return getStorage().getAllInstances().size(); } - public void add(T obj) { - if (!getStorage().willAccept(obj)) { - return; - } - - getStorage().add(obj); - } - public void queueAdd(T obj) { if (!getStorage().willAccept(obj)) { return; @@ -63,33 +55,10 @@ public abstract class InstanceManager { queue.add(Transaction.add(obj)); } - public void remove(T obj) { - getStorage().remove(obj); - } - public void queueRemove(T obj) { queue.add(Transaction.remove(obj)); } - /** - * Update the instance associated with an object. - * - *

- * By default this is the only hook an {@link Instance} has to change its internal state. This is the lowest frequency - * update hook {@link Instance} gets. For more frequent updates, see {@link TickableInstance} and - * {@link DynamicInstance}. - *

- * - * @param obj the object to update. - */ - public void update(T obj) { - if (!getStorage().willAccept(obj)) { - return; - } - - getStorage().update(obj); - } - public void queueUpdate(T obj) { if (!getStorage().willAccept(obj)) { return; @@ -115,9 +84,11 @@ public abstract class InstanceManager { } public Plan planThisTick(double cameraX, double cameraY, double cameraZ) { - tickLimiter.tick(); - processQueue(); - return PlanUtil.runOnAll(getStorage()::getTickableInstances, instance -> tickInstance(instance, cameraX, cameraY, cameraZ)); + return SimplePlan.of(() -> { + tickLimiter.tick(); + processQueue(); + }) + .then(RunOnAllPlan.of(getStorage()::getTickableInstances, instance -> tickInstance(instance, cameraX, cameraY, cameraZ))); } protected void tickInstance(TickableInstance instance, double cameraX, double cameraY, double cameraZ) { @@ -127,9 +98,11 @@ public abstract class InstanceManager { } public Plan planThisFrame(double cameraX, double cameraY, double cameraZ, FrustumIntersection frustum) { - frameLimiter.tick(); - processQueue(); - return PlanUtil.runOnAll(getStorage()::getDynamicInstances, instance -> updateInstance(instance, cameraX, cameraY, cameraZ, frustum)); + return SimplePlan.of(() -> { + frameLimiter.tick(); + processQueue(); + }) + .then(RunOnAllPlan.of(getStorage()::getDynamicInstances, instance -> updateInstance(instance, cameraX, cameraY, cameraZ, frustum))); } protected void updateInstance(DynamicInstance instance, double cameraX, double cameraY, double cameraZ, FrustumIntersection frustum) { diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/storage/AbstractStorage.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/storage/AbstractStorage.java index ce811413a..5a27fad34 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/storage/AbstractStorage.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/storage/AbstractStorage.java @@ -1,6 +1,7 @@ package com.jozufozu.flywheel.impl.instancing.storage; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import com.jozufozu.flywheel.api.backend.Engine; @@ -11,7 +12,9 @@ import com.jozufozu.flywheel.api.instance.TickableInstance; public abstract class AbstractStorage implements Storage { protected final Engine engine; protected final List tickableInstances = new ArrayList<>(); + protected final List tickableInstancesView = Collections.unmodifiableList(tickableInstances); protected final List dynamicInstances = new ArrayList<>(); + protected final List dynamicInstancesView = Collections.unmodifiableList(dynamicInstances); protected AbstractStorage(Engine engine) { this.engine = engine; @@ -19,12 +22,12 @@ public abstract class AbstractStorage implements Storage { @Override public List getTickableInstances() { - return tickableInstances; + return tickableInstancesView; } @Override public List getDynamicInstances() { - return dynamicInstances; + return dynamicInstancesView; } protected void setup(Instance instance) { diff --git a/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java b/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java index 363b3ac9f..c7b0381dc 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java +++ b/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java @@ -1,6 +1,8 @@ package com.jozufozu.flywheel.lib.light; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Stream; import com.jozufozu.flywheel.lib.box.ImmutableBox; @@ -26,6 +28,8 @@ public class LightUpdater { private final WeakContainmentMultiMap listenersBySection = new WeakContainmentMultiMap<>(); private final Set tickingListeners = FlwUtil.createWeakHashSet(); + private final Queue queue = new ConcurrentLinkedQueue<>(); + public static LightUpdater get(LevelAccessor level) { if (LightUpdated.receivesLightUpdates(level)) { // The level is valid, add it to the map. @@ -41,8 +45,8 @@ public class LightUpdater { } public void tick() { + processQueue(); tickSerial(); - //tickParallel(); } private void tickSerial() { @@ -59,8 +63,20 @@ public class LightUpdater { * @param listener The object that wants to receive light update notifications. */ public void addListener(LightListener listener) { - if (listener instanceof TickingLightListener) + queue.add(listener); + } + + private synchronized void processQueue() { + LightListener listener; + while ((listener = queue.poll()) != null) { + doAdd(listener); + } + } + + private void doAdd(LightListener listener) { + if (listener instanceof TickingLightListener) { tickingListeners.add(((TickingLightListener) listener)); + } ImmutableBox box = listener.getVolume(); @@ -94,9 +110,13 @@ public class LightUpdater { * @param pos The section position where light changed. */ public void onLightUpdate(LightLayer type, SectionPos pos) { + processQueue(); + Set listeners = listenersBySection.get(pos.asLong()); - if (listeners == null || listeners.isEmpty()) return; + if (listeners == null || listeners.isEmpty()) { + return; + } listeners.removeIf(LightListener::isInvalid); diff --git a/src/main/java/com/jozufozu/flywheel/lib/model/Models.java b/src/main/java/com/jozufozu/flywheel/lib/model/Models.java index fe55c0147..9899fc7d6 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/model/Models.java +++ b/src/main/java/com/jozufozu/flywheel/lib/model/Models.java @@ -1,8 +1,8 @@ package com.jozufozu.flywheel.lib.model; import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.jozufozu.flywheel.api.event.ReloadRenderersEvent; import com.jozufozu.flywheel.api.model.Model; @@ -16,9 +16,9 @@ import net.minecraft.core.Direction; import net.minecraft.world.level.block.state.BlockState; public final class Models { - private static final Map BLOCK_STATE = new HashMap<>(); - private static final Map PARTIAL = new HashMap<>(); - private static final Map, Model> PARTIAL_DIR = new HashMap<>(); + private static final Map BLOCK_STATE = new ConcurrentHashMap<>(); + private static final Map PARTIAL = new ConcurrentHashMap<>(); + private static final Map, Model> PARTIAL_DIR = new ConcurrentHashMap<>(); public static Model block(BlockState state) { return BLOCK_STATE.computeIfAbsent(state, it -> new BlockModelBuilder(it).build()); diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java index cec6812e9..e03bf0f8c 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java @@ -4,12 +4,13 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import com.google.common.collect.ImmutableList; import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; public record NestedPlan(List parallelPlans) implements Plan { public static NestedPlan of(Plan... plans) { - return new NestedPlan(List.of(plans)); + return new NestedPlan(ImmutableList.copyOf(plans)); } @Override @@ -33,6 +34,14 @@ public record NestedPlan(List parallelPlans) implements Plan { } } + @Override + public Plan and(Plan plan) { + return new NestedPlan(ImmutableList.builder() + .addAll(parallelPlans) + .add(plan) + .build()); + } + @Override public Plan maybeSimplify() { if (parallelPlans.isEmpty()) { diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java b/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java index 6cdbdf196..efa8b8280 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java @@ -1,15 +1,10 @@ package com.jozufozu.flywheel.lib.task; import java.util.List; -import java.util.function.Consumer; -import java.util.function.Supplier; import com.jozufozu.flywheel.api.task.Plan; public class PlanUtil { - public static Plan runOnAll(Supplier> iterable, Consumer forEach) { - return new RunOnAllPlan<>(iterable, forEach); - } public static Plan of() { return UnitPlan.INSTANCE; diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java index 598af1a9b..fb9123028 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java @@ -8,16 +8,19 @@ import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; public record RunOnAllPlan(Supplier> listSupplier, Consumer action) implements Plan { + public static Plan of(Supplier> iterable, Consumer forEach) { + return new RunOnAllPlan<>(iterable, forEach); + } + @Override public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { - // TODO: unit tests, fix CME? taskExecutor.execute(() -> { var list = listSupplier.get(); final int size = list.size(); if (size == 0) { onCompletion.run(); - } else if (size <= getChunkingThreshold(taskExecutor)) { + } else if (size <= getChunkingThreshold()) { processList(list, onCompletion); } else { dispatchChunks(list, taskExecutor, onCompletion); @@ -27,12 +30,9 @@ public record RunOnAllPlan(Supplier> listSupplier, Consumer action private void dispatchChunks(List suppliedList, TaskExecutor taskExecutor, Runnable onCompletion) { final int size = suppliedList.size(); - final int threadCount = taskExecutor.getThreadCount(); + final int chunkSize = getChunkSize(taskExecutor, size); - final int chunkSize = (size + threadCount - 1) / threadCount; // ceiling division - final int chunkCount = (size + chunkSize - 1) / chunkSize; // ceiling division - - var synchronizer = new Synchronizer(chunkCount, onCompletion); + var synchronizer = new Synchronizer(ceilingDiv(size, chunkSize), onCompletion); int remaining = size; while (remaining > 0) { @@ -45,6 +45,14 @@ public record RunOnAllPlan(Supplier> listSupplier, Consumer action } } + private static int getChunkSize(TaskExecutor taskExecutor, int totalSize) { + return ceilingDiv(totalSize, taskExecutor.getThreadCount() * 32); + } + + private static int ceilingDiv(int numerator, int denominator) { + return (numerator + denominator - 1) / denominator; + } + private void processList(List suppliedList, Runnable onCompletion) { for (T t : suppliedList) { action.accept(t); @@ -52,7 +60,7 @@ public record RunOnAllPlan(Supplier> listSupplier, Consumer action onCompletion.run(); } - private static int getChunkingThreshold(TaskExecutor taskExecutor) { - return 512; + private static int getChunkingThreshold() { + return 256; } } diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java index 45734fbbc..e3c68506f 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java @@ -18,7 +18,7 @@ public record SimplePlan(List parallelTasks) implements Plan { } var synchronizer = new Synchronizer(parallelTasks.size(), onCompletion); - for (Runnable task : parallelTasks) { + for (var task : parallelTasks) { taskExecutor.execute(() -> { task.run(); synchronizer.decrementAndEventuallyRun(); diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/WaitGroup.java b/src/main/java/com/jozufozu/flywheel/lib/task/WaitGroup.java new file mode 100644 index 000000000..28ce14729 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/WaitGroup.java @@ -0,0 +1,59 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.concurrent.atomic.AtomicInteger; + +// https://stackoverflow.com/questions/29655531 +public class WaitGroup { + private final AtomicInteger counter = new AtomicInteger(0); + + public void add() { + add(1); + } + + public void add(int i) { + if (i == 0) { + return; + } + + counter.addAndGet(i); + } + + public void done() { + var result = counter.decrementAndGet(); + if (result == 0) { + synchronized (this) { + this.notifyAll(); + } + } else if (result < 0) { + throw new IllegalStateException("WaitGroup counter is negative!"); + } + } + + public void await() { + try { + awaitInternal(); + } catch (InterruptedException ignored) { + // noop + } + } + + private void awaitInternal() throws InterruptedException { + // var start = System.nanoTime(); + while (counter.get() > 0) { + // spin in place to avoid sleeping the main thread + // synchronized (this) { + // this.wait(timeoutMs); + // } + } + // var end = System.nanoTime(); + // var elapsed = end - start; + // + // if (elapsed > 1000000) { + // Flywheel.LOGGER.info("Waited " + StringUtil.formatTime(elapsed)); + // } + } + + public void _reset() { + counter.set(0); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceAddMixin.java b/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceAddMixin.java index 601bb9b72..96490f6a4 100644 --- a/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceAddMixin.java +++ b/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceAddMixin.java @@ -25,7 +25,7 @@ public class InstanceAddMixin { private void flywheel$onBlockEntityAdded(BlockEntity blockEntity, CallbackInfo ci) { if (level.isClientSide && FlwUtil.canUseInstancing(level)) { InstancedRenderDispatcher.getBlockEntities(level) - .add(blockEntity); + .queueAdd(blockEntity); } } } diff --git a/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceRemoveMixin.java b/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceRemoveMixin.java index 4a6df32db..3361a6dcf 100644 --- a/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceRemoveMixin.java +++ b/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceRemoveMixin.java @@ -24,7 +24,7 @@ public class InstanceRemoveMixin { private void flywheel$removeInstance(CallbackInfo ci) { if (level instanceof ClientLevel && FlwUtil.canUseInstancing(level)) { InstancedRenderDispatcher.getBlockEntities(level) - .remove((BlockEntity) (Object) this); + .queueRemove((BlockEntity) (Object) this); } } diff --git a/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceUpdateMixin.java b/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceUpdateMixin.java index 363e1dc0e..df2f51d77 100644 --- a/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceUpdateMixin.java +++ b/src/main/java/com/jozufozu/flywheel/mixin/instancemanage/InstanceUpdateMixin.java @@ -35,6 +35,6 @@ public class InstanceUpdateMixin { } InstancedRenderDispatcher.getBlockEntities(level) - .update(blockEntity); + .queueUpdate(blockEntity); } } diff --git a/src/test/java/com/jozufozu/flywheel/lib/task/PlanCompositionTest.java b/src/test/java/com/jozufozu/flywheel/lib/task/PlanCompositionTest.java new file mode 100644 index 000000000..31331c55a --- /dev/null +++ b/src/test/java/com/jozufozu/flywheel/lib/task/PlanCompositionTest.java @@ -0,0 +1,20 @@ +package com.jozufozu.flywheel.lib.task; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.jozufozu.flywheel.api.task.Plan; + +public class PlanCompositionTest { + + public static final Runnable NOOP = () -> { + }; + public static final Plan SIMPLE = SimplePlan.of(NOOP); + + @Test + void nestedPlanAnd() { + var empty = NestedPlan.of(SIMPLE); + + Assertions.assertEquals(NestedPlan.of(SIMPLE, SIMPLE), empty.and(SIMPLE)); + } +} diff --git a/src/test/java/com/jozufozu/flywheel/lib/task/WaitGroupTest.java b/src/test/java/com/jozufozu/flywheel/lib/task/WaitGroupTest.java new file mode 100644 index 000000000..779170299 --- /dev/null +++ b/src/test/java/com/jozufozu/flywheel/lib/task/WaitGroupTest.java @@ -0,0 +1,14 @@ +package com.jozufozu.flywheel.lib.task; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class WaitGroupTest { + @Test + public void testExtraDone() { + WaitGroup wg = new WaitGroup(); + wg.add(); + wg.done(); + Assertions.assertThrows(IllegalStateException.class, wg::done); + } +} From 0861d8bfd220020f15093ad8009d5edac3d3e35a Mon Sep 17 00:00:00 2001 From: Jozufozu Date: Fri, 14 Apr 2023 17:14:12 -0700 Subject: [PATCH 5/5] Needs to wait - Commit to non-blocking waitgroup impl - Debug log when await takes suspiciously long --- .../backend/task/ParallelTaskExecutor.java | 26 ++++++----- .../jozufozu/flywheel/lib/task/WaitGroup.java | 43 ++++++++----------- 2 files changed, 30 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java b/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java index 9de43e445..7e7e16d68 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java +++ b/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -99,10 +100,12 @@ public class ParallelTaskExecutor implements TaskExecutor { threads.clear(); taskQueue.clear(); + mainThreadQueue.clear(); + waitGroup._reset(); } @Override - public void execute(Runnable task) { + public void execute(@NotNull Runnable task) { if (!running.get()) { throw new IllegalStateException("Executor is stopped"); } @@ -130,39 +133,35 @@ public class ParallelTaskExecutor implements TaskExecutor { @Override public void syncPoint() { Runnable task; - // Finish everyone else's work... while (true) { if ((task = mainThreadQueue.poll()) != null) { + // Prioritize main thread tasks. processMainThreadTask(task); } else if ((task = taskQueue.pollLast()) != null) { + // then work on tasks from the queue. processTask(task); } else { - // and wait for any stragglers. + // then wait for the other threads to finish. waitGroup.await(); + // at this point there will be no more tasks in the queue, but + // one of the worker threads may have submitted a main thread task. if (mainThreadQueue.isEmpty()) { + // if they didn't, we're done. break; } } } } - @Nullable - private Runnable pollForSyncPoint() { - Runnable task = mainThreadQueue.poll(); - if (task != null) { - return task; - } - return taskQueue.pollLast(); - } - public void discardAndAwait() { // Discard everyone else's work... while (taskQueue.pollLast() != null) { waitGroup.done(); } - // and wait for any stragglers. + // ...wait for any stragglers... waitGroup.await(); + // ...and clear the main thread queue. mainThreadQueue.clear(); } @@ -183,7 +182,6 @@ public class ParallelTaskExecutor implements TaskExecutor { return task; } - // TODO: task context private void processTask(Runnable task) { try { task.run(); diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/WaitGroup.java b/src/main/java/com/jozufozu/flywheel/lib/task/WaitGroup.java index 28ce14729..0d34ab324 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/task/WaitGroup.java +++ b/src/main/java/com/jozufozu/flywheel/lib/task/WaitGroup.java @@ -2,8 +2,14 @@ package com.jozufozu.flywheel.lib.task; import java.util.concurrent.atomic.AtomicInteger; -// https://stackoverflow.com/questions/29655531 +import org.slf4j.Logger; + +import com.jozufozu.flywheel.util.StringUtil; +import com.mojang.logging.LogUtils; + public class WaitGroup { + private static final Logger LOGGER = LogUtils.getLogger(); + private final AtomicInteger counter = new AtomicInteger(0); public void add() { @@ -19,38 +25,25 @@ public class WaitGroup { } public void done() { - var result = counter.decrementAndGet(); - if (result == 0) { - synchronized (this) { - this.notifyAll(); - } - } else if (result < 0) { + if (counter.decrementAndGet() < 0) { throw new IllegalStateException("WaitGroup counter is negative!"); } } public void await() { - try { - awaitInternal(); - } catch (InterruptedException ignored) { - // noop - } - } - - private void awaitInternal() throws InterruptedException { - // var start = System.nanoTime(); + // TODO: comprehensive performance tracking for tasks + long start = System.nanoTime(); + int count = 0; while (counter.get() > 0) { // spin in place to avoid sleeping the main thread - // synchronized (this) { - // this.wait(timeoutMs); - // } + count++; + } + long end = System.nanoTime(); + long elapsed = end - start; + + if (elapsed > 1000000) { // > 1ms + LOGGER.debug("Waited " + StringUtil.formatTime(elapsed) + ", looped " + count + " times"); } - // var end = System.nanoTime(); - // var elapsed = end - start; - // - // if (elapsed > 1000000) { - // Flywheel.LOGGER.info("Waited " + StringUtil.formatTime(elapsed)); - // } } public void _reset() {