From 19d757d7028b43569f15f4ad21431ec668296ec8 Mon Sep 17 00:00:00 2001 From: Jozufozu Date: Wed, 16 Mar 2022 23:43:01 -0700 Subject: [PATCH] Moving in parallel - LightUpdates now uses task engine to update listeners in parallel - Basic workgroup system to run something on the main thread after a group of tasks is complete --- .../backend/instancing/InstanceWorld.java | 2 +- .../instancing/InstancedRenderDispatcher.java | 22 ++-- .../instancing/ParallelTaskEngine.java | 103 +++++++++++++++++- .../jozufozu/flywheel/light/LightUpdater.java | 35 ++++-- 4 files changed, 139 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/jozufozu/flywheel/backend/instancing/InstanceWorld.java b/src/main/java/com/jozufozu/flywheel/backend/instancing/InstanceWorld.java index 23d248b8a..ac6b9037a 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/instancing/InstanceWorld.java +++ b/src/main/java/com/jozufozu/flywheel/backend/instancing/InstanceWorld.java @@ -32,7 +32,7 @@ public class InstanceWorld { protected final InstanceManager entityInstanceManager; protected final InstanceManager blockEntityInstanceManager; - protected final ParallelTaskEngine taskEngine; + public final ParallelTaskEngine taskEngine; public InstanceWorld(LevelAccessor levelAccessor) { Level world = (Level) levelAccessor; diff --git a/src/main/java/com/jozufozu/flywheel/backend/instancing/InstancedRenderDispatcher.java b/src/main/java/com/jozufozu/flywheel/backend/instancing/InstancedRenderDispatcher.java index ca423380b..908f2381b 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/instancing/InstancedRenderDispatcher.java +++ b/src/main/java/com/jozufozu/flywheel/backend/instancing/InstancedRenderDispatcher.java @@ -54,18 +54,24 @@ public class InstancedRenderDispatcher { } public static InstanceManager getBlockEntities(LevelAccessor world) { - if (Backend.isOn()) { - return instanceWorlds.get(world) - .getBlockEntityInstanceManager(); - } else { - throw new NullPointerException("Backend is off, cannot retrieve instance world."); - } + return getInstanceWorld(world).getBlockEntityInstanceManager(); } public static InstanceManager getEntities(LevelAccessor world) { + return getInstanceWorld(world).getEntityInstanceManager(); + } + + public static ParallelTaskEngine getTaskEngine(LevelAccessor world) { + return getInstanceWorld(world).taskEngine; + } + + /** + * Get or create the {@link InstanceWorld} for the given world. + * @throws NullPointerException if the backend is off + */ + public static InstanceWorld getInstanceWorld(LevelAccessor world) { if (Backend.isOn()) { - return instanceWorlds.get(world) - .getEntityInstanceManager(); + return instanceWorlds.get(world); } else { throw new NullPointerException("Backend is off, cannot retrieve instance world."); } diff --git a/src/main/java/com/jozufozu/flywheel/backend/instancing/ParallelTaskEngine.java b/src/main/java/com/jozufozu/flywheel/backend/instancing/ParallelTaskEngine.java index 19f5b2f59..cea0cbadb 100644 --- a/src/main/java/com/jozufozu/flywheel/backend/instancing/ParallelTaskEngine.java +++ b/src/main/java/com/jozufozu/flywheel/backend/instancing/ParallelTaskEngine.java @@ -5,6 +5,9 @@ import java.util.Deque; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -20,9 +23,15 @@ import net.minecraft.util.Mth; public class ParallelTaskEngine implements TaskEngine { private static final Logger LOGGER = LoggerFactory.getLogger("BatchExecutor"); + private final String name; + + /** + * If set to false, the engine will shut down. + */ private final AtomicBoolean running = new AtomicBoolean(false); private final WaitGroup wg = new WaitGroup(); + private final Deque syncTasks = new ConcurrentLinkedDeque<>(); private final Deque jobQueue = new ConcurrentLinkedDeque<>(); private final List threads = new ArrayList<>(); @@ -30,13 +39,15 @@ public class ParallelTaskEngine implements TaskEngine { private final int threadCount; - private final String name; - public ParallelTaskEngine(String name) { this.name = name; threadCount = getOptimalThreadCount(); } + public WorkGroupBuilder group(String name) { + return new WorkGroupBuilder(name); + } + /** * Spawns a number of work-stealing threads to process results in the build queue. If the builder is already * running, this method does nothing and exits. @@ -117,6 +128,10 @@ public class ParallelTaskEngine implements TaskEngine { this.wg.await(); } catch (InterruptedException ignored) { } + + while ((job = this.syncTasks.pollLast()) != null) { + job.run(); + } } @Nullable @@ -157,6 +172,7 @@ public class ParallelTaskEngine implements TaskEngine { private static int getMaxThreadCount() { return Runtime.getRuntime().availableProcessors(); } + private class WorkerRunnable implements Runnable { private final AtomicBoolean running = ParallelTaskEngine.this.running; @@ -176,4 +192,87 @@ public class ParallelTaskEngine implements TaskEngine { } } + + public class WorkGroupBuilder { + final String name; + + @Nullable + Runnable finalizer; + + Stream tasks; + + public WorkGroupBuilder(String name) { + this.name = name; + } + + public WorkGroupBuilder addTasks(Stream iterable, Consumer consumer) { + return addTasks(iterable.map(it -> () -> consumer.accept(it))); + } + + public WorkGroupBuilder addTasks(Stream tasks) { + if (this.tasks == null) { + this.tasks = tasks; + } else { + this.tasks = Stream.concat(this.tasks, tasks); + } + return this; + } + + public WorkGroupBuilder onComplete(Runnable runnable) { + this.finalizer = runnable; + return this; + } + + public void submit() { + if (this.tasks == null) { + return; + } + + WorkGroup workGroup = new WorkGroup(name, finalizer); + + tasks.map(task -> new WorkGroupTask(workGroup, task)).forEach(ParallelTaskEngine.this::submit); + } + + } + + private static class WorkGroupTask implements Runnable { + + private final WorkGroup parent; + private final Runnable wrapped; + + public WorkGroupTask(WorkGroup parent, Runnable wrapped) { + this.parent = parent; + this.wrapped = wrapped; + + this.parent.running.incrementAndGet(); + } + + @Override + public void run() { + this.wrapped.run(); + + this.parent.oneDown(); + } + } + + private class WorkGroup { + final String name; + + final Runnable finalizer; + + final AtomicInteger running = new AtomicInteger(0); + + public WorkGroup(String name, @Nullable Runnable finalizer) { + this.name = name; + this.finalizer = finalizer; + } + + public void oneDown() { + if (running.decrementAndGet() == 0) { + if (finalizer != null) { + ParallelTaskEngine.this.syncTasks.add(finalizer); + } + } + } + } } diff --git a/src/main/java/com/jozufozu/flywheel/light/LightUpdater.java b/src/main/java/com/jozufozu/flywheel/light/LightUpdater.java index cf6a8d491..7692126a9 100644 --- a/src/main/java/com/jozufozu/flywheel/light/LightUpdater.java +++ b/src/main/java/com/jozufozu/flywheel/light/LightUpdater.java @@ -1,18 +1,21 @@ package com.jozufozu.flywheel.light; -import java.util.HashMap; -import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Stream; +import com.jozufozu.flywheel.backend.instancing.InstancedRenderDispatcher; +import com.jozufozu.flywheel.backend.instancing.ParallelTaskEngine; import com.jozufozu.flywheel.util.WeakHashSet; +import com.jozufozu.flywheel.util.WorldAttached; import com.jozufozu.flywheel.util.box.GridAlignedBB; import com.jozufozu.flywheel.util.box.ImmutableBox; import it.unimi.dsi.fastutil.longs.LongSet; import net.minecraft.core.BlockPos; import net.minecraft.core.SectionPos; -import net.minecraft.world.level.BlockAndTintGetter; +import net.minecraft.world.level.LevelAccessor; import net.minecraft.world.level.LightLayer; /** @@ -20,9 +23,11 @@ import net.minecraft.world.level.LightLayer; */ public class LightUpdater { - private static final Map light = new HashMap<>(); - public static LightUpdater get(BlockAndTintGetter world) { - return light.computeIfAbsent(world, LightUpdater::new); + private static final WorldAttached light = new WorldAttached<>(LightUpdater::new); + private final ParallelTaskEngine taskEngine; + + public static LightUpdater get(LevelAccessor world) { + return light.get(world); } private final LightProvider provider; @@ -31,7 +36,8 @@ public class LightUpdater { private final WeakContainmentMultiMap sections = new WeakContainmentMultiMap<>(); private final WeakContainmentMultiMap chunks = new WeakContainmentMultiMap<>(); - public LightUpdater(BlockAndTintGetter world) { + public LightUpdater(LevelAccessor world) { + taskEngine = InstancedRenderDispatcher.getTaskEngine(world); provider = new BasicProvider(world); } @@ -40,11 +46,16 @@ public class LightUpdater { } public void tick() { - for (MovingListener listener : movingListeners) { - if (listener.update(provider)) { - addListener(listener); - } - } + Queue listeners = new ConcurrentLinkedQueue<>(); + + taskEngine.group("LightUpdater") + .addTasks(movingListeners.stream(), listener -> { + if (listener.update(provider)) { + listeners.add(listener); + } + }) + .onComplete(() -> listeners.forEach(this::addListener)) + .submit(); } /**