• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CyclopsMC / IntegratedDynamics / 20620100972

31 Dec 2025 01:35PM UTC coverage: 53.056% (-0.02%) from 53.079%
20620100972

push

github

rubensworks
Merge remote-tracking branch 'origin/master-1.21-lts' into master-1.21

2859 of 8756 branches covered (32.65%)

Branch coverage included in aggregate %.

17427 of 29479 relevant lines covered (59.12%)

3.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.81
/src/main/java/org/cyclops/integrateddynamics/core/network/IngredientObserver.java
1
package org.cyclops.integrateddynamics.core.network;
2

3
import com.google.common.collect.Iterators;
4
import com.google.common.collect.Lists;
5
import com.google.common.collect.Maps;
6
import com.google.common.collect.Sets;
7
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
8
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
9
import it.unimi.dsi.fastutil.ints.IntLinkedOpenHashSet;
10
import it.unimi.dsi.fastutil.ints.IntSet;
11
import net.neoforged.neoforge.common.NeoForge;
12
import net.neoforged.neoforge.event.server.ServerStoppingEvent;
13
import net.neoforged.neoforge.server.ServerLifecycleHooks;
14
import org.cyclops.cyclopscore.ingredient.collection.diff.IngredientCollectionDiff;
15
import org.cyclops.cyclopscore.ingredient.collection.diff.IngredientCollectionDiffManager;
16
import org.cyclops.integrateddynamics.GeneralConfig;
17
import org.cyclops.integrateddynamics.api.ingredient.IIngredientComponentStorageObservable;
18
import org.cyclops.integrateddynamics.api.network.IPositionedAddonsNetworkIngredients;
19
import org.cyclops.integrateddynamics.api.part.PartPos;
20
import org.cyclops.integrateddynamics.api.part.PartTarget;
21
import org.cyclops.integrateddynamics.api.part.PrioritizedPartPos;
22
import org.cyclops.integrateddynamics.core.network.diagnostics.NetworkDiagnostics;
23

24
import javax.annotation.Nullable;
25
import java.util.*;
26
import java.util.concurrent.ExecutionException;
27
import java.util.concurrent.ExecutorService;
28
import java.util.concurrent.Executors;
29
import java.util.concurrent.Future;
30
import java.util.function.Consumer;
31

32
/**
33
 * Instances of this class are able to watch ingredient positions and emit diffs.
34
 *
35
 * @author rubensworks
36
 */
37
public class IngredientObserver<T, M> {
38

39
    private static final ExecutorService WORKER_POOL = Executors.newFixedThreadPool(GeneralConfig.ingredientNetworkObserverThreads);
3✔
40
    static {
41
        NeoForge.EVENT_BUS.addListener((Consumer<ServerStoppingEvent>) event -> {
3✔
42
            if (event.getServer().isDedicatedServer()) {
4!
43
                WORKER_POOL.shutdown();
×
44
            }
45
        });
1✔
46
    }
1✔
47

48
    private final IPositionedAddonsNetworkIngredients<T, M> network;
49
    private final ConcurrentWorldIngredientsProxy<T, M> worldProxy;
50

51
    private final List<IIngredientComponentStorageObservable.IIndexChangeObserver<T, M>> changeObservers;
52
    private final Int2ObjectMap<Map<PartPos, Integer>> observeTargetTickIntervals;
53
    private final Int2ObjectMap<Map<PartPos, Integer>> observeTargetTicks;
54
    private final Int2ObjectMap<Map<PrioritizedPartPos, IngredientCollectionDiffManager<T, M>>> channeledDiffManagers;
55
    private final Int2ObjectMap<Set<PartPos>> pendingTickResets;
56

57
    private final Int2ObjectMap<List<PrioritizedPartPos>> lastRemoved;
58
    private final Map<PartPos, Integer> lastInventoryStates;
59
    private Future<?> lastObserverBarrier;
60
    private boolean runningObserverSync;
61
    private boolean initialObservation;
62

63
    public IngredientObserver(IPositionedAddonsNetworkIngredients<T, M> network) {
2✔
64
        this.network = network;
3✔
65
        this.worldProxy = new ConcurrentWorldIngredientsProxy<>(network);
6✔
66
        this.changeObservers = Lists.newArrayList();
3✔
67
        this.observeTargetTickIntervals = new Int2ObjectOpenHashMap<>();
5✔
68
        this.observeTargetTicks = new Int2ObjectOpenHashMap<>();
5✔
69
        this.channeledDiffManagers = new Int2ObjectOpenHashMap<>();
5✔
70
        this.pendingTickResets = new Int2ObjectOpenHashMap<>();
5✔
71
        this.lastRemoved = new Int2ObjectOpenHashMap<>();
5✔
72
        this.lastInventoryStates = Maps.newHashMap();
3✔
73

74
        this.lastObserverBarrier = null;
3✔
75
        this.runningObserverSync = false;
3✔
76
        this.initialObservation = true;
3✔
77
    }
1✔
78

79
    public IPositionedAddonsNetworkIngredients<T, M> getNetwork() {
80
        return network;
3✔
81
    }
82

83
    @Nullable
84
    public List<PrioritizedPartPos> getLastRemoved(int channel) {
85
        return lastRemoved.get(channel);
6✔
86
    }
87

88
    public void onPositionRemoved(int channel, PrioritizedPartPos pos) {
89
        List<PrioritizedPartPos> positions = this.lastRemoved.get(channel);
6✔
90
        if (positions == null) {
2✔
91
            positions = Lists.newLinkedList();
2✔
92
            this.lastRemoved.put(channel, positions);
6✔
93
        }
94
        positions.add(pos);
4✔
95
        this.lastInventoryStates.remove(pos.getPartPos());
6✔
96
    }
1✔
97

98
    /**
99
     * Add an observer for listing to index change events.
100
     * @param observer An index change observer.
101
     */
102
    public synchronized void addChangeObserver(IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer) {
103
        if (!changeObservers.contains(observer)) {
5!
104
            changeObservers.add(observer);
5✔
105
        }
106
    }
1✔
107

108
    /**
109
     * Remove the given index change observer.
110
     * This will silently fail if the given observer was not registered.
111
     * @param observer An index change observer.
112
     */
113
    public synchronized void removeChangeObserver(IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer) {
114
        changeObservers.remove(observer);
×
115
    }
×
116

117
    protected int getCurrentTick() {
118
        return ServerLifecycleHooks.getCurrentServer().getTickCount();
3✔
119
    }
120

121
    protected void emitEvent(IIngredientComponentStorageObservable.StorageChangeEvent<T, M> event, boolean forceSync) {
122
        if (GeneralConfig.ingredientNetworkObserverEnableMultithreading && !forceSync) {
4!
123
            // Make sure we are running on the main server thread to avoid concurrency exceptions
124
            ServerLifecycleHooks.getCurrentServer().submitAsync(() -> {
7✔
125
                for (IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer : getObserversCopy()) {
11✔
126
                    observer.onChange(event);
3✔
127
                }
1✔
128
            });
1✔
129
        } else {
130
            for (IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer : getObserversCopy()) {
×
131
                observer.onChange(event);
×
132
            }
×
133
        }
134
    }
1✔
135

136
    protected synchronized List<IIngredientComponentStorageObservable.IIndexChangeObserver<T, M>> getObserversCopy() {
137
        return Lists.newArrayList(this.changeObservers);
4✔
138
    }
139

140
    protected int[] getChannels() {
141
        int[] networkChannels = getNetwork().getChannels();
4✔
142
        IntSet lastRemovedChannels = this.lastRemoved.keySet();
4✔
143
        if (lastRemovedChannels.size() == 0) {
3!
144
            return networkChannels;
2✔
145
        }
146
        // We use a set that maintains insertion order,
147
        // because we MUST iterate over the channels that have removals first!
148
        IntSet uniqueChannels = new IntLinkedOpenHashSet();
×
149
        for (int lastRemovedChannel : lastRemovedChannels) {
×
150
            uniqueChannels.add(lastRemovedChannel);
×
151
        }
×
152
        for (int networkChannel : networkChannels) {
×
153
            uniqueChannels.add(networkChannel);
×
154
        }
155
        return uniqueChannels.toIntArray();
×
156
    }
157

158
    /**
159
     * @param forceSync If observation should happen synchronously.
160
     * @return If an observation job was successfully started if it was needed.
161
     */
162
    protected boolean observe(boolean forceSync) {
163
        if (!this.changeObservers.isEmpty()) {
4!
164
            // If we forcefully observe sync, make sure that no async observers are still running
165
            if (forceSync && GeneralConfig.ingredientNetworkObserverEnableMultithreading
2!
166
                    && this.lastObserverBarrier != null && !this.lastObserverBarrier.isDone()) {
×
167
                try {
168
                    this.lastObserverBarrier.get();
×
169
                } catch (InterruptedException | ExecutionException e) {
×
170
                    e.printStackTrace();
×
171
                }
×
172
            }
173

174
            if (GeneralConfig.ingredientNetworkObserverEnableMultithreading && !forceSync) {
4!
175
                // If we still have an uncompleted job (sync or async) from the previous tick, don't start a new one yet!
176
                if ((this.lastObserverBarrier != null && !this.lastObserverBarrier.isDone()) || this.runningObserverSync) {
10!
177
                    return false;
×
178
                }
179

180
                // Run the world proxy in the world thread
181
                this.worldProxy.onWorldTick();
3✔
182

183
                // Schedule the observation job
184
                this.lastObserverBarrier = WORKER_POOL.submit(() -> {
7✔
185
                    for (int channel : getChannels()) {
17✔
186
                        observe(channel, false);
4✔
187
                    }
188
                    this.initialObservation = false;
3✔
189
                });
1✔
190
            } else {
191
                // If we have an uncompleted sync observer, don't start a new one yet!
192
                if (this.runningObserverSync) {
×
193
                    return false;
×
194
                }
195

196
                // Run the world proxy in the world thread
197
                this.worldProxy.onWorldTick();
×
198

199
                this.runningObserverSync = true;
×
200
                for (int channel : getChannels()) {
×
201
                    observe(channel, true);
×
202
                }
203
                this.runningObserverSync = false;
×
204
                this.initialObservation = false;
×
205
            }
206
        }
207
        return true;
2✔
208
    }
209

210
    protected synchronized Set<PrioritizedPartPos> getPositionsCopy(int channel) {
211
        return Sets.newHashSet(getNetwork().getPrioritizedPositions(channel));
6✔
212
    }
213

214
    protected void observe(int channel, boolean forceSync) {
215
        int currentTick = getCurrentTick();
3✔
216

217
        // Prepare ticking collections
218
        Map<PartPos, Integer> channelTargetTicks = observeTargetTicks.get(channel);
6✔
219
        if (channelTargetTicks == null) {
2!
220
            channelTargetTicks = Maps.newHashMap();
×
221
        }
222
        Map<PartPos, Integer> channelIntervals = this.observeTargetTickIntervals.get(channel);
6✔
223
        if (channelIntervals == null) {
2✔
224
            channelIntervals = Maps.newHashMap();
2✔
225
        }
226

227
        // Calculate diff of all positions
228
        Map<PrioritizedPartPos, IngredientCollectionDiffManager<T, M>> diffManagers = this.channeledDiffManagers.get(channel);
6✔
229
        if (diffManagers == null) {
2✔
230
            diffManagers = Maps.newHashMap();
2✔
231
            this.channeledDiffManagers.put(channel, diffManagers);
6✔
232
        }
233

234
        // Check if we should diagnoze the observer
235
        boolean isBeingDiagnozed = NetworkDiagnostics.getInstance().isBeingDiagnozed();
3✔
236
        Map<PartPos, Long> lastSecondDurations = network.getLastSecondDurationIndex();
4✔
237
        if (!isBeingDiagnozed && !lastSecondDurations.isEmpty()) {
5!
238
            // Make sure we aren't using any unnecessary memory.
239
            lastSecondDurations.clear();
×
240
        }
241

242
        // Emit diffs for all current positions
243
        Set<PrioritizedPartPos> positions = getPositionsCopy(channel);
4✔
244
        for (PrioritizedPartPos partPos : positions) {
10✔
245
            // Get current time if diagnostics are enabled
246
            long startTime = 0;
2✔
247
            if (isBeingDiagnozed) {
2!
248
                startTime = System.nanoTime();
×
249
            }
250

251
            // Check if we should observe this position in this tick
252
            int lastTick = channelTargetTicks.getOrDefault(partPos.getPartPos(), currentTick);
9✔
253
            if (lastTick <= currentTick) {
3✔
254
                // Remove this position from the pending tick reset set
255
                synchronized (this.pendingTickResets) {
5✔
256
                    Set<PartPos> pendingTickResetsChannel = this.pendingTickResets.get(channel);
6✔
257
                    if (pendingTickResetsChannel != null) {
2✔
258
                        pendingTickResetsChannel.remove(partPos.getPartPos());
5✔
259
                        if (pendingTickResetsChannel.isEmpty()) {
3✔
260
                            this.pendingTickResets.remove(channel);
5✔
261
                        }
262
                    }
263
                }
3✔
264

265
                // If an inventory state is exposed, check if it has changed since the last observation call.
266
                boolean skipPosition = false;
2✔
267

268
                // Skip position forcefully if it is not loaded
269
                if (!partPos.getPartPos().getPos().isLoaded()) {
5!
270
                    skipPosition = true;
×
271
                }
272

273
                if (!skipPosition) {
2!
274
                    Optional<Integer> newInventoryStateBoxed = this.worldProxy.getInventoryState(partPos.getPartPos());
6✔
275
                    if (newInventoryStateBoxed.isPresent()) {
3!
276
                        Integer lastState = this.lastInventoryStates.get(partPos.getPartPos());
×
277
                        int newState = newInventoryStateBoxed.get();
×
278
                        if (lastState != null && lastState == newState) {
×
279
                            // Skip this position if it hasn't not changed
280
                            skipPosition = true;
×
281
                        } else {
282
                            this.lastInventoryStates.put(partPos.getPartPos(), newState);
×
283
                        }
284
                    }
285
                }
286

287
                if (!skipPosition) {
2!
288
                    IngredientCollectionDiffManager<T, M> diffManager = diffManagers.get(partPos);
5✔
289
                    if (diffManager == null) {
2✔
290
                        diffManager = new IngredientCollectionDiffManager<>(network.getComponent());
7✔
291
                        diffManagers.put(partPos, diffManager);
5✔
292
                    }
293

294
                    // Emit event of diff
295
                    Iterator<T> instances = this.worldProxy.getInstances(partPos.getPartPos()).iterator();
7✔
296
                    IngredientCollectionDiff<T, M> diff = diffManager.onChange(instances);
4✔
297
                    boolean hasChanges = false;
2✔
298
                    if (diff.hasAdditions()) {
3✔
299
                        hasChanges = true;
2✔
300
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
9✔
301
                                IIngredientComponentStorageObservable.Change.ADDITION, false, diff.getAdditions(), this.initialObservation), forceSync);
5✔
302
                    }
303
                    if (diff.hasDeletions()) {
3✔
304
                        hasChanges = true;
2✔
305
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
8✔
306
                                IIngredientComponentStorageObservable.Change.DELETION, diff.isCompletelyEmpty(), diff.getDeletions(), this.initialObservation), forceSync);
7✔
307
                    }
308

309
                    // Update the next tick value
310
                    int tickInterval = channelIntervals.getOrDefault(partPos.getPartPos(), GeneralConfig.ingredientNetworkObserverFrequencyMax);
9✔
311
                    // Decrease the frequency when changes were detected
312
                    // Increase the frequency when no changes were detected
313
                    // This will make it so that quickly changing storages will be observed
314
                    // more frequently than slowly changing storages
315
                    boolean tickIntervalChanged = false;
2✔
316
                    if (hasChanges) {
2✔
317
                        if (tickInterval > GeneralConfig.ingredientNetworkObserverFrequencyMin) {
3✔
318
                            tickIntervalChanged = true;
2✔
319
                            tickInterval = Math.max(GeneralConfig.ingredientNetworkObserverFrequencyMin, tickInterval - GeneralConfig.ingredientNetworkObserverFrequencyDecreaseFactor);
7✔
320
                        }
321
                    } else {
322
                        if (tickInterval < GeneralConfig.ingredientNetworkObserverFrequencyMax) {
3✔
323
                            tickIntervalChanged = true;
2✔
324
                            tickInterval = Math.min(GeneralConfig.ingredientNetworkObserverFrequencyMax, tickInterval + GeneralConfig.ingredientNetworkObserverFrequencyIncreaseFactor);
6✔
325
                        }
326
                    }
327
                    // No need to store the interval if it == 1, as the previous or default value will
328
                    // definitely also cause this part to tick in next tick.
329
                    // This makes these cases slightly faster, as no map updates are needed.
330
                    if (tickInterval != 1) {
3!
331
                        channelTargetTicks.put(partPos.getPartPos(), currentTick + tickInterval);
9✔
332

333
                    }
334
                    // Only update when the interval has changed.
335
                    // In most cases, this will remain the same.
336
                    if (tickIntervalChanged) {
2✔
337
                        if (tickInterval != GeneralConfig.ingredientNetworkObserverFrequencyMax) {
3!
338
                            channelIntervals.put(partPos.getPartPos(), tickInterval);
8✔
339
                        } else {
340
                            channelIntervals.remove(partPos.getPartPos());
×
341
                        }
342
                    }
343
                }
344
            }
345

346
            // Calculate duration if diagnostics are enabled
347
            if (isBeingDiagnozed) {
2!
348
                long duration = System.nanoTime() - startTime;
×
349
                PartPos interfacePos = PartTarget.fromCenter(partPos.getPartPos()).getTarget();
×
350
                Long lastDuration = lastSecondDurations.get(interfacePos);
×
351
                if (lastDuration != null) {
×
352
                    duration = duration + lastDuration;
×
353
                }
354
                lastSecondDurations.put(interfacePos, duration);
×
355
            }
356
        }
1✔
357

358
        // Emit deletions for all removed positions
359
        List<PrioritizedPartPos> lastRemovedPositions = this.lastRemoved.get(channel);
6✔
360
        if (lastRemovedPositions != null) {
2!
361
            for (PrioritizedPartPos partPos : lastRemovedPositions) {
×
362
                IngredientCollectionDiffManager<T, M> diffManager = diffManagers.get(partPos);
×
363
                if (diffManager != null) {
×
364
                    // Emit event of diff with *empty* iterator
365
                    IngredientCollectionDiff<T, M> diff = diffManager.onChange(Iterators.forArray());
×
366
                    // No additions are possible
367
                    if (diff.hasDeletions()) {
×
368
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
×
369
                                IIngredientComponentStorageObservable.Change.DELETION, diff.isCompletelyEmpty(), diff.getDeletions(), this.initialObservation), forceSync);
×
370
                    }
371
                }
