• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hirosystems / stacks-blockchain-api / 3749978634

pending completion
3749978634

push

github

GitHub
feat: [Stacks 2.1] Support new "block 0" boot events (#1476)

2014 of 3089 branches covered (65.2%)

13 of 13 new or added lines in 1 file covered. (100.0%)

886 existing lines in 37 files now uncovered.

7070 of 9217 relevant lines covered (76.71%)

970.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.54
/src/token-metadata/tokens-processor-queue.ts
1
import { FoundOrNot, logError, logger } from '../helpers';
2✔
2
import { Evt } from 'evt';
2✔
3
import PQueue from 'p-queue';
2✔
4
import { DbTokenMetadataQueueEntry, TokenMetadataUpdateInfo } from '../datastore/common';
5
import { ChainID, ClarityAbi } from '@stacks/transactions';
6
import { TokensContractHandler } from './tokens-contract-handler';
2✔
7
import { PgWriteStore } from '../datastore/pg-write-store';
8

9
/**
10
 * The maximum number of token metadata parsing operations that can be ran concurrently before
11
 * being added to a FIFO queue.
12
 */
13
const TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT = 5;
2✔
14

15
export class TokensProcessorQueue {
2✔
16
  readonly queue: PQueue;
17
  readonly db: PgWriteStore;
18
  readonly chainId: ChainID;
19

20
  readonly processStartedEvent: Evt<{
2✔
21
    contractId: string;
22
    txId: string;
23
  }> = new Evt();
24

25
  readonly processEndEvent: Evt<{
2✔
26
    contractId: string;
27
    txId: string;
28
  }> = new Evt();
29

30
  /** The entries currently queued for processing in memory, keyed by the queue entry db id. */
31
  readonly queuedEntries: Map<number, TokenMetadataUpdateInfo> = new Map();
2✔
32

33
  readonly onTokenMetadataUpdateQueued: (queueId: number) => void;
34
  readonly onBlockUpdate: (blockHash: string) => void;
35

36
  constructor(db: PgWriteStore, chainId: ChainID) {
37
    this.db = db;
2✔
38
    this.chainId = chainId;
2✔
39
    this.queue = new PQueue({ concurrency: TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT });
2✔
40
    this.onTokenMetadataUpdateQueued = entry => this.queueNotificationHandler(entry);
5✔
41
    this.db.eventEmitter.on('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
2✔
42
    this.onBlockUpdate = blockHash => this.blockNotificationHandler(blockHash);
11✔
43
    this.db.eventEmitter.on('blockUpdate', this.onBlockUpdate);
2✔
44
  }
45

46
  close() {
47
    this.db.eventEmitter.off('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
×
48
    this.db.eventEmitter.off('blockUpdate', this.onBlockUpdate);
×
49
    this.queue.pause();
×
50
    this.queue.clear();
×
51
  }
52

53
  async drainDbQueue(): Promise<void> {
54
    let entries: DbTokenMetadataQueueEntry[] = [];
12✔
55
    do {
12✔
56
      if (this.queue.isPaused) {
331!
57
        return;
×
58
      }
59
      const queuedEntries = [...this.queuedEntries.keys()];
331✔
60
      try {
331✔
61
        entries = await this.db.getTokenMetadataQueue(
331✔
62
          TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
63
          queuedEntries
64
        );
65
      } catch (error) {
66
        logger.error(error);
1✔
67
      }
68
      for (const entry of entries) {
331✔
69
        await this.queueHandler(entry);
6✔
70
      }
71
      await this.queue.onEmpty();
331✔
72
    } while (entries.length > 0 || this.queuedEntries.size > 0);
656✔
73
  }
74

75
  async checkDbQueue(): Promise<void> {
76
    if (this.queue.isPaused) {
8!
UNCOV
77
      return;
×
78
    }
79
    const queuedEntries = [...this.queuedEntries.keys()];
8✔
80
    const limit = TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT - this.queuedEntries.size;
8✔
81
    if (limit > 0) {
8✔
82
      let entries: DbTokenMetadataQueueEntry[];
83
      try {
8✔
84
        entries = await this.db.getTokenMetadataQueue(
8✔
85
          TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
86
          queuedEntries
87
        );
88
      } catch (error) {
89
        logger.error(error);
1✔
90
        return;
1✔
91
      }
92
      for (const entry of entries) {
7✔
UNCOV
93
        await this.queueHandler(entry);
×
94
      }
95
    }
96
  }
97

98
  async queueNotificationHandler(queueId: number) {
99
    let queueEntry: FoundOrNot<DbTokenMetadataQueueEntry>;
100
    try {
6✔
101
      queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
6✔
102
    } catch (error) {
103
      logger.error(error);
1✔
104
      return;
1✔
105
    }
106
    if (queueEntry.found) {
5✔
107
      await this.queueHandler(queueEntry.result);
5✔
108
    }
109
  }
110

111
  async blockNotificationHandler(_: string) {
112
    // Every block, check if we have entries we still need to process.
113
    await this.drainDbQueue();
11✔
114
  }
115

116
  async queueHandler(queueEntry: TokenMetadataUpdateInfo) {
117
    if (
12✔
118
      this.queuedEntries.has(queueEntry.queueId) ||
20✔
119
      this.queuedEntries.size >= this.queue.concurrency
120
    ) {
121
      return;
4✔
122
    }
123
    let abi: string;
124
    try {
8✔
125
      const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
8✔
126
      if (!contractQuery.found || !contractQuery.result.abi) {
7!
UNCOV
127
        return;
×
128
      }
129
      abi = contractQuery.result.abi;
7✔
130
    } catch (error) {
131
      logger.error(error);
1✔
132
      return;
1✔
133
    }
134
    logger.info(
7✔
135
      `[token-metadata] queueing token contract for processing: ${queueEntry.contractId} from tx ${queueEntry.txId}`
136
    );
137
    this.queuedEntries.set(queueEntry.queueId, queueEntry);
7✔
138

139
    const contractAbi: ClarityAbi = JSON.parse(abi);
7✔
140

141
    const tokenContractHandler = new TokensContractHandler({
7✔
142
      contractId: queueEntry.contractId,
143
      smartContractAbi: contractAbi,
144
      datastore: this.db,
145
      chainId: this.chainId,
146
      txId: queueEntry.txId,
147
      dbQueueId: queueEntry.queueId,
148
    });
149

150
    void this.queue
7✔
151
      .add(async () => {
152
        this.processStartedEvent.post({
7✔
153
          contractId: queueEntry.contractId,
154
          txId: queueEntry.txId,
155
        });
156
        await tokenContractHandler.start();
7✔
157
      })
158
      .catch(error => {
UNCOV
159
        logError(
×
160
          `[token-metadata] error processing token contract: ${tokenContractHandler.contractAddress} ${tokenContractHandler.contractName} from tx ${tokenContractHandler.txId}`,
161
          error
162
        );
163
      })
164
      .finally(() => {
165
        this.queuedEntries.delete(queueEntry.queueId);
7✔
166
        this.processEndEvent.post({
7✔
167
          contractId: queueEntry.contractId,
168
          txId: queueEntry.txId,
169
        });
170
        if (this.queuedEntries.size < this.queue.concurrency) {
7✔
171
          void this.checkDbQueue();
7✔
172
        }
173
      });
174
  }
175
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc