• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hirosystems / stacks-blockchain-api / 3885774965

pending completion
3885774965

Pull #1514

github

GitHub
Merge c6934e6a3 into 91d79bc02
Pull Request #1514: fix: prevent token metadata processor from blocking api launch

1926 of 2930 branches covered (65.73%)

1 of 2 new or added lines in 2 files covered. (50.0%)

71 existing lines in 5 files now uncovered.

6969 of 8983 relevant lines covered (77.58%)

356.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.54
/src/token-metadata/tokens-processor-queue.ts
1
import { FoundOrNot, logError, logger } from '../helpers';
8✔
2
import { Evt } from 'evt';
8✔
3
import PQueue from 'p-queue';
8✔
4
import { DbTokenMetadataQueueEntry, TokenMetadataUpdateInfo } from '../datastore/common';
5
import { ChainID, ClarityAbi } from '@stacks/transactions';
6
import { TokensContractHandler } from './tokens-contract-handler';
8✔
7
import { PgWriteStore } from '../datastore/pg-write-store';
8

9
/**
10
 * The maximum number of token metadata parsing operations that can be ran concurrently before
11
 * being added to a FIFO queue.
12
 */
13
const TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT = 5;
8✔
14

15
export class TokensProcessorQueue {
8✔
16
  readonly queue: PQueue;
17
  readonly db: PgWriteStore;
18
  readonly chainId: ChainID;
19

20
  readonly processStartedEvent: Evt<{
8✔
21
    contractId: string;
22
    txId: string;
23
  }> = new Evt();
24

25
  readonly processEndEvent: Evt<{
8✔
26
    contractId: string;
27
    txId: string;
28
  }> = new Evt();
29

30
  /** The entries currently queued for processing in memory, keyed by the queue entry db id. */
31
  readonly queuedEntries: Map<number, TokenMetadataUpdateInfo> = new Map();
8✔
32

33
  readonly onTokenMetadataUpdateQueued: (queueId: number) => void;
34
  readonly onBlockUpdate: (blockHash: string) => void;
35

36
  constructor(db: PgWriteStore, chainId: ChainID) {
37
    this.db = db;
8✔
38
    this.chainId = chainId;
8✔
39
    this.queue = new PQueue({ concurrency: TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT });
8✔
40
    this.onTokenMetadataUpdateQueued = entry => this.queueNotificationHandler(entry);
20✔
41
    this.db.eventEmitter.on('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
8✔
42
    this.onBlockUpdate = blockHash => this.blockNotificationHandler(blockHash);
45✔
43
    this.db.eventEmitter.on('blockUpdate', this.onBlockUpdate);
8✔
44
  }
45

46
  close() {
47
    this.db.eventEmitter.off('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
×
48
    this.db.eventEmitter.off('blockUpdate', this.onBlockUpdate);
×
49
    this.queue.pause();
×
50
    this.queue.clear();
×
51
  }
52

53
  async drainDbQueue(): Promise<void> {
54
    let entries: DbTokenMetadataQueueEntry[] = [];
4✔
55
    do {
4✔
56
      if (this.queue.isPaused) {
4!
57
        return;
×
58
      }
59
      const queuedEntries = [...this.queuedEntries.keys()];
4✔
60
      try {
4✔
61
        entries = await this.db.getTokenMetadataQueue(
4✔
62
          TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
63
          queuedEntries
64
        );
65
      } catch (error) {
66
        logger.error(error);
4✔
67
      }
68
      for (const entry of entries) {
4✔
UNCOV
69
        await this.queueHandler(entry);
×
70
      }
71
      await this.queue.onEmpty();
4✔
72
    } while (entries.length > 0 || this.queuedEntries.size > 0);
8✔
73
  }
74

75
  async checkDbQueue(): Promise<void> {
76
    if (this.queue.isPaused) {
77!
77
      return;
×
78
    }
79
    const queuedEntries = [...this.queuedEntries.keys()];
77✔
80
    const limit = TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT - this.queuedEntries.size;
77✔
81
    if (limit > 0) {
77✔
82
      let entries: DbTokenMetadataQueueEntry[];
83
      try {
77✔
84
        entries = await this.db.getTokenMetadataQueue(
77✔
85
          TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
86
          queuedEntries
87
        );
88
      } catch (error) {
89
        logger.error(error);
4✔
90
        return;
4✔
91
      }
92
      for (const entry of entries) {
73✔
93
        await this.queueHandler(entry);
23✔
94
      }
95
    }
96
  }
97

98
  async queueNotificationHandler(queueId: number) {
99
    let queueEntry: FoundOrNot<DbTokenMetadataQueueEntry>;
100
    try {
24✔
101
      queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
24✔
102
    } catch (error) {
103
      logger.error(error);
4✔
104
      return;
4✔
105
    }
106
    if (queueEntry.found) {
20✔
107
      await this.queueHandler(queueEntry.result);
20✔
108
    }
109
  }
110

111
  async blockNotificationHandler(_: string) {
112
    await this.checkDbQueue();
45✔
113
  }
114

115
  async queueHandler(queueEntry: TokenMetadataUpdateInfo) {
116
    if (
47✔
117
      this.queuedEntries.has(queueEntry.queueId) ||
79✔
118
      this.queuedEntries.size >= this.queue.concurrency
119
    ) {
120
      return;
15✔
121
    }
122
    let abi: string;
123
    try {
32✔
124
      const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
32✔
125
      if (!contractQuery.found || !contractQuery.result.abi) {
28!
126
        return;
×
127
      }
128
      abi = contractQuery.result.abi;
28✔
129
    } catch (error) {
130
      logger.error(error);
4✔
131
      return;
4✔
132
    }
133
    logger.info(
28✔
134
      `[token-metadata] queueing token contract for processing: ${queueEntry.contractId} from tx ${queueEntry.txId}`
135
    );
136
    this.queuedEntries.set(queueEntry.queueId, queueEntry);
28✔
137

138
    const contractAbi: ClarityAbi = JSON.parse(abi);
28✔
139

140
    const tokenContractHandler = new TokensContractHandler({
28✔
141
      contractId: queueEntry.contractId,
142
      smartContractAbi: contractAbi,
143
      datastore: this.db,
144
      chainId: this.chainId,
145
      txId: queueEntry.txId,
146
      dbQueueId: queueEntry.queueId,
147
    });
148

149
    void this.queue
28✔
150
      .add(async () => {
151
        this.processStartedEvent.post({
28✔
152
          contractId: queueEntry.contractId,
153
          txId: queueEntry.txId,
154
        });
155
        await tokenContractHandler.start();
28✔
156
      })
157
      .catch(error => {
158
        logError(
×
159
          `[token-metadata] error processing token contract: ${tokenContractHandler.contractAddress} ${tokenContractHandler.contractName} from tx ${tokenContractHandler.txId}`,
160
          error
161
        );
162
      })
163
      .finally(() => {
164
        this.queuedEntries.delete(queueEntry.queueId);
28✔
165
        this.processEndEvent.post({
28✔
166
          contractId: queueEntry.contractId,
167
          txId: queueEntry.txId,
168
        });
169
        if (this.queuedEntries.size < this.queue.concurrency) {
28✔
170
          void this.checkDbQueue();
28✔
171
        }
172
      });
173
  }
174
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc