A refined plan

- Clean up and improve functional interfaces used in Plans.
- Allow safely running TaskExecutor#sync* methods off-thread.
- Formalize the concept of "main thread" in task executor.
- Improve tests for main-thread plans.
This commit is contained in:
Jozufozu 2023-11-29 17:08:06 -08:00
parent 5ff194cbc8
commit bf6401a867
25 changed files with 340 additions and 146 deletions

View file

@ -28,6 +28,8 @@ public interface TaskExecutor extends Executor {
* Wait for running tasks, so long as the given condition is met * Wait for running tasks, so long as the given condition is met
* ({@link BooleanSupplier#getAsBoolean()} returns {@code true}). * ({@link BooleanSupplier#getAsBoolean()} returns {@code true}).
* <br> * <br>
* If this method is called on the
* <br>
* This method is equivalent to {@code syncUntil(() -> !cond.getAsBoolean())}. * This method is equivalent to {@code syncUntil(() -> !cond.getAsBoolean())}.
* *
* @param cond The condition sync on. * @param cond The condition sync on.
@ -48,10 +50,18 @@ public interface TaskExecutor extends Executor {
/** /**
* Schedule a task to be run on the main thread. * Schedule a task to be run on the main thread.
* <br> * <br>
* This method may be called from any thread, but the runnable will only * This method may be called from any thread (including the main thread),
* be executed once somebody calls either {@link #syncPoint()} or * but the runnable will <em>only</em> be executed once somebody calls
* {@link #syncUntil(BooleanSupplier)}. * either {@link #syncPoint()} or {@link #syncUntil(BooleanSupplier)}
* on this task executor's main thread.
* @param runnable The task to run. * @param runnable The task to run.
*/ */
void scheduleForSync(Runnable runnable); void scheduleForMainThread(Runnable runnable);
/**
* Check whether the current thread is this task executor's main thread.
*
* @return {@code true} if the current thread is the main thread.
*/
boolean isMainThread();
} }

View file

@ -4,6 +4,7 @@ import org.apache.commons.lang3.concurrent.AtomicSafeInitializer;
import org.apache.commons.lang3.concurrent.ConcurrentUtils; import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.mojang.blaze3d.systems.RenderSystem;
public final class FlwTaskExecutor { public final class FlwTaskExecutor {
public static final boolean USE_SERIAL_EXECUTOR = System.getProperty("flw.useSerialExecutor") != null; public static final boolean USE_SERIAL_EXECUTOR = System.getProperty("flw.useSerialExecutor") != null;
@ -28,7 +29,7 @@ public final class FlwTaskExecutor {
return SerialTaskExecutor.INSTANCE; return SerialTaskExecutor.INSTANCE;
} }
ParallelTaskExecutor executor = new ParallelTaskExecutor("Flywheel"); ParallelTaskExecutor executor = new ParallelTaskExecutor("Flywheel", RenderSystem::isOnRenderThread);
executor.startWorkers(); executor.startWorkers();
return executor; return executor;
} }

View file

@ -14,7 +14,6 @@ import org.slf4j.Logger;
import com.jozufozu.flywheel.Flywheel; import com.jozufozu.flywheel.Flywheel;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.mojang.blaze3d.systems.RenderSystem;
import com.mojang.logging.LogUtils; import com.mojang.logging.LogUtils;
import net.minecraft.util.Mth; import net.minecraft.util.Mth;
@ -27,6 +26,8 @@ public class ParallelTaskExecutor implements TaskExecutor {
private final String name; private final String name;
private final int threadCount; private final int threadCount;
private final BooleanSupplier mainThreadQuery;
/** /**
* If set to false, the executor will shut down. * If set to false, the executor will shut down.
*/ */
@ -38,8 +39,9 @@ public class ParallelTaskExecutor implements TaskExecutor {
private final ThreadGroupNotifier taskNotifier = new ThreadGroupNotifier(); private final ThreadGroupNotifier taskNotifier = new ThreadGroupNotifier();
private final WaitGroup waitGroup = new WaitGroup(); private final WaitGroup waitGroup = new WaitGroup();
public ParallelTaskExecutor(String name) { public ParallelTaskExecutor(String name, BooleanSupplier mainThreadQuery) {
this.name = name; this.name = name;
this.mainThreadQuery = mainThreadQuery;
threadCount = getOptimalThreadCount(); threadCount = getOptimalThreadCount();
} }
@ -116,16 +118,17 @@ public class ParallelTaskExecutor implements TaskExecutor {
} }
@Override @Override
public void scheduleForSync(Runnable runnable) { public void scheduleForMainThread(Runnable runnable) {
if (!running.get()) { if (!running.get()) {
throw new IllegalStateException("Executor is stopped"); throw new IllegalStateException("Executor is stopped");
} }
if (RenderSystem.isOnRenderThread()) { mainThreadQueue.add(runnable);
runnable.run(); }
} else {
mainThreadQueue.add(runnable); @Override
} public boolean isMainThread() {
return mainThreadQuery.getAsBoolean();
} }
/** /**
@ -133,16 +136,18 @@ public class ParallelTaskExecutor implements TaskExecutor {
*/ */
@Override @Override
public void syncPoint() { public void syncPoint() {
boolean onMainThread = isMainThread();
while (true) { while (true) {
if (syncOneTask()) { if (syncOneTask(onMainThread)) {
// Done! Nothing left to do. // Done! Nothing left to do.
break; return;
} }
} }
} }
@Override @Override
public boolean syncUntil(BooleanSupplier cond) { public boolean syncUntil(BooleanSupplier cond) {
boolean onMainThread = isMainThread();
while (true) { while (true) {
if (cond.getAsBoolean()) { if (cond.getAsBoolean()) {
// The condition is already true! // The condition is already true!
@ -150,7 +155,7 @@ public class ParallelTaskExecutor implements TaskExecutor {
return true; return true;
} }
if (syncOneTask()) { if (syncOneTask(onMainThread)) {
// Out of tasks entirely. // Out of tasks entirely.
// The condition may have flipped though so return its result. // The condition may have flipped though so return its result.
return cond.getAsBoolean(); return cond.getAsBoolean();
@ -161,6 +166,7 @@ public class ParallelTaskExecutor implements TaskExecutor {
@Override @Override
public boolean syncWhile(BooleanSupplier cond) { public boolean syncWhile(BooleanSupplier cond) {
boolean onMainThread = isMainThread();
while (true) { while (true) {
if (!cond.getAsBoolean()) { if (!cond.getAsBoolean()) {
// The condition is already false! // The condition is already false!
@ -168,7 +174,7 @@ public class ParallelTaskExecutor implements TaskExecutor {
return true; return true;
} }
if (syncOneTask()) { if (syncOneTask(onMainThread)) {
// Out of tasks entirely. // Out of tasks entirely.
// The condition may have flipped though so return its result. // The condition may have flipped though so return its result.
return !cond.getAsBoolean(); return !cond.getAsBoolean();
@ -179,24 +185,49 @@ public class ParallelTaskExecutor implements TaskExecutor {
/** /**
* Attempt to process a single task. * Attempt to process a single task.
* *
* @param mainThread Whether this is being called from the main thread or not.
* @return {@code true} if the executor has nothing left to do. * @return {@code true} if the executor has nothing left to do.
*/ */
private boolean syncOneTask() { private boolean syncOneTask(boolean mainThread) {
return mainThread ? syncOneTaskMainThread() : syncOneTaskOffThread();
}
private boolean syncOneTaskMainThread() {
Runnable task; Runnable task;
if ((task = mainThreadQueue.poll()) != null) { if ((task = mainThreadQueue.poll()) != null) {
// Prioritize main thread tasks. // Prioritize main thread tasks.
processMainThreadTask(task); processMainThreadTask(task);
// Check again next loop.
return false;
} else if ((task = taskQueue.pollLast()) != null) { } else if ((task = taskQueue.pollLast()) != null) {
// then work on tasks from the queue. // Nothing in the mainThreadQueue, work on tasks from the normal queue.
processTask(task); processTask(task);
// Check again next loop.
return false;
} else { } else {
// then wait for the other threads to finish. // Nothing right now, wait for the other threads to finish.
boolean done = waitGroup.await(10_000); boolean done = waitGroup.await(10_000);
// If we timed-out tasks may have been added to the queue, so check again. // If we timed-out tasks may have been added to the queue, so check again.
// if they didn't, we're done. // if they didn't, we're done.
return done && mainThreadQueue.isEmpty(); return done && mainThreadQueue.isEmpty();
} }
return false; }
private boolean syncOneTaskOffThread() {
Runnable task;
if ((task = taskQueue.pollLast()) != null) {
// then work on tasks from the queue.
processTask(task);
// Check again next loop.
return false;
} else {
// Nothing right now, wait for the other threads to finish.
// If we timed-out tasks may have been added to the queue, so check again.
// if they didn't, we're done.
return waitGroup.await(10_000);
}
} }
private void processTask(Runnable task) { private void processTask(Runnable task) {

View file

@ -16,7 +16,7 @@ public class SerialTaskExecutor implements TaskExecutor {
} }
@Override @Override
public void scheduleForSync(Runnable runnable) { public void scheduleForMainThread(Runnable runnable) {
runnable.run(); runnable.run();
} }
@ -38,4 +38,9 @@ public class SerialTaskExecutor implements TaskExecutor {
public int getThreadCount() { public int getThreadCount() {
return 1; return 1;
} }
@Override
public boolean isMainThread() {
return true;
}
} }

View file

@ -59,7 +59,7 @@ public abstract class Storage<T> {
tickableVisuals.remove(visual); tickableVisuals.remove(visual);
dynamicVisuals.remove(visual); dynamicVisuals.remove(visual);
if (plannedVisuals.remove(visual)) { if (plannedVisuals.remove(visual)) {
framePlan.clear(); framePlan.triggerReInitialize();
} }
visual.delete(); visual.delete();
} }
@ -103,8 +103,8 @@ public abstract class Storage<T> {
tickableVisuals.clear(); tickableVisuals.clear();
dynamicVisuals.clear(); dynamicVisuals.clear();
plannedVisuals.clear(); plannedVisuals.clear();
framePlan.clear(); framePlan.triggerReInitialize();
tickPlan.clear(); tickPlan.triggerReInitialize();
visuals.values() visuals.values()
.forEach(Visual::delete); .forEach(Visual::delete);
visuals.clear(); visuals.clear();

View file

@ -47,7 +47,7 @@ public class VisualUpdatePlan<C> implements SimplyComposedPlan<C> {
return plan; return plan;
} }
public void clear() { public void triggerReInitialize() {
plan = UnitPlan.of(); plan = UnitPlan.of();
initialized = false; initialized = false;
} }

View file

@ -1,13 +0,0 @@
package com.jozufozu.flywheel.lib.task;
import com.jozufozu.flywheel.api.task.Plan;
/**
* A consumer like interface for use with {@link Plan}s.
*
* @param <C> The context type.
*/
@FunctionalInterface
public interface ContextConsumer<C> {
void accept(C context);
}

View file

@ -1,13 +0,0 @@
package com.jozufozu.flywheel.lib.task;
import com.jozufozu.flywheel.api.task.Plan;
/**
* A function like interface for use with {@link Plan}s.
* @param <C> The context type.
* @param <R> The return type.
*/
@FunctionalInterface
public interface ContextFunction<C, R> {
R apply(C context);
}

View file

@ -1,16 +0,0 @@
package com.jozufozu.flywheel.lib.task;
/**
* A {@link ContextConsumer} that ignores the context object.
*
* @param <C> The context type.
*/
@FunctionalInterface
public interface ContextRunnable<C> extends ContextConsumer<C> {
void run();
@Override
default void accept(C ignored) {
run();
}
}

View file

@ -1,17 +0,0 @@
package com.jozufozu.flywheel.lib.task;
/**
* A {@link ContextFunction} that ignores the context object.
*
* @param <C> The context type.
* @param <R> The return type.
*/
@FunctionalInterface
public interface ContextSupplier<C, R> extends ContextFunction<C, R> {
R get();
@Override
default R apply(C ignored) {
return get();
}
}

View file

@ -4,6 +4,7 @@ import java.util.Collection;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.SupplierWithContext;
/** /**
* A plan that executes many other plans provided dynamically. * A plan that executes many other plans provided dynamically.
@ -11,18 +12,19 @@ import com.jozufozu.flywheel.api.task.TaskExecutor;
* @param plans A function to get a collection of plans based on the context. * @param plans A function to get a collection of plans based on the context.
* @param <C> The type of the context object. * @param <C> The type of the context object.
*/ */
public record DynamicNestedPlan<C>(ContextFunction<C, Collection<? extends Plan<C>>> plans) implements SimplyComposedPlan<C> { public record DynamicNestedPlan<C>(
public static <C> Plan<C> of(ContextSupplier<C, Collection<? extends Plan<C>>> supplier) { SupplierWithContext<C, Collection<? extends Plan<C>>> plans) implements SimplyComposedPlan<C> {
public static <C> Plan<C> of(SupplierWithContext.Ignored<C, Collection<? extends Plan<C>>> supplier) {
return new DynamicNestedPlan<>(supplier); return new DynamicNestedPlan<>(supplier);
} }
public static <C> Plan<C> of(ContextFunction<C, Collection<? extends Plan<C>>> function) { public static <C> Plan<C> of(SupplierWithContext<C, Collection<? extends Plan<C>>> supplier) {
return new DynamicNestedPlan<>(function); return new DynamicNestedPlan<>(supplier);
} }
@Override @Override
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) { public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
var plans = this.plans.apply(context); var plans = this.plans.get(context);
if (plans.isEmpty()) { if (plans.isEmpty()) {
onCompletion.run(); onCompletion.run();

View file

@ -1,12 +1,11 @@
package com.jozufozu.flywheel.lib.task; package com.jozufozu.flywheel.lib.task;
import java.util.List; import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.ConsumerWithContext;
import com.jozufozu.flywheel.lib.task.functional.SupplierWithContext;
/** /**
* A plan that executes code on each element of a provided list. * A plan that executes code on each element of a provided list.
@ -18,17 +17,26 @@ import com.jozufozu.flywheel.api.task.TaskExecutor;
* @param <T> The type of the list elements. * @param <T> The type of the list elements.
* @param <C> The type of the context object. * @param <C> The type of the context object.
*/ */
public record ForEachPlan<T, C>(Supplier<List<T>> listSupplier, BiConsumer<T, C> action) implements SimplyComposedPlan<C> { public record ForEachPlan<T, C>(SupplierWithContext<C, List<T>> listSupplier,
public static <T, C> Plan<C> of(Supplier<List<T>> iterable, BiConsumer<T, C> forEach) { ConsumerWithContext<T, C> action) implements SimplyComposedPlan<C> {
public static <T, C> Plan<C> of(SupplierWithContext<C, List<T>> iterable, ConsumerWithContext<T, C> forEach) {
return new ForEachPlan<>(iterable, forEach); return new ForEachPlan<>(iterable, forEach);
} }
public static <T, C> Plan<C> of(Supplier<List<T>> iterable, Consumer<T> forEach) { public static <T, C> Plan<C> of(SupplierWithContext<C, List<T>> iterable, ConsumerWithContext.Ignored<T, C> forEach) {
return of(iterable, (t, c) -> forEach.accept(t)); return new ForEachPlan<>(iterable, forEach);
}
public static <T, C> Plan<C> of(SupplierWithContext.Ignored<C, List<T>> iterable, ConsumerWithContext<T, C> forEach) {
return new ForEachPlan<>(iterable, forEach);
}
public static <T, C> Plan<C> of(SupplierWithContext.Ignored<C, List<T>> iterable, ConsumerWithContext.Ignored<T, C> forEach) {
return new ForEachPlan<>(iterable, forEach);
} }
@Override @Override
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) { public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
taskExecutor.execute(() -> PlanUtil.distribute(taskExecutor, context, onCompletion, listSupplier.get(), action)); taskExecutor.execute(() -> PlanUtil.distribute(taskExecutor, context, onCompletion, listSupplier.get(context), action));
} }
} }

View file

@ -1,11 +1,11 @@
package com.jozufozu.flywheel.lib.task; package com.jozufozu.flywheel.lib.task;
import java.util.List; import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.ConsumerWithContext;
import com.jozufozu.flywheel.lib.task.functional.SupplierWithContext;
/** /**
* A plan that executes code over many slices of a provided list. * A plan that executes code over many slices of a provided list.
@ -17,14 +17,26 @@ import com.jozufozu.flywheel.api.task.TaskExecutor;
* @param <T> The type of the list elements. * @param <T> The type of the list elements.
* @param <C> The type of the context object. * @param <C> The type of the context object.
*/ */
public record ForEachSlicePlan<T, C>(Supplier<List<T>> listSupplier, public record ForEachSlicePlan<T, C>(SupplierWithContext<C, List<T>> listSupplier,
BiConsumer<List<T>, C> action) implements SimplyComposedPlan<C> { ConsumerWithContext<List<T>, C> action) implements SimplyComposedPlan<C> {
public static <T, C> Plan<C> of(Supplier<List<T>> iterable, BiConsumer<List<T>, C> forEach) { public static <T, C> Plan<C> of(SupplierWithContext<C, List<T>> iterable, ConsumerWithContext<List<T>, C> forEach) {
return new ForEachSlicePlan<>(iterable, forEach);
}
public static <T, C> Plan<C> of(SupplierWithContext<C, List<T>> iterable, ConsumerWithContext.Ignored<List<T>, C> forEach) {
return new ForEachSlicePlan<>(iterable, forEach);
}
public static <T, C> Plan<C> of(SupplierWithContext.Ignored<C, List<T>> iterable, ConsumerWithContext<List<T>, C> forEach) {
return new ForEachSlicePlan<>(iterable, forEach);
}
public static <T, C> Plan<C> of(SupplierWithContext.Ignored<C, List<T>> iterable, ConsumerWithContext.Ignored<List<T>, C> forEach) {
return new ForEachSlicePlan<>(iterable, forEach); return new ForEachSlicePlan<>(iterable, forEach);
} }
@Override @Override
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) { public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
taskExecutor.execute(() -> PlanUtil.distributeSlices(taskExecutor, context, onCompletion, listSupplier.get(), action)); taskExecutor.execute(() -> PlanUtil.distributeSlices(taskExecutor, context, onCompletion, listSupplier.get(context), action));
} }
} }

View file

@ -1,9 +1,8 @@
package com.jozufozu.flywheel.lib.task; package com.jozufozu.flywheel.lib.task;
import java.util.function.Predicate;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.BooleanSupplierWithContext;
/** /**
* Executes one plan or another, depending on a dynamically evaluated condition. * Executes one plan or another, depending on a dynamically evaluated condition.
@ -12,14 +11,19 @@ import com.jozufozu.flywheel.api.task.TaskExecutor;
* @param onFalse The plan to execute if the condition is false. * @param onFalse The plan to execute if the condition is false.
* @param <C> The type of the context object. * @param <C> The type of the context object.
*/ */
public record IfElsePlan<C>(Predicate<C> condition, Plan<C> onTrue, Plan<C> onFalse) implements SimplyComposedPlan<C> { public record IfElsePlan<C>(BooleanSupplierWithContext<C> condition, Plan<C> onTrue,
public static <C> Builder<C> on(Predicate<C> condition) { Plan<C> onFalse) implements SimplyComposedPlan<C> {
public static <C> Builder<C> on(BooleanSupplierWithContext<C> condition) {
return new Builder<>(condition);
}
public static <C> Builder<C> on(BooleanSupplierWithContext.Ignored<C> condition) {
return new Builder<>(condition); return new Builder<>(condition);
} }
@Override @Override
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) { public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
if (condition.test(context)) { if (condition.getAsBoolean(context)) {
onTrue.execute(taskExecutor, context, onCompletion); onTrue.execute(taskExecutor, context, onCompletion);
} else { } else {
onFalse.execute(taskExecutor, context, onCompletion); onFalse.execute(taskExecutor, context, onCompletion);
@ -40,11 +44,11 @@ public record IfElsePlan<C>(Predicate<C> condition, Plan<C> onTrue, Plan<C> onFa
} }
public static class Builder<C> { public static class Builder<C> {
private final Predicate<C> condition; private final BooleanSupplierWithContext<C> condition;
private Plan<C> onTrue = UnitPlan.of(); private Plan<C> onTrue = UnitPlan.of();
private Plan<C> onFalse = UnitPlan.of(); private Plan<C> onFalse = UnitPlan.of();
public Builder(Predicate<C> condition) { public Builder(BooleanSupplierWithContext<C> condition) {
this.condition = condition; this.condition = condition;
} }

View file

@ -1,12 +1,15 @@
package com.jozufozu.flywheel.lib.task; package com.jozufozu.flywheel.lib.task;
import java.util.function.Function;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.SupplierWithContext;
public record MapContextPlan<C, D>(Function<C, D> map, Plan<D> plan) implements SimplyComposedPlan<C> { public record MapContextPlan<C, D>(SupplierWithContext<C, D> map, Plan<D> plan) implements SimplyComposedPlan<C> {
public static <C, D> Builder<C, D> map(Function<C, D> map) { public static <C, D> Builder<C, D> map(SupplierWithContext<C, D> map) {
return new Builder<>(map);
}
public static <C, D> Builder<C, D> get(SupplierWithContext.Ignored<C, D> map) {
return new Builder<>(map); return new Builder<>(map);
} }
@ -28,9 +31,9 @@ public record MapContextPlan<C, D>(Function<C, D> map, Plan<D> plan) implements
} }
public static class Builder<C, D> { public static class Builder<C, D> {
private final Function<C, D> map; private final SupplierWithContext<C, D> map;
public Builder(Function<C, D> map) { public Builder(SupplierWithContext<C, D> map) {
this.map = map; this.map = map;
} }

View file

@ -7,6 +7,7 @@ import java.util.List;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.RunnableWithContext;
public record NestedPlan<C>(List<Plan<C>> parallelPlans) implements SimplyComposedPlan<C> { public record NestedPlan<C>(List<Plan<C>> parallelPlans) implements SimplyComposedPlan<C> {
@SafeVarargs @SafeVarargs
@ -54,7 +55,7 @@ public record NestedPlan<C>(List<Plan<C>> parallelPlans) implements SimplyCompos
.simplify(); .simplify();
} }
var simplifiedTasks = new ArrayList<ContextConsumer<C>>(); var simplifiedTasks = new ArrayList<RunnableWithContext<C>>();
var simplifiedPlans = new ArrayList<Plan<C>>(); var simplifiedPlans = new ArrayList<Plan<C>>();
var toVisit = new ArrayDeque<>(parallelPlans); var toVisit = new ArrayDeque<>(parallelPlans);
while (!toVisit.isEmpty()) { while (!toVisit.isEmpty()) {

View file

@ -5,19 +5,20 @@ import java.util.List;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.RunnableWithContext;
public record SimplePlan<C>(List<ContextConsumer<C>> parallelTasks) implements SimplyComposedPlan<C> { public record SimplePlan<C>(List<RunnableWithContext<C>> parallelTasks) implements SimplyComposedPlan<C> {
@SafeVarargs @SafeVarargs
public static <C> SimplePlan<C> of(ContextRunnable<C>... tasks) { public static <C> SimplePlan<C> of(RunnableWithContext.Ignored<C>... tasks) {
return new SimplePlan<>(List.of(tasks)); return new SimplePlan<>(List.of(tasks));
} }
@SafeVarargs @SafeVarargs
public static <C> SimplePlan<C> of(ContextConsumer<C>... tasks) { public static <C> SimplePlan<C> of(RunnableWithContext<C>... tasks) {
return new SimplePlan<>(List.of(tasks)); return new SimplePlan<>(List.of(tasks));
} }
public static <C> SimplePlan<C> of(List<ContextConsumer<C>> tasks) { public static <C> SimplePlan<C> of(List<RunnableWithContext<C>> tasks) {
return new SimplePlan<>(tasks); return new SimplePlan<>(tasks);
} }
@ -28,15 +29,13 @@ public record SimplePlan<C>(List<ContextConsumer<C>> parallelTasks) implements S
return; return;
} }
taskExecutor.execute(() -> { taskExecutor.execute(() -> PlanUtil.distribute(taskExecutor, context, onCompletion, parallelTasks, RunnableWithContext::run));
PlanUtil.distribute(taskExecutor, context, onCompletion, parallelTasks, ContextConsumer::accept);
});
} }
@Override @Override
public Plan<C> and(Plan<C> plan) { public Plan<C> and(Plan<C> plan) {
if (plan instanceof SimplePlan<C> simple) { if (plan instanceof SimplePlan<C> simple) {
return of(ImmutableList.<ContextConsumer<C>>builder() return of(ImmutableList.<RunnableWithContext<C>>builder()
.addAll(parallelTasks) .addAll(parallelTasks)
.addAll(simple.parallelTasks) .addAll(simple.parallelTasks)
.build()); .build());

View file

@ -2,20 +2,26 @@ package com.jozufozu.flywheel.lib.task;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.api.task.TaskExecutor; import com.jozufozu.flywheel.api.task.TaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.RunnableWithContext;
public record SyncedPlan<C>(ContextConsumer<C> task) implements SimplyComposedPlan<C> { public record SyncedPlan<C>(RunnableWithContext<C> task) implements SimplyComposedPlan<C> {
public static <C> Plan<C> of(ContextConsumer<C> task) { public static <C> Plan<C> of(RunnableWithContext<C> task) {
return new SyncedPlan<>(task); return new SyncedPlan<>(task);
} }
public static <C> Plan<C> of(ContextRunnable<C> task) { public static <C> Plan<C> of(RunnableWithContext.Ignored<C> task) {
return new SyncedPlan<>(task); return new SyncedPlan<>(task);
} }
@Override @Override
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) { public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
taskExecutor.scheduleForSync(() -> { if (taskExecutor.isMainThread()) {
task.accept(context); task.run(context);
onCompletion.run();
return;
}
taskExecutor.scheduleForMainThread(() -> {
task.run(context);
onCompletion.run(); onCompletion.run();
}); });
} }

View file

@ -0,0 +1,35 @@
package com.jozufozu.flywheel.lib.task.functional;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
/**
* A boolean supplier like interface for use with {@link com.jozufozu.flywheel.api.task.Plan Plans} and their contexts.
*
* @param <C> The context type.
*/
@FunctionalInterface
public interface BooleanSupplierWithContext<C> extends Predicate<C> {
boolean getAsBoolean(C context);
@Override
default boolean test(C c) {
return getAsBoolean(c);
}
/**
* A {@link BooleanSupplierWithContext} that ignores the context object.
*
* @param <C> The (ignored) context type.
*/
@FunctionalInterface
interface Ignored<C> extends BooleanSupplierWithContext<C>, BooleanSupplier {
@Override
boolean getAsBoolean();
@Override
default boolean getAsBoolean(C ignored) {
return getAsBoolean();
}
}
}

View file

@ -0,0 +1,34 @@
package com.jozufozu.flywheel.lib.task.functional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* A consumer like interface for use with {@link com.jozufozu.flywheel.api.task.Plan Plans} and their contexts.
* <br>
* The subinterface {@link Ignored} is provided for consumers that do not need the context object.
*
* @param <T> The type to actually consume.
* @param <C> The context type.
*/
@FunctionalInterface
public interface ConsumerWithContext<T, C> extends BiConsumer<T, C> {
void accept(T t, C context);
/**
* A {@link ConsumerWithContext} that ignores the context object.
*
* @param <T> The type to actually consume.
* @param <C> The (ignored) context type.
*/
@FunctionalInterface
interface Ignored<T, C> extends ConsumerWithContext<T, C>, Consumer<T> {
@Override
void accept(T t);
@Override
default void accept(T t, C ignored) {
accept(t);
}
}
}

View file

@ -0,0 +1,35 @@
package com.jozufozu.flywheel.lib.task.functional;
import java.util.function.Consumer;
/**
* A runnable like interface for use with {@link com.jozufozu.flywheel.api.task.Plan Plans} and their contexts.
* <br>
* The subinterface {@link Ignored} is provided for runnables that do not need the context object.
* @param <C> The context type.
*/
@FunctionalInterface
public interface RunnableWithContext<C> extends Consumer<C> {
void run(C context);
@Override
default void accept(C c) {
run(c);
}
/**
* A {@link RunnableWithContext} that ignores the context object.
*
* @param <C> The (ignored) context type.
*/
@FunctionalInterface
interface Ignored<C> extends RunnableWithContext<C>, Runnable {
@Override
void run();
@Override
default void run(C ignored) {
run();
}
}
}

View file

@ -0,0 +1,38 @@
package com.jozufozu.flywheel.lib.task.functional;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A supplier like interface for use with {@link com.jozufozu.flywheel.api.task.Plan Plans} and their contexts.
* <br>
* The subinterface {@link Ignored} is provided for suppliers that do not need the context object.
* @param <C> The context type.
* @param <R> The return type.
*/
@FunctionalInterface
public interface SupplierWithContext<C, R> extends Function<C, R> {
R get(C context);
@Override
default R apply(C c) {
return get(c);
}
/**
* A {@link SupplierWithContext} that ignores the context object.
*
* @param <C> The (ignored) context type.
* @param <R> The return type.
*/
@FunctionalInterface
interface Ignored<C, R> extends SupplierWithContext<C, R>, Supplier<R> {
@Override
R get();
@Override
default R get(C ignored) {
return get();
}
}
}

View file

@ -0,0 +1,8 @@
/**
* Functional interfaces accepting a context object for use with {@link com.jozufozu.flywheel.api.task.Plan Plans}.
* <br>
* Each interface in this package has a subinterface that ignores the context object. Plans then call the parent
* interface, but do not need to create additional closure objects to translate when the consumer wishes to ignore
* the context object.
*/
package com.jozufozu.flywheel.lib.task.functional;

View file

@ -5,9 +5,9 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@ -15,22 +15,26 @@ import org.junit.jupiter.params.provider.ValueSource;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.impl.task.ParallelTaskExecutor; import com.jozufozu.flywheel.impl.task.ParallelTaskExecutor;
import com.jozufozu.flywheel.lib.task.functional.RunnableWithContext;
import com.jozufozu.flywheel.lib.util.Unit; import com.jozufozu.flywheel.lib.util.Unit;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
class PlanExecutionTest { class PlanExecutionTest {
protected static final ParallelTaskExecutor EXECUTOR = new ParallelTaskExecutor("PlanTest"); protected static ParallelTaskExecutor EXECUTOR;
@BeforeAll @BeforeEach
public static void setUp() { public void setUp() {
var currentThread = Thread.currentThread();
EXECUTOR = new ParallelTaskExecutor("PlanTest", () -> currentThread == Thread.currentThread());
EXECUTOR.startWorkers(); EXECUTOR.startWorkers();
} }
@AfterAll @AfterEach
public static void tearDown() { public void tearDown() {
EXECUTOR.stopWorkers(); EXECUTOR.stopWorkers();
EXECUTOR = null;
} }
@ParameterizedTest @ParameterizedTest
@ -73,12 +77,12 @@ class PlanExecutionTest {
var lock = new Object(); var lock = new Object();
var sequence = new IntArrayList(8); var sequence = new IntArrayList(8);
ContextRunnable<Unit> addOne = () -> { RunnableWithContext.Ignored<Unit> addOne = () -> {
synchronized (lock) { synchronized (lock) {
sequence.add(1); sequence.add(1);
} }
}; };
ContextRunnable<Unit> addTwo = () -> { RunnableWithContext.Ignored<Unit> addTwo = () -> {
synchronized (lock) { synchronized (lock) {
sequence.add(2); sequence.add(2);
} }
@ -185,12 +189,26 @@ class PlanExecutionTest {
} }
@Test @Test
void mainThreadPlan() { void mainThreadPlanRunsImmediately() {
var done = new AtomicBoolean(false); var done = new AtomicBoolean(false);
var plan = SyncedPlan.of(() -> done.set(true)); var plan = SyncedPlan.of(() -> done.set(true));
plan.execute(EXECUTOR, Unit.INSTANCE); plan.execute(EXECUTOR, Unit.INSTANCE);
Assertions.assertTrue(done.get());
}
@Test
void mainThreadPlanIsNotCalledOffThread() {
var done = new AtomicBoolean(false);
var plan = SyncedPlan.of(() -> {
done.set(true);
});
// call execute from within a worker thread
EXECUTOR.execute(() -> plan.execute(EXECUTOR, Unit.INSTANCE));
Assertions.assertFalse(done.get()); Assertions.assertFalse(done.get());
EXECUTOR.syncPoint(); EXECUTOR.syncPoint();
@ -222,16 +240,18 @@ class PlanExecutionTest {
var first = new NamedFlag("ready right away"); var first = new NamedFlag("ready right away");
var second = new NamedFlag("ready after 2s"); var second = new NamedFlag("ready after 2s");
RaisePlan.raise(first) var plan = RaisePlan.raise(first)
.then(SimplePlan.of(() -> { .then(SimplePlan.of(() -> {
// sleep to add delay between raising the first flag and raising the second flag
try { try {
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
})) }))
.then(RaisePlan.raise(second)) .then(RaisePlan.raise(second));
.execute(EXECUTOR, Unit.INSTANCE);
EXECUTOR.execute(() -> plan.execute(EXECUTOR, Unit.INSTANCE));
Assertions.assertTrue(EXECUTOR.syncUntil(first::isRaised), "First flag should be raised since we submitted a plan that raises it."); Assertions.assertTrue(EXECUTOR.syncUntil(first::isRaised), "First flag should be raised since we submitted a plan that raises it.");
@ -260,7 +280,7 @@ class PlanExecutionTest {
} }
public static void runAndWait(Plan<Unit> plan) { public static void runAndWait(Plan<Unit> plan) {
new TestBarrier<Unit>(plan, Unit.INSTANCE).runAndWait(); new TestBarrier<>(plan, Unit.INSTANCE).runAndWait();
} }
public static <C> void runAndWait(Plan<C> plan, C ctx) { public static <C> void runAndWait(Plan<C> plan, C ctx) {

View file

@ -4,11 +4,12 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.jozufozu.flywheel.api.task.Plan; import com.jozufozu.flywheel.api.task.Plan;
import com.jozufozu.flywheel.lib.task.functional.RunnableWithContext;
import com.jozufozu.flywheel.lib.util.Unit; import com.jozufozu.flywheel.lib.util.Unit;
public class PlanSimplificationTest { public class PlanSimplificationTest {
public static final ContextRunnable<Unit> NOOP = () -> { public static final RunnableWithContext.Ignored<Unit> NOOP = () -> {
}; };
public static final Plan<Unit> SIMPLE = SimplePlan.of(NOOP); public static final Plan<Unit> SIMPLE = SimplePlan.of(NOOP);