• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

decentraland / archipelago-workers / 25576800384

08 May 2026 08:03PM UTC coverage: 79.551% (-1.1%) from 80.63%
25576800384

Pull #118

github

pentreathm
fix: logging
Pull Request #118: fix: add ban check

165 of 248 branches covered (66.53%)

Branch coverage included in aggregate %.

44 of 45 new or added lines in 2 files covered. (97.78%)

14 existing lines in 3 files now uncovered.

792 of 955 relevant lines covered (82.93%)

335.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.98
/core/src/adapters/engine.ts
1
import {
2
  PeerData,
3
  Island,
4
  PeerPositionChange,
5
  Transport,
6
  BaseComponents,
7
  ChangeToIslandUpdate,
8
  LeaveIslandUpdate,
9
  IslandUpdates,
10
  Engine
11
} from '../types'
12

13
import { intersectPeerGroup, popMax } from '../logic/islands'
8✔
14
import { sequentialIdGenerator } from '../logic/idGenerator'
8✔
15
import { intersectIslands, islandGeometryCalculator } from '../logic/islands'
8✔
16

17
export type Options = {
18
  components: Pick<BaseComponents, 'logs' | 'metrics'>
19
  roomPrefix?: string
20
  joinDistance: number
21
  leaveDistance: number
22
  transport: Transport
23
}
24

25
function recalculateGeometryIfNeeded(island: Island) {
26
  if (
10,141✔
27
    island.peers.length > 0 &&
45,254✔
28
    (island._geometryDirty || island._radius === undefined || island._radius === null || !island._center)
29
  ) {
30
    const [center, radius] = islandGeometryCalculator(island.peers)
1,817✔
31
    island._center = center
1,817✔
32
    island._radius = radius
1,817✔
33
    island._geometryDirty = false
1,817✔
34
  }
35
}
36

37
export function createArchipelagoEngine({
8✔
38
  components: { logs, metrics },
39
  joinDistance,
40
  leaveDistance,
41
  transport,
42
  roomPrefix
43
}: Options): Engine {
44
  const logger = logs.getLogger('Archipelago')
130✔
45
  const peers = new Map<string, PeerData>()
130✔
46
  const islands = new Map<string, Island>()
130✔
47
  const pendingNewPeers = new Map<string, PeerData>()
130✔
48
  const pendingUpdates = new Map<string, ChangeToIslandUpdate | LeaveIslandUpdate>()
130✔
49
  const islandIdGenerator = sequentialIdGenerator(roomPrefix || 'I')
130✔
50
  let currentSequence = 0
130✔
51

52
  function onPeerPositionsUpdate(changes: PeerPositionChange[]): void {
53
    for (const change of changes) {
236✔
54
      const { id, position, preferedIslandId } = change
2,471✔
55
      if (!peers.has(id)) {
2,471✔
56
        pendingNewPeers.set(id, change)
1,704✔
57
      } else {
58
        const peer = peers.get(id)!
767✔
59
        peer.position = position
767✔
60

61
        // We can set the prefered island to undefined by explicitly providing the key but no value.
62
        // If we don't provide the key, we leave it as it is
63
        if ('preferedIslandId' in change) {
767✔
64
          peer.preferedIslandId = preferedIslandId
3✔
65
        }
66

67
        if (peer.islandId) {
767✔
68
          const island = islands.get(peer.islandId)
767✔
69
          if (island) {
767✔
70
            island._geometryDirty = true
767✔
71
          }
72
        }
73
      }
74
    }
75
  }
76

77
  function getIslands(): Island[] {
78
    return Array.from(islands.values())
252✔
79
  }
80

81
  function getIsland(id: string): Island | undefined {
82
    return islands.get(id)
54✔
83
  }
84

85
  function getPeerData(id: string): PeerData | undefined {
86
    return peers.get(id)
45✔
87
  }
88

89
  function getPeerCount(): number {
90
    return peers.size
6✔
91
  }
92

93
  function onPeerDisconnected(id: string): void {
94
    const peer = peers.get(id)
58✔
95
    pendingNewPeers.delete(id)
58✔
96

97
    if (peer) {
58✔
98
      peers.delete(id)
51✔
99
      if (peer.islandId) {
51✔
100
        const island = islands.get(peer.islandId)
51✔
101

102
        if (island) {
51✔
103
          const idx = island.peers.findIndex((it) => it.id === id)
67✔
104
          if (idx >= 0) {
50✔
105
            island.peers.splice(idx, 1)
48✔
106
          }
107

108
          island._geometryDirty = true
50✔
109

110
          if (island.peers.length === 0) {
50✔
111
            islands.delete(island.id)
15✔
112
          }
113
        }
114

115
        pendingUpdates.set(peer.id, { action: 'leave', islandId: peer.islandId })
51✔
116
      }
117
    }
118
  }
119

120
  async function flush(): Promise<IslandUpdates> {
121
    for (const [id, change] of pendingNewPeers) {
260✔
122
      peers.set(id, change)
1,699✔
123
      try {
1,699✔
124
        await createIsland([change])
1,699✔
125
      } catch (err: any) {
126
        // Remove from peers so the peer is not orphaned without an island.
127
        // The next heartbeat will re-add it to pendingNewPeers for retry.
128
        peers.delete(id)
4✔
129
        logger.error(err)
4✔
130
      }
131
    }
132
    pendingNewPeers.clear()
260✔
133

134
    const affectedIslands = new Set<string>()
260✔
135
    for (const island of islands.values()) {
260✔
136
      if (island._geometryDirty) {
1,912✔
137
        affectedIslands.add(island.id)
1,816✔
138
      }
139
    }
140

141
    for (const islandId of affectedIslands) {
260✔
142
      const island = islands.get(islandId)
1,848✔
143
      if (island) {
1,848✔
144
        await checkSplitIsland(island, affectedIslands)
1,845✔
145
      }
146
    }
147

148
    // NOTE: check if islands can be merged
149
    const processedIslands: Record<string, boolean> = {}
260✔
150

151
    for (const islandId of affectedIslands) {
260✔
152
      if (!processedIslands[islandId] && islands.has(islandId)) {
1,848✔
153
        const island = islands.get(islandId)!
317✔
154
        const islandsIntersected: Island[] = []
317✔
155
        for (const [, otherIsland] of islands) {
317✔
156
          if (islandId !== otherIsland.id && intersectIslands(island, otherIsland, joinDistance)) {
2,804✔
157
            islandsIntersected.push(otherIsland)
1,598✔
158
            processedIslands[islandId] = true
1,598✔
159
          }
160
        }
161
        if (islandsIntersected.length > 0) {
317✔
162
          await mergeIslands(island, ...islandsIntersected)
128✔
163
        }
164
      }
165
    }
166

167
    metrics.observe('dcl_archipelago_islands_count', {}, islands.size)
260✔
168
    metrics.observe('dcl_archipelago_peers_count', {}, peers.size)
260✔
169

170
    const updates = new Map(pendingUpdates)
260✔
171
    pendingUpdates.clear()
260✔
172
    return updates
260✔
173
  }
174

175
  async function checkSplitIsland(island: Island, affectedIslands: Set<string>) {
176
    const peerGroups: PeerData[][] = []
1,845✔
177

178
    for (const peer of island.peers) {
1,845✔
179
      const groupsIntersected = peerGroups.filter((it) => intersectPeerGroup(peer, it, leaveDistance))
4,420✔
180
      if (groupsIntersected.length === 0) {
4,420✔
181
        peerGroups.push([peer])
1,879✔
182
      } else {
183
        // We merge all the groups into one
184
        const [finalGroup, ...rest] = groupsIntersected
2,541✔
185
        finalGroup.push(peer)
2,541✔
186

187
        for (const group of rest) {
2,541✔
188
          // We remove each group
189
          peerGroups.splice(peerGroups.indexOf(group), 1)
×
190

191
          //We add the members of each group to the final group
192
          finalGroup.push(...group)
×
193
        }
194
      }
195
    }
196

197
    if (peerGroups.length > 1) {
1,845✔
198
      const biggestGroup = popMax(peerGroups, (group) => group.length)!
68✔
199
      island.peers = biggestGroup
31✔
200
      island._geometryDirty = true
31✔
201

202
      for (const group of peerGroups) {
31✔
203
        try {
34✔
204
          affectedIslands.add(await createIsland(group))
34✔
205
        } catch (err: any) {
206
          // createIsland failed before setPeersIsland, so these peers were never
207
          // assigned to a new island. Put them back in the original island to
208
          // avoid orphaning them.
209
          island.peers.push(...group)
2✔
210
          logger.error(err)
2✔
211
        }
212
      }
213
    }
214
  }
215

216
  async function createIsland(group: PeerData[]): Promise<string> {
217
    const newIslandId = islandIdGenerator.generateId()
1,733✔
218
    const peerIds = group.map((p) => p.id)
1,735✔
219
    const connStrs = await transport.getConnectionStrings(peerIds, newIslandId)
1,733✔
220

221
    // Omitted from connStrs = transport rejected the peer (today: banned).
222
    // Evict so they don't get retried every flush; a fresh heartbeat re-enters them.
223
    for (const p of group) {
1,727✔
224
      if (!(p.id in connStrs) && peers.has(p.id)) {
1,729✔
225
        peers.delete(p.id)
2✔
226
      }
227
    }
228

229
    // After the await, filter out peers that disconnected during the transport call.
230
    // A NATS disconnect callback can fire during the await and remove peers from the
231
    // peers Map. If we include them, they become ghost peers in the island.
232
    const activePeers = group.filter((p) => peers.has(p.id) && p.id in connStrs)
1,729✔
233
    if (activePeers.length === 0) {
1,727✔
234
      return newIslandId
5✔
235
    }
236

237
    const island: Island = {
1,722✔
238
      id: newIslandId,
239
      peers: activePeers,
240
      maxPeers: transport.maxIslandSize,
241
      sequenceId: ++currentSequence,
242
      _geometryDirty: true,
243
      get center() {
244
        recalculateGeometryIfNeeded(this)
5,115✔
245
        return this._center!
5,115✔
246
      },
247
      get radius() {
248
        recalculateGeometryIfNeeded(this)
5,026✔
249
        return this._radius!
5,026✔
250
      }
251
    }
252

253
    islands.set(newIslandId, island)
1,722✔
254

255
    setPeersIsland(island, activePeers, connStrs)
1,722✔
256

257
    return newIslandId
1,722✔
258
  }
259

260
  async function mergeIntoIfPossible(islandToMergeInto: Island, anIsland: Island) {
261
    const canMerge = islandToMergeInto.peers.length + anIsland.peers.length <= islandToMergeInto.maxPeers
1,607✔
262
    if (!canMerge) {
1,607✔
263
      return false
33✔
264
    }
265

266
    try {
1,574✔
267
      const connStrs = await transport.getConnectionStrings(
1,574✔
268
        anIsland.peers.map((p) => p.id),
1,605✔
269
        islandToMergeInto.id
270
      )
271

272
      // After the await, re-validate that both islands still exist and the merge
273
      // is still valid. A NATS disconnect callback can fire during the await and
274
      // delete islands or change peer counts.
275
      if (!islands.has(islandToMergeInto.id) || !islands.has(anIsland.id)) {
1,574✔
276
        return false
3✔
277
      }
278
      if (islandToMergeInto.peers.length + anIsland.peers.length > islandToMergeInto.maxPeers) {
1,571!
279
        return false
×
280
      }
281

282
      // Evict peers rejected by the transport (today: banned). See createIsland.
283
      for (const p of anIsland.peers) {
1,571✔
284
        if (!(p.id in connStrs) && peers.has(p.id)) {
1,602!
NEW
285
          peers.delete(p.id)
×
286
        }
287
      }
288

289
      // Filter out peers that disconnected during the await.
290
      const activePeers = anIsland.peers.filter((p) => peers.has(p.id) && p.id in connStrs)
1,602✔
291
      if (activePeers.length === 0) {
1,571!
292
        islands.delete(anIsland.id)
×
293
        return true
×
294
      }
295

296
      islandToMergeInto.peers.push(...activePeers)
1,571✔
297
      setPeersIsland(islandToMergeInto, activePeers, connStrs)
1,571✔
298
      islands.delete(anIsland.id)
1,571✔
299
      islandToMergeInto._geometryDirty = true
1,571✔
300

301
      return true
1,571✔
302
    } catch (err: any) {
303
      logger.warn(err)
×
304
      return false
×
305
    }
306
  }
307

308
  async function mergeIslands(...islandsToMerge: Island[]) {
309
    const sortedIslands = islandsToMerge.sort((i1, i2) =>
128✔
310
      i1.peers.length === i2.peers.length
1,627✔
311
        ? Math.sign(i1.sequenceId - i2.sequenceId)
312
        : Math.sign(i2.peers.length - i1.peers.length)
313
    )
314

315
    const biggestIslands: Island[] = [sortedIslands.shift()!]
128✔
316

317
    let anIsland: Island | undefined
318

319
    while ((anIsland = sortedIslands.shift())) {
128✔
320
      // Skip islands that were deleted by a concurrent disconnect during a previous await
321
      if (!islands.has(anIsland.id)) {
1,598!
322
        continue
×
323
      }
324

325
      let merged = false
1,598✔
326

327
      const preferedIslandId = getPreferedIslandFor(anIsland)
1,598✔
328

329
      // We only support prefered islands for islands bigger and/or older than the one we are currently processing.
330
      // It would be very unlikely that there is a valid use case for the other possibilities
331
      const preferedIsland = preferedIslandId ? biggestIslands.find((it) => it.id === preferedIslandId) : undefined
1,598✔
332

333
      if (preferedIsland) {
1,598✔
334
        merged = await mergeIntoIfPossible(preferedIsland, anIsland)
4✔
335
      }
336

337
      for (let i = 0; !merged && i < biggestIslands.length; i++) {
1,598✔
338
        if (biggestIslands[i] === preferedIsland) continue
1,604✔
339
        merged = await mergeIntoIfPossible(biggestIslands[i], anIsland)
1,603✔
340
      }
341

342
      if (!merged) {
1,598✔
343
        biggestIslands.push(anIsland)
27✔
344
      }
345
    }
346
  }
347

348
  function setPeersIsland(island: Island, peers: PeerData[], connStrs: Record<string, string>) {
349
    for (const peer of peers) {
3,293✔
350
      const previousIslandId = peer.islandId
3,326✔
351
      peer.islandId = island.id
3,326✔
352
      pendingUpdates.set(peer.id, {
3,326✔
353
        action: 'changeTo',
354
        islandId: island.id,
355
        fromIslandId: previousIslandId,
356
        connStr: connStrs[peer.id]!
357
      })
358
    }
359
  }
360

361
  function getPreferedIslandFor(anIsland: Island) {
362
    const votes: Record<string, number> = {}
1,598✔
363
    let mostVoted: string | undefined
364

365
    for (const peer of anIsland.peers) {
1,598✔
366
      if (peer.preferedIslandId) {
4,229✔
367
        votes[peer.preferedIslandId] = peer.preferedIslandId in votes ? votes[peer.preferedIslandId] + 1 : 1
6!
368

369
        if (!mostVoted || votes[mostVoted] < votes[peer.preferedIslandId]) {
6!
370
          mostVoted = peer.preferedIslandId
6✔
371
        }
372
      }
373
    }
374

375
    return mostVoted
1,598✔
376
  }
377

378
  return {
130✔
379
    flush,
380
    onPeerPositionsUpdate,
381
    getPeerCount,
382
    onPeerDisconnected,
383
    getIslands,
384
    getIsland,
385
    getPeerData
386
  }
387
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc