mirror of
https://github.com/Jozufozu/Flywheel.git
synced 2024-11-10 12:34:11 +01:00
Spring cleaning
- Move PlanStorage to lib as PlanMap and add documentation - Remove FastPlanStorage in favor of ForEachPlan - Move plan distribution next to other distribution methods - Rename PlanUtil -> Distribute
This commit is contained in:
parent
e5a0e35141
commit
31f4ffe354
@ -1,35 +0,0 @@
|
||||
package com.jozufozu.flywheel.impl.visualization.storage;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.jozufozu.flywheel.api.task.TaskExecutor;
|
||||
import com.jozufozu.flywheel.lib.task.PlanUtil;
|
||||
import com.jozufozu.flywheel.lib.task.SimplyComposedPlan;
|
||||
import com.jozufozu.flywheel.lib.task.functional.ConsumerWithContext;
|
||||
|
||||
public class FastPlanStorage<T, C> implements SimplyComposedPlan<C> {
|
||||
private final List<T> objects = new ArrayList<>();
|
||||
private final ConsumerWithContext<T, C> consumer;
|
||||
|
||||
public FastPlanStorage(ConsumerWithContext<T, C> consumer) {
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
public void add(T object) {
|
||||
objects.add(object);
|
||||
}
|
||||
|
||||
public void remove(T object) {
|
||||
objects.remove(object);
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
objects.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
|
||||
PlanUtil.distribute(taskExecutor, context, onCompletion, objects, consumer);
|
||||
}
|
||||
}
|
@ -11,7 +11,7 @@ import com.jozufozu.flywheel.api.task.Plan;
|
||||
import com.jozufozu.flywheel.api.task.TaskExecutor;
|
||||
import com.jozufozu.flywheel.api.visual.LitVisual;
|
||||
import com.jozufozu.flywheel.api.visual.VisualFrameContext;
|
||||
import com.jozufozu.flywheel.lib.task.PlanUtil;
|
||||
import com.jozufozu.flywheel.lib.task.Distribute;
|
||||
import com.jozufozu.flywheel.lib.task.SimplyComposedPlan;
|
||||
import com.jozufozu.flywheel.lib.task.Synchronizer;
|
||||
|
||||
@ -56,7 +56,7 @@ public class LitVisualStorage {
|
||||
for (long section : sectionsUpdatedThisFrame) {
|
||||
var visuals = sections2Visuals.get(section);
|
||||
if (visuals != null && !visuals.isEmpty()) {
|
||||
taskExecutor.execute(() -> PlanUtil.distribute(taskExecutor, updateId, sync, visuals, Updater::updateLight));
|
||||
taskExecutor.execute(() -> Distribute.tasks(taskExecutor, updateId, sync, visuals, Updater::updateLight));
|
||||
} else {
|
||||
sync.decrementAndEventuallyRun();
|
||||
}
|
||||
|
@ -1,72 +0,0 @@
|
||||
package com.jozufozu.flywheel.impl.visualization.storage;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.jozufozu.flywheel.api.task.Plan;
|
||||
import com.jozufozu.flywheel.api.task.TaskExecutor;
|
||||
import com.jozufozu.flywheel.lib.task.PlanUtil;
|
||||
import com.jozufozu.flywheel.lib.task.SimplyComposedPlan;
|
||||
import com.jozufozu.flywheel.lib.task.Synchronizer;
|
||||
|
||||
public class PlanStorage<T, C> implements SimplyComposedPlan<C> {
|
||||
private final List<T> objects = new ArrayList<>();
|
||||
private final List<Plan<C>> plans = new ArrayList<>();
|
||||
|
||||
public void add(T object, Plan<C> plan) {
|
||||
objects.add(object);
|
||||
plans.add(plan);
|
||||
}
|
||||
|
||||
public void remove(T object) {
|
||||
int index = objects.indexOf(object);
|
||||
|
||||
if (index != -1) {
|
||||
objects.remove(index);
|
||||
plans.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
objects.clear();
|
||||
plans.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
|
||||
final int size = plans.size();
|
||||
|
||||
if (size == 0) {
|
||||
onCompletion.run();
|
||||
return;
|
||||
}
|
||||
|
||||
var synchronizer = new Synchronizer(size, onCompletion);
|
||||
final int sliceSize = PlanUtil.sliceSize(taskExecutor, size, 8);
|
||||
|
||||
if (size <= sliceSize) {
|
||||
for (var t : plans) {
|
||||
t.execute(taskExecutor, context, synchronizer);
|
||||
}
|
||||
} else if (sliceSize == 1) {
|
||||
for (var t : plans) {
|
||||
taskExecutor.execute(() -> t.execute(taskExecutor, context, synchronizer));
|
||||
}
|
||||
} else {
|
||||
int remaining = size;
|
||||
|
||||
while (remaining > 0) {
|
||||
int end = remaining;
|
||||
remaining -= sliceSize;
|
||||
int start = Math.max(remaining, 0);
|
||||
|
||||
var subList = plans.subList(start, end);
|
||||
taskExecutor.execute(() -> {
|
||||
for (var t : subList) {
|
||||
t.execute(taskExecutor, context, synchronizer);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,6 +1,8 @@
|
||||
package com.jozufozu.flywheel.impl.visualization.storage;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@ -14,7 +16,9 @@ import com.jozufozu.flywheel.api.visual.Visual;
|
||||
import com.jozufozu.flywheel.api.visual.VisualFrameContext;
|
||||
import com.jozufozu.flywheel.api.visual.VisualTickContext;
|
||||
import com.jozufozu.flywheel.api.visualization.VisualizationContext;
|
||||
import com.jozufozu.flywheel.lib.task.ForEachPlan;
|
||||
import com.jozufozu.flywheel.lib.task.NestedPlan;
|
||||
import com.jozufozu.flywheel.lib.task.PlanMap;
|
||||
import com.jozufozu.flywheel.lib.visual.SimpleDynamicVisual;
|
||||
import com.jozufozu.flywheel.lib.visual.SimpleTickableVisual;
|
||||
|
||||
@ -23,10 +27,10 @@ import it.unimi.dsi.fastutil.objects.Reference2ObjectOpenHashMap;
|
||||
|
||||
public abstract class Storage<T> {
|
||||
protected final Supplier<VisualizationContext> visualizationContextSupplier;
|
||||
protected final PlanStorage<DynamicVisual, VisualFrameContext> dynamicVisuals = new PlanStorage<>();
|
||||
protected final FastPlanStorage<SimpleDynamicVisual, VisualFrameContext> fastDynamicVisuals = new FastPlanStorage<>(SimpleDynamicVisual::beginFrame);
|
||||
protected final PlanStorage<TickableVisual, VisualTickContext> tickableVisuals = new PlanStorage<>();
|
||||
protected final FastPlanStorage<SimpleTickableVisual, VisualTickContext> fastTickableVisuals = new FastPlanStorage<>(SimpleTickableVisual::tick);
|
||||
protected final PlanMap<DynamicVisual, VisualFrameContext> dynamicVisuals = new PlanMap<>();
|
||||
protected final PlanMap<TickableVisual, VisualTickContext> tickableVisuals = new PlanMap<>();
|
||||
protected final List<SimpleDynamicVisual> simpleDynamicVisuals = new ArrayList<>();
|
||||
protected final List<SimpleTickableVisual> simpleTickableVisuals = new ArrayList<>();
|
||||
protected final LitVisualStorage litVisuals = new LitVisualStorage();
|
||||
|
||||
private final Map<T, Visual> visuals = new Reference2ObjectOpenHashMap<>();
|
||||
@ -56,14 +60,14 @@ public abstract class Storage<T> {
|
||||
|
||||
if (visual instanceof TickableVisual tickable) {
|
||||
if (visual instanceof SimpleTickableVisual simpleTickable) {
|
||||
fastTickableVisuals.remove(simpleTickable);
|
||||
simpleTickableVisuals.remove(simpleTickable);
|
||||
} else {
|
||||
tickableVisuals.remove(tickable);
|
||||
}
|
||||
}
|
||||
if (visual instanceof DynamicVisual dynamic) {
|
||||
if (visual instanceof SimpleDynamicVisual simpleDynamic) {
|
||||
fastDynamicVisuals.remove(simpleDynamic);
|
||||
simpleDynamicVisuals.remove(simpleDynamic);
|
||||
} else {
|
||||
dynamicVisuals.remove(dynamic);
|
||||
}
|
||||
@ -94,9 +98,9 @@ public abstract class Storage<T> {
|
||||
|
||||
public void recreateAll(float partialTick) {
|
||||
tickableVisuals.clear();
|
||||
fastTickableVisuals.clear();
|
||||
dynamicVisuals.clear();
|
||||
fastDynamicVisuals.clear();
|
||||
simpleTickableVisuals.clear();
|
||||
simpleDynamicVisuals.clear();
|
||||
litVisuals.clear();
|
||||
visuals.replaceAll((obj, visual) -> {
|
||||
visual.delete();
|
||||
@ -133,11 +137,11 @@ public abstract class Storage<T> {
|
||||
protected abstract Visual createRaw(T obj);
|
||||
|
||||
public Plan<VisualFrameContext> framePlan() {
|
||||
return NestedPlan.of(dynamicVisuals, fastDynamicVisuals, litVisuals.plan());
|
||||
return NestedPlan.of(dynamicVisuals, litVisuals.plan(), ForEachPlan.of(() -> simpleDynamicVisuals, SimpleDynamicVisual::beginFrame));
|
||||
}
|
||||
|
||||
public Plan<VisualTickContext> tickPlan() {
|
||||
return NestedPlan.of(tickableVisuals, fastTickableVisuals);
|
||||
return NestedPlan.of(tickableVisuals, ForEachPlan.of(() -> simpleTickableVisuals, SimpleTickableVisual::tick));
|
||||
}
|
||||
|
||||
public void enqueueLightUpdateSections(LongSet sections) {
|
||||
@ -149,7 +153,7 @@ public abstract class Storage<T> {
|
||||
|
||||
if (visual instanceof TickableVisual tickable) {
|
||||
if (visual instanceof SimpleTickableVisual simpleTickable) {
|
||||
fastTickableVisuals.add(simpleTickable);
|
||||
simpleTickableVisuals.add(simpleTickable);
|
||||
} else {
|
||||
tickableVisuals.add(tickable, tickable.planTick());
|
||||
}
|
||||
@ -157,7 +161,7 @@ public abstract class Storage<T> {
|
||||
|
||||
if (visual instanceof DynamicVisual dynamic) {
|
||||
if (visual instanceof SimpleDynamicVisual simpleDynamic) {
|
||||
fastDynamicVisuals.add(simpleDynamic);
|
||||
simpleDynamicVisuals.add(simpleDynamic);
|
||||
} else {
|
||||
dynamicVisuals.add(dynamic, dynamic.planFrame());
|
||||
}
|
||||
|
190
src/main/java/com/jozufozu/flywheel/lib/task/Distribute.java
Normal file
190
src/main/java/com/jozufozu/flywheel/lib/task/Distribute.java
Normal file
@ -0,0 +1,190 @@
|
||||
package com.jozufozu.flywheel.lib.task;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import com.jozufozu.flywheel.api.task.Plan;
|
||||
import com.jozufozu.flywheel.api.task.TaskExecutor;
|
||||
import com.jozufozu.flywheel.lib.math.MoreMath;
|
||||
|
||||
public final class Distribute {
|
||||
/**
|
||||
* Distribute the given list of tasks across the threads of the task executor.
|
||||
*
|
||||
* <p>An effort is made to balance the load across threads while also ensuring each
|
||||
* runnable passed to the executor is large enough to amortize the cost of scheduling it.</p>
|
||||
*
|
||||
* @param taskExecutor The task executor to run on.
|
||||
* @param context The context to pass to each task.
|
||||
* @param onCompletion The action to run when all tasks are complete.
|
||||
* @param list The list of objects to run tasks on.
|
||||
* @param action The action to run on each object.
|
||||
* @param <C> The context type.
|
||||
* @param <T> The object type.
|
||||
*/
|
||||
public static <C, T> void tasks(TaskExecutor taskExecutor, C context, Runnable onCompletion, List<T> list, BiConsumer<T, C> action) {
|
||||
final int size = list.size();
|
||||
|
||||
if (size == 0) {
|
||||
onCompletion.run();
|
||||
return;
|
||||
}
|
||||
|
||||
final int sliceSize = sliceSize(taskExecutor, size);
|
||||
|
||||
if (size <= sliceSize) {
|
||||
for (T t : list) {
|
||||
action.accept(t, context);
|
||||
}
|
||||
onCompletion.run();
|
||||
} else if (sliceSize == 1) {
|
||||
var synchronizer = new Synchronizer(size, onCompletion);
|
||||
for (T t : list) {
|
||||
taskExecutor.execute(() -> {
|
||||
action.accept(t, context);
|
||||
synchronizer.decrementAndEventuallyRun();
|
||||
});
|
||||
}
|
||||
} else {
|
||||
var synchronizer = new Synchronizer(MoreMath.ceilingDiv(size, sliceSize), onCompletion);
|
||||
int remaining = size;
|
||||
|
||||
while (remaining > 0) {
|
||||
int end = remaining;
|
||||
remaining -= sliceSize;
|
||||
int start = Math.max(remaining, 0);
|
||||
|
||||
var subList = list.subList(start, end);
|
||||
taskExecutor.execute(() -> {
|
||||
for (T t : subList) {
|
||||
action.accept(t, context);
|
||||
}
|
||||
synchronizer.decrementAndEventuallyRun();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Distribute the given list of tasks in chunks across the threads of the task executor.
|
||||
*
|
||||
* <p>Unlike {@link #tasks(TaskExecutor, Object, Runnable, List, BiConsumer)}, this method
|
||||
* gives the action a list of objects to work on, rather than a single object. This may be handy
|
||||
* for when you can share some thread local objects between individual elements of the list.</p>
|
||||
*
|
||||
* <p>An effort is made to balance the load across threads while also ensuring each
|
||||
* runnable passed to the executor is large enough to amortize the cost of scheduling it.</p>
|
||||
*
|
||||
* @param taskExecutor The task executor to run on.
|
||||
* @param context The context to pass to each task.
|
||||
* @param onCompletion The action to run when all tasks are complete.
|
||||
* @param list The list of objects to run tasks on.
|
||||
* @param action The action to run on each slice.
|
||||
* @param <C> The context type.
|
||||
* @param <T> The object type.
|
||||
*/
|
||||
public static <C, T> void slices(TaskExecutor taskExecutor, C context, Runnable onCompletion, List<T> list, BiConsumer<List<T>, C> action) {
|
||||
final int size = list.size();
|
||||
|
||||
if (size == 0) {
|
||||
onCompletion.run();
|
||||
return;
|
||||
}
|
||||
|
||||
final int sliceSize = sliceSize(taskExecutor, size);
|
||||
|
||||
if (size <= sliceSize) {
|
||||
action.accept(list, context);
|
||||
onCompletion.run();
|
||||
} else if (sliceSize == 1) {
|
||||
var synchronizer = new Synchronizer(size, onCompletion);
|
||||
for (T t : list) {
|
||||
taskExecutor.execute(() -> {
|
||||
action.accept(Collections.singletonList(t), context);
|
||||
synchronizer.decrementAndEventuallyRun();
|
||||
});
|
||||
}
|
||||
} else {
|
||||
var synchronizer = new Synchronizer(MoreMath.ceilingDiv(size, sliceSize), onCompletion);
|
||||
int remaining = size;
|
||||
|
||||
while (remaining > 0) {
|
||||
int end = remaining;
|
||||
remaining -= sliceSize;
|
||||
int start = Math.max(remaining, 0);
|
||||
|
||||
var subList = list.subList(start, end);
|
||||
taskExecutor.execute(() -> {
|
||||
action.accept(subList, context);
|
||||
synchronizer.decrementAndEventuallyRun();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Distribute the given list of plans across the threads of the task executor.
|
||||
*
|
||||
* <p>Plan scheduling is normally lightweight compared to the cost of execution,
|
||||
* but when many hundreds or thousands of plans need to be scheduled it may be beneficial
|
||||
* to parallelize. This method does exactly that, distributing larger chunks of plans to
|
||||
* be scheduled in batches.</p>
|
||||
*
|
||||
* <p>An effort is made to balance the load across threads while also ensuring each
|
||||
* runnable passed to the executor is large enough to amortize the cost of scheduling it.</p>
|
||||
*
|
||||
* @param taskExecutor The task executor to run on.
|
||||
* @param context The context to pass to the plans.
|
||||
* @param onCompletion The action to run when all plans are complete.
|
||||
* @param plans The list of plans to execute.
|
||||
* @param <C> The context type.
|
||||
*/
|
||||
public static <C> void plans(TaskExecutor taskExecutor, C context, Runnable onCompletion, List<Plan<C>> plans) {
|
||||
final int size = plans.size();
|
||||
|
||||
if (size == 0) {
|
||||
onCompletion.run();
|
||||
return;
|
||||
}
|
||||
|
||||
var synchronizer = new Synchronizer(size, onCompletion);
|
||||
final int sliceSize = sliceSize(taskExecutor, size, 8);
|
||||
|
||||
if (size <= sliceSize) {
|
||||
for (var t : plans) {
|
||||
t.execute(taskExecutor, context, synchronizer);
|
||||
}
|
||||
} else if (sliceSize == 1) {
|
||||
for (var t : plans) {
|
||||
taskExecutor.execute(() -> t.execute(taskExecutor, context, synchronizer));
|
||||
}
|
||||
} else {
|
||||
int remaining = size;
|
||||
|
||||
while (remaining > 0) {
|
||||
int end = remaining;
|
||||
remaining -= sliceSize;
|
||||
int start = Math.max(remaining, 0);
|
||||
|
||||
var subList = plans.subList(start, end);
|
||||
taskExecutor.execute(() -> {
|
||||
for (var t : subList) {
|
||||
t.execute(taskExecutor, context, synchronizer);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static int sliceSize(TaskExecutor taskExecutor, int totalSize) {
|
||||
return sliceSize(taskExecutor, totalSize, 32);
|
||||
}
|
||||
|
||||
public static int sliceSize(TaskExecutor taskExecutor, int totalSize, int denominator) {
|
||||
return MoreMath.ceilingDiv(totalSize, taskExecutor.getThreadCount() * denominator);
|
||||
}
|
||||
|
||||
private Distribute() {
|
||||
}
|
||||
}
|
@ -37,6 +37,6 @@ public record ForEachPlan<T, C>(SupplierWithContext<C, List<T>> listSupplier,
|
||||
|
||||
@Override
|
||||
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
|
||||
taskExecutor.execute(() -> PlanUtil.distribute(taskExecutor, context, onCompletion, listSupplier.get(context), action));
|
||||
taskExecutor.execute(() -> Distribute.tasks(taskExecutor, context, onCompletion, listSupplier.get(context), action));
|
||||
}
|
||||
}
|
||||
|
@ -37,6 +37,6 @@ public record ForEachSlicePlan<T, C>(SupplierWithContext<C, List<T>> listSupplie
|
||||
|
||||
@Override
|
||||
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
|
||||
taskExecutor.execute(() -> PlanUtil.distributeSlices(taskExecutor, context, onCompletion, listSupplier.get(context), action));
|
||||
taskExecutor.execute(() -> Distribute.slices(taskExecutor, context, onCompletion, listSupplier.get(context), action));
|
||||
}
|
||||
}
|
||||
|
44
src/main/java/com/jozufozu/flywheel/lib/task/PlanMap.java
Normal file
44
src/main/java/com/jozufozu/flywheel/lib/task/PlanMap.java
Normal file
@ -0,0 +1,44 @@
|
||||
package com.jozufozu.flywheel.lib.task;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.jozufozu.flywheel.api.task.Plan;
|
||||
import com.jozufozu.flywheel.api.task.TaskExecutor;
|
||||
|
||||
/**
|
||||
* A plan that executes a dynamic list of plans in parallel.
|
||||
*
|
||||
* <p>The plans can be added/removed by association with a key object.</p>
|
||||
*
|
||||
* @param <K> The key type
|
||||
* @param <C> The context type
|
||||
*/
|
||||
public class PlanMap<K, C> implements SimplyComposedPlan<C> {
|
||||
private final List<K> keys = new ArrayList<>();
|
||||
private final List<Plan<C>> values = new ArrayList<>();
|
||||
|
||||
public void add(K object, Plan<C> plan) {
|
||||
keys.add(object);
|
||||
values.add(plan);
|
||||
}
|
||||
|
||||
public void remove(K object) {
|
||||
int index = keys.indexOf(object);
|
||||
|
||||
if (index != -1) {
|
||||
keys.remove(index);
|
||||
values.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
keys.clear();
|
||||
values.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(TaskExecutor taskExecutor, C context, Runnable onCompletion) {
|
||||
Distribute.plans(taskExecutor, context, onCompletion, values);
|
||||
}
|
||||
}
|
@ -1,103 +0,0 @@
|
||||
package com.jozufozu.flywheel.lib.task;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import com.jozufozu.flywheel.api.task.TaskExecutor;
|
||||
import com.jozufozu.flywheel.lib.math.MoreMath;
|
||||
|
||||
public final class PlanUtil {
|
||||
public static <C, T> void distribute(TaskExecutor taskExecutor, C context, Runnable onCompletion, List<T> list, BiConsumer<T, C> action) {
|
||||
final int size = list.size();
|
||||
|
||||
if (size == 0) {
|
||||
onCompletion.run();
|
||||
return;
|
||||
}
|
||||
|
||||
final int sliceSize = sliceSize(taskExecutor, size);
|
||||
|
||||
if (size <= sliceSize) {
|
||||
for (T t : list) {
|
||||
action.accept(t, context);
|
||||
}
|
||||
onCompletion.run();
|
||||
} else if (sliceSize == 1) {
|
||||
var synchronizer = new Synchronizer(size, onCompletion);
|
||||
for (T t : list) {
|
||||
taskExecutor.execute(() -> {
|
||||
action.accept(t, context);
|
||||
synchronizer.decrementAndEventuallyRun();
|
||||
});
|
||||
}
|
||||
} else {
|
||||
var synchronizer = new Synchronizer(MoreMath.ceilingDiv(size, sliceSize), onCompletion);
|
||||
int remaining = size;
|
||||
|
||||
while (remaining > 0) {
|
||||
int end = remaining;
|
||||
remaining -= sliceSize;
|
||||
int start = Math.max(remaining, 0);
|
||||
|
||||
var subList = list.subList(start, end);
|
||||
taskExecutor.execute(() -> {
|
||||
for (T t : subList) {
|
||||
action.accept(t, context);
|
||||
}
|
||||
synchronizer.decrementAndEventuallyRun();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static <C, T> void distributeSlices(TaskExecutor taskExecutor, C context, Runnable onCompletion, List<T> list, BiConsumer<List<T>, C> action) {
|
||||
final int size = list.size();
|
||||
|
||||
if (size == 0) {
|
||||
onCompletion.run();
|
||||
return;
|
||||
}
|
||||
|
||||
final int sliceSize = sliceSize(taskExecutor, size);
|
||||
|
||||
if (size <= sliceSize) {
|
||||
action.accept(list, context);
|
||||
onCompletion.run();
|
||||
} else if (sliceSize == 1) {
|
||||
var synchronizer = new Synchronizer(size, onCompletion);
|
||||
for (T t : list) {
|
||||
taskExecutor.execute(() -> {
|
||||
action.accept(Collections.singletonList(t), context);
|
||||
synchronizer.decrementAndEventuallyRun();
|
||||
});
|
||||
}
|
||||
} else {
|
||||
var synchronizer = new Synchronizer(MoreMath.ceilingDiv(size, sliceSize), onCompletion);
|
||||
int remaining = size;
|
||||
|
||||
while (remaining > 0) {
|
||||
int end = remaining;
|
||||
remaining -= sliceSize;
|
||||
int start = Math.max(remaining, 0);
|
||||
|
||||
var subList = list.subList(start, end);
|
||||
taskExecutor.execute(() -> {
|
||||
action.accept(subList, context);
|
||||
synchronizer.decrementAndEventuallyRun();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static int sliceSize(TaskExecutor taskExecutor, int totalSize) {
|
||||
return sliceSize(taskExecutor, totalSize, 32);
|
||||
}
|
||||
|
||||
public static int sliceSize(TaskExecutor taskExecutor, int totalSize, int denominator) {
|
||||
return MoreMath.ceilingDiv(totalSize, taskExecutor.getThreadCount() * denominator);
|
||||
}
|
||||
|
||||
private PlanUtil() {
|
||||
}
|
||||
}
|
@ -29,7 +29,7 @@ public record SimplePlan<C>(List<RunnableWithContext<C>> parallelTasks) implemen
|
||||
return;
|
||||
}
|
||||
|
||||
taskExecutor.execute(() -> PlanUtil.distribute(taskExecutor, context, onCompletion, parallelTasks, RunnableWithContext::run));
|
||||
taskExecutor.execute(() -> Distribute.tasks(taskExecutor, context, onCompletion, parallelTasks, RunnableWithContext::run));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user