• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CyclopsMC / IntegratedDynamics / 19640651880

24 Nov 2025 03:59PM UTC coverage: 44.824% (-0.3%) from 45.13%
19640651880

push

github

rubensworks
Bump mod version

2581 of 8552 branches covered (30.18%)

Branch coverage included in aggregate %.

11789 of 23507 relevant lines covered (50.15%)

2.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.5
/src/main/java/org/cyclops/integrateddynamics/core/network/IngredientObserver.java
1
package org.cyclops.integrateddynamics.core.network;
2

3
import com.google.common.collect.Iterators;
4
import com.google.common.collect.Lists;
5
import com.google.common.collect.Maps;
6
import com.google.common.collect.Sets;
7
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
8
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
9
import it.unimi.dsi.fastutil.ints.IntLinkedOpenHashSet;
10
import it.unimi.dsi.fastutil.ints.IntSet;
11
import net.neoforged.neoforge.common.NeoForge;
12
import net.neoforged.neoforge.event.server.ServerStoppingEvent;
13
import net.neoforged.neoforge.server.ServerLifecycleHooks;
14
import org.cyclops.cyclopscore.ingredient.collection.diff.IngredientCollectionDiff;
15
import org.cyclops.cyclopscore.ingredient.collection.diff.IngredientCollectionDiffManager;
16
import org.cyclops.integrateddynamics.GeneralConfig;
17
import org.cyclops.integrateddynamics.api.ingredient.IIngredientComponentStorageObservable;
18
import org.cyclops.integrateddynamics.api.network.IPositionedAddonsNetworkIngredients;
19
import org.cyclops.integrateddynamics.api.part.PartPos;
20
import org.cyclops.integrateddynamics.api.part.PartTarget;
21
import org.cyclops.integrateddynamics.api.part.PrioritizedPartPos;
22
import org.cyclops.integrateddynamics.core.network.diagnostics.NetworkDiagnostics;
23

24
import javax.annotation.Nullable;
25
import java.util.Iterator;
26
import java.util.List;
27
import java.util.Map;
28
import java.util.Optional;
29
import java.util.Set;
30
import java.util.concurrent.ExecutionException;
31
import java.util.concurrent.ExecutorService;
32
import java.util.concurrent.Executors;
33
import java.util.concurrent.Future;
34
import java.util.function.Consumer;
35

36
/**
37
 * Instances of this class are able to watch ingredient positions and emit diffs.
38
 *
39
 * @author rubensworks
40
 */
41
public class IngredientObserver<T, M> {
42

43
    private static final ExecutorService WORKER_POOL = Executors.newFixedThreadPool(GeneralConfig.ingredientNetworkObserverThreads);
3✔
44
    static {
45
        NeoForge.EVENT_BUS.addListener((Consumer<ServerStoppingEvent>) event -> {
3✔
46
            if (event.getServer().isDedicatedServer()) {
4!
47
                WORKER_POOL.shutdown();
×
48
            }
49
        });
1✔
50
    }
1✔
51

52
    private final IPositionedAddonsNetworkIngredients<T, M> network;
53
    private final ConcurrentWorldIngredientsProxy<T, M> worldProxy;
54

55
    private final Set<IIngredientComponentStorageObservable.IIndexChangeObserver<T, M>> changeObservers;
56
    private final Int2ObjectMap<Map<PartPos, Integer>> observeTargetTickIntervals;
57
    private final Int2ObjectMap<Map<PartPos, Integer>> observeTargetTicks;
58
    private final Int2ObjectMap<Map<PrioritizedPartPos, IngredientCollectionDiffManager<T, M>>> channeledDiffManagers;
59
    private final Int2ObjectMap<Set<PartPos>> pendingTickResets;
60

61
    private final Int2ObjectMap<List<PrioritizedPartPos>> lastRemoved;
62
    private final Map<PartPos, Integer> lastInventoryStates;
63
    private Future<?> lastObserverBarrier;
64
    private boolean runningObserverSync;
65
    private boolean initialObservation;
66

67
    public IngredientObserver(IPositionedAddonsNetworkIngredients<T, M> network) {
2✔
68
        this.network = network;
3✔
69
        this.worldProxy = new ConcurrentWorldIngredientsProxy<>(network);
6✔
70
        this.changeObservers = Sets.newIdentityHashSet();
3✔
71
        this.observeTargetTickIntervals = new Int2ObjectOpenHashMap<>();
5✔
72
        this.observeTargetTicks = new Int2ObjectOpenHashMap<>();
5✔
73
        this.channeledDiffManagers = new Int2ObjectOpenHashMap<>();
5✔
74
        this.pendingTickResets = new Int2ObjectOpenHashMap<>();
5✔
75
        this.lastRemoved = new Int2ObjectOpenHashMap<>();
5✔
76
        this.lastInventoryStates = Maps.newHashMap();
3✔
77

78
        this.lastObserverBarrier = null;
3✔
79
        this.runningObserverSync = false;
3✔
80
        this.initialObservation = true;
3✔
81
    }
1✔
82

83
    public IPositionedAddonsNetworkIngredients<T, M> getNetwork() {
84
        return network;
3✔
85
    }
86

87
    @Nullable
88
    public List<PrioritizedPartPos> getLastRemoved(int channel) {
89
        return lastRemoved.get(channel);
6✔
90
    }
91

92
    public void onPositionRemoved(int channel, PrioritizedPartPos pos) {
93
        List<PrioritizedPartPos> positions = this.lastRemoved.get(channel);
6✔
94
        if (positions == null) {
2✔
95
            positions = Lists.newLinkedList();
2✔
96
            this.lastRemoved.put(channel, positions);
6✔
97
        }
98
        positions.add(pos);
4✔
99
        this.lastInventoryStates.remove(pos.getPartPos());
6✔
100
    }
1✔
101

102
    /**
103
     * Add an observer for listing to index change events.
104
     * @param observer An index change observer.
105
     */
106
    public synchronized void addChangeObserver(IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer) {
107
        changeObservers.add(observer);
5✔
108
    }
1✔
109

110
    /**
111
     * Remove the given index change observer.
112
     * This will silently fail if the given observer was not registered.
113
     * @param observer An index change observer.
114
     */
115
    public synchronized void removeChangeObserver(IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer) {
116
        changeObservers.remove(observer);
×
117
    }
×
118

119
    protected int getCurrentTick() {
120
        return ServerLifecycleHooks.getCurrentServer().getTickCount();
3✔
121
    }
122

123
    protected void emitEvent(IIngredientComponentStorageObservable.StorageChangeEvent<T, M> event, boolean forceSync) {
124
        if (GeneralConfig.ingredientNetworkObserverEnableMultithreading && !forceSync) {
4!
125
            // Make sure we are running on the main server thread to avoid concurrency exceptions
126
            ServerLifecycleHooks.getCurrentServer().submitAsync(() -> {
7✔
127
                for (IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer : getObserversCopy()) {
11✔
128
                    observer.onChange(event);
3✔
129
                }
1✔
130
            });
1✔
131
        } else {
132
            for (IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer : getObserversCopy()) {
×
133
                observer.onChange(event);
×
134
            }
×
135
        }
136
    }
1✔
137

138
    protected synchronized List<IIngredientComponentStorageObservable.IIndexChangeObserver<T, M>> getObserversCopy() {
139
        return Lists.newArrayList(this.changeObservers);
4✔
140
    }
141

142
    protected int[] getChannels() {
143
        int[] networkChannels = getNetwork().getChannels();
4✔
144
        IntSet lastRemovedChannels = this.lastRemoved.keySet();
4✔
145
        if (lastRemovedChannels.size() == 0) {
3!
146
            return networkChannels;
2✔
147
        }
148
        // We use a set that maintains insertion order,
149
        // because we MUST iterate over the channels that have removals first!
150
        IntSet uniqueChannels = new IntLinkedOpenHashSet();
×
151
        for (int lastRemovedChannel : lastRemovedChannels) {
×
152
            uniqueChannels.add(lastRemovedChannel);
×
153
        }
×
154
        for (int networkChannel : networkChannels) {
×
155
            uniqueChannels.add(networkChannel);
×
156
        }
157
        return uniqueChannels.toIntArray();
×
158
    }
159

160
    /**
161
     * @param forceSync If observation should happen synchronously.
162
     * @return If an observation job was successfully started if it was needed.
163
     */
164
    protected boolean observe(boolean forceSync) {
165
        if (!this.changeObservers.isEmpty()) {
4!
166
            // If we forcefully observe sync, make sure that no async observers are still running
167
            if (forceSync && GeneralConfig.ingredientNetworkObserverEnableMultithreading
2!
168
                    && this.lastObserverBarrier != null && !this.lastObserverBarrier.isDone()) {
×
169
                try {
170
                    this.lastObserverBarrier.get();
×
171
                } catch (InterruptedException | ExecutionException e) {
×
172
                    e.printStackTrace();
×
173
                }
×
174
            }
175

176
            if (GeneralConfig.ingredientNetworkObserverEnableMultithreading && !forceSync) {
4!
177
                // If we still have an uncompleted job (sync or async) from the previous tick, don't start a new one yet!
178
                if ((this.lastObserverBarrier != null && !this.lastObserverBarrier.isDone()) || this.runningObserverSync) {
10!
179
                    return false;
×
180
                }
181

182
                // Run the world proxy in the world thread
183
                this.worldProxy.onWorldTick();
3✔
184

185
                // Schedule the observation job
186
                this.lastObserverBarrier = WORKER_POOL.submit(() -> {
7✔
187
                    for (int channel : getChannels()) {
17✔
188
                        observe(channel, false);
4✔
189
                    }
190
                    this.initialObservation = false;
3✔
191
                });
1✔
192
            } else {
193
                // If we have an uncompleted sync observer, don't start a new one yet!
194
                if (this.runningObserverSync) {
×
195
                    return false;
×
196
                }
197

198
                // Run the world proxy in the world thread
199
                this.worldProxy.onWorldTick();
×
200

201
                this.runningObserverSync = true;
×
202
                for (int channel : getChannels()) {
×
203
                    observe(channel, true);
×
204
                }
205
                this.runningObserverSync = false;
×
206
                this.initialObservation = false;
×
207
            }
208
        }
209
        return true;
2✔
210
    }
211

212
    protected synchronized Set<PrioritizedPartPos> getPositionsCopy(int channel) {
213
        return Sets.newHashSet(getNetwork().getPrioritizedPositions(channel));
6✔
214
    }
215

216
    protected void observe(int channel, boolean forceSync) {
217
        int currentTick = getCurrentTick();
3✔
218

219
        // Prepare ticking collections
220
        Map<PartPos, Integer> channelTargetTicks = observeTargetTicks.get(channel);
6✔
221
        if (channelTargetTicks == null) {
2!
222
            channelTargetTicks = Maps.newHashMap();
×
223
        }
224
        Map<PartPos, Integer> channelIntervals = this.observeTargetTickIntervals.get(channel);
6✔
225
        if (channelIntervals == null) {
2✔
226
            channelIntervals = Maps.newHashMap();
2✔
227
        }
228

229
        // Calculate diff of all positions
230
        Map<PrioritizedPartPos, IngredientCollectionDiffManager<T, M>> diffManagers = this.channeledDiffManagers.get(channel);
6✔
231
        if (diffManagers == null) {
2✔
232
            diffManagers = Maps.newHashMap();
2✔
233
            this.channeledDiffManagers.put(channel, diffManagers);
6✔
234
        }
235

236
        // Check if we should diagnoze the observer
237
        boolean isBeingDiagnozed = NetworkDiagnostics.getInstance().isBeingDiagnozed();
3✔
238
        Map<PartPos, Long> lastSecondDurations = network.getLastSecondDurationIndex();
4✔
239
        if (!isBeingDiagnozed && !lastSecondDurations.isEmpty()) {
5!
240
            // Make sure we aren't using any unnecessary memory.
241
            lastSecondDurations.clear();
×
242
        }
243

244
        // Emit diffs for all current positions
245
        Set<PrioritizedPartPos> positions = getPositionsCopy(channel);
4✔
246
        for (PrioritizedPartPos partPos : positions) {
10✔
247
            // Get current time if diagnostics are enabled
248
            long startTime = 0;
2✔
249
            if (isBeingDiagnozed) {
2!
250
                startTime = System.nanoTime();
×
251
            }
252

253
            // Check if we should observe this position in this tick
254
            int lastTick = channelTargetTicks.getOrDefault(partPos.getPartPos(), currentTick);
9✔
255
            if (lastTick <= currentTick) {
3✔
256
                // Remove this position from the pending tick reset set
257
                synchronized (this.pendingTickResets) {
5✔
258
                    Set<PartPos> pendingTickResetsChannel = this.pendingTickResets.get(channel);
6✔
259
                    if (pendingTickResetsChannel != null) {
2!
260
                        pendingTickResetsChannel.remove(partPos.getPartPos());
5✔
261
                        if (pendingTickResetsChannel.isEmpty()) {
3✔
262
                            this.pendingTickResets.remove(channel);
5✔
263
                        }
264
                    }
265
                }
3✔
266

267
                // If an inventory state is exposed, check if it has changed since the last observation call.
268
                boolean skipPosition = false;
2✔
269

270
                // Skip position forcefully if it is not loaded
271
                if (!partPos.getPartPos().getPos().isLoaded()) {
5!
272
                    skipPosition = true;
×
273
                }
274

275
                if (!skipPosition) {
2!
276
                    Optional<Integer> newInventoryStateBoxed = this.worldProxy.getInventoryState(partPos.getPartPos());
6✔
277
                    if (newInventoryStateBoxed.isPresent()) {
3!
278
                        Integer lastState = this.lastInventoryStates.get(partPos.getPartPos());
×
279
                        int newState = newInventoryStateBoxed.get();
×
280
                        if (lastState != null && lastState == newState) {
×
281
                            // Skip this position if it hasn't not changed
282
                            skipPosition = true;
×
283
                        } else {
284
                            this.lastInventoryStates.put(partPos.getPartPos(), newState);
×
285
                        }
286
                    }
287
                }
288

289
                if (!skipPosition) {
2!
290
                    IngredientCollectionDiffManager<T, M> diffManager = diffManagers.get(partPos);
5✔
291
                    if (diffManager == null) {
2✔
292
                        diffManager = new IngredientCollectionDiffManager<>(network.getComponent());
7✔
293
                        diffManagers.put(partPos, diffManager);
5✔
294
                    }
295

296
                    // Emit event of diff
297
                    Iterator<T> instances = this.worldProxy.getInstances(partPos.getPartPos()).iterator();
7✔
298
                    IngredientCollectionDiff<T, M> diff = diffManager.onChange(instances);
4✔
299
                    boolean hasChanges = false;
2✔
300
                    if (diff.hasAdditions()) {
3✔
301
                        hasChanges = true;
2✔
302
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
9✔
303
                                IIngredientComponentStorageObservable.Change.ADDITION, false, diff.getAdditions(), this.initialObservation), forceSync);
5✔
304
                    }
305
                    if (diff.hasDeletions()) {
3✔
306
                        hasChanges = true;
2✔
307
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
8✔
308
                                IIngredientComponentStorageObservable.Change.DELETION, diff.isCompletelyEmpty(), diff.getDeletions(), this.initialObservation), forceSync);
7✔
309
                    }
310

311
                    // Update the next tick value
312
                    int tickInterval = channelIntervals.getOrDefault(partPos.getPartPos(), GeneralConfig.ingredientNetworkObserverFrequencyMax);
9✔
313
                    // Decrease the frequency when changes were detected
314
                    // Increase the frequency when no changes were detected
315
                    // This will make it so that quickly changing storages will be observed
316
                    // more frequently than slowly changing storages
317
                    boolean tickIntervalChanged = false;
2✔
318
                    if (hasChanges) {
2✔
319
                        if (tickInterval > GeneralConfig.ingredientNetworkObserverFrequencyMin) {
3✔
320
                            tickIntervalChanged = true;
2✔
321
                            tickInterval = Math.max(GeneralConfig.ingredientNetworkObserverFrequencyMin, tickInterval - GeneralConfig.ingredientNetworkObserverFrequencyDecreaseFactor);
7✔
322
                        }
323
                    } else {
324
                        if (tickInterval < GeneralConfig.ingredientNetworkObserverFrequencyMax) {
3!
325
                            tickIntervalChanged = true;
×
326
                            tickInterval = Math.min(GeneralConfig.ingredientNetworkObserverFrequencyMax, tickInterval + GeneralConfig.ingredientNetworkObserverFrequencyIncreaseFactor);
×
327
                        }
328
                    }
329
                    // No need to store the interval if it == 1, as the previous or default value will
330
                    // definitely also cause this part to tick in next tick.
331
                    // This makes these cases slightly faster, as no map updates are needed.
332
                    if (tickInterval != 1) {
3!
333
                        channelTargetTicks.put(partPos.getPartPos(), currentTick + tickInterval);
9✔
334

335
                    }
336
                    // Only update when the interval has changed.
337
                    // In most cases, this will remain the same.
338
                    if (tickIntervalChanged) {
2✔
339
                        if (tickInterval != GeneralConfig.ingredientNetworkObserverFrequencyMax) {
3!
340
                            channelIntervals.put(partPos.getPartPos(), tickInterval);
8✔
341
                        } else {
342
                            channelIntervals.remove(partPos.getPartPos());
×
343
                        }
344
                    }
345
                }
346
            }
347

348
            // Calculate duration if diagnostics are enabled
349
            if (isBeingDiagnozed) {
2!
350
                long duration = System.nanoTime() - startTime;
×
351
                PartPos interfacePos = PartTarget.fromCenter(partPos.getPartPos()).getTarget();
×
352
                Long lastDuration = lastSecondDurations.get(interfacePos);
×
353
                if (lastDuration != null) {
×
354
                    duration = duration + lastDuration;
×
355
                }
356
                lastSecondDurations.put(interfacePos, duration);
×
357
            }
358
        }
1✔
359

360
        // Emit deletions for all removed positions
361
        List<PrioritizedPartPos> lastRemovedPositions = this.lastRemoved.get(channel);
6✔
362
        if (lastRemovedPositions != null) {
2!
363
            for (PrioritizedPartPos partPos : lastRemovedPositions) {
×
364
                IngredientCollectionDiffManager<T, M> diffManager = diffManagers.get(partPos);
×
365
                if (diffManager != null) {
×
366
                    // Emit event of diff with *empty* iterator
367
                    IngredientCollectionDiff<T, M> diff = diffManager.onChange(Iterators.forArray());
×
368
                    // No additions are possible
369
                    if (diff.hasDeletions()) {
×
370
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
×
371
                                IIngredientComponentStorageObservable.Change.DELETION, diff.isCompletelyEmpty(), diff.getDeletions(), this.initialObservation), forceSync);
×
372
                    }
373
                }
