diff --git a/build.gradle b/build.gradle index e08f7fe2..cdae9234 100644 --- a/build.gradle +++ b/build.gradle @@ -121,6 +121,9 @@ configure (allprojects - project(":tests")) { implementation "com.electronwill.night-config:toml:${night_config_version}" implementation "org.threadly:threadly:${threadly_version}" implementation "net.objecthunter:exp4j:${exp4j_version}" + implementation "com.github.LlamaLad7:MixinExtras:${mixinextras_version}" + + annotationProcessor "com.github.LlamaLad7:MixinExtras:${mixinextras_version}" } } @@ -149,6 +152,7 @@ dependencies { include implementation("com.electronwill.night-config:core:${night_config_version}") include implementation("org.threadly:threadly:${threadly_version}") include implementation("net.objecthunter:exp4j:${exp4j_version}") + include implementation("com.github.LlamaLad7:MixinExtras:${mixinextras_version}") // PSA: Some older mods, compiled on Loom 0.2.1, might have outdated Maven POMs. // You may need to force-disable transitiveness on them. diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/GlobalExecutors.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/GlobalExecutors.java index 4015f926..e19798c0 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/GlobalExecutors.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/GlobalExecutors.java @@ -3,6 +3,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.ishland.c2me.base.ModuleEntryPoint; import com.ishland.c2me.base.common.util.C2MENormalWorkerThreadFactory; +import net.minecraft.util.thread.TaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -34,6 +35,8 @@ public class GlobalExecutors { public static final ExecutorService asyncScheduler = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ksched").build()); + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("c2me-sched").build()); + + public static final TaskExecutor asyncSchedulerTaskExecutor = TaskExecutor.create(asyncScheduler, "c2me-sched"); } diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/ModuleMixinPlugin.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/ModuleMixinPlugin.java index 98391bdc..7fd0863f 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/ModuleMixinPlugin.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/ModuleMixinPlugin.java @@ -1,5 +1,6 @@ package com.ishland.c2me.base.common; +import com.llamalad7.mixinextras.MixinExtrasBootstrap; import org.objectweb.asm.tree.ClassNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,6 +20,7 @@ public class ModuleMixinPlugin implements IMixinConfigPlugin { @Override public void onLoad(String mixinPackage) { + MixinExtrasBootstrap.init(); LOGGER.info("Initializing {}", mixinPackage); final String[] split = mixinPackage.split("\\."); final String targetClass = String.join(".", Arrays.copyOf(split, split.length - 1)) + ".ModuleEntryPoint"; diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingManager.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingManager.java new file mode 100644 index 00000000..313b2e86 --- /dev/null +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingManager.java @@ -0,0 +1,53 @@ +package com.ishland.c2me.base.common.scheduler; + +import com.ishland.c2me.base.common.structs.SimpleObjectPool; +import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap; +import it.unimi.dsi.fastutil.objects.ReferenceArraySet; +import net.minecraft.util.math.ChunkPos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Not thread safe. + */ +public class NeighborLockingManager { + + private static final Logger LOGGER = LoggerFactory.getLogger("NeighborLockingManager"); + + private final SimpleObjectPool> pool = new SimpleObjectPool<>( + pool -> new ReferenceArraySet<>(16), + ReferenceArraySet::clear, + 1024 + ); + private final Long2ReferenceOpenHashMap> activeLocks = new Long2ReferenceOpenHashMap<>(); + + public boolean isLocked(long pos) { + return activeLocks.containsKey(pos); + } + + public void acquireLock(long pos) { + if (isLocked(pos)) throw new IllegalStateException("Already locked"); + activeLocks.put(pos, pool.alloc()); + } + + public void releaseLock(long pos) { + if (!isLocked(pos)) throw new IllegalStateException("Not locked"); + final ReferenceArraySet runnables = activeLocks.remove(pos); + for (Runnable runnable : runnables) { + try { + runnable.run(); + } catch (Throwable t) { + LOGGER.error("Failed to notify lock release at chunk %s".formatted(new ChunkPos(pos)), t); + } + } + runnables.clear(); + pool.release(runnables); + } + + public void addReleaseListener(long pos, Runnable runnable) { + if (!isLocked(pos)) throw new IllegalStateException("Not locked"); + activeLocks.get(pos).add(runnable); + } + +} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingTask.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingTask.java new file mode 100644 index 00000000..ef625e5a --- /dev/null +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/NeighborLockingTask.java @@ -0,0 +1,88 @@ +package com.ishland.c2me.base.common.scheduler; + +import com.google.common.base.Preconditions; +import com.ishland.c2me.base.common.GlobalExecutors; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; + +public class NeighborLockingTask implements ScheduledTask { + + private final SchedulingManager schedulingManager; + private final long center; + private final long[] names; + private final BooleanSupplier isCancelled; + private final Supplier> action; + private final String desc; + private final boolean async; + private final CompletableFuture future = new CompletableFuture<>(); + private boolean acquired = false; + + public NeighborLockingTask(SchedulingManager schedulingManager, long center, long[] names, BooleanSupplier isCancelled, Supplier> action, String desc, boolean async) { + this.schedulingManager = schedulingManager; + this.center = center; + this.names = names; + this.isCancelled = isCancelled; + this.action = action; + this.desc = desc; + this.async = async; + + this.schedulingManager.enqueue(this); + } + + + @Override + public boolean tryPrepare() { + final NeighborLockingManager lockingManager = this.schedulingManager.getNeighborLockingManager(); + for (long l : names) { + if (lockingManager.isLocked(l)) { + lockingManager.addReleaseListener(l, () -> this.schedulingManager.enqueue(this)); + return false; + } + } + for (long l : names) { + lockingManager.acquireLock(l); + } + acquired = true; + return true; + } + + @Override + public void runTask(Runnable postAction) { + Preconditions.checkNotNull(postAction); + if (!acquired) throw new IllegalStateException(); + final CompletableFuture future = this.action.get(); + Preconditions.checkNotNull(future, "future"); + future.handleAsync((result, throwable) -> { + this.schedulingManager.getExecutor().execute(() -> { + final NeighborLockingManager lockingManager = this.schedulingManager.getNeighborLockingManager(); + for (long l : names) { + lockingManager.releaseLock(l); + } + }); + try { + postAction.run(); + } catch (Throwable t) { + t.printStackTrace(); + } + if (throwable != null) this.future.completeExceptionally(throwable); + else this.future.complete(result); + return null; + }, GlobalExecutors.invokingExecutor); + } + + @Override + public long centerPos() { + return center; + } + + @Override + public boolean isAsync() { + return async; + } + + public CompletableFuture getFuture() { + return future; + } +} diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/ScheduledTask.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/ScheduledTask.java index bf74a0f6..8b7c1339 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/ScheduledTask.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/ScheduledTask.java @@ -2,9 +2,9 @@ public interface ScheduledTask { - public boolean trySchedule(); + public boolean tryPrepare(); - public void addPostAction(Runnable postAction); + public void runTask(Runnable postAction); public long centerPos(); diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingAsyncCombinedLock.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingAsyncCombinedLock.java index d0be79d6..e862e7c0 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingAsyncCombinedLock.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingAsyncCombinedLock.java @@ -41,7 +41,7 @@ public SchedulingAsyncCombinedLock(AsyncNamedLock lock, long center, S } @Override - public boolean trySchedule() { + public boolean tryPrepare() { return tryAcquire(); } @@ -94,7 +94,7 @@ synchronized boolean tryAcquire() { } @Override - public void addPostAction(Runnable postAction) { + public void runTask(Runnable postAction) { Preconditions.checkNotNull(postAction); AsyncLock.LockToken token = this.acquiredToken; if (token == null) throw new IllegalStateException(); diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java index dbe31fdd..5dc4e6a8 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/scheduler/SchedulingManager.java @@ -17,6 +17,7 @@ public class SchedulingManager { private final DynamicPriorityQueue queue = new DynamicPriorityQueue<>(MAX_LEVEL + 1); private final Long2ReferenceOpenHashMap> pos2Tasks = new Long2ReferenceOpenHashMap<>(); private final Long2IntOpenHashMap prioritiesFromLevel = new Long2IntOpenHashMap(); + private final NeighborLockingManager neighborLockingManager = new NeighborLockingManager(); private final AtomicInteger scheduledCount = new AtomicInteger(0); private final AtomicBoolean scheduled = new AtomicBoolean(false); private ChunkPos currentSyncLoad = null; @@ -94,6 +95,14 @@ public void setCurrentSyncLoad(ChunkPos pos) { }); } + public NeighborLockingManager getNeighborLockingManager() { + return this.neighborLockingManager; + } + + public Executor getExecutor() { + return executor; + } + private void updateSyncLoadInternal(ChunkPos pos) { long startTime = System.nanoTime(); for (int xOff = -8; xOff <= 8; xOff++) { @@ -128,8 +137,8 @@ private ScheduleStatus scheduleExecutionInternal() { } private boolean schedule0(ScheduledTask task) { - if (task.trySchedule()) { - task.addPostAction(() -> { + if (task.tryPrepare()) { + task.runTask(() -> { scheduledCount.decrementAndGet(); scheduleExecution(); }); diff --git a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/SimpleObjectPool.java b/c2me-base/src/main/java/com/ishland/c2me/base/common/structs/SimpleObjectPool.java similarity index 97% rename from c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/SimpleObjectPool.java rename to c2me-base/src/main/java/com/ishland/c2me/base/common/structs/SimpleObjectPool.java index b9890198..e338661d 100644 --- a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/SimpleObjectPool.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/common/structs/SimpleObjectPool.java @@ -1,4 +1,4 @@ -package com.ishland.c2me.opts.allocs.common; +package com.ishland.c2me.base.common.structs; import com.google.common.base.Preconditions; diff --git a/c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java b/c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java index 6bb20246..1cdd1778 100644 --- a/c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java +++ b/c2me-base/src/main/java/com/ishland/c2me/base/mixin/scheduler/MixinThreadedAnvilChunkStorage.java @@ -13,7 +13,7 @@ @Mixin(ThreadedAnvilChunkStorage.class) public class MixinThreadedAnvilChunkStorage implements IVanillaChunkManager { - private final SchedulingManager c2me$schedulingManager = new SchedulingManager(GlobalExecutors.asyncScheduler, GlobalExecutors.GLOBAL_EXECUTOR_PARALLELISM + 1); + private final SchedulingManager c2me$schedulingManager = new SchedulingManager(GlobalExecutors.asyncSchedulerTaskExecutor::send, GlobalExecutors.GLOBAL_EXECUTOR_PARALLELISM * 2); @Override public SchedulingManager c2me$getSchedulingManager() { diff --git a/c2me-fixes-general-threading-issues/src/main/java/com/ishland/c2me/fixes/general/threading_issues/mixin/MixinChunkHolder.java b/c2me-fixes-general-threading-issues/src/main/java/com/ishland/c2me/fixes/general/threading_issues/mixin/MixinChunkHolder.java index b5d94e43..1351c4e9 100644 --- a/c2me-fixes-general-threading-issues/src/main/java/com/ishland/c2me/fixes/general/threading_issues/mixin/MixinChunkHolder.java +++ b/c2me-fixes-general-threading-issues/src/main/java/com/ishland/c2me/fixes/general/threading_issues/mixin/MixinChunkHolder.java @@ -107,7 +107,7 @@ public CompletableFuture> getChunkAt(ChunkSt @Redirect(method = "*", at = @At(value = "INVOKE", target = "Lnet/minecraft/server/world/ChunkHolder;combineSavingFuture(Ljava/util/concurrent/CompletableFuture;Ljava/lang/String;)V")) private void synchronizeCombineSavingFuture(ChunkHolder holder, CompletableFuture> then, String thenDesc) { synchronized (this) { - this.combineSavingFuture(then, thenDesc); + this.combineSavingFuture(then.exceptionally(unused -> null), thenDesc); } } @@ -118,7 +118,7 @@ private void synchronizeCombineSavingFuture(ChunkHolder holder, CompletableFutur @Overwrite public void combineSavingFuture(String string, CompletableFuture completableFuture) { synchronized (this) { - this.savingFuture = this.savingFuture.thenCombine(completableFuture, (chunk, object) -> chunk); + this.savingFuture = this.savingFuture.thenCombine(completableFuture.exceptionally(unused -> null), (chunk, object) -> chunk); } } diff --git a/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/NoTickSystem.java b/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/NoTickSystem.java index 8bbe45c6..17c79683 100644 --- a/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/NoTickSystem.java +++ b/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/NoTickSystem.java @@ -12,7 +12,7 @@ import org.threadly.concurrent.NoThreadScheduler; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; public class NoTickSystem { @@ -26,7 +26,7 @@ public class NoTickSystem { final NoThreadScheduler noThreadScheduler = new NoThreadScheduler(); private final AtomicBoolean isTicking = new AtomicBoolean(); - final ExecutorService executor = GlobalExecutors.asyncScheduler; + final Executor executor = GlobalExecutors.asyncSchedulerTaskExecutor::send; private volatile LongSet noTickOnlyChunksSnapshot = LongSets.EMPTY_SET; private volatile boolean pendingPurge = false; private volatile long age = 0; diff --git a/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/NormalTicketDistanceMap.java b/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/NormalTicketDistanceMap.java index 67634148..ca70a71b 100644 --- a/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/NormalTicketDistanceMap.java +++ b/c2me-notickvd/src/main/java/com/ishland/c2me/notickvd/common/NormalTicketDistanceMap.java @@ -24,7 +24,7 @@ public NormalTicketDistanceMap(ChunkTicketManager chunkTicketManager) { @Override protected int getInitialLevel(long id) { - SortedArraySet> sortedArraySet = ((com.ishland.c2me.base.mixin.access.IChunkTicketManager) chunkTicketManager).getTicketsByPosition().get(id); + SortedArraySet> sortedArraySet = ticketsByPosition.get(id); if (sortedArraySet != null) { if (sortedArraySet.isEmpty()) return Integer.MAX_VALUE; for (ChunkTicket next : sortedArraySet) { diff --git a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/PooledFeatureContext.java b/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/PooledFeatureContext.java index 80d5e930..d6c50a8d 100644 --- a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/PooledFeatureContext.java +++ b/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/common/PooledFeatureContext.java @@ -1,5 +1,6 @@ package com.ishland.c2me.opts.allocs.common; +import com.ishland.c2me.base.common.structs.SimpleObjectPool; import net.minecraft.util.math.BlockPos; import net.minecraft.util.math.random.Random; import net.minecraft.world.StructureWorldAccess; diff --git a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/mixin/object_pooling_caching/MixinConfiguredFeature.java b/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/mixin/object_pooling_caching/MixinConfiguredFeature.java index d616e9ed..ecdeb5f7 100644 --- a/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/mixin/object_pooling_caching/MixinConfiguredFeature.java +++ b/c2me-opts-allocs/src/main/java/com/ishland/c2me/opts/allocs/mixin/object_pooling_caching/MixinConfiguredFeature.java @@ -1,7 +1,7 @@ package com.ishland.c2me.opts.allocs.mixin.object_pooling_caching; import com.ishland.c2me.opts.allocs.common.PooledFeatureContext; -import com.ishland.c2me.opts.allocs.common.SimpleObjectPool; +import com.ishland.c2me.base.common.structs.SimpleObjectPool; import net.minecraft.util.math.BlockPos; import net.minecraft.util.math.random.Random; import net.minecraft.world.StructureWorldAccess; diff --git a/c2me-opts-scheduling/src/main/java/com/ishland/c2me/opts/scheduling/mixin/task_scheduling/MixinThreadExecutor.java b/c2me-opts-scheduling/src/main/java/com/ishland/c2me/opts/scheduling/mixin/task_scheduling/MixinThreadExecutor.java deleted file mode 100644 index 7ffcf060..00000000 --- a/c2me-opts-scheduling/src/main/java/com/ishland/c2me/opts/scheduling/mixin/task_scheduling/MixinThreadExecutor.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.ishland.c2me.opts.scheduling.mixin.task_scheduling; - -import net.minecraft.util.thread.ThreadExecutor; -import org.slf4j.Logger; -import org.spongepowered.asm.mixin.Final; -import org.spongepowered.asm.mixin.Mixin; -import org.spongepowered.asm.mixin.Shadow; -import org.spongepowered.asm.mixin.injection.At; -import org.spongepowered.asm.mixin.injection.Redirect; - -@Mixin(ThreadExecutor.class) -public abstract class MixinThreadExecutor { - - @Shadow @Final private static Logger LOGGER; - - @Shadow public abstract String getName(); - - @Redirect(method = "runTask", at = @At(value = "INVOKE", target = "Lnet/minecraft/util/thread/ThreadExecutor;executeTask(Ljava/lang/Runnable;)V")) - private void redirectExecuteTask(ThreadExecutor threadExecutor, R task) { - try { - task.run(); - } catch (Throwable t) { - LOGGER.error("Error executing task on {}", this.getName(), t); - } - } - -} diff --git a/c2me-opts-scheduling/src/main/resources/c2me-opts-scheduling.mixins.json b/c2me-opts-scheduling/src/main/resources/c2me-opts-scheduling.mixins.json index 58259347..c9139c19 100644 --- a/c2me-opts-scheduling/src/main/resources/c2me-opts-scheduling.mixins.json +++ b/c2me-opts-scheduling/src/main/resources/c2me-opts-scheduling.mixins.json @@ -20,7 +20,6 @@ "shutdown.MixinServerEntityManager", "shutdown.MixinServerWorld", "task_scheduling.MixinEntityChunkDataAccess", - "task_scheduling.MixinThreadedAnvilChunkStorage", - "task_scheduling.MixinThreadExecutor" + "task_scheduling.MixinThreadedAnvilChunkStorage" ] } diff --git a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/common/ChunkIoMainThreadTaskUtils.java b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/common/ChunkIoMainThreadTaskUtils.java index bda8e77f..a91707c1 100644 --- a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/common/ChunkIoMainThreadTaskUtils.java +++ b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/common/ChunkIoMainThreadTaskUtils.java @@ -1,6 +1,7 @@ package com.ishland.c2me.threading.chunkio.common; import com.mojang.logging.LogUtils; +import it.unimi.dsi.fastutil.objects.ReferenceArrayList; import org.slf4j.Logger; import java.util.ArrayDeque; @@ -9,31 +10,40 @@ public class ChunkIoMainThreadTaskUtils { private static final Logger LOGGER = LogUtils.getLogger(); - private static final ThreadLocal> deserializeStack = ThreadLocal.withInitial(ArrayDeque::new); + private static final ThreadLocal>> deserializeStack = ThreadLocal.withInitial(ArrayDeque::new); private static final LinkedBlockingQueue mainThreadQueue = new LinkedBlockingQueue<>(); - public static void push() { - deserializeStack.get().push(new Object()); + public static void push(ReferenceArrayList queue) { + if (queue == null) { + throw new IllegalArgumentException("Queue cannot be null"); + } + deserializeStack.get().push(queue); } - public static void pop() { - deserializeStack.get().pop(); + public static void pop(ReferenceArrayList queue) { + if (queue == null) { + throw new IllegalArgumentException("Queue cannot be null"); + } + final ArrayDeque> stack = deserializeStack.get(); + if (stack.peek() != queue) throw new IllegalStateException("Unexpected queue"); + stack.pop(); } public static void executeMain(Runnable command) { - if (deserializeStack.get().isEmpty()) command.run(); - else mainThreadQueue.add(command); + final ArrayDeque> stack = deserializeStack.get(); + if (stack.isEmpty()) command.run(); + else stack.peek().add(command); } - public static void drainQueue() { - Runnable command; - while ((command = mainThreadQueue.poll()) != null) { + public static void drainQueue(ReferenceArrayList queue) { + for (Runnable command : queue) { try { command.run(); } catch (Throwable t) { LOGGER.error("Error while executing main thread task", t); } } + queue.clear(); } } diff --git a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/common/ProtoChunkExtension.java b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/common/ProtoChunkExtension.java index b86e9945..52afc0f4 100644 --- a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/common/ProtoChunkExtension.java +++ b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/common/ProtoChunkExtension.java @@ -14,4 +14,7 @@ public interface ProtoChunkExtension { boolean getNeedBlending(); + void setInitialMainThreadComputeFuture(CompletableFuture future); + CompletableFuture getInitialMainThreadComputeFuture(); + } diff --git a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinChunkRegion.java b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinChunkRegion.java new file mode 100644 index 00000000..70231125 --- /dev/null +++ b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinChunkRegion.java @@ -0,0 +1,34 @@ +package com.ishland.c2me.threading.chunkio.mixin; + +import com.ishland.c2me.threading.chunkio.common.ProtoChunkExtension; +import com.llamalad7.mixinextras.injector.wrapoperation.Operation; +import com.llamalad7.mixinextras.injector.wrapoperation.WrapOperation; +import net.minecraft.block.BlockState; +import net.minecraft.server.world.ServerWorld; +import net.minecraft.util.math.BlockPos; +import net.minecraft.world.ChunkRegion; +import net.minecraft.world.StructureWorldAccess; +import net.minecraft.world.chunk.Chunk; +import net.minecraft.world.chunk.ProtoChunk; +import org.spongepowered.asm.mixin.Mixin; +import org.spongepowered.asm.mixin.injection.At; + +import java.util.concurrent.CompletableFuture; + +@Mixin(ChunkRegion.class) +public abstract class MixinChunkRegion implements StructureWorldAccess { + + @WrapOperation(method = "setBlockState", at = @At(value = "INVOKE", target = "Lnet/minecraft/server/world/ServerWorld;onBlockChanged(Lnet/minecraft/util/math/BlockPos;Lnet/minecraft/block/BlockState;Lnet/minecraft/block/BlockState;)V")) + private void waitForFutureBeforeNotifyChanges(ServerWorld instance, BlockPos pos, BlockState oldBlock, BlockState newBlock, Operation operation) { + final Chunk chunk = this.getChunk(pos); + if (chunk instanceof ProtoChunk protoChunk) { + final CompletableFuture future = ((ProtoChunkExtension) protoChunk).getInitialMainThreadComputeFuture(); + if (future != null && !future.isDone()) { + future.thenRun(() -> operation.call(instance, pos, oldBlock, newBlock)); + return; + } + } + operation.call(instance, pos, oldBlock, newBlock); + } + +} diff --git a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinProtoChunk.java b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinProtoChunk.java index fbfe5ae6..f341eb15 100644 --- a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinProtoChunk.java +++ b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinProtoChunk.java @@ -17,6 +17,9 @@ public class MixinProtoChunk implements ProtoChunkExtension { @Unique private CompletableFuture blendingComputeFuture = CompletableFuture.completedFuture(null); + @Unique + private CompletableFuture initialMainThreadComputeFuture = CompletableFuture.completedFuture(null); + @Unique private boolean needBlending = false; @@ -66,4 +69,15 @@ public boolean getNeedBlending() { } return needBlending; } + + @Override + public void setInitialMainThreadComputeFuture(CompletableFuture future) { + this.initialMainThreadComputeFuture = future; + } + + @Override + public CompletableFuture getInitialMainThreadComputeFuture() { + return this.initialMainThreadComputeFuture; + } + } diff --git a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinServerChunkManager.java b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinServerChunkManager.java deleted file mode 100644 index 697172af..00000000 --- a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinServerChunkManager.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.ishland.c2me.threading.chunkio.mixin; - -import com.ishland.c2me.threading.chunkio.common.ChunkIoMainThreadTaskUtils; -import net.minecraft.server.world.ServerChunkManager; -import org.spongepowered.asm.mixin.Mixin; -import org.spongepowered.asm.mixin.injection.At; -import org.spongepowered.asm.mixin.injection.Inject; -import org.spongepowered.asm.mixin.injection.callback.CallbackInfoReturnable; - -@Mixin(ServerChunkManager.class) -public class MixinServerChunkManager { - - @Inject(method = "executeQueuedTasks", at = @At(value = "RETURN")) - private void onExecuteTasks(CallbackInfoReturnable cir) { - ChunkIoMainThreadTaskUtils.drainQueue(); - } - -} diff --git a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinThreadedAnvilChunkStorage.java b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinThreadedAnvilChunkStorage.java index c05d9826..fdde761d 100644 --- a/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinThreadedAnvilChunkStorage.java +++ b/c2me-threading-chunkio/src/main/java/com/ishland/c2me/threading/chunkio/mixin/MixinThreadedAnvilChunkStorage.java @@ -15,10 +15,12 @@ import com.ishland.c2me.threading.chunkio.common.ISerializingRegionBasedStorage; import com.ishland.c2me.threading.chunkio.common.ProtoChunkExtension; import com.ishland.c2me.threading.chunkio.common.TaskCancellationException; +import com.llamalad7.mixinextras.injector.ModifyReturnValue; import com.mojang.datafixers.DataFixer; import com.mojang.datafixers.util.Either; import it.unimi.dsi.fastutil.longs.Long2ByteMap; import it.unimi.dsi.fastutil.longs.Long2ByteMaps; +import it.unimi.dsi.fastutil.objects.ReferenceArrayList; import net.minecraft.SharedConstants; import net.minecraft.nbt.NbtCompound; import net.minecraft.nbt.NbtElement; @@ -57,6 +59,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Supplier; @@ -153,6 +156,8 @@ private CompletableFuture> loadChunk(ChunkPo } }); + final ReferenceArrayList mainThreadQueue = new ReferenceArrayList<>(); + final CompletableFuture> future = getUpdatedChunkNbtAtAsync(pos) .thenApply(optional -> optional.filter(nbtCompound -> { boolean bl = containsStatus(nbtCompound); @@ -164,11 +169,11 @@ private CompletableFuture> loadChunk(ChunkPo })) .thenApplyAsync(optional -> { if (optional.isPresent()) { - ChunkIoMainThreadTaskUtils.push(); + ChunkIoMainThreadTaskUtils.push(mainThreadQueue); try { return ChunkSerializer.deserialize(this.world, this.pointOfInterestStorage, pos, optional.get()); } finally { - ChunkIoMainThreadTaskUtils.pop(); + ChunkIoMainThreadTaskUtils.pop(mainThreadQueue); } } @@ -184,7 +189,7 @@ private CompletableFuture> loadChunk(ChunkPo return null; // unreachable } }) - .thenCombine(poiData, (protoChunk, tag) -> protoChunk) +// .thenCombine(poiData, (protoChunk, tag) -> protoChunk) // .thenCombine(blendingInfos, (protoChunk, bitSet) -> { // if (protoChunk != null) ((ProtoChunkExtension) protoChunk).setBlendingInfo(pos, bitSet); // return protoChunk; @@ -200,24 +205,22 @@ private CompletableFuture> loadChunk(ChunkPo ); } - try { - ((ISerializingRegionBasedStorage) this.pointOfInterestStorage).update(pos, poiData.join().orElse(null)); - } catch (Throwable t) { - if (Config.recoverFromErrors) { - LOGGER.error("Couldn't load poi data for chunk {}, poi data will be lost!", pos, t); - } else { - SneakyThrow.sneaky(t); + ((ProtoChunkExtension) protoChunk).setInitialMainThreadComputeFuture(poiData.thenAcceptAsync(poiDataNbt -> { + try { + ((ISerializingRegionBasedStorage) this.pointOfInterestStorage).update(pos, poiDataNbt.orElse(null)); + } catch (Throwable t) { + if (Config.recoverFromErrors) { + LOGGER.error("Couldn't load poi data for chunk {}, poi data will be lost!", pos, t); + } else { + SneakyThrow.sneaky(t); + } } - } - ChunkIoMainThreadTaskUtils.drainQueue(); - if (protoChunk != null) { - this.mark(pos, protoChunk.getStatus().getChunkType()); - return Either.left(protoChunk); - } else { - LOGGER.error("Why is protoChunk null? Trying to recover from this case..."); - return Either.left(this.getProtoChunk(pos)); - } - }, this.mainThreadExecutor); + ChunkIoMainThreadTaskUtils.drainQueue(mainThreadQueue); + }, this.mainThreadExecutor)); + + this.mark(pos, protoChunk.getStatus().getChunkType()); + return Either.left(protoChunk); + }, GlobalExecutors.invokingExecutor); future.exceptionally(throwable -> { LOGGER.error("Couldn't load chunk {}", pos, throwable); return null; @@ -288,6 +291,26 @@ private CompletableFuture> getUpdatedChunkNbt(ChunkPos chu }); } + @ModifyReturnValue(method = "getChunk", at = @At("RETURN")) + private CompletableFuture> postGetChunk(CompletableFuture> originalReturn, ChunkHolder holder, ChunkStatus requiredStatus) { + if (requiredStatus == ChunkStatus.FULL.getPrevious()) { + // wait for initial main thread tasks before proceeding to finish full chunk + return originalReturn.thenCompose(either -> { + if (either.left().isPresent()) { + final Chunk chunk = either.left().get(); + if (chunk instanceof ProtoChunk protoChunk) { + final CompletableFuture future = ((ProtoChunkExtension) protoChunk).getInitialMainThreadComputeFuture(); + if (future != null) { + return future.thenApply(v -> either); + } + } + } + return CompletableFuture.completedFuture(either); + }); + } + return originalReturn; + } + private ConcurrentLinkedQueue> saveFutures = new ConcurrentLinkedQueue<>(); @Dynamic @@ -360,7 +383,9 @@ private boolean asyncSave(ThreadedAnvilChunkStorage tacs, Chunk chunk, ChunkHold .handle((unused, throwable) -> { lockToken.releaseLock(); if (throwable != null) { - if (!(throwable instanceof TaskCancellationException)) { + Throwable actual = throwable; + while (actual instanceof CompletionException e) actual = e.getCause(); + if (!(actual instanceof TaskCancellationException)) { LOGGER.error("Failed to save chunk {},{} asynchronously, falling back to sync saving", chunkPos.x, chunkPos.z, throwable); final CompletableFuture savingFuture = holder.getSavingFuture(); if (savingFuture != originalSavingFuture) { diff --git a/c2me-threading-chunkio/src/main/resources/c2me-threading-chunkio.mixins.json b/c2me-threading-chunkio/src/main/resources/c2me-threading-chunkio.mixins.json index 02ef90c3..169f50c3 100644 --- a/c2me-threading-chunkio/src/main/resources/c2me-threading-chunkio.mixins.json +++ b/c2me-threading-chunkio/src/main/resources/c2me-threading-chunkio.mixins.json @@ -5,10 +5,10 @@ "plugin": "com.ishland.c2me.base.common.ModuleMixinPlugin", "mixins": [ "MixinBlender", + "MixinChunkRegion", "MixinChunkSerializer", "MixinProtoChunk", "MixinSerializingRegionBasedStorage", - "MixinServerChunkManager", "MixinStorageIoWorker", "MixinThreadedAnvilChunkStorage", "gc_free_serializer.MixinChunkDataSerializer" diff --git a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/common/ChunkStatusUtils.java b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/common/ChunkStatusUtils.java index 39fe6840..c0a89ef3 100644 --- a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/common/ChunkStatusUtils.java +++ b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/common/ChunkStatusUtils.java @@ -4,16 +4,15 @@ import com.ibm.asyncutil.locks.AsyncLock; import com.ibm.asyncutil.locks.AsyncNamedLock; import com.ishland.c2me.base.common.GlobalExecutors; -import com.ishland.c2me.base.common.scheduler.SchedulingAsyncCombinedLock; +import com.ishland.c2me.base.common.scheduler.NeighborLockingTask; import com.ishland.c2me.base.common.scheduler.SchedulingManager; import com.mojang.datafixers.util.Either; +import it.unimi.dsi.fastutil.longs.LongArrayList; import net.minecraft.server.world.ChunkHolder; import net.minecraft.util.math.ChunkPos; import net.minecraft.world.chunk.Chunk; import net.minecraft.world.chunk.ChunkStatus; -import java.util.ArrayList; -import java.util.HashSet; import java.util.concurrent.CompletableFuture; import java.util.function.BooleanSupplier; import java.util.function.Function; @@ -32,13 +31,12 @@ public static ChunkStatusThreadingType getThreadingType(final ChunkStatus status || status.equals(ChunkStatus.STRUCTURE_REFERENCES) || status.equals(ChunkStatus.BIOMES) || status.equals(ChunkStatus.NOISE) + || status.equals(ChunkStatus.SPAWN) || status.equals(ChunkStatus.SURFACE) || status.equals(ChunkStatus.CARVERS) - || status.equals(ChunkStatus.LIQUID_CARVERS) +// || status.equals(ChunkStatus.LIQUID_CARVERS) // empty, don't need to parallelize || status.equals(ChunkStatus.HEIGHTMAPS)) { return PARALLELIZED; - } else if (status.equals(ChunkStatus.SPAWN)) { - return SINGLE_THREADED; } else if (status.equals(ChunkStatus.FEATURES)) { return Config.allowThreadedFeatures ? PARALLELIZED : SINGLE_THREADED; } @@ -58,21 +56,36 @@ public static CompletableFuture runChunkGenWithLock(ChunkPos target, Chun isCancelled = FALSE_SUPPLIER; } - ArrayList fetchedLocks = new ArrayList<>((2 * radius + 1) * (2 * radius + 1)); +// ArrayList fetchedLocks = new ArrayList<>((2 * radius + 1) * (2 * radius + 1)); +// for (int x = target.x - radius; x <= target.x + radius; x++) +// for (int z = target.z - radius; z <= target.z + radius; z++) +// fetchedLocks.add(new ChunkPos(x, z)); +// +// final SchedulingAsyncCombinedLock task = new SchedulingAsyncCombinedLock<>( +// chunkLock, +// target.toLong(), +// new HashSet<>(fetchedLocks), +// isCancelled, +// schedulingManager::enqueue, +// action, +// target.toString(), +// async); + + LongArrayList lockTargets = new LongArrayList((2 * radius + 1) * (2 * radius + 1)); for (int x = target.x - radius; x <= target.x + radius; x++) for (int z = target.z - radius; z <= target.z + radius; z++) - fetchedLocks.add(new ChunkPos(x, z)); + lockTargets.add(ChunkPos.toLong(x, z)); - final SchedulingAsyncCombinedLock lock = new SchedulingAsyncCombinedLock<>( - chunkLock, + final NeighborLockingTask task = new NeighborLockingTask<>( + schedulingManager, target.toLong(), - new HashSet<>(fetchedLocks), + lockTargets.toLongArray(), isCancelled, - schedulingManager::enqueue, action, - target.toString(), - async); - return lock.getFuture(); + "%s %s".formatted(target.toString(), status.toString()), + async + ); + return task.getFuture(); } public static boolean isCancelled(ChunkHolder holder, ChunkStatus targetStatus) { diff --git a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/cancellation/MixinThreadedAnvilChunkStorage.java b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/cancellation/MixinThreadedAnvilChunkStorage.java index 5b2f147a..800c3d87 100644 --- a/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/cancellation/MixinThreadedAnvilChunkStorage.java +++ b/c2me-threading-worldgen/src/main/java/com/ishland/c2me/threading/worldgen/mixin/cancellation/MixinThreadedAnvilChunkStorage.java @@ -1,5 +1,6 @@ package com.ishland.c2me.threading.worldgen.mixin.cancellation; +import com.llamalad7.mixinextras.injector.ModifyReturnValue; import com.mojang.datafixers.util.Either; import net.minecraft.server.world.ChunkHolder; import net.minecraft.server.world.ThreadedAnvilChunkStorage; @@ -11,9 +12,7 @@ import org.spongepowered.asm.mixin.Mixin; import org.spongepowered.asm.mixin.Shadow; import org.spongepowered.asm.mixin.injection.At; -import org.spongepowered.asm.mixin.injection.Inject; import org.spongepowered.asm.mixin.injection.Redirect; -import org.spongepowered.asm.mixin.injection.callback.CallbackInfoReturnable; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -37,13 +36,13 @@ private int redirectRemoveLightTicketDistance(ChunkStatus status) { return status == ChunkStatus.LIGHT ? ChunkStatus.getDistanceFromFull(ChunkStatus.STRUCTURE_STARTS) - 2 : ChunkStatus.getDistanceFromFull(status); } - @Inject(method = "getChunk", at = @At("RETURN"), cancellable = true) - private void injectCancellationHook(ChunkHolder holder, ChunkStatus requiredStatus, CallbackInfoReturnable>> cir) { - cir.setReturnValue(cir.getReturnValue().thenCompose(either -> { + @ModifyReturnValue(method = "getChunk", at = @At("RETURN")) + private CompletableFuture> injectCancellationHook(CompletableFuture> originalReturn, ChunkHolder holder, ChunkStatus requiredStatus) { + return originalReturn.thenCompose(either -> { if (either.right().isPresent()) { return CompletableFuture.supplyAsync(() -> { if (ChunkHolder.getTargetStatusForLevel(holder.getLevel()).isAtLeast(requiredStatus)) { - LOGGER.info("Chunk load {} raced, recovering", holder.getPos()); +// LOGGER.info("Chunk load {} raced, recovering", holder.getPos()); return this.getChunk(holder, requiredStatus); // recover from cancellation } else { return CompletableFuture.completedFuture(either); @@ -52,7 +51,7 @@ private void injectCancellationHook(ChunkHolder holder, ChunkStatus requiredStat } else { return CompletableFuture.completedFuture(either); } - })); + }); } } diff --git a/gradle.properties b/gradle.properties index 7a5f5686..41bb111c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,3 +16,4 @@ async_util_version=0.1.0 night_config_version=3.6.5 threadly_version=7.0 exp4j_version=0.4.8 +mixinextras_version=0.1.1 diff --git a/src/main/resources/fabric.mod.json b/src/main/resources/fabric.mod.json index b7498a45..543c0aab 100644 --- a/src/main/resources/fabric.mod.json +++ b/src/main/resources/fabric.mod.json @@ -23,7 +23,7 @@ ] }, "depends": { - "fabricloader": ">=0.14.0", + "fabricloader": ">=0.14.11", "java": ">=17", "minecraft": ">=1.19.4" },