Proper task engine

- Manual threadpool
 - More control
This commit is contained in:
Jozufozu 2021-12-17 01:03:52 -08:00
parent 431ff92861
commit f651676dea
11 changed files with 245 additions and 66 deletions

View file

@ -0,0 +1,172 @@
package com.jozufozu.flywheel.backend.instancing;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import com.jozufozu.flywheel.backend.instancing.batching.WaitGroup;
import net.minecraft.util.Mth;
// https://github.com/CaffeineMC/sodium-fabric/blob/5d364ed5ba63f9067fcf72a078ca310bff4db3e9/src/main/java/me/jellysquid/mods/sodium/client/render/chunk/compile/ChunkBuilder.java
public class BatchExecutor implements TaskEngine {
private static final Logger LOGGER = LogManager.getLogger("BatchExecutor");
private final AtomicBoolean running = new AtomicBoolean(false);
private final WaitGroup wg = new WaitGroup();
private final Deque<Runnable> jobQueue = new ConcurrentLinkedDeque<>();
private final List<Thread> threads = new ArrayList<>();
private final Object jobNotifier = new Object();
private final int threadCount;
public BatchExecutor() {
threadCount = getOptimalThreadCount();
}
/**
* Spawns a number of work-stealing threads to process results in the build queue. If the builder is already
* running, this method does nothing and exits.
*/
public void startWorkers() {
if (this.running.getAndSet(true)) {
return;
}
if (!this.threads.isEmpty()) {
throw new IllegalStateException("Threads are still alive while in the STOPPED state");
}
for (int i = 0; i < this.threadCount; i++) {
Thread thread = new Thread(new WorkerRunnable(), "Engine Executor " + i);
thread.setPriority(Math.max(0, Thread.NORM_PRIORITY - 2));
thread.start();
this.threads.add(thread);
}
LOGGER.info("Started {} worker threads", this.threads.size());
}
public void stopWorkers() {
if (!this.running.getAndSet(false)) {
return;
}
if (this.threads.isEmpty()) {
throw new IllegalStateException("No threads are alive but the executor is in the RUNNING state");
}
synchronized (this.jobNotifier) {
this.jobNotifier.notifyAll();
}
try {
for (Thread thread : this.threads) {
thread.join();
}
} catch (InterruptedException ignored) {
}
this.threads.clear();
this.jobQueue.clear();
}
/**
* Submit a task to the pool.
*/
@Override
public void submit(@NotNull Runnable command) {
this.jobQueue.add(command);
this.wg.add(1);
synchronized (this.jobNotifier) {
this.jobNotifier.notify();
}
}
/**
* Wait for all running jobs to finish.
*/
@Override
public void syncPoint() {
Runnable job;
// Finish everyone else's work...
while ((job = getNextTask(false)) != null) {
processTask(job);
}
// and wait for any stragglers.
try {
this.wg.await();
} catch (InterruptedException ignored) {
}
}
@Nullable
private Runnable getNextTask(boolean block) {
Runnable job = this.jobQueue.poll();
if (job == null && block) {
synchronized (BatchExecutor.this.jobNotifier) {
try {
BatchExecutor.this.jobNotifier.wait();
} catch (InterruptedException ignored) {
}
}
}
return job;
}
private void processTask(Runnable job) {
try {
job.run();
} finally {
BatchExecutor.this.wg.done();
}
}
/**
* Returns the "optimal" number of threads to be used for chunk build tasks. This will always return at least one
* thread.
*/
private static int getOptimalThreadCount() {
return Mth.clamp(Math.max(getMaxThreadCount() / 3, getMaxThreadCount() - 6), 1, 10);
}
private static int getMaxThreadCount() {
return Runtime.getRuntime().availableProcessors();
}
private class WorkerRunnable implements Runnable {
private final AtomicBoolean running = BatchExecutor.this.running;
@Override
public void run() {
// Run until the chunk builder shuts down
while (this.running.get()) {
Runnable job = BatchExecutor.this.getNextTask(true);
if (job == null) {
continue;
}
processTask(job);
}
}
}
}

View file

@ -0,0 +1,21 @@
package com.jozufozu.flywheel.backend.instancing;
import org.jetbrains.annotations.NotNull;
public class ImmediateExecutor implements TaskEngine {
public static final ImmediateExecutor INSTANCE = new ImmediateExecutor();
private ImmediateExecutor() {
}
@Override
public void submit(@NotNull Runnable command) {
command.run();
}
@Override
public void syncPoint() {
// noop
}
}

View file

