• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

GoodDollar / GoodServer / 12903998428

14 Jan 2025 12:39PM UTC coverage: 49.574% (+0.2%) from 49.344%
12903998428

push

github

sirpy
fix: remove unused

579 of 1453 branches covered (39.85%)

Branch coverage included in aggregate %.

1863 of 3473 relevant lines covered (53.64%)

8.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.8
/src/server/utils/tx-manager/queueMongo.js
1
import WalletNonce from '../../db/mongo/models/wallet-nonce'
2
import logger from '../../../imports/logger'
3
import conf from '../../server.config'
4
import moment from 'moment'
5
import { remove } from 'lodash'
6

7
export default class queueMongo {
8
  constructor(networkId, lockExpireSeconds = conf.mongoQueueMaxLockTime) {
×
9
    this.log = logger.child({ from: 'queueMongo-' + networkId })
1✔
10
    this.lockExpireSeconds = lockExpireSeconds
1✔
11
    this.networkId = networkId
1✔
12
    this.model = WalletNonce
1✔
13
    this.queue = []
1✔
14
    this.nonce = null
1✔
15
    this.getTransactionCount = () => 0
2✔
16
    this.reRunQueue = null
1✔
17
    this.wallets = {}
1✔
18
    const filter = [
1✔
19
      {
20
        $match: {
21
          $and: [
22
            { 'updateDescription.updatedFields.isLock': { $eq: false } },
23
            { operationType: 'update' },
24
            { 'fullDocument.networkId': { $eq: String(this.networkId) } }
25
          ]
26
        }
27
      }
28
    ]
29

30
    const options = { fullDocument: 'updateLookup' }
1✔
31

32
    this.model.watch(filter, options).on('change', async () => {
1✔
33
      await this.run()
11✔
34
    })
35

36
    this.log.info('queueMongo created')
1✔
37
  }
38

39
  /**
40
   * Get new nonce after increment
41

42
   * @param {array} addresses
43

44
   * @returns {Promise<*>}
45
   */
46
  async getWalletNonce(addresses, id) {
47
    try {
11✔
48
      const expired = moment().subtract(this.lockExpireSeconds, 'seconds').toDate()
11✔
49
      const filter = {
11✔
50
        address: { $in: addresses },
51
        networkId: this.networkId,
52
        $or: [
53
          { isLock: false },
54
          {
55
            lockedAt: { $lte: expired },
56
            isLock: true
57
          }
58
        ]
59
      }
60
      const update = { isLock: true, lockedAt: +new Date() }
11✔
61
      this.log.trace('getting free address', { addresses, expired, id })
11✔
62
      let wallet = await this.model.findOneAndUpdate(filter, update, {
11✔
63
        sort: { lockedAt: 1 }, //get least recently used
64
        returnNewDocument: true
65
      })
66
      this.log.trace('got free address', { addresses, expired, wallet, id })
11✔
67

68
      return wallet
11✔
69
    } catch (e) {
70
      this.log.error('TX queueMongo (getWalletNonce)', e.message, e, { addresses, id })
×
71
      return false
×
72
    }
73
  }
74

75
  /**
76
   * Create array of addresses if not exists to db
77
   *
78
   * @param {array} addresses
79
   *
80
   * @returns {Promise<void>}
81
   */
82
  async createListIfNotExists(addresses) {
83
    const ps = addresses.map(_ => this.createWallet(_))
11✔
84
    await Promise.all(ps)
11✔
85
  }
86

87
  /**
88
   * Create if not exists to db
89
   *
90
   * @param {string} address
91
   *
92
   * @returns {Promise<void>}
93
   */
94
  async createWallet(address) {
95
    if (this.wallets[address]) {
11✔
96
      return
9✔
97
    }
98
    try {
2✔
99
      const nonce = await this.getTransactionCount(address)
2✔
100
      this.log.debug(`init wallet ${address} with nonce ${nonce} in mongo`)
2✔
101
      await this.model.findOneAndUpdate(
2✔
102
        { address, networkId: this.networkId },
103
        {
104
          $setOnInsert: {
105
            address,
106
            isLock: false,
107
            networkId: this.networkId
108
          },
109
          //make sure we reset nonce on startup
110
          $set: {
111
            nonce
112
          }
113
        },
114
        { upsert: true }
115
      )
116
      this.log.debug(`wallet initialized ${address} with nonce ${nonce} in mongo`)
2✔
117
      this.wallets[address] = true
2✔
118
    } catch (e) {
119
      this.log.error('TX queueMongo (create)', e.message, e, { address })
×
120
    }
121
  }
122

123
  /**
124
   * Unlock for queue
125
   *
126
   * @param {string} address
127
   * @param {string} nextNonce
128
   *
129
   * @returns {Promise<void>}
130
   */
131
  async unlock(address, nextNonce) {
132
    const update = {
11✔
133
      isLock: false
134
    }
135
    if (nextNonce != null) {
11!
136
      update.nonce = nextNonce
11✔
137
    }
138
    try {
11✔
139
      await this.model.findOneAndUpdate(
11✔
140
        {
141
          address,
142
          networkId: this.networkId
143
        },
144
        update,
145
        { returnNewDocument: true }
146
      )
147
    } catch (e) {
148
      this.log.error('errorunlock', e.message, e, { address })
×
149
    }
150
  }
151

152
  /**
153
   * lock for queue
154
   *
155
   * @param {array}addresses
156
   *
157
   * @returns {Promise<any>}
158
   */
159
  lock(addresses, timeout = 2000, id) {
×
160
    return new Promise((resolve, reject) => {
×
161
      let timer
162
      let released = false
×
163

164
      //this callback is called by the queue to resolve the promise
165
      const cb = ({ nonce, address }) => {
×
166
        timer && clearTimeout(timer)
×
167
        resolve({
×
168
          address,
169
          nonce,
170
          release: async () => {
171
            released = true
×
172
            await this.unlock(address, nonce + 1)
×
173
          },
174
          fail: async () => !released && (await this.unlock(address))
×
175
        })
176
      }
177

178
      timer = setTimeout(() => {
×
179
        //if timer make sure to remove request from queue
180
        this.removeFromQueue(cb, id)
×
181
        reject(new Error('lock not acquired timeout id:' + id))
×
182
        this.log.warn('lock timedout,', { addresses, id })
×
183
      }, timeout)
184

185
      this.addToQueue(addresses, cb, id)
×
186
    })
187
  }
188

189
  async lockOrFail(addresses, id) {
190
    addresses = Array.isArray(addresses) ? addresses : [addresses]
11!
191
    await this.createListIfNotExists(addresses)
11✔
192
    const wallet = await this.getWalletNonce(addresses, id)
11✔
193
    if (wallet) {
11!
194
      return {
11✔
195
        ...wallet,
196
        release: async () => await this.unlock(wallet.address, wallet.nonce + 1),
11✔
197
        fail: async () => await this.unlock(wallet.address)
×
198
      }
199
    } else {
200
      throw new Error('lock not acquired')
×
201
    }
202
  }
203

204
  removeFromQueue(cb, id) {
205
    this.log.info('removeFromQueue', { cb, id, queue: this.queue.length })
×
206
    remove(this.queue, x => {
×
207
      if (x.cb === cb || x.id === id) {
×
208
        this.log.info('removed from queue', { x, id })
×
209
        return (x.removed = true)
×
210
      }
211
      return false
×
212
    })
213
    this.log.info('removeFromQueue result ', { cb, id, queue: this.queue.length })
×
214
  }
215

216
  /**
217
   *  Add new tr to
218
   *
219
   * @param {array} addresses
220
   * @param {function} cb
221
   *
222
   * @returns {Promise<void>}
223
   */
224
  async addToQueue(addresses, cb, id) {
225
    addresses = Array.isArray(addresses) ? addresses : [addresses]
×
226
    await this.createListIfNotExists(addresses)
×
227

228
    this.queue.push({ cb, addresses, id })
×
229

230
    this.run()
×
231
  }
232

233
  /**
234
   * Run the first transaction from the queue
235
   *
236
   * @returns {Promise<void>}
237
   */
238
  async run() {
239
    let nextTr, walletNonce
240
    try {
11✔
241
      if (this.queue.length > 0) {
11!
242
        nextTr = this.queue.shift()
×
243
        if (!nextTr.removed) walletNonce = await this.getWalletNonce(nextTr.addresses, nextTr.id)
×
244
        if (walletNonce) {
×
245
          nextTr.cb({ nonce: walletNonce.nonce, address: walletNonce.address })
×
246
        } else {
247
          !nextTr.removed && this.queue.push(nextTr)
×
248
        }
249

250
        //make sure we will run again, even though there's a mongodb listener
251
        if (this.reRunQueue) {
×
252
          clearTimeout(this.reRunQueue)
×
253
        }
254
        this.reRunQueue = setTimeout(() => {
×
255
          this.run()
×
256
        }, this.lockExpireSeconds * 1000)
257
      }
258
    } catch (e) {
259
      this.log.error('TX queueMongo (run)', e.message, e, { nextTr, walletNonce })
×
260
    }
261
  }
262

263
  /**
264
   * Get lock status for address
265
   *
266
   * @param {string} address
267
   *
268
   * @returns {Boolean}
269
   */
270
  async isLocked(address) {
271
    const wallet = await this.model.findOne({ address, networkId: this.networkId })
×
272
    const expired = moment().subtract(this.lockExpireSeconds, 'seconds')
×
273
    const lockNotExpired = wallet && wallet.lockedAt && expired.isBefore(wallet.lockedAt)
×
274
    return Boolean(wallet && wallet.isLock && lockNotExpired)
×
275
  }
276
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc