• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hirosystems / stacks-blockchain-api / 4863643345

pending completion
4863643345

Pull #1630

github

GitHub
Merge b9b14018f into a6c8f16b6
Pull Request #1630: Logging migration to Pino library

2140 of 3228 branches covered (66.29%)

131 of 231 new or added lines in 41 files covered. (56.71%)

38 existing lines in 2 files now uncovered.

7490 of 9606 relevant lines covered (77.97%)

1789.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.54
/src/token-metadata/tokens-processor-queue.ts
1
import { FoundOrNot } from '../helpers';
2
import { Evt } from 'evt';
2✔
3
import PQueue from 'p-queue';
2✔
4
import { DbTokenMetadataQueueEntry, TokenMetadataUpdateInfo } from '../datastore/common';
5
import { ChainID, ClarityAbi } from '@stacks/transactions';
6
import { TokensContractHandler } from './tokens-contract-handler';
2✔
7
import { PgWriteStore } from '../datastore/pg-write-store';
8
import { logger } from '../logger';
2✔
9

10
/**
11
 * The maximum number of token metadata parsing operations that can be ran concurrently before
12
 * being added to a FIFO queue.
13
 */
14
const TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT = 5;
2✔
15

16
export class TokensProcessorQueue {
2✔
17
  readonly queue: PQueue;
18
  readonly db: PgWriteStore;
19
  readonly chainId: ChainID;
20

21
  readonly processStartedEvent: Evt<{
2✔
22
    contractId: string;
23
    txId: string;
24
  }> = new Evt();
25

26
  readonly processEndEvent: Evt<{
2✔
27
    contractId: string;
28
    txId: string;
29
  }> = new Evt();
30

31
  /** The entries currently queued for processing in memory, keyed by the queue entry db id. */
32
  readonly queuedEntries: Map<number, TokenMetadataUpdateInfo> = new Map();
2✔
33

34
  readonly onTokenMetadataUpdateQueued: (queueId: number) => void;
35
  readonly onBlockUpdate: (blockHash: string) => void;
36

37
  constructor(db: PgWriteStore, chainId: ChainID) {
38
    this.db = db;
2✔
39
    this.chainId = chainId;
2✔
40
    this.queue = new PQueue({ concurrency: TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT });
2✔
41
    this.onTokenMetadataUpdateQueued = entry => this.queueNotificationHandler(entry);
5✔
42
    this.db.eventEmitter.on('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
2✔
43
    this.onBlockUpdate = blockHash => this.blockNotificationHandler(blockHash);
12✔
44
    this.db.eventEmitter.on('blockUpdate', this.onBlockUpdate);
2✔
45
  }
46

47
  close() {
48
    this.db.eventEmitter.off('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
×
49
    this.db.eventEmitter.off('blockUpdate', this.onBlockUpdate);
×
50
    this.queue.pause();
×
51
    this.queue.clear();
×
52
  }
53

54
  async drainDbQueue(): Promise<void> {
55
    let entries: DbTokenMetadataQueueEntry[] = [];
1✔
56
    do {
1✔
57
      if (this.queue.isPaused) {
1!
58
        return;
×
59
      }
60
      const queuedEntries = [...this.queuedEntries.keys()];
1✔
61
      try {
1✔
62
        entries = await this.db.getTokenMetadataQueue(
1✔
63
          TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
64
          queuedEntries
65
        );
66
      } catch (error) {
67
        logger.error(error);
1✔
68
      }
69
      for (const entry of entries) {
1✔
70
        await this.queueHandler(entry);
×
71
      }
72
      await this.queue.onEmpty();
1✔
73
    } while (entries.length > 0 || this.queuedEntries.size > 0);
2✔
74
  }
75

76
  async checkDbQueue(): Promise<void> {
77
    if (this.queue.isPaused) {
19!
78
      return;
×
79
    }
80
    const queuedEntries = [...this.queuedEntries.keys()];
19✔
81
    const limit = TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT - this.queuedEntries.size;
19✔
82
    if (limit > 0) {
19✔
83
      let entries: DbTokenMetadataQueueEntry[];
84
      try {
19✔
85
        entries = await this.db.getTokenMetadataQueue(
19✔
86
          TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
87
          queuedEntries
88
        );
89
      } catch (error) {
90
        logger.error(error);
1✔
91
        return;
1✔
92
      }
93
      for (const entry of entries) {
18✔
94
        await this.queueHandler(entry);
6✔
95
      }
96
    }
97
  }
98

99
  async queueNotificationHandler(queueId: number) {
100
    let queueEntry: FoundOrNot<DbTokenMetadataQueueEntry>;
101
    try {
6✔
102
      queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
6✔
103
    } catch (error) {
104
      logger.error(error);
1✔
105
      return;
1✔
106
    }
107
    if (queueEntry.found) {
5✔
108
      await this.queueHandler(queueEntry.result);
5✔
109
    }
110
  }
111

112
  async blockNotificationHandler(_: string) {
113
    await this.checkDbQueue();
12✔
114
  }
115

116
  async queueHandler(queueEntry: TokenMetadataUpdateInfo) {
117
    if (
12✔
118
      this.queuedEntries.has(queueEntry.queueId) ||
19✔
119
      this.queuedEntries.size >= this.queue.concurrency
120
    ) {
121
      return;
5✔
122
    }
123
    let abi: string;
124
    try {
7✔
125
      const contractQuery = await this.db.getSmartContract(queueEntry.contractId);
7✔
126
      if (!contractQuery.found || !contractQuery.result.abi) {
6!
127
        return;
×
128
      }
129
      abi = contractQuery.result.abi;
6✔
130
    } catch (error) {
131
      logger.error(error);
1✔
132
      return;
1✔
133
    }
134
    logger.info(
6✔
135
      `[token-metadata] queueing token contract for processing: ${queueEntry.contractId} from tx ${queueEntry.txId}`
136
    );
137
    this.queuedEntries.set(queueEntry.queueId, queueEntry);
6✔
138

139
    const contractAbi: ClarityAbi = JSON.parse(abi);
6✔
140

141
    const tokenContractHandler = new TokensContractHandler({
6✔
142
      contractId: queueEntry.contractId,
143
      smartContractAbi: contractAbi,
144
      datastore: this.db,
145
      chainId: this.chainId,
146
      txId: queueEntry.txId,
147
      dbQueueId: queueEntry.queueId,
148
    });
149

150
    void this.queue
6✔
151
      .add(async () => {
152
        this.processStartedEvent.post({
6✔
153
          contractId: queueEntry.contractId,
154
          txId: queueEntry.txId,
155
        });
156
        await tokenContractHandler.start();
6✔
157
      })
158
      .catch(error => {
NEW
159
        logger.error(
×
160
          error,
161
          `[token-metadata] error processing token contract: ${tokenContractHandler.contractAddress} ${tokenContractHandler.contractName} from tx ${tokenContractHandler.txId}`
162
        );
163
      })
164
      .finally(() => {
165
        this.queuedEntries.delete(queueEntry.queueId);
6✔
166
        this.processEndEvent.post({
6✔
167
          contractId: queueEntry.contractId,
168
          txId: queueEntry.txId,
169
        });
170
        if (this.queuedEntries.size < this.queue.concurrency) {
6✔
171
          void this.checkDbQueue();
6✔
172
        }
173
      });
174
  }
175
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc