• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CyclopsMC / IntegratedDynamics / 20210191346

14 Dec 2025 03:32PM UTC coverage: 19.514% (-33.5%) from 53.061%
20210191346

push

github

rubensworks
Remove deprecations

663 of 8728 branches covered (7.6%)

Branch coverage included in aggregate %.

6786 of 29445 relevant lines covered (23.05%)

1.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/org/cyclops/integrateddynamics/core/network/IngredientObserver.java
1
package org.cyclops.integrateddynamics.core.network;
2

3
import com.google.common.collect.Iterators;
4
import com.google.common.collect.Lists;
5
import com.google.common.collect.Maps;
6
import com.google.common.collect.Sets;
7
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
8
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
9
import it.unimi.dsi.fastutil.ints.IntLinkedOpenHashSet;
10
import it.unimi.dsi.fastutil.ints.IntSet;
11
import net.neoforged.neoforge.common.NeoForge;
12
import net.neoforged.neoforge.event.server.ServerStoppingEvent;
13
import net.neoforged.neoforge.server.ServerLifecycleHooks;
14
import org.cyclops.cyclopscore.ingredient.collection.diff.IngredientCollectionDiff;
15
import org.cyclops.cyclopscore.ingredient.collection.diff.IngredientCollectionDiffManager;
16
import org.cyclops.integrateddynamics.GeneralConfig;
17
import org.cyclops.integrateddynamics.api.ingredient.IIngredientComponentStorageObservable;
18
import org.cyclops.integrateddynamics.api.network.IPositionedAddonsNetworkIngredients;
19
import org.cyclops.integrateddynamics.api.part.PartPos;
20
import org.cyclops.integrateddynamics.api.part.PartTarget;
21
import org.cyclops.integrateddynamics.api.part.PrioritizedPartPos;
22
import org.cyclops.integrateddynamics.core.network.diagnostics.NetworkDiagnostics;
23

24
import javax.annotation.Nullable;
25
import java.util.Iterator;
26
import java.util.List;
27
import java.util.Map;
28
import java.util.Optional;
29
import java.util.Set;
30
import java.util.concurrent.ExecutionException;
31
import java.util.concurrent.ExecutorService;
32
import java.util.concurrent.Executors;
33
import java.util.concurrent.Future;
34
import java.util.function.Consumer;
35

36
/**
37
 * Instances of this class are able to watch ingredient positions and emit diffs.
38
 *
39
 * @author rubensworks
40
 */
41
public class IngredientObserver<T, M> {
42

43
    private static final ExecutorService WORKER_POOL = Executors.newFixedThreadPool(GeneralConfig.ingredientNetworkObserverThreads);
×
44
    static {
45
        NeoForge.EVENT_BUS.addListener((Consumer<ServerStoppingEvent>) event -> {
×
46
            if (event.getServer().isDedicatedServer()) {
×
47
                WORKER_POOL.shutdown();
×
48
            }
49
        });
×
50
    }
×
51

52
    private final IPositionedAddonsNetworkIngredients<T, M> network;
53
    private final ConcurrentWorldIngredientsProxy<T, M> worldProxy;
54

55
    private final Set<IIngredientComponentStorageObservable.IIndexChangeObserver<T, M>> changeObservers;
56
    private final Int2ObjectMap<Map<PartPos, Integer>> observeTargetTickIntervals;
57
    private final Int2ObjectMap<Map<PartPos, Integer>> observeTargetTicks;
58
    private final Int2ObjectMap<Map<PrioritizedPartPos, IngredientCollectionDiffManager<T, M>>> channeledDiffManagers;
59
    private final Int2ObjectMap<Set<PartPos>> pendingTickResets;
60

61
    private final Int2ObjectMap<List<PrioritizedPartPos>> lastRemoved;
62
    private final Map<PartPos, Integer> lastInventoryStates;
63
    private Future<?> lastObserverBarrier;
64
    private boolean runningObserverSync;
65
    private boolean initialObservation;
66

67
    public IngredientObserver(IPositionedAddonsNetworkIngredients<T, M> network) {
×
68
        this.network = network;
×
69
        this.worldProxy = new ConcurrentWorldIngredientsProxy<>(network);
×
70
        this.changeObservers = Sets.newIdentityHashSet();
×
71
        this.observeTargetTickIntervals = new Int2ObjectOpenHashMap<>();
×
72
        this.observeTargetTicks = new Int2ObjectOpenHashMap<>();
×
73
        this.channeledDiffManagers = new Int2ObjectOpenHashMap<>();
×
74
        this.pendingTickResets = new Int2ObjectOpenHashMap<>();
×
75
        this.lastRemoved = new Int2ObjectOpenHashMap<>();
×
76
        this.lastInventoryStates = Maps.newHashMap();
×
77

78
        this.lastObserverBarrier = null;
×
79
        this.runningObserverSync = false;
×
80
        this.initialObservation = true;
×
81
    }
×
82

83
    public IPositionedAddonsNetworkIngredients<T, M> getNetwork() {
84
        return network;
×
85
    }
86

87
    @Nullable
88
    public List<PrioritizedPartPos> getLastRemoved(int channel) {
89
        return lastRemoved.get(channel);
×
90
    }
91

92
    public void onPositionRemoved(int channel, PrioritizedPartPos pos) {
93
        List<PrioritizedPartPos> positions = this.lastRemoved.get(channel);
×
94
        if (positions == null) {
×
95
            positions = Lists.newLinkedList();
×
96
            this.lastRemoved.put(channel, positions);
×
97
        }
98
        positions.add(pos);
×
99
        this.lastInventoryStates.remove(pos.getPartPos());
×
100
    }
×
101

102
    /**
103
     * Add an observer for listing to index change events.
104
     * @param observer An index change observer.
105
     */
106
    public synchronized void addChangeObserver(IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer) {
107
        changeObservers.add(observer);
×
108
    }
×
109

110
    /**
111
     * Remove the given index change observer.
112
     * This will silently fail if the given observer was not registered.
113
     * @param observer An index change observer.
114
     */
115
    public synchronized void removeChangeObserver(IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer) {
116
        changeObservers.remove(observer);
×
117
    }
×
118

119
    protected int getCurrentTick() {
120
        return ServerLifecycleHooks.getCurrentServer().getTickCount();
×
121
    }
122

123
    protected void emitEvent(IIngredientComponentStorageObservable.StorageChangeEvent<T, M> event, boolean forceSync) {
124
        if (GeneralConfig.ingredientNetworkObserverEnableMultithreading && !forceSync) {
×
125
            // Make sure we are running on the main server thread to avoid concurrency exceptions
126
            ServerLifecycleHooks.getCurrentServer().submitAsync(() -> {
×
127
                for (IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer : getObserversCopy()) {
×
128
                    observer.onChange(event);
×
129
                }
×
130
            });
×
131
        } else {
132
            for (IIngredientComponentStorageObservable.IIndexChangeObserver<T, M> observer : getObserversCopy()) {
×
133
                observer.onChange(event);
×
134
            }
×
135
        }
136
    }
×
137

138
    protected synchronized List<IIngredientComponentStorageObservable.IIndexChangeObserver<T, M>> getObserversCopy() {
139
        return Lists.newArrayList(this.changeObservers);
×
140
    }
141

142
    protected int[] getChannels() {
143
        int[] networkChannels = getNetwork().getChannels();
×
144
        IntSet lastRemovedChannels = this.lastRemoved.keySet();
×
145
        if (lastRemovedChannels.size() == 0) {
×
146
            return networkChannels;
×
147
        }
148
        // We use a set that maintains insertion order,
149
        // because we MUST iterate over the channels that have removals first!
150
        IntSet uniqueChannels = new IntLinkedOpenHashSet();
×
151
        for (int lastRemovedChannel : lastRemovedChannels) {
×
152
            uniqueChannels.add(lastRemovedChannel);
×
153
        }
×
154
        for (int networkChannel : networkChannels) {
×
155
            uniqueChannels.add(networkChannel);
×
156
        }
157
        return uniqueChannels.toIntArray();
×
158
    }
159

160
    /**
161
     * @param forceSync If observation should happen synchronously.
162
     * @return If an observation job was successfully started if it was needed.
163
     */
164
    protected boolean observe(boolean forceSync) {
165
        if (!this.changeObservers.isEmpty()) {
×
166
            // If we forcefully observe sync, make sure that no async observers are still running
167
            if (forceSync && GeneralConfig.ingredientNetworkObserverEnableMultithreading
×
168
                    && this.lastObserverBarrier != null && !this.lastObserverBarrier.isDone()) {
×
169
                try {
170
                    this.lastObserverBarrier.get();
×
171
                } catch (InterruptedException | ExecutionException e) {
×
172
                    e.printStackTrace();
×
173
                }
×
174
            }
175

176
            if (GeneralConfig.ingredientNetworkObserverEnableMultithreading && !forceSync) {
×
177
                // If we still have an uncompleted job (sync or async) from the previous tick, don't start a new one yet!
178
                if ((this.lastObserverBarrier != null && !this.lastObserverBarrier.isDone()) || this.runningObserverSync) {
×
179
                    return false;
×
180
                }
181

182
                // Run the world proxy in the world thread
183
                this.worldProxy.onWorldTick();
×
184

185
                // Schedule the observation job
186
                this.lastObserverBarrier = WORKER_POOL.submit(() -> {
×
187
                    for (int channel : getChannels()) {
×
188
                        observe(channel, false);
×
189
                    }
190
                    this.initialObservation = false;
×
191
                });
×
192
            } else {
193
                // If we have an uncompleted sync observer, don't start a new one yet!
194
                if (this.runningObserverSync) {
×
195
                    return false;
×
196
                }
197

198
                // Run the world proxy in the world thread
199
                this.worldProxy.onWorldTick();
×
200

201
                this.runningObserverSync = true;
×
202
                for (int channel : getChannels()) {
×
203
                    observe(channel, true);
×
204
                }
205
                this.runningObserverSync = false;
×
206
                this.initialObservation = false;
×
207
            }
208
        }
209
        return true;
×
210
    }
211

212
    protected synchronized Set<PrioritizedPartPos> getPositionsCopy(int channel) {
213
        return Sets.newHashSet(getNetwork().getPrioritizedPositions(channel));
×
214
    }
215

216
    protected void observe(int channel, boolean forceSync) {
217
        int currentTick = getCurrentTick();
×
218

219
        // Prepare ticking collections
220
        Map<PartPos, Integer> channelTargetTicks = observeTargetTicks.get(channel);
×
221
        if (channelTargetTicks == null) {
×
222
            channelTargetTicks = Maps.newHashMap();
×
223
        }
224
        Map<PartPos, Integer> channelIntervals = this.observeTargetTickIntervals.get(channel);
×
225
        if (channelIntervals == null) {
×
226
            channelIntervals = Maps.newHashMap();
×
227
        }
228

229
        // Calculate diff of all positions
230
        Map<PrioritizedPartPos, IngredientCollectionDiffManager<T, M>> diffManagers = this.channeledDiffManagers.get(channel);
×
231
        if (diffManagers == null) {
×
232
            diffManagers = Maps.newHashMap();
×
233
            this.channeledDiffManagers.put(channel, diffManagers);
×
234
        }
235

236
        // Check if we should diagnoze the observer
237
        boolean isBeingDiagnozed = NetworkDiagnostics.getInstance().isBeingDiagnozed();
×
238
        Map<PartPos, Long> lastSecondDurations = network.getLastSecondDurationIndex();
×
239
        if (!isBeingDiagnozed && !lastSecondDurations.isEmpty()) {
×
240
            // Make sure we aren't using any unnecessary memory.
241
            lastSecondDurations.clear();
×
242
        }
243

244
        // Emit diffs for all current positions
245
        Set<PrioritizedPartPos> positions = getPositionsCopy(channel);
×
246
        for (PrioritizedPartPos partPos : positions) {
×
247
            // Get current time if diagnostics are enabled
248
            long startTime = 0;
×
249
            if (isBeingDiagnozed) {
×
250
                startTime = System.nanoTime();
×
251
            }
252

253
            // Check if we should observe this position in this tick
254
            int lastTick = channelTargetTicks.getOrDefault(partPos.getPartPos(), currentTick);
×
255
            if (lastTick <= currentTick) {
×
256
                // Remove this position from the pending tick reset set
257
                synchronized (this.pendingTickResets) {
×
258
                    Set<PartPos> pendingTickResetsChannel = this.pendingTickResets.get(channel);
×
259
                    if (pendingTickResetsChannel != null) {
×
260
                        pendingTickResetsChannel.remove(partPos.getPartPos());
×
261
                        if (pendingTickResetsChannel.isEmpty()) {
×
262
                            this.pendingTickResets.remove(channel);
×
263
                        }
264
                    }
265
                }
×
266

267
                // If an inventory state is exposed, check if it has changed since the last observation call.
268
                boolean skipPosition = false;
×
269

270
                // Skip position forcefully if it is not loaded
271
                if (!partPos.getPartPos().getPos().isLoaded()) {
×
272
                    skipPosition = true;
×
273
                }
274

275
                if (!skipPosition) {
×
276
                    Optional<Integer> newInventoryStateBoxed = this.worldProxy.getInventoryState(partPos.getPartPos());
×
277
                    if (newInventoryStateBoxed.isPresent()) {
×
278
                        Integer lastState = this.lastInventoryStates.get(partPos.getPartPos());
×
279
                        int newState = newInventoryStateBoxed.get();
×
280
                        if (lastState != null && lastState == newState) {
×
281
                            // Skip this position if it hasn't not changed
282
                            skipPosition = true;
×
283
                        } else {
284
                            this.lastInventoryStates.put(partPos.getPartPos(), newState);
×
285
                        }
286
                    }
287
                }
288

289
                if (!skipPosition) {
×
290
                    IngredientCollectionDiffManager<T, M> diffManager = diffManagers.get(partPos);
×
291
                    if (diffManager == null) {
×
292
                        diffManager = new IngredientCollectionDiffManager<>(network.getComponent());
×
293
                        diffManagers.put(partPos, diffManager);
×
294
                    }
295

296
                    // Emit event of diff
297
                    Iterator<T> instances = this.worldProxy.getInstances(partPos.getPartPos()).iterator();
×
298
                    IngredientCollectionDiff<T, M> diff = diffManager.onChange(instances);
×
299
                    boolean hasChanges = false;
×
300
                    if (diff.hasAdditions()) {
×
301
                        hasChanges = true;
×
302
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
×
303
                                IIngredientComponentStorageObservable.Change.ADDITION, false, diff.getAdditions(), this.initialObservation), forceSync);
×
304
                    }
305
                    if (diff.hasDeletions()) {
×
306
                        hasChanges = true;
×
307
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
×
308
                                IIngredientComponentStorageObservable.Change.DELETION, diff.isCompletelyEmpty(), diff.getDeletions(), this.initialObservation), forceSync);
×
309
                    }
310

311
                    // Update the next tick value
312
                    int tickInterval = channelIntervals.getOrDefault(partPos.getPartPos(), GeneralConfig.ingredientNetworkObserverFrequencyMax);
×
313
                    // Decrease the frequency when changes were detected
314
                    // Increase the frequency when no changes were detected
315
                    // This will make it so that quickly changing storages will be observed
316
                    // more frequently than slowly changing storages
317
                    boolean tickIntervalChanged = false;
×
318
                    if (hasChanges) {
×
319
                        if (tickInterval > GeneralConfig.ingredientNetworkObserverFrequencyMin) {
×
320
                            tickIntervalChanged = true;
×
321
                            tickInterval = Math.max(GeneralConfig.ingredientNetworkObserverFrequencyMin, tickInterval - GeneralConfig.ingredientNetworkObserverFrequencyDecreaseFactor);
×
322
                        }
323
                    } else {
324
                        if (tickInterval < GeneralConfig.ingredientNetworkObserverFrequencyMax) {
×
325
                            tickIntervalChanged = true;
×
326
                            tickInterval = Math.min(GeneralConfig.ingredientNetworkObserverFrequencyMax, tickInterval + GeneralConfig.ingredientNetworkObserverFrequencyIncreaseFactor);
×
327
                        }
328
                    }
329
                    // No need to store the interval if it == 1, as the previous or default value will
330
                    // definitely also cause this part to tick in next tick.
331
                    // This makes these cases slightly faster, as no map updates are needed.
332
                    if (tickInterval != 1) {
×
333
                        channelTargetTicks.put(partPos.getPartPos(), currentTick + tickInterval);
×
334

335
                    }
336
                    // Only update when the interval has changed.
337
                    // In most cases, this will remain the same.
338
                    if (tickIntervalChanged) {
×
339
                        if (tickInterval != GeneralConfig.ingredientNetworkObserverFrequencyMax) {
×
340
                            channelIntervals.put(partPos.getPartPos(), tickInterval);
×
341
                        } else {
342
                            channelIntervals.remove(partPos.getPartPos());
×
343
                        }
344
                    }
345
                }
346
            }
347

348
            // Calculate duration if diagnostics are enabled
349
            if (isBeingDiagnozed) {
×
350
                long duration = System.nanoTime() - startTime;
×
351
                PartPos interfacePos = PartTarget.fromCenter(partPos.getPartPos()).getTarget();
×
352
                Long lastDuration = lastSecondDurations.get(interfacePos);
×
353
                if (lastDuration != null) {
×
354
                    duration = duration + lastDuration;
×
355
                }
356
                lastSecondDurations.put(interfacePos, duration);
×
357
            }
358
        }
×
359

360
        // Emit deletions for all removed positions
361
        List<PrioritizedPartPos> lastRemovedPositions = this.lastRemoved.get(channel);
×
362
        if (lastRemovedPositions != null) {
×
363
            for (PrioritizedPartPos partPos : lastRemovedPositions) {
×
364
                IngredientCollectionDiffManager<T, M> diffManager = diffManagers.get(partPos);
×
365
                if (diffManager != null) {
×
366
                    // Emit event of diff with *empty* iterator
367
                    IngredientCollectionDiff<T, M> diff = diffManager.onChange(Iterators.forArray());
×
368
                    // No additions are possible
369
                    if (diff.hasDeletions()) {
×
370
                        this.emitEvent(new IIngredientComponentStorageObservable.StorageChangeEvent<>(channel, partPos,
×
371
                                IIngredientComponentStorageObservable.Change.DELETION, diff.isCompletelyEmpty(), diff.getDeletions(), this.initialObservation), forceSync);
×
372
                    }
373
                }
374
            }
×
375
            this.lastRemoved.remove(channel);
×
376
        }
377

378
        // Store our new ticking collections
379
        if (!channelTargetTicks.isEmpty()) {
×
380
            observeTargetTicks.put(channel, channelTargetTicks);
×
381
        }
382
        if (!channelIntervals.isEmpty()) {
×
383
            observeTargetTickIntervals.put(channel, channelIntervals);
×
384
        }
385
    }
×
386

387
    public void resetTickInterval(int channel, PartPos targetPos) {
388
        // Reset the world proxy
389
        this.worldProxy.setRead(targetPos);
×
390

391
        // Reset the channel ticks
392
        Map<PartPos, Integer> channelTicks = this.observeTargetTicks.get(channel);
×
393
        if (channelTicks == null) {
×
394
            channelTicks = Maps.newHashMap();
×
395
            this.observeTargetTicks.put(channel, channelTicks);
×
396
        }
397
        channelTicks.put(targetPos, getCurrentTick() + GeneralConfig.ingredientNetworkObserverFrequencyForced);
×
398

399
        // Keep an overview of the pending positions per channel that require tick resets
400
        synchronized (this.pendingTickResets) {
×
401
            Set<PartPos> pendingTickResetsChannel = this.pendingTickResets.get(channel);
×
402
            if (pendingTickResetsChannel == null) {
×
403
                pendingTickResetsChannel = Sets.newHashSet();
×
404
                this.pendingTickResets.put(channel, pendingTickResetsChannel);
×
405
            }
406
            pendingTickResetsChannel.add(targetPos);
×
407
        }
×
408
    }
×
409

410
    public boolean isTickResetPending(int channel) {
411
        synchronized (this.pendingTickResets) {
×
412
            return this.pendingTickResets.containsKey(channel);
×
413
        }
414
    }
415

416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc