• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hirosystems / stacks-blockchain-api / 4117314041

pending completion
4117314041

push

github

GitHub
Merge pull request #1539 from hirosystems/beta

2038 of 3151 branches covered (64.68%)

555 of 978 new or added lines in 38 files covered. (56.75%)

11 existing lines in 7 files now uncovered.

7158 of 9352 relevant lines covered (76.54%)

1172.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.54
/src/token-metadata/tokens-processor-queue.ts
1
import { FoundOrNot, logError, logger } from '../helpers';
2✔
2
import { Evt } from 'evt';
2✔
3
import PQueue from 'p-queue';
2✔
4
import { DbTokenMetadataQueueEntry, TokenMetadataUpdateInfo } from '../datastore/common';
5
import { ChainID, ClarityAbi } from '@stacks/transactions';
6
import { TokensContractHandler } from './tokens-contract-handler';
2✔
7
import { PgWriteStore } from '../datastore/pg-write-store';
8

9
/**
10
 * The maximum number of token metadata parsing operations that can be ran concurrently before
11
 * being added to a FIFO queue.
12
 */
13
const TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT = 5;
2✔
14

15
export class TokensProcessorQueue {
2✔
16
  readonly queue: PQueue;
17
  readonly db: PgWriteStore;
18
  readonly chainId: ChainID;
19

20
  readonly processStartedEvent: Evt<{
2✔
21
    contractId: string;
22
    txId: string;
23
  }> = new Evt();
24

25
  readonly processEndEvent: Evt<{
2✔
26
    contractId: string;
27
    txId: string;
28
  }> = new Evt();
29

30
  /** The entries currently queued for processing in memory, keyed by the queue entry db id. */
31
  readonly queuedEntries: Map<number, TokenMetadataUpdateInfo> = new Map();
2✔
32

33
  readonly onTokenMetadataUpdateQueued: (queueId: number) => void;
34
  readonly onBlockUpdate: (blockHash: string) => void;
35

36
  constructor(db: PgWriteStore, chainId: ChainID) {
37
    this.db = db;
2✔
38
    this.chainId = chainId;
2✔
39
    this.queue = new PQueue({ concurrency: TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT });
2✔
40
    this.onTokenMetadataUpdateQueued = entry => this.queueNotificationHandler(entry);
5✔
41
    this.db.eventEmitter.on('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
2✔
42
    this.onBlockUpdate = blockHash => this.blockNotificationHandler(blockHash);
13✔
43
    this.db.eventEmitter.on('blockUpdate', this.onBlockUpdate);
2✔
44
  }
45

46
  close() {
47
    this.db.eventEmitter.off('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
×
48
    this.db.eventEmitter.off('blockUpdate', this.onBlockUpdate);
×
49
    this.queue.pause();
×
50
    this.queue.clear();
×
51
  }
52

53
  async drainDbQueue(): Promise<void> {
54
    let entries: DbTokenMetadataQueueEntry[] = [];
1✔
55
    do {
1✔
56
      if (this.queue.isPaused) {
1!
57
        return;
×
58
      }
59
      const queuedEntries = [...this.queuedEntries.keys()];
1✔
60
      try {
1✔
61
        entries = await this.db.getTokenMetadataQueue(
1✔
62
          TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
63
          queuedEntries
64
        );
65
      } catch (error) {
66
        logger.error(error);
1✔
67
      }
68
      for (const entry of entries) {
1✔
UNCOV
69
        await this.queueHandler(entry);
×
70
      }
71
      await this.queue.onEmpty();
1✔
72
    } while (entries.length > 0 || this.queuedEntries.size > 0);
2✔
73
  }
74

75
  async checkDbQueue(): Promise<void> {
76
    if (this.queue.isPaused) {
20!
77
      return;
×
78
    }
79
    const queuedEntries = [...this.queuedEntries.keys()];
20✔
80
    const limit = TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT - this.queuedEntries.size;
20✔
81
    if (limit > 0) {
20✔
82
      let entries: DbTokenMetadataQueueEntry[];
83
      try {
20✔
84
        entries = await this.db.getTokenMetadataQueue(
20✔
85
          TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
86
          queuedEntries
87
        );
88
      } catch (error) {
89
        logger.error(error);
1✔
90
        return;
1✔
91
      }
92
      for (const entry of entries) {
19✔
93
        await this.queueHandler(entry);
6✔
94
      }
95
    }
96
  }
97

98
  async queueNotificationHandler(queueId: number) {
99
    let queueEntry: FoundOrNot<DbTokenMetadataQueueEntry>;
100
    try {
6✔
101
      queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
6✔
102
    } catch (error) {
103
      logger.error(error);
1✔
104
      return;
1✔
105
    }
106
    if (queueEntry.found) {
5✔
107
      await this.queueHandler(queueEntry.result);
5✔
108
    }
109
  }
110

111
  async blockNotificationHandler(_: string) {
112
    await this.checkDbQueue();
13✔
113
  }
114

115
  async queueHandler(queueEntry: TokenMetadataUpdateInfo) {
116
    if (
12✔
117
      this.queuedEntries.has(queueEntry.queueId) ||
19✔
118
      this.queuedEntries.size >= this.queue.concurrency
119
    ) {
120
      return;
5✔
121
    }
122
    let abi: string;
123
    try {
7✔
124
      const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
7✔
125
      if (!contractQuery.found || !contractQuery.result.abi) {
6!
126
        return;
×
127
      }
128
      abi = contractQuery.result.abi;
6✔
129
    } catch (error) {
130
      logger.error(error);
1✔
131
      return;
1✔
132
    }
133
    logger.info(
6✔
134
      `[token-metadata] queueing token contract for processing: ${queueEntry.contractId} from tx ${queueEntry.txId}`
135
    );
136
    this.queuedEntries.set(queueEntry.queueId, queueEntry);
6✔
137

138
    const contractAbi: ClarityAbi = JSON.parse(abi);
6✔
139

140
    const tokenContractHandler = new TokensContractHandler({
6✔
141
      contractId: queueEntry.contractId,
142
      smartContractAbi: contractAbi,
143
      datastore: this.db,
144
      chainId: this.chainId,
145
      txId: queueEntry.txId,
146
      dbQueueId: queueEntry.queueId,
147
    });
148

149
    void this.queue
6✔
150
      .add(async () => {
151
        this.processStartedEvent.post({
6✔
152
          contractId: queueEntry.contractId,
153
          txId: queueEntry.txId,
154
        });
155
        await tokenContractHandler.start();
6✔
156
      })
157
      .catch(error => {
158
        logError(
×
159
          `[token-metadata] error processing token contract: ${tokenContractHandler.contractAddress} ${tokenContractHandler.contractName} from tx ${tokenContractHandler.txId}`,
160
          error
161
        );
162
      })
163
      .finally(() => {
164
        this.queuedEntries.delete(queueEntry.queueId);
6✔
165
        this.processEndEvent.post({
6✔
166
          contractId: queueEntry.contractId,
167
          txId: queueEntry.txId,
168
        });
169
        if (this.queuedEntries.size < this.queue.concurrency) {
6✔
170
          void this.checkDbQueue();
6✔
171
        }
172
      });
173
  }
174
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc