mirror of
https://github.com/Jozufozu/Flywheel.git
synced 2025-01-28 05:44:59 +01:00
Moving in parallel
- LightUpdates now uses task engine to update listeners in parallel - Basic workgroup system to run something on the main thread after a group of tasks is complete
This commit is contained in:
parent
35f513a2a0
commit
0c53a55104
4 changed files with 139 additions and 23 deletions
|
@ -32,7 +32,7 @@ public class InstanceWorld {
|
||||||
protected final InstanceManager<Entity> entityInstanceManager;
|
protected final InstanceManager<Entity> entityInstanceManager;
|
||||||
protected final InstanceManager<BlockEntity> blockEntityInstanceManager;
|
protected final InstanceManager<BlockEntity> blockEntityInstanceManager;
|
||||||
|
|
||||||
protected final ParallelTaskEngine taskEngine;
|
public final ParallelTaskEngine taskEngine;
|
||||||
|
|
||||||
public InstanceWorld(LevelAccessor levelAccessor) {
|
public InstanceWorld(LevelAccessor levelAccessor) {
|
||||||
Level world = (Level) levelAccessor;
|
Level world = (Level) levelAccessor;
|
||||||
|
|
|
@ -54,18 +54,24 @@ public class InstancedRenderDispatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static InstanceManager<BlockEntity> getBlockEntities(LevelAccessor world) {
|
public static InstanceManager<BlockEntity> getBlockEntities(LevelAccessor world) {
|
||||||
if (Backend.isOn()) {
|
return getInstanceWorld(world).getBlockEntityInstanceManager();
|
||||||
return instanceWorlds.get(world)
|
|
||||||
.getBlockEntityInstanceManager();
|
|
||||||
} else {
|
|
||||||
throw new NullPointerException("Backend is off, cannot retrieve instance world.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static InstanceManager<Entity> getEntities(LevelAccessor world) {
|
public static InstanceManager<Entity> getEntities(LevelAccessor world) {
|
||||||
|
return getInstanceWorld(world).getEntityInstanceManager();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ParallelTaskEngine getTaskEngine(LevelAccessor world) {
|
||||||
|
return getInstanceWorld(world).taskEngine;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or create the {@link InstanceWorld} for the given world.
|
||||||
|
* @throws NullPointerException if the backend is off
|
||||||
|
*/
|
||||||
|
public static InstanceWorld getInstanceWorld(LevelAccessor world) {
|
||||||
if (Backend.isOn()) {
|
if (Backend.isOn()) {
|
||||||
return instanceWorlds.get(world)
|
return instanceWorlds.get(world);
|
||||||
.getEntityInstanceManager();
|
|
||||||
} else {
|
} else {
|
||||||
throw new NullPointerException("Backend is off, cannot retrieve instance world.");
|
throw new NullPointerException("Backend is off, cannot retrieve instance world.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,9 @@ import java.util.Deque;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
@ -20,9 +23,15 @@ import net.minecraft.util.Mth;
|
||||||
public class ParallelTaskEngine implements TaskEngine {
|
public class ParallelTaskEngine implements TaskEngine {
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger("BatchExecutor");
|
private static final Logger LOGGER = LoggerFactory.getLogger("BatchExecutor");
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If set to false, the engine will shut down.
|
||||||
|
*/
|
||||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
private final WaitGroup wg = new WaitGroup();
|
private final WaitGroup wg = new WaitGroup();
|
||||||
|
|
||||||
|
private final Deque<Runnable> syncTasks = new ConcurrentLinkedDeque<>();
|
||||||
private final Deque<Runnable> jobQueue = new ConcurrentLinkedDeque<>();
|
private final Deque<Runnable> jobQueue = new ConcurrentLinkedDeque<>();
|
||||||
private final List<Thread> threads = new ArrayList<>();
|
private final List<Thread> threads = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -30,13 +39,15 @@ public class ParallelTaskEngine implements TaskEngine {
|
||||||
|
|
||||||
private final int threadCount;
|
private final int threadCount;
|
||||||
|
|
||||||
private final String name;
|
|
||||||
|
|
||||||
public ParallelTaskEngine(String name) {
|
public ParallelTaskEngine(String name) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
threadCount = getOptimalThreadCount();
|
threadCount = getOptimalThreadCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public WorkGroupBuilder group(String name) {
|
||||||
|
return new WorkGroupBuilder(name);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Spawns a number of work-stealing threads to process results in the build queue. If the builder is already
|
* Spawns a number of work-stealing threads to process results in the build queue. If the builder is already
|
||||||
* running, this method does nothing and exits.
|
* running, this method does nothing and exits.
|
||||||
|
@ -117,6 +128,10 @@ public class ParallelTaskEngine implements TaskEngine {
|
||||||
this.wg.await();
|
this.wg.await();
|
||||||
} catch (InterruptedException ignored) {
|
} catch (InterruptedException ignored) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while ((job = this.syncTasks.pollLast()) != null) {
|
||||||
|
job.run();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
@ -157,6 +172,7 @@ public class ParallelTaskEngine implements TaskEngine {
|
||||||
private static int getMaxThreadCount() {
|
private static int getMaxThreadCount() {
|
||||||
return Runtime.getRuntime().availableProcessors();
|
return Runtime.getRuntime().availableProcessors();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class WorkerRunnable implements Runnable {
|
private class WorkerRunnable implements Runnable {
|
||||||
|
|
||||||
private final AtomicBoolean running = ParallelTaskEngine.this.running;
|
private final AtomicBoolean running = ParallelTaskEngine.this.running;
|
||||||
|
@ -176,4 +192,87 @@ public class ParallelTaskEngine implements TaskEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class WorkGroupBuilder {
|
||||||
|
final String name;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
Runnable finalizer;
|
||||||
|
|
||||||
|
Stream<Runnable> tasks;
|
||||||
|
|
||||||
|
public WorkGroupBuilder(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> WorkGroupBuilder addTasks(Stream<T> iterable, Consumer<T> consumer) {
|
||||||
|
return addTasks(iterable.map(it -> () -> consumer.accept(it)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkGroupBuilder addTasks(Stream<Runnable> tasks) {
|
||||||
|
if (this.tasks == null) {
|
||||||
|
this.tasks = tasks;
|
||||||
|
} else {
|
||||||
|
this.tasks = Stream.concat(this.tasks, tasks);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkGroupBuilder onComplete(Runnable runnable) {
|
||||||
|
this.finalizer = runnable;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void submit() {
|
||||||
|
if (this.tasks == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkGroup workGroup = new WorkGroup(name, finalizer);
|
||||||
|
|
||||||
|
tasks.map(task -> new WorkGroupTask(workGroup, task)).forEach(ParallelTaskEngine.this::submit);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class WorkGroupTask implements Runnable {
|
||||||
|
|
||||||
|
private final WorkGroup parent;
|
||||||
|
private final Runnable wrapped;
|
||||||
|
|
||||||
|
public WorkGroupTask(WorkGroup parent, Runnable wrapped) {
|
||||||
|
this.parent = parent;
|
||||||
|
this.wrapped = wrapped;
|
||||||
|
|
||||||
|
this.parent.running.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
this.wrapped.run();
|
||||||
|
|
||||||
|
this.parent.oneDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class WorkGroup {
|
||||||
|
final String name;
|
||||||
|
|
||||||
|
final Runnable finalizer;
|
||||||
|
|
||||||
|
final AtomicInteger running = new AtomicInteger(0);
|
||||||
|
|
||||||
|
public WorkGroup(String name, @Nullable Runnable finalizer) {
|
||||||
|
this.name = name;
|
||||||
|
this.finalizer = finalizer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void oneDown() {
|
||||||
|
if (running.decrementAndGet() == 0) {
|
||||||
|
if (finalizer != null) {
|
||||||
|
ParallelTaskEngine.this.syncTasks.add(finalizer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +1,21 @@
|
||||||
package com.jozufozu.flywheel.light;
|
package com.jozufozu.flywheel.light;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.Queue;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import com.jozufozu.flywheel.backend.instancing.InstancedRenderDispatcher;
|
||||||
|
import com.jozufozu.flywheel.backend.instancing.ParallelTaskEngine;
|
||||||
import com.jozufozu.flywheel.util.WeakHashSet;
|
import com.jozufozu.flywheel.util.WeakHashSet;
|
||||||
|
import com.jozufozu.flywheel.util.WorldAttached;
|
||||||
import com.jozufozu.flywheel.util.box.GridAlignedBB;
|
import com.jozufozu.flywheel.util.box.GridAlignedBB;
|
||||||
import com.jozufozu.flywheel.util.box.ImmutableBox;
|
import com.jozufozu.flywheel.util.box.ImmutableBox;
|
||||||
|
|
||||||
import it.unimi.dsi.fastutil.longs.LongSet;
|
import it.unimi.dsi.fastutil.longs.LongSet;
|
||||||
import net.minecraft.core.BlockPos;
|
import net.minecraft.core.BlockPos;
|
||||||
import net.minecraft.core.SectionPos;
|
import net.minecraft.core.SectionPos;
|
||||||
import net.minecraft.world.level.BlockAndTintGetter;
|
import net.minecraft.world.level.LevelAccessor;
|
||||||
import net.minecraft.world.level.LightLayer;
|
import net.minecraft.world.level.LightLayer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -20,9 +23,11 @@ import net.minecraft.world.level.LightLayer;
|
||||||
*/
|
*/
|
||||||
public class LightUpdater {
|
public class LightUpdater {
|
||||||
|
|
||||||
private static final Map<BlockAndTintGetter, LightUpdater> light = new HashMap<>();
|
private static final WorldAttached<LightUpdater> light = new WorldAttached<>(LightUpdater::new);
|
||||||
public static LightUpdater get(BlockAndTintGetter world) {
|
private final ParallelTaskEngine taskEngine;
|
||||||
return light.computeIfAbsent(world, LightUpdater::new);
|
|
||||||
|
public static LightUpdater get(LevelAccessor world) {
|
||||||
|
return light.get(world);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final LightProvider provider;
|
private final LightProvider provider;
|
||||||
|
@ -31,7 +36,8 @@ public class LightUpdater {
|
||||||
private final WeakContainmentMultiMap<LightListener> sections = new WeakContainmentMultiMap<>();
|
private final WeakContainmentMultiMap<LightListener> sections = new WeakContainmentMultiMap<>();
|
||||||
private final WeakContainmentMultiMap<LightListener> chunks = new WeakContainmentMultiMap<>();
|
private final WeakContainmentMultiMap<LightListener> chunks = new WeakContainmentMultiMap<>();
|
||||||
|
|
||||||
public LightUpdater(BlockAndTintGetter world) {
|
public LightUpdater(LevelAccessor world) {
|
||||||
|
taskEngine = InstancedRenderDispatcher.getTaskEngine(world);
|
||||||
provider = new BasicProvider(world);
|
provider = new BasicProvider(world);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,11 +46,16 @@ public class LightUpdater {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void tick() {
|
public void tick() {
|
||||||
for (MovingListener listener : movingListeners) {
|
Queue<LightListener> listeners = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
|
taskEngine.group("LightUpdater")
|
||||||
|
.addTasks(movingListeners.stream(), listener -> {
|
||||||
if (listener.update(provider)) {
|
if (listener.update(provider)) {
|
||||||
addListener(listener);
|
listeners.add(listener);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
.onComplete(() -> listeners.forEach(this::addListener))
|
||||||
|
.submit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue