diff --git a/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java b/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java index 09765ca87..fa2202101 100644 --- a/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java +++ b/src/main/java/com/jozufozu/flywheel/api/backend/Engine.java @@ -5,6 +5,7 @@ import java.util.List; import com.jozufozu.flywheel.api.event.RenderContext; import com.jozufozu.flywheel.api.event.RenderStage; import com.jozufozu.flywheel.api.instancer.InstancerProvider; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; import net.minecraft.client.Camera; @@ -28,4 +29,6 @@ public interface Engine extends InstancerProvider { void addDebugInfo(List info); void delete(); + + Plan planThisFrame(RenderContext context); } diff --git a/src/main/java/com/jozufozu/flywheel/api/task/Plan.java b/src/main/java/com/jozufozu/flywheel/api/task/Plan.java new file mode 100644 index 000000000..3849792e0 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/api/task/Plan.java @@ -0,0 +1,42 @@ +package com.jozufozu.flywheel.api.task; + +import com.jozufozu.flywheel.lib.task.BarrierPlan; + +public interface Plan { + /** + * Submit this plan for execution. + *

+ * You must call {@code onCompletion.run()} when the plan has completed execution. + * + * @param taskExecutor The executor to use for submitting tasks. + * @param onCompletion A callback to run when the plan has completed execution, useful for chaining plans. + */ + void execute(TaskExecutor taskExecutor, Runnable onCompletion); + + default void execute(TaskExecutor taskExecutor) { + execute(taskExecutor, () -> { + }); + } + + /** + * Create a new plan that executes this plan, then the given plan. + * + * @param plan The plan to execute after this plan. + * @return The new, composed plan. + */ + default Plan then(Plan plan) { + // TODO: AbstractPlan? + return new BarrierPlan(this, plan); + } + + /** + * If possible, create a new plan that accomplishes everything + * this plan does but with a simpler execution schedule. + * + * @return A simplified plan, or this. + */ + default Plan maybeSimplify() { + // TODO: plan caching/simplification + return this; + } +} diff --git a/src/main/java/com/jozufozu/flywheel/api/task/TaskExecutor.java b/src/main/java/com/jozufozu/flywheel/api/task/TaskExecutor.java index ee4b722bd..abe9dc71c 100644 --- a/src/main/java/com/jozufozu/flywheel/api/task/TaskExecutor.java +++ b/src/main/java/com/jozufozu/flywheel/api/task/TaskExecutor.java @@ -9,4 +9,6 @@ public interface TaskExecutor extends Executor { void syncPoint(); int getThreadCount(); + + void scheduleForMainThread(Runnable runnable); } diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java index fc5a58870..444622b2b 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/batching/BatchingEngine.java @@ -9,7 +9,9 @@ import com.jozufozu.flywheel.api.instancer.Instancer; import com.jozufozu.flywheel.api.model.Model; import com.jozufozu.flywheel.api.struct.InstancePart; import com.jozufozu.flywheel.api.struct.StructType; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; +import com.jozufozu.flywheel.lib.task.PlanUtil; import com.jozufozu.flywheel.util.FlwUtil; import com.mojang.blaze3d.vertex.PoseStack; @@ -32,17 +34,34 @@ public class BatchingEngine implements Engine { public void beginFrame(TaskExecutor executor, RenderContext context) { transformManager.flush(); - Vec3 cameraPos = context.camera().getPosition(); + Vec3 cameraPos = context.camera() + .getPosition(); var stack = FlwUtil.copyPoseStack(context.stack()); stack.translate(-cameraPos.x, -cameraPos.y, -cameraPos.z); - // TODO: async task executor barriers executor.syncPoint(); submitTasks(executor, stack.last(), context.level()); } + @Override + public Plan planThisFrame(RenderContext context) { + return PlanUtil.of(transformManager::flush) + .then(planTransformers(context)); + } + + private Plan planTransformers(RenderContext context) { + Vec3 cameraPos = context.camera() + .getPosition(); + var stack = FlwUtil.copyPoseStack(context.stack()); + stack.translate(-cameraPos.x, -cameraPos.y, -cameraPos.z); + + // TODO: planify batching engine + return PlanUtil.of(); + } + private void submitTasks(TaskExecutor executor, PoseStack.Pose matrices, ClientLevel level) { - for (var transformSetEntry : transformManager.getTransformSetsView().entrySet()) { + for (var transformSetEntry : transformManager.getTransformSetsView() + .entrySet()) { var stage = transformSetEntry.getKey(); var transformSet = transformSetEntry.getValue(); diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java index 5c53a2261..7a2a0e2bc 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/indirect/IndirectEngine.java @@ -11,9 +11,11 @@ import com.jozufozu.flywheel.api.instancer.Instancer; import com.jozufozu.flywheel.api.model.Model; import com.jozufozu.flywheel.api.struct.InstancePart; import com.jozufozu.flywheel.api.struct.StructType; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.gl.GlStateTracker; import com.jozufozu.flywheel.gl.GlTextureUnit; +import com.jozufozu.flywheel.lib.task.PlanUtil; import com.mojang.blaze3d.systems.RenderSystem; import net.minecraft.client.Camera; @@ -40,7 +42,16 @@ public class IndirectEngine implements Engine { @Override public void beginFrame(TaskExecutor executor, RenderContext context) { - try (var restoreState = GlStateTracker.getRestoreState()) { + flushDrawManager(); + } + + @Override + public Plan planThisFrame(RenderContext context) { + return PlanUtil.onMainThread(this::flushDrawManager); + } + + private void flushDrawManager() { + try (var state = GlStateTracker.getRestoreState()) { drawManager.flush(); } } diff --git a/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java b/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java index 80e0a6b3a..d5dff8b41 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/engine/instancing/InstancingEngine.java @@ -12,6 +12,7 @@ import com.jozufozu.flywheel.api.instancer.Instancer; import com.jozufozu.flywheel.api.model.Model; import com.jozufozu.flywheel.api.struct.InstancePart; import com.jozufozu.flywheel.api.struct.StructType; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.backend.Pipelines; import com.jozufozu.flywheel.backend.compile.FlwCompiler; @@ -19,6 +20,7 @@ import com.jozufozu.flywheel.backend.engine.UniformBuffer; import com.jozufozu.flywheel.gl.GlStateTracker; import com.jozufozu.flywheel.gl.GlTextureUnit; import com.jozufozu.flywheel.lib.material.MaterialIndices; +import com.jozufozu.flywheel.lib.task.PlanUtil; import com.mojang.blaze3d.systems.RenderSystem; import net.minecraft.client.Camera; @@ -47,7 +49,16 @@ public class InstancingEngine implements Engine { @Override public void beginFrame(TaskExecutor executor, RenderContext context) { - try (var state = GlStateTracker.getRestoreState()) { + flushDrawManager(); + } + + @Override + public Plan planThisFrame(RenderContext context) { + return PlanUtil.onMainThread(this::flushDrawManager); + } + + private void flushDrawManager() { + try (var restoreState = GlStateTracker.getRestoreState()) { drawManager.flush(); } } diff --git a/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java b/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java index 351b3d8b9..6a43248e0 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java +++ b/src/main/java/com/jozufozu/flywheel/backend/task/ParallelTaskExecutor.java @@ -3,7 +3,9 @@ package com.jozufozu.flywheel.backend.task; import java.util.ArrayList; import java.util.Deque; import java.util.List; +import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.jetbrains.annotations.Nullable; @@ -34,6 +36,7 @@ public class ParallelTaskExecutor implements TaskExecutor { private final List threads = new ArrayList<>(); private final Deque taskQueue = new ConcurrentLinkedDeque<>(); + private final Queue mainThreadQueue = new ConcurrentLinkedQueue<>(); private final Object taskNotifier = new Object(); private final Object tasksCompletedNotifier = new Object(); @@ -118,10 +121,19 @@ public class ParallelTaskExecutor implements TaskExecutor { } synchronized (taskNotifier) { - taskNotifier.notify(); + taskNotifier.notifyAll(); } } + @Override + public void scheduleForMainThread(Runnable runnable) { + if (!running.get()) { + throw new IllegalStateException("Executor is stopped"); + } + + mainThreadQueue.add(runnable); + } + /** * Wait for all running tasks to finish. */ @@ -130,7 +142,7 @@ public class ParallelTaskExecutor implements TaskExecutor { Runnable task; // Finish everyone else's work... - while ((task = taskQueue.pollLast()) != null) { + while ((task = pollForSyncPoint()) != null) { processTask(task); } @@ -146,6 +158,15 @@ public class ParallelTaskExecutor implements TaskExecutor { } } + @Nullable + private Runnable pollForSyncPoint() { + Runnable task = mainThreadQueue.poll(); + if (task != null) { + return task; + } + return taskQueue.pollLast(); + } + public void discardAndAwait() { // Discard everyone else's work... while (taskQueue.pollLast() != null) { diff --git a/src/main/java/com/jozufozu/flywheel/backend/task/SerialTaskExecutor.java b/src/main/java/com/jozufozu/flywheel/backend/task/SerialTaskExecutor.java index 6e273ab1b..f7730b406 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/task/SerialTaskExecutor.java +++ b/src/main/java/com/jozufozu/flywheel/backend/task/SerialTaskExecutor.java @@ -13,6 +13,11 @@ public class SerialTaskExecutor implements TaskExecutor { task.run(); } + @Override + public void scheduleForMainThread(Runnable runnable) { + runnable.run(); + } + @Override public void syncPoint() { } diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java index e974d0b69..435d66a86 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/InstanceWorld.java @@ -11,6 +11,7 @@ import com.jozufozu.flywheel.api.event.RenderStage; import com.jozufozu.flywheel.api.instance.DynamicInstance; import com.jozufozu.flywheel.api.instance.TickableInstance; import com.jozufozu.flywheel.api.instance.effect.Effect; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.backend.task.FlwTaskExecutor; import com.jozufozu.flywheel.backend.task.ParallelTaskExecutor; import com.jozufozu.flywheel.config.FlwCommands; @@ -19,6 +20,7 @@ import com.jozufozu.flywheel.impl.instancing.manager.BlockEntityInstanceManager; import com.jozufozu.flywheel.impl.instancing.manager.EffectInstanceManager; import com.jozufozu.flywheel.impl.instancing.manager.EntityInstanceManager; import com.jozufozu.flywheel.impl.instancing.manager.InstanceManager; +import com.jozufozu.flywheel.lib.task.PlanUtil; import net.minecraft.world.entity.Entity; import net.minecraft.world.level.LevelAccessor; @@ -68,9 +70,12 @@ public class InstanceWorld implements AutoCloseable { *

*/ public void tick(double cameraX, double cameraY, double cameraZ) { - blockEntities.tick(taskExecutor, cameraX, cameraY, cameraZ); - entities.tick(taskExecutor, cameraX, cameraY, cameraZ); - effects.tick(taskExecutor, cameraX, cameraY, cameraZ); + var blockEntityPlan = blockEntities.planThisTick(cameraX, cameraY, cameraZ); + var entityPlan = entities.planThisTick(cameraX, cameraY, cameraZ); + var effectPlan = effects.planThisTick(cameraX, cameraY, cameraZ); + + PlanUtil.of(blockEntityPlan, entityPlan, effectPlan) + .execute(taskExecutor); } /** @@ -82,17 +87,16 @@ public class InstanceWorld implements AutoCloseable { *

*/ public void beginFrame(RenderContext context) { - boolean originChanged = engine.updateRenderOrigin(context.camera()); - - if (originChanged) { - blockEntities.recreateAll(); - entities.recreateAll(); - effects.recreateAll(); - } - taskExecutor.syncPoint(); - if (!originChanged) { + getManagerPlan(context).then(engine.planThisFrame(context)) + .execute(taskExecutor); + } + + private Plan getManagerPlan(RenderContext context) { + if (engine.updateRenderOrigin(context.camera())) { + return PlanUtil.of(blockEntities::recreateAll, entities::recreateAll, effects::recreateAll); + } else { var cameraPos = context.camera() .getPosition(); double cameraX = cameraPos.x; @@ -100,12 +104,8 @@ public class InstanceWorld implements AutoCloseable { double cameraZ = cameraPos.z; FrustumIntersection culler = context.culler(); - blockEntities.beginFrame(taskExecutor, cameraX, cameraY, cameraZ, culler); - entities.beginFrame(taskExecutor, cameraX, cameraY, cameraZ, culler); - effects.beginFrame(taskExecutor, cameraX, cameraY, cameraZ, culler); + return PlanUtil.of(blockEntities.planThisFrame(cameraX, cameraY, cameraZ, culler), entities.planThisFrame(cameraX, cameraY, cameraZ, culler), effects.planThisFrame(cameraX, cameraY, cameraZ, culler)); } - - engine.beginFrame(taskExecutor, context); } /** diff --git a/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java b/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java index ff0c5535d..702006a10 100644 --- a/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java +++ b/src/main/java/com/jozufozu/flywheel/impl/instancing/manager/InstanceManager.java @@ -1,22 +1,21 @@ package com.jozufozu.flywheel.impl.instancing.manager; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.Consumer; import org.joml.FrustumIntersection; import com.jozufozu.flywheel.api.instance.DynamicInstance; import com.jozufozu.flywheel.api.instance.Instance; import com.jozufozu.flywheel.api.instance.TickableInstance; -import com.jozufozu.flywheel.api.task.TaskExecutor; +import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.config.FlwConfig; import com.jozufozu.flywheel.impl.instancing.ratelimit.BandedPrimeLimiter; import com.jozufozu.flywheel.impl.instancing.ratelimit.DistanceUpdateLimiter; import com.jozufozu.flywheel.impl.instancing.ratelimit.NonLimiter; import com.jozufozu.flywheel.impl.instancing.storage.Storage; import com.jozufozu.flywheel.impl.instancing.storage.Transaction; +import com.jozufozu.flywheel.lib.task.PlanUtil; public abstract class InstanceManager { private final Queue> queue = new ConcurrentLinkedQueue<>(); @@ -115,21 +114,10 @@ public abstract class InstanceManager { } } - /** - * Ticks the InstanceManager. - * - *

- * {@link TickableInstance}s get ticked. - *
- * Queued updates are processed. - *

- */ - public void tick(TaskExecutor executor, double cameraX, double cameraY, double cameraZ) { + public Plan planThisTick(double cameraX, double cameraY, double cameraZ) { tickLimiter.tick(); processQueue(); - - var instances = getStorage().getTickableInstances(); - distributeWork(executor, instances, instance -> tickInstance(instance, cameraX, cameraY, cameraZ)); + return PlanUtil.runOnAll(getStorage()::getTickableInstances, instance -> tickInstance(instance, cameraX, cameraY, cameraZ)); } protected void tickInstance(TickableInstance instance, double cameraX, double cameraY, double cameraZ) { @@ -138,12 +126,10 @@ public abstract class InstanceManager { } } - public void beginFrame(TaskExecutor executor, double cameraX, double cameraY, double cameraZ, FrustumIntersection frustum) { + public Plan planThisFrame(double cameraX, double cameraY, double cameraZ, FrustumIntersection frustum) { frameLimiter.tick(); processQueue(); - - var instances = getStorage().getDynamicInstances(); - distributeWork(executor, instances, instance -> updateInstance(instance, cameraX, cameraY, cameraZ, frustum)); + return PlanUtil.runOnAll(getStorage()::getDynamicInstances, instance -> updateInstance(instance, cameraX, cameraY, cameraZ, frustum)); } protected void updateInstance(DynamicInstance instance, double cameraX, double cameraY, double cameraZ, FrustumIntersection frustum) { @@ -153,21 +139,4 @@ public abstract class InstanceManager { } } } - - private static void distributeWork(TaskExecutor executor, List instances, Consumer action) { - final int size = instances.size(); - final int threadCount = executor.getThreadCount(); - - if (threadCount == 1) { - executor.execute(() -> instances.forEach(action)); - } else { - final int stride = Math.max(size / (threadCount * 2), 1); - for (int start = 0; start < size; start += stride) { - int end = Math.min(start + stride, size); - - var sub = instances.subList(start, end); - executor.execute(() -> sub.forEach(action)); - } - } - } } diff --git a/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java b/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java index 3bd003cd6..363b3ac9f 100644 --- a/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java +++ b/src/main/java/com/jozufozu/flywheel/lib/light/LightUpdater.java @@ -1,13 +1,9 @@ package com.jozufozu.flywheel.lib.light; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Stream; -import com.jozufozu.flywheel.backend.task.FlwTaskExecutor; import com.jozufozu.flywheel.lib.box.ImmutableBox; -import com.jozufozu.flywheel.lib.task.WorkGroup; import com.jozufozu.flywheel.util.FlwUtil; import com.jozufozu.flywheel.util.WorldAttached; @@ -57,19 +53,6 @@ public class LightUpdater { } } - private void tickParallel() { - Queue listeners = new ConcurrentLinkedQueue<>(); - - WorkGroup.builder() - .addTasks(tickingListeners.stream(), listener -> { - if (listener.tickLightListener()) { - listeners.add(listener); - } - }) - .onComplete(() -> listeners.forEach(this::addListener)) - .execute(FlwTaskExecutor.get()); - } - /** * Add a listener. * diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java new file mode 100644 index 000000000..8b28e9361 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/BarrierPlan.java @@ -0,0 +1,11 @@ +package com.jozufozu.flywheel.lib.task; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record BarrierPlan(Plan first, Plan second) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + first.execute(taskExecutor, () -> second.execute(taskExecutor, onCompletion)); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java new file mode 100644 index 000000000..c440b35df --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/NestedPlan.java @@ -0,0 +1,29 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.List; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record NestedPlan(List parallelPlans) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + if (parallelPlans.isEmpty()) { + onCompletion.run(); + return; + } + + var size = parallelPlans.size(); + + if (size == 1) { + parallelPlans.get(0) + .execute(taskExecutor, onCompletion); + return; + } + + var wait = new Synchronizer(size, onCompletion); + for (Plan plan : parallelPlans) { + plan.execute(taskExecutor, wait::decrementAndEventuallyRun); + } + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/OnMainThreadPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/OnMainThreadPlan.java new file mode 100644 index 000000000..9ef4993fc --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/OnMainThreadPlan.java @@ -0,0 +1,15 @@ +package com.jozufozu.flywheel.lib.task; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record OnMainThreadPlan(Runnable task) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + // TODO: detect if we're already on the render thread and just run the task directly + taskExecutor.scheduleForMainThread(() -> { + task.run(); + onCompletion.run(); + }); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java b/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java new file mode 100644 index 000000000..4d19f2e80 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/PlanUtil.java @@ -0,0 +1,29 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.jozufozu.flywheel.api.task.Plan; + +public class PlanUtil { + public static Plan runOnAll(Supplier> iterable, Consumer forEach) { + return new RunOnAllPlan<>(iterable, forEach); + } + + public static Plan of() { + return UnitPlan.INSTANCE; + } + + public static Plan of(Plan... plans) { + return new NestedPlan(List.of(plans)); + } + + public static Plan of(Runnable... tasks) { + return new SimplePlan(List.of(tasks)); + } + + public static Plan onMainThread(Runnable task) { + return new OnMainThreadPlan(task); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java new file mode 100644 index 000000000..3f36022e5 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/RunOnAllPlan.java @@ -0,0 +1,57 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record RunOnAllPlan(Supplier> listSupplier, Consumer action) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + taskExecutor.execute(() -> { + var list = listSupplier.get(); + final int size = list.size(); + + if (size == 0) { + onCompletion.run(); + } else if (size <= getChunkingThreshold(taskExecutor)) { + processList(list, onCompletion); + } else { + dispatchChunks(list, taskExecutor, onCompletion); + } + }); + } + + private void dispatchChunks(List suppliedList, TaskExecutor taskExecutor, Runnable onCompletion) { + final int size = suppliedList.size(); + final int threadCount = taskExecutor.getThreadCount(); + + final int chunkSize = (size + threadCount - 1) / threadCount; // ceiling division + final int chunkCount = (size + chunkSize - 1) / chunkSize; // ceiling division + + var synchronizer = new Synchronizer(chunkCount, onCompletion); + int remaining = size; + + while (remaining > 0) { + int end = remaining; + remaining -= chunkSize; + int start = Math.max(remaining, 0); + + var subList = suppliedList.subList(start, end); + taskExecutor.execute(() -> processList(subList, synchronizer::decrementAndEventuallyRun)); + } + } + + private void processList(List suppliedList, Runnable onCompletion) { + for (T t : suppliedList) { + action.accept(t); + } + onCompletion.run(); + } + + private static int getChunkingThreshold(TaskExecutor taskExecutor) { + return 512; + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java new file mode 100644 index 000000000..a20ec1192 --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/SimplePlan.java @@ -0,0 +1,24 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.List; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public record SimplePlan(List parallelTasks) implements Plan { + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + if (parallelTasks.isEmpty()) { + onCompletion.run(); + return; + } + + var synchronizer = new Synchronizer(parallelTasks.size(), onCompletion); + for (Runnable task : parallelTasks) { + taskExecutor.execute(() -> { + task.run(); + synchronizer.decrementAndEventuallyRun(); + }); + } + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/Synchronizer.java b/src/main/java/com/jozufozu/flywheel/lib/task/Synchronizer.java new file mode 100644 index 000000000..51c9716be --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/Synchronizer.java @@ -0,0 +1,19 @@ +package com.jozufozu.flywheel.lib.task; + +import java.util.concurrent.atomic.AtomicInteger; + +public class Synchronizer { + private final AtomicInteger countDown; + private final Runnable onCompletion; + + public Synchronizer(int countDown, Runnable onCompletion) { + this.countDown = new AtomicInteger(countDown); + this.onCompletion = onCompletion; + } + + public void decrementAndEventuallyRun() { + if (countDown.decrementAndGet() == 0) { + onCompletion.run(); + } + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/UnitPlan.java b/src/main/java/com/jozufozu/flywheel/lib/task/UnitPlan.java new file mode 100644 index 000000000..8894b02dd --- /dev/null +++ b/src/main/java/com/jozufozu/flywheel/lib/task/UnitPlan.java @@ -0,0 +1,16 @@ +package com.jozufozu.flywheel.lib.task; + +import com.jozufozu.flywheel.api.task.Plan; +import com.jozufozu.flywheel.api.task.TaskExecutor; + +public class UnitPlan implements Plan { + public static final UnitPlan INSTANCE = new UnitPlan(); + + private UnitPlan() { + } + + @Override + public void execute(TaskExecutor taskExecutor, Runnable onCompletion) { + onCompletion.run(); + } +} diff --git a/src/main/java/com/jozufozu/flywheel/lib/task/WorkGroup.java b/src/main/java/com/jozufozu/flywheel/lib/task/WorkGroup.java deleted file mode 100644 index 8de35d7b5..000000000 --- a/src/main/java/com/jozufozu/flywheel/lib/task/WorkGroup.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.jozufozu.flywheel.lib.task; - -import java.util.Iterator; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.Stream; - -import org.jetbrains.annotations.Nullable; - -public final class WorkGroup { - public static void run(Iterator tasks, Executor executor) { - tasks.forEachRemaining(executor::execute); - } - - public static void run(Iterator tasks, @Nullable Runnable finalizer, Executor executor) { - if (finalizer == null) { - run(tasks, executor); - return; - } - - AtomicInteger incompleteTaskCounter = new AtomicInteger(0); - tasks.forEachRemaining(task -> { - incompleteTaskCounter.incrementAndGet(); - executor.execute(() -> { - task.run(); - if (incompleteTaskCounter.decrementAndGet() == 0) { - executor.execute(finalizer); - } - }); - }); - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - @Nullable - private Runnable finalizer; - private Stream tasks; - - public Builder() { - } - - public Builder addTasks(Stream iterable, Consumer consumer) { - return addTasks(iterable.map(it -> () -> consumer.accept(it))); - } - - public Builder addTasks(Stream tasks) { - if (this.tasks == null) { - this.tasks = tasks; - } else { - this.tasks = Stream.concat(this.tasks, tasks); - } - return this; - } - - public Builder onComplete(Runnable runnable) { - this.finalizer = runnable; - return this; - } - - public void execute(Executor executor) { - if (tasks == null) { - return; - } - - run(tasks.iterator(), finalizer, executor); - } - } -}