@ -29,15 +29,20 @@ public class InstanceWorld {
protected final InstanceManager<Entity> entityInstanceManager;
protected final InstanceManager<BlockEntity> tileEntityInstanceManager;
protected final BatchExecutor executor;
public InstanceWorld() {
this.executor = new BatchExecutor();
this.executor.startWorkers();
FlwEngine engine = Backend.getInstance()
.getEngine();
switch (engine) {
case GL33 -> {
InstancingEngine<WorldProgram> manager = InstancingEngine.builder(Contexts.WORLD)
.build();
.build(this.executor);
entityInstanceManager = new EntityInstanceManager(manager);
tileEntityInstanceManager = new TileInstanceManager(manager);
@ -47,7 +52,7 @@ public class InstanceWorld {
this.engine = manager;
}
case BATCHING -> {
this.engine = new BatchingEngine();
this.engine = new BatchingEngine(this.executor);
entityInstanceManager = new EntityInstanceManager(this.engine);
tileEntityInstanceManager = new TileInstanceManager(this.engine);
}
@ -67,6 +72,7 @@ public class InstanceWorld {
* Free all acquired resources and invalidate this instance world.
*/
public void delete() {
this.executor.stopWorkers();
engine.delete();
entityInstanceManager.detachLightListeners();
tileEntityInstanceManager.detachLightListeners();

View file

@ -21,7 +21,5 @@ public interface RenderDispatcher {
*/
void beginFrame(Camera info);
default void delete() {
}
void delete();
}

View file

@ -0,0 +1,12 @@
package com.jozufozu.flywheel.backend.instancing;
import org.jetbrains.annotations.NotNull;
public interface TaskEngine {
void submit(@NotNull Runnable command);
/**
* Wait for all running jobs to finish.
*/
void syncPoint();
}

View file

@ -1,34 +0,0 @@
package com.jozufozu.flywheel.backend.instancing.batching;
import java.util.concurrent.Executor;
import org.jetbrains.annotations.NotNull;
public class BatchExecutor implements Executor {
private final Executor internal;
private final WaitGroup wg;
public BatchExecutor(Executor internal) {
this.internal = internal;
wg = new WaitGroup();
}
@Override
public void execute(@NotNull Runnable command) {
wg.add(1);
internal.execute(() -> {
// wrapper function to decrement the wait group
try {
command.run();
} catch (Exception ignored) {
} finally {
wg.done();
}
});
}
public void await() throws InterruptedException {
wg.await();
}
}

View file

@ -2,11 +2,11 @@ package com.jozufozu.flywheel.backend.instancing.batching;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import com.jozufozu.flywheel.api.InstanceData;
import com.jozufozu.flywheel.api.MaterialGroup;
import com.jozufozu.flywheel.api.struct.StructType;
import com.jozufozu.flywheel.backend.instancing.TaskEngine;
import com.jozufozu.flywheel.backend.model.DirectBufferBuilder;
import com.jozufozu.flywheel.backend.model.DirectVertexConsumer;
import com.mojang.blaze3d.vertex.PoseStack;
@ -31,7 +31,7 @@ public class BatchedMaterialGroup implements MaterialGroup {
return (BatchedMaterial<D>) materials.computeIfAbsent(spec, BatchedMaterial::new);
}
public void render(PoseStack stack, MultiBufferSource source, Executor pool) {
public void render(PoseStack stack, MultiBufferSource source, TaskEngine pool) {
VertexConsumer buffer = source.getBuffer(state);
if (buffer instanceof DirectBufferBuilder direct) {
@ -41,7 +41,7 @@ public class BatchedMaterialGroup implements MaterialGroup {
}
}
private void renderParallel(PoseStack stack, Executor pool, DirectBufferBuilder direct) {
private void renderParallel(PoseStack stack, TaskEngine pool, DirectBufferBuilder direct) {
int vertexCount = calculateNeededVertices();
DirectVertexConsumer consumer = direct.intoDirectConsumer(vertexCount);
FormatContext context = new FormatContext(consumer.hasOverlay());

View file

@ -3,11 +3,10 @@ package com.jozufozu.flywheel.backend.instancing.batching;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import com.jozufozu.flywheel.api.MaterialGroup;
import com.jozufozu.flywheel.backend.RenderLayer;
import com.jozufozu.flywheel.backend.instancing.TaskEngine;
import com.jozufozu.flywheel.backend.instancing.Engine;
import com.jozufozu.flywheel.event.RenderLayerEvent;
import com.mojang.blaze3d.vertex.PoseStack;
@ -20,19 +19,16 @@ import net.minecraft.core.Vec3i;
public class BatchingEngine implements Engine {
protected BlockPos originCoordinate = BlockPos.ZERO;
protected final Map<RenderLayer, Map<RenderType, BatchedMaterialGroup>> layers;
protected final TaskEngine taskEngine;
private final BatchExecutor pool;
public BatchingEngine() {
public BatchingEngine(TaskEngine taskEngine) {
this.layers = new EnumMap<>(RenderLayer.class);
for (RenderLayer value : RenderLayer.values()) {
layers.put(value, new HashMap<>());
}
pool = new BatchExecutor(Executors.newWorkStealingPool(ForkJoinPool.getCommonPoolParallelism()));
this.taskEngine = taskEngine;
}
@Override
@ -42,7 +38,7 @@ public class BatchingEngine implements Engine {
@Override
public Vec3i getOriginCoordinate() {
return originCoordinate;
return BlockPos.ZERO;
}
@Override
@ -53,22 +49,25 @@ public class BatchingEngine implements Engine {
stack.translate(-event.camX, -event.camY, -event.camZ);
taskEngine.syncPoint();
for (Map.Entry<RenderType, BatchedMaterialGroup> entry : layers.get(event.getLayer()).entrySet()) {
BatchedMaterialGroup group = entry.getValue();
group.render(stack, buffers, pool);
group.render(stack, buffers, taskEngine);
}
try {
pool.await();
} catch (InterruptedException ignored) {
}
taskEngine.syncPoint();
stack.popPose();
event.buffers.bufferSource().endBatch();
}
@Override
public void delete() {
}
@Override
public void beginFrame(Camera info) {

View file

@ -1,11 +1,10 @@
package com.jozufozu.flywheel.backend.instancing.batching;
import java.util.concurrent.Executor;
import com.jozufozu.flywheel.api.InstanceData;
import com.jozufozu.flywheel.api.struct.BatchingTransformer;
import com.jozufozu.flywheel.api.struct.StructType;
import com.jozufozu.flywheel.backend.instancing.AbstractInstancer;
import com.jozufozu.flywheel.backend.instancing.TaskEngine;
import com.jozufozu.flywheel.backend.model.DirectVertexConsumer;
import com.jozufozu.flywheel.core.model.Model;
import com.jozufozu.flywheel.core.model.ModelTransformer;
@ -32,7 +31,7 @@ public class CPUInstancer<D extends InstanceData> extends AbstractInstancer<D> {
}
}
void submitTasks(PoseStack stack, Executor pool, DirectVertexConsumer consumer) {
void submitTasks(PoseStack stack, TaskEngine pool, DirectVertexConsumer consumer) {
int instances = numInstances();
while (instances > 0) {
@ -44,7 +43,7 @@ public class CPUInstancer<D extends InstanceData> extends AbstractInstancer<D> {
DirectVertexConsumer sub = consumer.split(verts);
pool.execute(() -> drawRange(stack, sub, start, end));
pool.submit(() -> drawRange(stack, sub, start, end));
}
}
@ -82,7 +81,6 @@ public class CPUInstancer<D extends InstanceData> extends AbstractInstancer<D> {
anyToRemove = false;
}
if (context.usesOverlay()) {
defaultParams.overlay();
}

View file

@ -9,6 +9,8 @@ import java.util.stream.Stream;
import javax.annotation.Nullable;
import com.jozufozu.flywheel.api.MaterialGroup;
import com.jozufozu.flywheel.backend.instancing.TaskEngine;
import com.jozufozu.flywheel.backend.instancing.ImmediateExecutor;
import com.jozufozu.flywheel.backend.RenderLayer;
import com.jozufozu.flywheel.backend.gl.GlVertexArray;
import com.jozufozu.flywheel.backend.gl.buffer.GlBufferType;
@ -33,6 +35,7 @@ public class InstancingEngine<P extends WorldProgram> implements Engine {
protected BlockPos originCoordinate = BlockPos.ZERO;
protected final TaskEngine taskEngine;
protected final WorldContext<P> context;
protected final GroupFactory<P> groupFactory;
protected final boolean ignoreOriginCoordinate;
@ -41,15 +44,16 @@ public class InstancingEngine<P extends WorldProgram> implements Engine {
private final WeakHashSet<OriginShiftListener> listeners;
public InstancingEngine(WorldContext<P> context) {
this(context, InstancedMaterialGroup::new, false);
public InstancingEngine(WorldContext<P> context, TaskEngine taskEngine) {
this(taskEngine, context, InstancedMaterialGroup::new, false);
}
public static <P extends WorldProgram> Builder<P> builder(WorldContext<P> context) {
return new Builder<>(context);
}
public InstancingEngine(WorldContext<P> context, GroupFactory<P> groupFactory, boolean ignoreOriginCoordinate) {
public InstancingEngine(TaskEngine taskEngine, WorldContext<P> context, GroupFactory<P> groupFactory, boolean ignoreOriginCoordinate) {
this.taskEngine = taskEngine;
this.context = context;
this.ignoreOriginCoordinate = ignoreOriginCoordinate;
@ -200,7 +204,11 @@ public class InstancingEngine<P extends WorldProgram> implements Engine {
}
public InstancingEngine<P> build() {
return new InstancingEngine<>(context, groupFactory, ignoreOriginCoordinate);
return build(ImmediateExecutor.INSTANCE);
}
public InstancingEngine<P> build(TaskEngine taskEngine) {
return new InstancingEngine<>(taskEngine, context, groupFactory, ignoreOriginCoordinate);
}
}
}

View file

@ -34,7 +34,6 @@ public class ModelTransformer {
modelMat.multiply(params.model);
Matrix3f normalMat;
if (params.fullNormalTransform) {
normalMat = input.last().normal().copy();
normalMat.mul(params.normal);