374
            }
×
375
            this.lastRemoved.remove(channel);
×
376
        }
377

378
        // Store our new ticking collections
379
        if (!channelTargetTicks.isEmpty()) {
3!
380
            observeTargetTicks.put(channel, channelTargetTicks);
6✔
381
        }
382
        if (!channelIntervals.isEmpty()) {
3✔
383
            observeTargetTickIntervals.put(channel, channelIntervals);
6✔
384
        }
385
    }
1✔
386

387
    public void resetTickInterval(int channel, PartPos targetPos) {
388
        // Reset the world proxy
389
        this.worldProxy.setRead(targetPos);
4✔
390

391
        // Reset the channel ticks
392
        Map<PartPos, Integer> channelTicks = this.observeTargetTicks.get(channel);
6✔
393
        if (channelTicks == null) {
2✔
394
            channelTicks = Maps.newHashMap();
2✔
395
            this.observeTargetTicks.put(channel, channelTicks);
6✔
396
        }
397
        channelTicks.put(targetPos, getCurrentTick() + GeneralConfig.ingredientNetworkObserverFrequencyForced);
9✔
398

399
        // Keep an overview of the pending positions per channel that require tick resets
400
        synchronized (this.pendingTickResets) {
5✔
401
            Set<PartPos> pendingTickResetsChannel = this.pendingTickResets.get(channel);
6✔
402
            if (pendingTickResetsChannel == null) {
2✔
403
                pendingTickResetsChannel = Sets.newHashSet();
2✔
404
                this.pendingTickResets.put(channel, pendingTickResetsChannel);
6✔
405
            }
406
            pendingTickResetsChannel.add(targetPos);
4✔
407
        }
3✔
408
    }
1✔
409

410
    public boolean isTickResetPending(int channel) {
411
        synchronized (this.pendingTickResets) {
×
412
            return this.pendingTickResets.containsKey(channel);
×
413
        }
414
    }
415

416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc