diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5d4372a4..5a429d9e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -10,9 +10,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: fetch-depth: 0 + submodules: true - name: Set up JDK 17 uses: actions/setup-java@v2 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9e7c53c2..74f070f0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,6 +17,7 @@ jobs: - uses: actions/checkout@v2 with: fetch-depth: 0 + submodules: true - name: Set up JDK 17 uses: actions/setup-java@v2 diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..8132eca3 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "FlowSched"] + path = FlowSched + url = https://github.com/RelativityMC/FlowSched.git diff --git a/FlowSched b/FlowSched new file mode 160000 index 00000000..a0c7df23 --- /dev/null +++ b/FlowSched @@ -0,0 +1 @@ +Subproject commit a0c7df23ae2fb6faf40868859cc600d3e9b4c286 diff --git a/build.gradle b/build.gradle index 337d8808..aefaed5d 100644 --- a/build.gradle +++ b/build.gradle @@ -4,6 +4,7 @@ plugins { id "me.champeau.jmh" version "0.7.1" id 'com.modrinth.minotaur' version '2.+' apply false id 'com.matthewprenger.cursegradle' version '1.4.0' apply false + id 'com.github.johnrengelman.shadow' version '8.1.1' apply false } @SuppressWarnings('unused') @@ -36,6 +37,8 @@ configure(allprojects) { archivesBaseName = "${project.archives_base_name}-mc${project.minecraft_version}" +clean.dependsOn gradle.includedBuild('FlowSched').task(':clean') + configure (allprojects - project(":tests")) { if (project != project(":") && project.parent != project(":")) return @@ -118,6 +121,7 @@ configure (allprojects - project(":tests")) { implementation "com.electronwill.night-config:toml:${night_config_version}" implementation "org.threadly:threadly:${threadly_version}" implementation "net.objecthunter:exp4j:${exp4j_version}" + implementation "com.ishland.flowsched:flowsched" } } @@ -146,6 +150,7 @@ dependencies { include implementation("com.electronwill.night-config:core:${night_config_version}") include implementation("org.threadly:threadly:${threadly_version}") include implementation("net.objecthunter:exp4j:${exp4j_version}") +// include implementation("com.ishland.flowsched:flowsched") // PSA: Some older mods, compiled on Loom 0.2.1, might have outdated Maven POMs. // You may need to force-disable transitiveness on them. @@ -196,8 +201,9 @@ dependencies { (subprojects - project(":tests")).forEach { if (it.parent != project(":")) return - api project(path: ":${it.name}", configuration: "namedElements") - include project("${it.name}:") + def projectName = it.name + api project(path: ":${projectName}", configuration: "namedElements") + include project("${projectName}:") } } } diff --git a/c2me-base/build.gradle b/c2me-base/build.gradle new file mode 100644 index 00000000..925f6c1d --- /dev/null +++ b/c2me-base/build.gradle @@ -0,0 +1,24 @@ +apply plugin: 'com.github.johnrengelman.shadow' + +configurations { + shadowInclude +} + +dependencies { + shadowInclude("com.ishland.flowsched:flowsched") { + transitive false + } +} + +shadowJar { + archiveClassifier = "all-dev" + configurations = [ project.configurations.shadowInclude ] +} + +remapJar { + input = shadowJar.archiveFile + archiveFileName = shadowJar.archiveFileName.get().replaceAll("-dev\\.jar\$", ".jar") + addNestedDependencies = true + dependsOn shadowJar +} + diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/GlobalExecutors.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/GlobalExecutors.java index 324a7083..62758004 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/GlobalExecutors.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/GlobalExecutors.java @@ -2,34 +2,23 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.ishland.c2me.base.ModuleEntryPoint; -import com.ishland.c2me.base.common.util.C2MENormalWorkerThreadFactory; +import com.ishland.flowsched.executor.ExecutorManager; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class GlobalExecutors { // private static final C2MEForkJoinWorkerThreadFactory factory = new C2MEForkJoinWorkerThreadFactory("c2me", "C2ME worker #%d", Thread.NORM_PRIORITY - 1); - private static final C2MENormalWorkerThreadFactory factory = new C2MENormalWorkerThreadFactory("c2me", "C2ME worker #%d", Thread.NORM_PRIORITY - 1); public static final int GLOBAL_EXECUTOR_PARALLELISM = (int) ModuleEntryPoint.globalExecutorParallelism; -// public static final ForkJoinPool executor = new ForkJoinPool( -// GLOBAL_EXECUTOR_PARALLELISM, -// factory, -// null, -// true -// ); - public static final ExecutorService executor = Executors.newFixedThreadPool(GLOBAL_EXECUTOR_PARALLELISM, factory); - public static final Executor invokingExecutor = r -> { - if (Thread.currentThread().getThreadGroup() == factory.getThreadGroup()) { - r.run(); - } else { - executor.execute(r); - } - }; + private static final AtomicInteger prioritizedSchedulerCounter = new AtomicInteger(0); + public static final ExecutorManager prioritizedScheduler = new ExecutorManager(GlobalExecutors.GLOBAL_EXECUTOR_PARALLELISM, thread -> { + thread.setDaemon(true); + thread.setName("c2me-prioritized-%d".formatted(prioritizedSchedulerCounter.getAndIncrement())); + }); public static final ExecutorService asyncScheduler = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/AbstractPosAwarePrioritizedTask.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/AbstractPosAwarePrioritizedTask.java new file mode 100644 index 00000000..334da353 --- /dev/null +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/AbstractPosAwarePrioritizedTask.java @@ -0,0 +1,36 @@ +package com.ishland.c2me.base.common.scheduler; + +import com.ishland.flowsched.executor.Task; +import it.unimi.dsi.fastutil.objects.ReferenceArrayList; + +import java.util.Objects; + +public abstract class AbstractPosAwarePrioritizedTask implements Task { + + protected final ReferenceArrayList postExec = new ReferenceArrayList<>(4); + private final long pos; + private int priority = Integer.MAX_VALUE; + + public AbstractPosAwarePrioritizedTask(long pos) { + this.pos = pos; + } + + @Override + public int priority() { + return this.priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + public long getPos() { + return this.pos; + } + + public void addPostExec(Runnable runnable) { + synchronized (this.postExec) { + postExec.add(Objects.requireNonNull(runnable)); + } + } +} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/LockTokenImpl.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/LockTokenImpl.java new file mode 100644 index 00000000..ad0e2ea4 --- /dev/null +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/LockTokenImpl.java @@ -0,0 +1,6 @@ +package com.ishland.c2me.base.common.scheduler; + +import com.ishland.flowsched.executor.LockToken; + +public record LockTokenImpl(int ownerTag, long pos) implements LockToken { +} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingManager.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingManager.java deleted file mode 100644 index 313b2e86..00000000 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingManager.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.ishland.c2me.base.common.scheduler; - -import com.ishland.c2me.base.common.structs.SimpleObjectPool; -import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap; -import it.unimi.dsi.fastutil.objects.ReferenceArraySet; -import net.minecraft.util.math.ChunkPos; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Not thread safe. - */ -public class NeighborLockingManager { - - private static final Logger LOGGER = LoggerFactory.getLogger("NeighborLockingManager"); - - private final SimpleObjectPool> pool = new SimpleObjectPool<>( - pool -> new ReferenceArraySet<>(16), - ReferenceArraySet::clear, - 1024 - ); - private final Long2ReferenceOpenHashMap> activeLocks = new Long2ReferenceOpenHashMap<>(); - - public boolean isLocked(long pos) { - return activeLocks.containsKey(pos); - } - - public void acquireLock(long pos) { - if (isLocked(pos)) throw new IllegalStateException("Already locked"); - activeLocks.put(pos, pool.alloc()); - } - - public void releaseLock(long pos) { - if (!isLocked(pos)) throw new IllegalStateException("Not locked"); - final ReferenceArraySet runnables = activeLocks.remove(pos); - for (Runnable runnable : runnables) { - try { - runnable.run(); - } catch (Throwable t) { - LOGGER.error("Failed to notify lock release at chunk %s".formatted(new ChunkPos(pos)), t); - } - } - runnables.clear(); - pool.release(runnables); - } - - public void addReleaseListener(long pos, Runnable runnable) { - if (!isLocked(pos)) throw new IllegalStateException("Not locked"); - activeLocks.get(pos).add(runnable); - } - -} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingTask.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingTask.java deleted file mode 100644 index ef625e5a..00000000 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingTask.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.ishland.c2me.base.common.scheduler; - -import com.google.common.base.Preconditions; -import com.ishland.c2me.base.common.GlobalExecutors; - -import java.util.concurrent.CompletableFuture; -import java.util.function.BooleanSupplier; -import java.util.function.Supplier; - -public class NeighborLockingTask implements ScheduledTask { - - private final SchedulingManager schedulingManager; - private final long center; - private final long[] names; - private final BooleanSupplier isCancelled; - private final Supplier> action; - private final String desc; - private final boolean async; - private final CompletableFuture future = new CompletableFuture<>(); - private boolean acquired = false; - - public NeighborLockingTask(SchedulingManager schedulingManager, long center, long[] names, BooleanSupplier isCancelled, Supplier> action, String desc, boolean async) { - this.schedulingManager = schedulingManager; - this.center = center; - this.names = names; - this.isCancelled = isCancelled; - this.action = action; - this.desc = desc; - this.async = async; - - this.schedulingManager.enqueue(this); - } - - - @Override - public boolean tryPrepare() { - final NeighborLockingManager lockingManager = this.schedulingManager.getNeighborLockingManager(); - for (long l : names) { - if (lockingManager.isLocked(l)) { - lockingManager.addReleaseListener(l, () -> this.schedulingManager.enqueue(this)); - return false; - } - } - for (long l : names) { - lockingManager.acquireLock(l); - } - acquired = true; - return true; - } - - @Override - public void runTask(Runnable postAction) { - Preconditions.checkNotNull(postAction); - if (!acquired) throw new IllegalStateException(); - final CompletableFuture future = this.action.get(); - Preconditions.checkNotNull(future, "future"); - future.handleAsync((result, throwable) -> { - this.schedulingManager.getExecutor().execute(() -> { - final NeighborLockingManager lockingManager = this.schedulingManager.getNeighborLockingManager(); - for (long l : names) { - lockingManager.releaseLock(l); - } - }); - try { - postAction.run(); - } catch (Throwable t) { - t.printStackTrace(); - } - if (throwable != null) this.future.completeExceptionally(throwable); - else this.future.complete(result); - return null; - }, GlobalExecutors.invokingExecutor); - } - - @Override - public long centerPos() { - return center; - } - - @Override - public boolean isAsync() { - return async; - } - - public CompletableFuture getFuture() { - return future; - } -} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/PriorityUtils.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/PriorityUtils.java deleted file mode 100644 index d85728fb..00000000 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/PriorityUtils.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.ishland.c2me.base.common.scheduler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicInteger; - -public class PriorityUtils { - - static final Logger LOGGER = LoggerFactory.getLogger("C2ME Priority System"); - - // int32 S0000000 000MNNNN LLLLLLLL DDDDDDDD - // S: sign bit always 0 - // M: clear if in sync load range and set if not - // N: distance to sync load chunk - // L: load level - // D: distance to nearest player - - private static final AtomicInteger priorityChanges = new AtomicInteger(0); - - public static void notifyPriorityChange() { -// priorityChanges.incrementAndGet(); - } - - public static void notifyPriorityChange(int amount) { -// priorityChanges.addAndGet(amount); - } - - public static int priorityChangeSerial() { - return priorityChanges.get(); - } - -} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/ScheduledTask.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/ScheduledTask.java index 8b7c1339..7118b67c 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/ScheduledTask.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/ScheduledTask.java @@ -1,13 +1,52 @@ package com.ishland.c2me.base.common.scheduler; -public interface ScheduledTask { +import com.ishland.flowsched.executor.LockToken; - public boolean tryPrepare(); +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; - public void runTask(Runnable postAction); +public class ScheduledTask extends AbstractPosAwarePrioritizedTask { - public long centerPos(); + private final Supplier> action; + private final LockToken[] lockTokens; + private final CompletableFuture future = new CompletableFuture<>(); - public boolean isAsync(); + public ScheduledTask(long pos, Supplier> action, LockToken[] lockTokens) { + super(pos); + this.action = action; + this.lockTokens = lockTokens; + } + @Override + public void run(Runnable releaseLocks) { + action.get().whenComplete((t, throwable) -> { + releaseLocks.run(); + if (throwable != null) { + future.completeExceptionally(throwable); + } else { + future.complete(t); + } + for (Runnable runnable : this.postExec) { + try { + runnable.run(); + } catch (Throwable t1) { + t1.printStackTrace(); + } + } + }); + } + + @Override + public void propagateException(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public LockToken[] lockTokens() { + return this.lockTokens; + } + + public CompletableFuture getFuture() { + return this.future; + } } diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingAsyncCombinedLock.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingAsyncCombinedLock.java deleted file mode 100644 index e862e7c0..00000000 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingAsyncCombinedLock.java +++ /dev/null @@ -1,137 +0,0 @@ -package com.ishland.c2me.base.common.scheduler; - -import com.google.common.base.Preconditions; -import com.ibm.asyncutil.locks.AsyncLock; -import com.ibm.asyncutil.locks.AsyncNamedLock; -import com.ishland.c2me.base.common.GlobalExecutors; -import net.minecraft.util.math.ChunkPos; - -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.BooleanSupplier; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - -public class SchedulingAsyncCombinedLock implements ScheduledTask { - - private final AsyncNamedLock lock; - private final long center; - private final ChunkPos[] names; - private final BooleanSupplier isCancelled; - private final Consumer> readdForExecution; - private final Supplier> action; - private final String desc; - private final CompletableFuture future = new CompletableFuture<>(); - private final boolean async; - private AsyncLock.LockToken acquiredToken; - - public SchedulingAsyncCombinedLock(AsyncNamedLock lock, long center, Set names, BooleanSupplier isCancelled, Consumer> readdForExecution, Supplier> action, String desc, boolean async) { - this.lock = lock; - this.center = center; - this.names = names.toArray(ChunkPos[]::new); - this.isCancelled = isCancelled; - this.readdForExecution = readdForExecution; - this.action = action; - this.desc = desc; - this.async = async; - - this.readdForExecution.accept(this); - } - - @Override - public boolean tryPrepare() { - return tryAcquire(); - } - - synchronized boolean tryAcquire() { -// if (this.isCancelled.getAsBoolean()) { -//// System.out.println(String.format("Cancelling tasks for %s", this.desc)); -// this.future.completeExceptionally(new CancellationException()); -// return false; -// } - - final LockEntry[] tryLocks = new LockEntry[names.length]; - boolean allAcquired = true; - for (int i = 0, namesLength = names.length; i < namesLength; i++) { - ChunkPos name = names[i]; - final LockEntry entry = new LockEntry(name, this.lock.tryLock(name)); - tryLocks[i] = entry; - if (entry.lockToken.isEmpty()) { - allAcquired = false; - break; - } - } - if (allAcquired) { - this.acquiredToken = () -> { - for (LockEntry entry : tryLocks) { - //noinspection OptionalGetWithoutIsPresent - entry.lockToken.get().releaseLock(); // if it isn't present then something is really wrong - } - }; - return true; - } else { - boolean triedRelock = false; - for (LockEntry entry : tryLocks) { - if (entry == null) continue; - entry.lockToken.ifPresent(AsyncLock.LockToken::releaseLock); - if (!triedRelock && entry.lockToken.isEmpty()) { - this.lock.acquireLock(entry.name).thenAccept(lockToken -> { - lockToken.releaseLock(); - this.readdForExecution.accept(this); - }); - triedRelock = true; - } - } - if (!triedRelock) { - // shouldn't happen at all... - System.err.println("Some issue occurred while doing locking, retrying"); - return this.tryAcquire(); - } - return false; - } - } - - @Override - public void runTask(Runnable postAction) { - Preconditions.checkNotNull(postAction); - AsyncLock.LockToken token = this.acquiredToken; - if (token == null) throw new IllegalStateException(); - final CompletableFuture future = this.action.get(); - Preconditions.checkNotNull(future, "future"); - future.handleAsync((result, throwable) -> { - try { - token.releaseLock(); - } catch (Throwable t) { - t.printStackTrace(); - } - try { - postAction.run(); - } catch (Throwable t) { - t.printStackTrace(); - } - if (throwable != null) this.future.completeExceptionally(throwable); - else this.future.complete(result); - return null; - }, GlobalExecutors.invokingExecutor); - } - - @Override - public long centerPos() { - return center; - } - - @Override - public boolean isAsync() { - return this.async; - } - - public CompletableFuture getFuture() { - return this.future.thenApply(Function.identity()); - } - - private record LockEntry(ChunkPos name, - Optional lockToken) { - } -} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java index f56fcb7b..bc616850 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java @@ -1,6 +1,7 @@ package com.ishland.c2me.base.common.scheduler; -import com.ishland.c2me.base.common.structs.DynamicPriorityQueue; +import com.ishland.c2me.base.common.GlobalExecutors; +import com.ishland.flowsched.structs.SimpleObjectPool; import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap; import it.unimi.dsi.fastutil.objects.ObjectArraySet; @@ -8,53 +9,71 @@ import net.minecraft.util.math.ChunkPos; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class SchedulingManager { + private static final AtomicInteger COUNTER = new AtomicInteger(0); + public static final int MAX_LEVEL = ChunkLevels.INACCESSIBLE + 1; - private final DynamicPriorityQueue queue = new DynamicPriorityQueue<>(MAX_LEVEL + 1); - private final Long2ReferenceOpenHashMap> pos2Tasks = new Long2ReferenceOpenHashMap<>(); + private final Long2ReferenceOpenHashMap> pos2Tasks = new Long2ReferenceOpenHashMap<>(); + private final SimpleObjectPool> pos2TasksPool = new SimpleObjectPool<>(unused -> new ObjectArraySet<>(), ObjectArraySet::clear, ObjectArraySet::clear, 2048); private final Long2IntOpenHashMap prioritiesFromLevel = new Long2IntOpenHashMap(); - private final NeighborLockingManager neighborLockingManager = new NeighborLockingManager(); - private final AtomicInteger scheduledCount = new AtomicInteger(0); - private final AtomicBoolean scheduled = new AtomicBoolean(false); + private final Object schedulingMutex = new Object(); + private final int id = COUNTER.getAndIncrement(); private ChunkPos currentSyncLoad = null; private final Executor executor; - private final int maxScheduled; { prioritiesFromLevel.defaultReturnValue(MAX_LEVEL); } - public SchedulingManager(Executor executor, int maxScheduled) { + public SchedulingManager(Executor executor) { this.executor = executor; - this.maxScheduled = maxScheduled; } - public void enqueue(ScheduledTask task) { - this.executor.execute(() -> { - if (task.isAsync()) { - schedule0(task); - } else { - queue.enqueue(task, prioritiesFromLevel.get(task.centerPos())); - pos2Tasks.computeIfAbsent(task.centerPos(), unused -> new ObjectArraySet<>()).add(task); - scheduleExecution(); + public void enqueue(AbstractPosAwarePrioritizedTask task) { + synchronized (this.schedulingMutex) { + final long pos = task.getPos(); + final ObjectArraySet locks = this.pos2Tasks.computeIfAbsent(pos, unused -> this.pos2TasksPool.alloc()); + locks.add(task); + updatePriorityInternal(pos); + } + task.addPostExec(() -> { + synchronized (this.schedulingMutex) { + final ObjectArraySet tasks = this.pos2Tasks.get(task.getPos()); + if (tasks != null) { + tasks.remove(task); + if (tasks.isEmpty()) { + this.pos2Tasks.remove(task.getPos()); + this.pos2TasksPool.release(tasks); + } + } } }); + GlobalExecutors.prioritizedScheduler.schedule(task); + } + + public void enqueue(long pos, Runnable command) { + this.enqueue(new WrappingTask(pos, command)); + } + + public Executor positionedExecutor(long pos) { + return command -> this.enqueue(pos, command); } public void updatePriorityFromLevel(long pos, int level) { this.executor.execute(() -> { - if (prioritiesFromLevel.get(pos) == level) return; - if (level < MAX_LEVEL) { - prioritiesFromLevel.put(pos, level); - } else { - prioritiesFromLevel.remove(pos); + synchronized (this.schedulingMutex) { + if (prioritiesFromLevel.get(pos) == level) return; + if (level < MAX_LEVEL) { + prioritiesFromLevel.put(pos, level); + } else { + prioritiesFromLevel.remove(pos); + } + updatePriorityInternal(pos); } - updatePriorityInternal(pos); }); } @@ -73,34 +92,33 @@ private void updatePriorityInternal(long pos) { fromSyncLoad = MAX_LEVEL; } int priority = Math.min(fromLevel, fromSyncLoad); - final ObjectArraySet locks = this.pos2Tasks.get(pos); + final ObjectArraySet locks = this.pos2Tasks.get(pos); if (locks != null) { - for (ScheduledTask lock : locks) { - queue.changePriority(lock, priority); + for (AbstractPosAwarePrioritizedTask lock : locks) { + lock.setPriority(priority); + GlobalExecutors.prioritizedScheduler.notifyPriorityChange(lock); } } } public void setCurrentSyncLoad(ChunkPos pos) { executor.execute(() -> { - if (this.currentSyncLoad != null) { - final ChunkPos lastSyncLoad = this.currentSyncLoad; - this.currentSyncLoad = null; - updateSyncLoadInternal(lastSyncLoad); - } - if (pos != null) { - this.currentSyncLoad = pos; - updateSyncLoadInternal(pos); + synchronized (this.schedulingMutex) { + if (this.currentSyncLoad != null) { + final ChunkPos lastSyncLoad = this.currentSyncLoad; + this.currentSyncLoad = null; + updateSyncLoadInternal(lastSyncLoad); + } + if (pos != null) { + this.currentSyncLoad = pos; + updateSyncLoadInternal(pos); + } } }); } - public NeighborLockingManager getNeighborLockingManager() { - return this.neighborLockingManager; - } - - public Executor getExecutor() { - return executor; + public int getId() { + return this.id; } private void updateSyncLoadInternal(ChunkPos pos) { @@ -113,40 +131,6 @@ private void updateSyncLoadInternal(ChunkPos pos) { long endTime = System.nanoTime(); } - private void scheduleExecution() { - if (scheduledCount.get() < maxScheduled && scheduled.compareAndSet(false, true)) { - this.executor.execute(() -> { - ScheduleStatus status; - while (scheduledCount.get() < maxScheduled && (status = scheduleExecutionInternal()).success) { - if (!status.async) scheduledCount.incrementAndGet(); - } - scheduled.set(false); - }); - } - } - - private ScheduleStatus scheduleExecutionInternal() { - final ScheduledTask task = queue.dequeue(); - if (task != null) { - this.pos2Tasks.get(task.centerPos()).remove(task); - runPos2TasksMaintenance(task.centerPos()); - boolean scheduled1 = schedule0(task); - if (scheduled1) return ScheduleStatus.SCHEDULED; - } - return ScheduleStatus.NOT_SCHEDULED; - } - - private boolean schedule0(ScheduledTask task) { - if (task.tryPrepare()) { - task.runTask(() -> { - scheduledCount.decrementAndGet(); - scheduleExecution(); - }); - return true; - } - return false; - } - private static int chebyshev(ChunkPos a, ChunkPos b) { return Math.max(Math.abs(a.x - b.x), Math.abs(a.z - b.z)); } @@ -169,11 +153,4 @@ private enum ScheduleStatus { } }; - private void runPos2TasksMaintenance(long pos) { - final ObjectArraySet locks = this.pos2Tasks.get(pos); - if (locks != null && locks.isEmpty()) { - this.pos2Tasks.remove(pos); - } - } - } diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/WrappingTask.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/WrappingTask.java new file mode 100644 index 00000000..c65fff2f --- /dev/null +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/WrappingTask.java @@ -0,0 +1,43 @@ +package com.ishland.c2me.base.common.scheduler; + +import com.ishland.flowsched.executor.LockToken; + +import java.util.Objects; + +public class WrappingTask extends AbstractPosAwarePrioritizedTask { + + private static final LockToken[] EMPTY_LOCK_TOKENS = new LockToken[0]; + + private final Runnable wrapped; + + public WrappingTask(long pos, Runnable wrapped) { + super(pos); + this.wrapped = Objects.requireNonNull(wrapped); + } + + @Override + public void run(Runnable releaseLocks) { + try { + wrapped.run(); + } finally { + releaseLocks.run(); + for (Runnable runnable : this.postExec) { + try { + runnable.run(); + } catch (Throwable t1) { + t1.printStackTrace(); + } + } + } + } + + @Override + public void propagateException(Throwable t) { + t.printStackTrace(); + } + + @Override + public LockToken[] lockTokens() { + return EMPTY_LOCK_TOKENS; + } +} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/structs/DynamicPriorityQueue.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/structs/DynamicPriorityQueue.java deleted file mode 100644 index 9813abf5..00000000 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/structs/DynamicPriorityQueue.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.ishland.c2me.base.common.structs; - -import it.unimi.dsi.fastutil.objects.Object2IntMap; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; -import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet; - -/** - * A priority queue with fixed number of priorities and allows changing priorities of elements. - * Not thread-safe. - * - * @param the type of elements held in this collection - */ -public class DynamicPriorityQueue { - - private final ObjectLinkedOpenHashSet[] priorities; - private final Object2IntMap priorityMap = new Object2IntOpenHashMap<>(); - - private int currentMinPriority = 0; - - public DynamicPriorityQueue(int priorityCount) { - //noinspection unchecked - this.priorities = new ObjectLinkedOpenHashSet[priorityCount]; - for (int i = 0; i < priorityCount; i++) { - this.priorities[i] = new ObjectLinkedOpenHashSet<>(); - } - } - - public void enqueue(E element, int priority) { - if (priority < 0 || priority >= priorities.length) - throw new IllegalArgumentException("Priority out of range"); - if (priorityMap.containsKey(element)) - throw new IllegalArgumentException("Element already in queue"); - - priorities[priority].add(element); - priorityMap.put(element, priority); - if (priority < currentMinPriority) - currentMinPriority = priority; - } - - public void changePriority(E element, int priority) { - if (priority < 0 || priority >= priorities.length) - throw new IllegalArgumentException("Priority out of range"); - if (!priorityMap.containsKey(element)) return; // ignored - - int oldPriority = priorityMap.getInt(element); - if (oldPriority == priority) return; // nothing to do - - priorities[oldPriority].remove(element); - priorities[priority].add(element); - priorityMap.put(element, priority); - - if (priority < currentMinPriority) currentMinPriority = priority; - } - - public E dequeue() { - while (currentMinPriority < priorities.length) { - ObjectLinkedOpenHashSet priority = this.priorities[currentMinPriority]; - if (priority.isEmpty()) { - currentMinPriority++; - continue; - } - E element = priority.removeFirst(); - priorityMap.removeInt(element); - return element; - } - return null; - } - - public boolean contains(E element) { - return priorityMap.containsKey(element); - } - - public void remove(E element) { - if (!priorityMap.containsKey(element)) - return; // ignore - int priority = priorityMap.getInt(element); - priorities[priority].remove(element); - priorityMap.removeInt(element); - } - - public int size() { - return priorityMap.size(); - } - -} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/structs/SimpleObjectPool.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/structs/SimpleObjectPool.java deleted file mode 100644 index e338661d..00000000 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/structs/SimpleObjectPool.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.ishland.c2me.base.common.structs; - -import com.google.common.base.Preconditions; - -import java.util.Objects; -import java.util.function.Consumer; -import java.util.function.Function; - -public class SimpleObjectPool { - - private final Function, T> constructor; - private final Consumer initializer; - private final int size; - - private final Object[] cachedObjects; - private int allocatedCount = 0; - - public SimpleObjectPool(Function, T> constructor, Consumer initializer, int size) { - this.constructor = Objects.requireNonNull(constructor); - this.initializer = Objects.requireNonNull(initializer); - Preconditions.checkArgument(size > 0); - this.cachedObjects = new Object[size]; - this.size = size; - - for (int i = 0; i < size; i++) { - final T object = constructor.apply(this); - this.cachedObjects[i] = object; - } - } - - public T alloc() { - final T object; - synchronized (this) { - if (this.allocatedCount >= this.size) { // oversized, falling back to normal alloc - object = this.constructor.apply(this); - return object; - } - - // get an object from the array - final int ordinal = this.allocatedCount++; - object = (T) this.cachedObjects[ordinal]; - this.cachedObjects[ordinal] = null; - } - - this.initializer.accept(object); // initialize the object - - return object; - } - - public void release(T object) { - synchronized (this) { - if (this.allocatedCount == 0) return; // pool is full - this.cachedObjects[--this.allocatedCount] = object; // store the object into the pool - } - } - -} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/util/AsyncCombinedLock.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/util/AsyncCombinedLock.java deleted file mode 100644 index 0fdaef81..00000000 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/util/AsyncCombinedLock.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.ishland.c2me.base.common.util; - -import com.ibm.asyncutil.locks.AsyncLock; -import com.ibm.asyncutil.locks.AsyncNamedLock; -import com.ishland.c2me.base.common.GlobalExecutors; -import net.minecraft.util.math.ChunkPos; - -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -public class AsyncCombinedLock { - - private final AsyncNamedLock lock; - private final ChunkPos[] names; - private final CompletableFuture future = new CompletableFuture<>(); - - public AsyncCombinedLock(AsyncNamedLock lock, Set names) { - this.lock = lock; - this.names = names.toArray(ChunkPos[]::new); - this.tryAcquire(); - } - - private synchronized void tryAcquire() { // TODO optimize logic further - final LockEntry[] tryLocks = new LockEntry[names.length]; - boolean allAcquired = true; - for (int i = 0, namesLength = names.length; i < namesLength; i++) { - ChunkPos name = names[i]; - final LockEntry entry = new LockEntry(name, this.lock.tryLock(name)); - tryLocks[i] = entry; - if (entry.lockToken.isEmpty()) { - allAcquired = false; - break; - } - } - if (allAcquired) { - future.complete(() -> { - for (LockEntry entry : tryLocks) { - //noinspection OptionalGetWithoutIsPresent - entry.lockToken.get().releaseLock(); // if it isn't present then something is really wrong - } - }); - } else { - boolean triedRelock = false; - for (LockEntry entry : tryLocks) { - if (entry == null) continue; - entry.lockToken.ifPresent(AsyncLock.LockToken::releaseLock); - if (!triedRelock && entry.lockToken.isEmpty()) { - this.lock.acquireLock(entry.name).thenCompose(lockToken -> { - lockToken.releaseLock(); - return CompletableFuture.runAsync(this::tryAcquire, GlobalExecutors.executor); - }); - triedRelock = true; - } - } - if (!triedRelock) { - // shouldn't happen at all... - System.err.println("Some issue occurred while doing locking, retrying"); - this.tryAcquire(); - } - } - } - - public CompletableFuture getFuture() { - return future.thenApply(Function.identity()); - } - - private record LockEntry(ChunkPos name, - Optional lockToken) { - } -} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java b/c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java index b998fb58..e202ff92 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java @@ -13,7 +13,7 @@ @Mixin(ThreadedAnvilChunkStorage.class) public class MixinThreadedAnvilChunkStorage implements IVanillaChunkManager { - private final SchedulingManager c2me$schedulingManager = new SchedulingManager(GlobalExecutors.asyncScheduler, GlobalExecutors.GLOBAL_EXECUTOR_PARALLELISM * 2); + private final SchedulingManager c2me$schedulingManager = new SchedulingManager(GlobalExecutors.asyncScheduler); @Override public SchedulingManager c2me$getSchedulingManager() { diff --git a/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/PlayerNoTickDistanceMap.java b/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/PlayerNoTickDistanceMap.java index ba82e8db..c87d886e 100644 --- a/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/PlayerNoTickDistanceMap.java +++ b/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/PlayerNoTickDistanceMap.java @@ -1,7 +1,7 @@ package com.ishland.c2me.notickvd.common; -import com.ishland.c2me.base.common.structs.DynamicPriorityQueue; import com.ishland.c2me.base.mixin.access.IThreadedAnvilChunkStorage; +import com.ishland.flowsched.structs.DynamicPriorityQueue; import com.mojang.logging.LogUtils; import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; import it.unimi.dsi.fastutil.longs.LongIterator; diff --git a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/PooledFeatureContext.java b/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/PooledFeatureContext.java index d6c50a8d..fa370022 100644 --- a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/PooledFeatureContext.java +++ b/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/PooledFeatureContext.java @@ -1,6 +1,6 @@ package com.ishland.c2me.opts.allocs.common; -import com.ishland.c2me.base.common.structs.SimpleObjectPool; +import com.ishland.flowsched.structs.SimpleObjectPool; import net.minecraft.util.math.BlockPos; import net.minecraft.util.math.random.Random; import net.minecraft.world.StructureWorldAccess; @@ -13,7 +13,7 @@ public class PooledFeatureContext extends FeatureContext { - public static final ThreadLocal>> POOL = ThreadLocal.withInitial(() -> new SimpleObjectPool<>(unused -> new PooledFeatureContext<>(), unused -> {}, 2048)); + public static final ThreadLocal>> POOL = ThreadLocal.withInitial(() -> new SimpleObjectPool<>(unused -> new PooledFeatureContext<>(), PooledFeatureContext::reInit, PooledFeatureContext::reInit, 2048)); private Optional> feature; private StructureWorldAccess world; diff --git a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/mixin/object_pooling_caching/MixinConfiguredFeature.java b/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/mixin/object_pooling_caching/MixinConfiguredFeature.java index ecdeb5f7..8730c4aa 100644 --- a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/mixin/object_pooling_caching/MixinConfiguredFeature.java +++ b/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/mixin/object_pooling_caching/MixinConfiguredFeature.java @@ -1,7 +1,7 @@ package com.ishland.c2me.opts.allocs.mixin.object_pooling_caching; import com.ishland.c2me.opts.allocs.common.PooledFeatureContext; -import com.ishland.c2me.base.common.structs.SimpleObjectPool; +import com.ishland.flowsched.structs.SimpleObjectPool; import net.minecraft.util.math.BlockPos; import net.minecraft.util.math.random.Random; import net.minecraft.world.StructureWorldAccess; @@ -36,7 +36,6 @@ public boolean generate(StructureWorldAccess world, ChunkGenerator chunkGenerato context.reInit(Optional.empty(), world, chunkGenerator, random, origin, this.config); return this.feature.generate(context); } finally { - context.reInit(); pool.release(context); } } diff --git a/c2me-rewrites-chunkio/src/main/java/com/ishland/c2me/rewrites/chunkio/common/C2MEStorageThread.java b/c2me-rewrites-chunkio/src/main/java/com/ishland/c2me/rewrites/chunkio/common/C2MEStorageThread.java index fbdb3704..210a4791 100644 --- a/c2me-rewrites-chunkio/src/main/java/com/ishland/c2me/rewrites/chunkio/common/C2MEStorageThread.java +++ b/c2me-rewrites-chunkio/src/main/java/com/ishland/c2me/rewrites/chunkio/common/C2MEStorageThread.java @@ -201,14 +201,14 @@ private boolean handlePendingReads() { future.complete(null); } else if (cached.left().isPresent()) { if (scanner != null) { - GlobalExecutors.executor.execute(() -> { + GlobalExecutors.prioritizedScheduler.schedule(() -> { try { cached.left().get().accept(scanner); future.complete(null); } catch (Throwable t) { future.completeExceptionally(t); } - }); + }, 16); } else { future.complete(cached.left().get()); } @@ -227,7 +227,7 @@ private boolean handlePendingReads() { SneakyThrow.sneaky(e); return null; // unreachable } - }, GlobalExecutors.executor) + }, GlobalExecutors.prioritizedScheduler.executor(16)) .thenAccept(future::complete) .exceptionally(throwable -> { future.completeExceptionally(throwable); @@ -293,7 +293,7 @@ private void scheduleChunkRead(long pos, CompletableFuture future, SneakyThrow.sneaky(t); return null; // Unreachable anyway } - }, GlobalExecutors.executor).handle((compound, throwable) -> { + }, GlobalExecutors.prioritizedScheduler.executor(16)).handle((compound, throwable) -> { if (throwable != null) future.completeExceptionally(throwable); else future.complete(compound); return null; @@ -337,7 +337,7 @@ private void writeChunk(long pos, Either nbt) { SneakyThrow.sneaky(t); return null; // Unreachable anyway } - }, GlobalExecutors.executor).thenAcceptAsync(bytes -> { + }, GlobalExecutors.prioritizedScheduler.executor(16)).thenAcceptAsync(bytes -> { if (nbt == this.cache.get(pos)) { // only write if match to avoid overwrites try { final ChunkPos pos1 = new ChunkPos(pos); diff --git a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinThreadedAnvilChunkStorage.java b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinThreadedAnvilChunkStorage.java index 14e5845a..5c81a17f 100644 --- a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinThreadedAnvilChunkStorage.java +++ b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinThreadedAnvilChunkStorage.java @@ -3,6 +3,7 @@ import com.ibm.asyncutil.locks.AsyncNamedLock; import com.ishland.c2me.base.common.GlobalExecutors; import com.ishland.c2me.base.common.registry.SerializerAccess; +import com.ishland.c2me.base.common.scheduler.IVanillaChunkManager; import com.ishland.c2me.base.common.theinterface.IDirectStorage; import com.ishland.c2me.base.common.util.SneakyThrow; import com.ishland.c2me.base.mixin.access.IVersionedChunkStorage; @@ -38,6 +39,7 @@ import net.minecraft.world.poi.PointOfInterestStorage; import net.minecraft.world.storage.StorageIoWorker; import net.minecraft.world.storage.VersionedChunkStorage; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.spongepowered.asm.mixin.Dynamic; import org.spongepowered.asm.mixin.Final; @@ -141,6 +143,12 @@ private CompletableFuture> loadChunk(ChunkPo scheduledChunks.add(pos); } + return chunkLock.acquireLock(pos).toCompletableFuture() + .thenCompose(lockToken -> c2me$loadChunk0(pos).whenComplete((chunkUnloadedEither, throwable) -> lockToken.releaseLock())); + } + + @NotNull + private CompletableFuture> c2me$loadChunk0(ChunkPos pos) { final CompletableFuture> poiData = ((IAsyncChunkStorage) ((com.ishland.c2me.base.mixin.access.ISerializingRegionBasedStorage) this.pointOfInterestStorage).getWorker()).getNbtAtAsync(pos) .exceptionally(throwable -> { @@ -176,7 +184,7 @@ private CompletableFuture> loadChunk(ChunkPo } return null; - }, GlobalExecutors.executor) + }, ((IVanillaChunkManager) this).c2me$getSchedulingManager().positionedExecutor(pos.toLong())) .exceptionally(throwable -> { //noinspection IfStatementWithIdenticalBranches if (Config.recoverFromErrors) { @@ -192,7 +200,7 @@ private CompletableFuture> loadChunk(ChunkPo // if (protoChunk != null) ((ProtoChunkExtension) protoChunk).setBlendingInfo(pos, bitSet); // return protoChunk; // }) - .thenApplyAsync(protoChunk -> { + .thenApply(protoChunk -> { // blending protoChunk = protoChunk != null ? protoChunk : (ProtoChunk) this.getProtoChunk(pos); if (protoChunk.getBelowZeroRetrogen() != null || protoChunk.getStatus().getChunkType() == ChunkStatus.ChunkType.PROTOCHUNK) { @@ -218,7 +226,7 @@ private CompletableFuture> loadChunk(ChunkPo this.mark(pos, protoChunk.getStatus().getChunkType()); return Either.left(protoChunk); - }, GlobalExecutors.invokingExecutor); + }); future.exceptionally(throwable -> { LOGGER.error("Couldn't load chunk {}", pos, throwable); return null; @@ -229,39 +237,6 @@ private CompletableFuture> loadChunk(ChunkPo } }); return future; - - // [VanillaCopy] - for reference - /* - return CompletableFuture.supplyAsync(() -> { - try { - this.world.getProfiler().visit("chunkLoad"); - CompoundTag compoundTag = this.getUpdatedChunkNbt(pos); - if (compoundTag != null) { - boolean bl = compoundTag.contains("Level", 10) && compoundTag.getCompound("Level").contains("Status", 8); - if (bl) { - Chunk chunk = ChunkSerializer.deserialize(this.world, this.structureManager, this.pointOfInterestStorage, pos, compoundTag); - this.method_27053(pos, chunk.getStatus().getChunkType()); - return Either.left(chunk); - } - - LOGGER.error((String)"Chunk file at {} is missing level data, skipping", (Object)pos); - } - } catch (CrashException var5) { - Throwable throwable = var5.getCause(); - if (!(throwable instanceof IOException)) { - this.method_27054(pos); - throw var5; - } - - LOGGER.error((String)"Couldn't load chunk {}", (Object)pos, (Object)throwable); - } catch (Exception var6) { - LOGGER.error((String)"Couldn't load chunk {}", (Object)pos, (Object)var6); - } - - this.method_27054(pos); - return Either.left(new ProtoChunk(pos, UpgradeData.NO_UPGRADE_DATA, this.world)); - }, this.mainThreadExecutor); - */ } private CompletableFuture> getUpdatedChunkNbtAtAsync(ChunkPos pos) { @@ -359,7 +334,7 @@ private boolean asyncSave(ThreadedAnvilChunkStorage tacs, Chunk chunk, ChunkHold } finally { AsyncSerializationManager.pop(scope); } - }, GlobalExecutors.executor) + }, GlobalExecutors.prioritizedScheduler.executor(16) /* boost priority as we are serializing an unloaded chunk */) .thenAccept((either) -> { if (either.left().isPresent()) { this.setNbt(chunkPos, either.left().get()); @@ -397,7 +372,7 @@ private boolean asyncSave(ThreadedAnvilChunkStorage tacs, Chunk chunk, ChunkHold @Inject(method = "tick", at = @At("HEAD")) private void onTick(CallbackInfo info) { - GlobalExecutors.executor.execute(() -> saveFutures.removeIf(CompletableFuture::isDone)); + GlobalExecutors.asyncScheduler.execute(() -> saveFutures.removeIf(CompletableFuture::isDone)); } @Override diff --git a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/common/ChunkStatusUtils.java b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/common/ChunkStatusUtils.java index e43fd0b2..c3950422 100644 --- a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/common/ChunkStatusUtils.java +++ b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/common/ChunkStatusUtils.java @@ -1,25 +1,20 @@ package com.ishland.c2me.threading.worldgen.common; import com.google.common.base.Preconditions; -import com.ibm.asyncutil.locks.AsyncLock; -import com.ibm.asyncutil.locks.AsyncNamedLock; -import com.ishland.c2me.base.common.GlobalExecutors; -import com.ishland.c2me.base.common.scheduler.NeighborLockingTask; +import com.ishland.c2me.base.common.scheduler.LockTokenImpl; +import com.ishland.c2me.base.common.scheduler.ScheduledTask; import com.ishland.c2me.base.common.scheduler.SchedulingManager; -import com.mojang.datafixers.util.Either; -import it.unimi.dsi.fastutil.longs.LongArrayList; +import com.ishland.flowsched.executor.LockToken; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import net.minecraft.server.world.ChunkHolder; import net.minecraft.server.world.ChunkLevels; import net.minecraft.util.math.ChunkPos; -import net.minecraft.world.chunk.Chunk; import net.minecraft.world.chunk.ChunkStatus; import java.util.concurrent.CompletableFuture; import java.util.function.BooleanSupplier; -import java.util.function.Function; import java.util.function.Supplier; -import static com.ishland.c2me.threading.worldgen.common.ChunkStatusUtils.ChunkStatusThreadingType.AS_IS; import static com.ishland.c2me.threading.worldgen.common.ChunkStatusUtils.ChunkStatusThreadingType.PARALLELIZED; import static com.ishland.c2me.threading.worldgen.common.ChunkStatusUtils.ChunkStatusThreadingType.SINGLE_THREADED; @@ -40,53 +35,30 @@ public static ChunkStatusThreadingType getThreadingType(final ChunkStatus status return Config.allowThreadedFeatures ? PARALLELIZED : SINGLE_THREADED; } else if (status.equals(ChunkStatus.INITIALIZE_LIGHT) || status.equals(ChunkStatus.LIGHT)) { - return AS_IS; + return PARALLELIZED; } - return AS_IS; + return PARALLELIZED; } - public static CompletableFuture runChunkGenWithLock(ChunkPos target, ChunkStatus status, ChunkHolder holder, int radius, SchedulingManager schedulingManager, boolean async, AsyncNamedLock chunkLock, Supplier> action) { + public static CompletableFuture runChunkGenWithLock(ChunkPos target, ChunkStatus status, int radius, SchedulingManager schedulingManager, ChunkStatusThreadingType threadingType, Supplier> action) { Preconditions.checkNotNull(status); // if (radius == 0) // return StageSupport.tryWith(chunkLock.acquireLock(target), unused -> action.get()).toCompletableFuture().thenCompose(Function.identity()); - BooleanSupplier isCancelled; - - if (holder != null) { - isCancelled = () -> isCancelled(holder, status); - } else { - isCancelled = FALSE_SUPPLIER; - } - -// ArrayList fetchedLocks = new ArrayList<>((2 * radius + 1) * (2 * radius + 1)); -// for (int x = target.x - radius; x <= target.x + radius; x++) -// for (int z = target.z - radius; z <= target.z + radius; z++) -// fetchedLocks.add(new ChunkPos(x, z)); -// -// final SchedulingAsyncCombinedLock task = new SchedulingAsyncCombinedLock<>( -// chunkLock, -// target.toLong(), -// new HashSet<>(fetchedLocks), -// isCancelled, -// schedulingManager::enqueue, -// action, -// target.toString(), -// async); - - LongArrayList lockTargets = new LongArrayList((2 * radius + 1) * (2 * radius + 1)); + ObjectArrayList lockTargets = new ObjectArrayList<>((2 * radius + 1) * (2 * radius + 1) + 1); for (int x = target.x - radius; x <= target.x + radius; x++) for (int z = target.z - radius; z <= target.z + radius; z++) - lockTargets.add(ChunkPos.toLong(x, z)); + lockTargets.add(new LockTokenImpl(schedulingManager.getId(), ChunkPos.toLong(x, z))); + + if (threadingType == SINGLE_THREADED) { + lockTargets.add(new LockTokenImpl(schedulingManager.getId(), ChunkPos.MARKER)); + } - final NeighborLockingTask task = new NeighborLockingTask<>( - schedulingManager, + final ScheduledTask task = new ScheduledTask<>( target.toLong(), - lockTargets.toLongArray(), - isCancelled, action, - "%s %s".formatted(target.toString(), status.toString()), - async - ); + lockTargets.toArray(LockToken[]::new)); + schedulingManager.enqueue(task); return task.getFuture(); } @@ -96,33 +68,9 @@ public static boolean isCancelled(ChunkHolder holder, ChunkStatus targetStatus) public enum ChunkStatusThreadingType { - PARALLELIZED() { - @Override - public CompletableFuture> runTask(AsyncLock lock, Supplier>> completableFuture) { - return CompletableFuture.supplyAsync(completableFuture, GlobalExecutors.executor).thenCompose(Function.identity()); - } - }, - SINGLE_THREADED() { - @Override - public CompletableFuture> runTask(AsyncLock lock, Supplier>> completableFuture) { - Preconditions.checkNotNull(lock); - return lock.acquireLock().toCompletableFuture().thenComposeAsync(lockToken -> { - try { - return completableFuture.get(); - } finally { - lockToken.releaseLock(); - } - }, GlobalExecutors.executor); - } - }, - AS_IS() { - @Override - public CompletableFuture> runTask(AsyncLock lock, Supplier>> completableFuture) { - return completableFuture.get(); - } - }; - - public abstract CompletableFuture> runTask(AsyncLock lock, Supplier>> completableFuture); + PARALLELIZED(), + SINGLE_THREADED, + AS_IS; } } diff --git a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/MixinChunkStatus.java b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/MixinChunkStatus.java index 706f516f..8da25761 100644 --- a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/MixinChunkStatus.java +++ b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/MixinChunkStatus.java @@ -1,5 +1,6 @@ package com.ishland.c2me.threading.worldgen.mixin; +import com.ishland.c2me.base.common.scheduler.IVanillaChunkManager; import com.ishland.c2me.base.common.scheduler.ThreadLocalWorldGenSchedulingState; import com.ishland.c2me.base.common.util.SneakyThrow; import com.ishland.c2me.base.mixin.access.IThreadedAnvilChunkStorage; @@ -7,8 +8,6 @@ import com.ishland.c2me.threading.worldgen.common.ChunkStatusUtils; import com.ishland.c2me.threading.worldgen.common.Config; import com.ishland.c2me.threading.worldgen.common.IChunkStatus; -import com.ishland.c2me.base.common.scheduler.IVanillaChunkManager; -import com.ishland.c2me.threading.worldgen.common.IWorldGenLockable; import com.mojang.datafixers.util.Either; import net.minecraft.server.world.ChunkHolder; import net.minecraft.server.world.ServerLightingProvider; @@ -128,12 +127,10 @@ public CompletableFuture> runGenerationTask( completableFuture = ChunkStatusUtils.runChunkGenWithLock( targetChunk.getPos(), thiz, - holder, lockRadius, ((IVanillaChunkManager) tacs).c2me$getSchedulingManager(), - (Object) this == ChunkStatus.LIGHT, // lighting is async so don't hold the slot TODO make this check less dirty - ((IWorldGenLockable) world).getWorldGenChunkLock(), - () -> ChunkStatusUtils.getThreadingType(thiz).runTask(((IWorldGenLockable) world).getWorldGenSingleThreadedLock(), generationTask)) + ChunkStatusUtils.getThreadingType(thiz), + generationTask) .exceptionally(t -> { Throwable actual = t; while (actual instanceof CompletionException) actual = t.getCause(); diff --git a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/MixinThreadedAnvilChunkStorage.java b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/MixinThreadedAnvilChunkStorage.java index 3c74b072..b9193d54 100644 --- a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/MixinThreadedAnvilChunkStorage.java +++ b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/MixinThreadedAnvilChunkStorage.java @@ -1,6 +1,6 @@ package com.ishland.c2me.threading.worldgen.mixin; -import com.ishland.c2me.base.common.GlobalExecutors; +import com.ishland.c2me.base.common.scheduler.IVanillaChunkManager; import com.ishland.c2me.base.common.scheduler.ThreadLocalWorldGenSchedulingState; import com.ishland.c2me.threading.worldgen.common.Config; import com.mojang.datafixers.util.Either; @@ -73,7 +73,7 @@ private CompletableFuture, ChunkHolder.Unloaded>> redirectGet .thenComposeAsync(unused -> this.getRegion(chunkHolder, margin, distanceToStatus), r -> { if (Config.asyncScheduling) { if (this.mainThreadExecutor.isOnThread()) { - GlobalExecutors.executor.execute(r); + ((IVanillaChunkManager) this).c2me$getSchedulingManager().enqueue(chunkHolder.getPos().toLong(), r); } else { r.run(); } diff --git a/settings.gradle b/settings.gradle index 1efa844f..5395174d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,6 +10,12 @@ pluginManagement { rootProject.name = "c2me" // avoid weirdness when building the project using another directory name +includeBuild('FlowSched') { + dependencySubstitution { + substitute module('com.ishland.flowsched:flowsched') using project(':') + } +} + include 'tests' include 'tests:test-mod' include 'tests:vanilla-test'