372
            }
×
373
            this.lastRemoved.remove(channel);
×
374
        }
375

376
        // Store our new ticking collections
377
        if (!channelTargetTicks.isEmpty()) {
3!
378
            observeTargetTicks.put(channel, channelTargetTicks);
6✔
379
        }
380
        if (!channelIntervals.isEmpty()) {
3✔
381
            observeTargetTickIntervals.put(channel, channelIntervals);
6✔
382
        }
383
    }
1✔
384

385
    public void resetTickInterval(int channel, PartPos targetPos) {
386
        // Reset the world proxy
387
        this.worldProxy.setRead(targetPos);
4✔
388

389
        // Reset the channel ticks
390
        Map<PartPos, Integer> channelTicks = this.observeTargetTicks.get(channel);
6✔
391
        if (channelTicks == null) {
2✔
392
            channelTicks = Maps.newHashMap();
2✔
393
            this.observeTargetTicks.put(channel, channelTicks);
6✔
394
        }
395
        channelTicks.put(targetPos, getCurrentTick() + GeneralConfig.ingredientNetworkObserverFrequencyForced);
9✔
396

397
        // Keep an overview of the pending positions per channel that require tick resets
398
        synchronized (this.pendingTickResets) {
5✔
399
            Set<PartPos> pendingTickResetsChannel = this.pendingTickResets.get(channel);
6✔
400
            if (pendingTickResetsChannel == null) {
2✔
401
                pendingTickResetsChannel = Sets.newHashSet();
2✔
402
                this.pendingTickResets.put(channel, pendingTickResetsChannel);
6✔
403
            }
404
            pendingTickResetsChannel.add(targetPos);
4✔
405
        }
3✔
406
    }
1✔
407

408
    public boolean isTickResetPending(int channel) {
409
        synchronized (this.pendingTickResets) {
×
410
            return this.pendingTickResets.containsKey(channel);
×
411
        }
412
    }
413

414
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc