• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hirosystems / stacks-blockchain-api / 4874244011

pending completion
4874244011

push

github

GitHub
Logging migration to Pino library (#1630)

2071 of 3228 branches covered (64.16%)

130 of 231 new or added lines in 41 files covered. (56.28%)

71 existing lines in 2 files now uncovered.

7294 of 9606 relevant lines covered (75.93%)

1764.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.3
/src/api/routes/core-node-rpc-proxy.ts
1
import * as express from 'express';
32✔
2
import * as cors from 'cors';
32✔
3
import { createProxyMiddleware, Options, responseInterceptor } from 'http-proxy-middleware';
32✔
4
import { parsePort, pipelineAsync, REPO_DIR } from '../../helpers';
32✔
5
import { Agent } from 'http';
32✔
6
import * as fs from 'fs';
32✔
7
import * as path from 'path';
32✔
8
import { asyncHandler } from '../async-handler';
32✔
9
import * as chokidar from 'chokidar';
32✔
10
import * as jsoncParser from 'jsonc-parser';
32✔
11
import fetch, { RequestInit } from 'node-fetch';
32✔
12
import { PgStore } from '../../datastore/pg-store';
13
import { logger } from '../../logger';
32✔
14

15
function GetStacksNodeProxyEndpoint() {
16
  // Use STACKS_CORE_PROXY env vars if available, otherwise fallback to `STACKS_CORE_RPC
17
  const proxyHost =
18
    process.env['STACKS_CORE_PROXY_HOST'] ?? process.env['STACKS_CORE_RPC_HOST'] ?? '';
207!
19
  const proxyPort =
20
    parsePort(process.env['STACKS_CORE_PROXY_PORT'] ?? process.env['STACKS_CORE_RPC_PORT']) ?? 0;
207!
21
  return `${proxyHost}:${proxyPort}`;
207✔
22
}
23

24
export function createCoreNodeRpcProxyRouter(db: PgStore): express.Router {
32✔
25
  const router = express.Router();
207✔
26
  router.use(cors());
207✔
27

28
  const stacksNodeRpcEndpoint = GetStacksNodeProxyEndpoint();
207✔
29

30
  logger.info(`/v2/* proxying to: ${stacksNodeRpcEndpoint}`);
207✔
31

32
  // Note: while keep-alive may result in some performance improvements with the stacks-node http server,
33
  // it can also cause request distribution issues when proxying to a pool of stacks-nodes. See:
34
  // https://github.com/hirosystems/stacks-blockchain-api/issues/756
35
  const httpAgent = new Agent({
207✔
36
    // keepAlive: true,
37
    keepAlive: false, // `false` is the default -- set it explicitly for readability anyway.
38
    // keepAliveMsecs: 60000,
39
    maxSockets: 200,
40
    maxTotalSockets: 400,
41
  });
42

43
  const PROXY_CACHE_CONTROL_FILE_ENV_VAR = 'STACKS_API_PROXY_CACHE_CONTROL_FILE';
207✔
44
  let proxyCacheControlFile = '.proxy-cache-control.json';
207✔
45
  if (process.env[PROXY_CACHE_CONTROL_FILE_ENV_VAR]) {
207!
46
    proxyCacheControlFile = process.env[PROXY_CACHE_CONTROL_FILE_ENV_VAR] as string;
×
47
    logger.info(`Using ${proxyCacheControlFile}`);
×
48
  }
49
  const cacheControlFileWatcher = chokidar.watch(proxyCacheControlFile, {
207✔
50
    persistent: false,
51
    useFsEvents: false,
52
    ignoreInitial: true,
53
  });
54
  let pathCacheOptions = new Map<RegExp, string | null>();
207✔
55

56
  const updatePathCacheOptions = () => {
207✔
57
    try {
207✔
58
      const configContent: { paths: Record<string, string> } = jsoncParser.parse(
207✔
59
        fs.readFileSync(proxyCacheControlFile, 'utf8')
60
      );
61
      pathCacheOptions = new Map(
207✔
62
        Object.entries(configContent.paths).map(([k, v]) => [RegExp(k), v])
1,035✔
63
      );
64
    } catch (error) {
65
      pathCacheOptions.clear();
×
NEW
66
      logger.error(error, `Error reading changes from ${proxyCacheControlFile}`);
×
67
    }
68
  };
69
  updatePathCacheOptions();
207✔
70
  cacheControlFileWatcher.on('all', (eventName, path, stats) => {
207✔
71
    updatePathCacheOptions();
×
72
  });
73

74
  const getCacheControlHeader = (statusCode: number, url: string): string | null => {
207✔
75
    if (statusCode < 200 || statusCode > 299) {
178✔
76
      return null;
79✔
77
    }
78
    for (const [regexp, cacheControl] of pathCacheOptions.entries()) {
99✔
79
      if (cacheControl && regexp.test(url)) {
495!
80
        return cacheControl;
×
81
      }
82
    }
83
    return null;
99✔
84
  };
85

86
  /**
87
   * Check for any extra endpoints that have been configured for performing a "multicast" for a tx submission.
88
   */
89
  async function getExtraTxPostEndpoints(): Promise<string[] | false> {
90
    const STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR = 'STACKS_API_EXTRA_TX_ENDPOINTS_FILE';
34✔
91
    const extraEndpointsEnvVar = process.env[STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR];
34✔
92
    if (!extraEndpointsEnvVar) {
34✔
93
      return false;
33✔
94
    }
95
    const filePath = path.resolve(REPO_DIR, extraEndpointsEnvVar);
1✔
96
    let fileContents: string;
97
    try {
1✔
98
      fileContents = await fs.promises.readFile(filePath, { encoding: 'utf8' });
1✔
99
    } catch (error) {
NEW
100
      logger.error(error, `Error reading ${STACKS_API_EXTRA_TX_ENDPOINTS_FILE_ENV_VAR}`);
×
101
      return false;
×
102
    }
103
    const endpoints = fileContents
1✔
104
      .split(/\r?\n/)
105
      .map(r => r.trim())
1✔
106
      .filter(r => !r.startsWith('#') && r.length !== 0);
1✔
107
    if (endpoints.length === 0) {
1!
108
      return false;
×
109
    }
110
    return endpoints;
1✔
111
  }
112

113
  /**
114
   * Reads an http request stream into a Buffer.
115
   */
116
  async function readRequestBody(req: express.Request, maxSizeBytes = Infinity): Promise<Buffer> {
×
117
    return new Promise((resolve, reject) => {
1✔
118
      let resultBuffer: Buffer = Buffer.alloc(0);
1✔
119
      req.on('data', chunk => {
1✔
120
        if (!Buffer.isBuffer(chunk)) {
1!
121
          reject(
×
122
            new Error(
123
              `Expected request body chunks to be Buffer, received ${chunk.constructor.name}`
124
            )
125
          );
126
          req.destroy();
×
127
          return;
×
128
        }
129
        resultBuffer = resultBuffer.length === 0 ? chunk : Buffer.concat([resultBuffer, chunk]);
1!
130
        if (resultBuffer.byteLength >= maxSizeBytes) {
1!
131
          reject(new Error(`Request body exceeded max byte size`));
×
132
          req.destroy();
×
133
          return;
×
134
        }
135
      });
136
      req.on('end', () => {
1✔
137
        if (!req.complete) {
1!
138
          return reject(
×
139
            new Error('The connection was terminated while the message was still being sent')
140
          );
141
        }
142
        resolve(resultBuffer);
1✔
143
      });
144
      req.on('error', error => reject(error));
1✔
145
    });
146
  }
147

148
  /**
149
   * Logs a transaction broadcast event alongside the current block height.
150
   */
151
  async function logTxBroadcast(response: string): Promise<void> {
152
    try {
34✔
153
      const blockHeightQuery = await db.getCurrentBlockHeight();
34✔
154
      if (!blockHeightQuery.found) {
34!
155
        return;
×
156
      }
157
      const blockHeight = blockHeightQuery.result;
34✔
158
      // Strip wrapping double quotes (if any)
159
      const txId = response.replace(/^"(.*)"$/, '$1');
34✔
160
      logger.info('Transaction broadcasted', {
34✔
161
        txid: `0x${txId}`,
162
        first_broadcast_at_stacks_height: blockHeight,
163
      });
164
    } catch (error) {
NEW
165
      logger.error(error, 'Error logging tx broadcast');
×
166
    }
167
  }
168

169
  router.post(
207✔
170
    '/transactions',
171
    asyncHandler(async (req, res, next) => {
172
      const extraEndpoints = await getExtraTxPostEndpoints();
34✔
173
      if (!extraEndpoints) {
34✔
174
        next();
33✔
175
        return;
33✔
176
      }
177
      const endpoints = [
1✔
178
        // The primary proxy endpoint (the http response from this one will be returned to the client)
179
        `http://${stacksNodeRpcEndpoint}/v2/transactions`,
180
      ];
181
      endpoints.push(...extraEndpoints);
1✔
182
      logger.info(`Overriding POST /v2/transactions to multicast to ${endpoints.join(',')}}`);
1✔
183
      const maxBodySize = 10_000_000; // 10 MB max POST body size
1✔
184
      const reqBody = await readRequestBody(req, maxBodySize);
1✔
185
      const reqHeaders: string[][] = [];
1✔
186
      for (let i = 0; i < req.rawHeaders.length; i += 2) {
1✔
187
        reqHeaders.push([req.rawHeaders[i], req.rawHeaders[i + 1]]);
6✔
188
      }
189
      const postFn = async (endpoint: string) => {
1✔
190
        const reqOpts: RequestInit = {
2✔
191
          method: 'POST',
192
          agent: httpAgent,
193
          body: reqBody,
194
          headers: reqHeaders,
195
        };
196
        const proxyResult = await fetch(endpoint, reqOpts);
2✔
197
        return proxyResult;
2✔
198
      };
199

200
      // Here's were we "multicast" the `/v2/transaction` POST, by concurrently sending the http request to all configured endpoints.
201
      const results = await Promise.allSettled(endpoints.map(endpoint => postFn(endpoint)));
2✔
202

203
      // Only the first (non-extra) endpoint http response is proxied back through to the client, so ensure any errors from requests
204
      // to the extra endpoints are logged.
205
      results.slice(1).forEach(p => {
1✔
206
        if (p.status === 'rejected') {
1!
NEW
207
          logger.error(
×
208
            p.reason,
209
            `Error during POST /v2/transaction to extra endpoint: ${p.reason}`
210
          );
211
        } else {
212
          if (!p.value.ok) {
1!
213
            logger.warn(
×
214
              `Response ${p.value.status} during POST /v2/transaction to extra endpoint ${p.value.url}`
215
            );
216
          }
217
        }
218
      });
219

220
      // Proxy the result of the (non-extra) http response back to the client.
221
      const mainResult = results[0];
1✔
222
      if (mainResult.status === 'rejected') {
1!
NEW
223
        logger.error(
×
224
          mainResult.reason,
225
          `Error in primary POST /v2/transaction proxy: ${mainResult.reason}`
226
        );
227
        res.status(500).json({ error: mainResult.reason });
×
228
      } else {
229
        const proxyResp = mainResult.value;
1✔
230
        res.status(proxyResp.status);
1✔
231
        proxyResp.headers.forEach((value, name) => {
1✔
232
          res.setHeader(name, value);
×
233
        });
234
        if (proxyResp.status === 200) {
1✔
235
          // Log the transaction id broadcast, but clone the `Response` first before parsing its body
236
          // so we don't mess up the original response's `ReadableStream` pointers.
237
          const parsedTxId: string = await proxyResp.clone().text();
1✔
238
          await logTxBroadcast(parsedTxId);
1✔
239
        }
240
        await pipelineAsync(proxyResp.body, res);
1✔
241
      }
242
    })
243
  );
244

245
  const proxyOptions: Options = {
207✔
246
    agent: httpAgent,
247
    target: `http://${stacksNodeRpcEndpoint}`,
248
    changeOrigin: true,
249
    selfHandleResponse: true,
250
    onProxyRes: responseInterceptor(async (responseBuffer, proxyRes, req, res) => {
251
      if (req.url !== undefined) {
178✔
252
        const header = getCacheControlHeader(res.statusCode, req.url);
178✔
253
        if (header) {
178!
254
          res.setHeader('Cache-Control', header);
×
255
        }
256
        const url = new URL(req.url, `http://${req.headers.host}`);
178✔
257
        if (url.pathname === '/v2/transactions' && res.statusCode === 200) {
178✔
258
          await logTxBroadcast(responseBuffer.toString());
33✔
259
        }
260
      }
261
      return responseBuffer;
178✔
262
    }),
263
    onError: (error, req, res) => {
264
      const msg =
265
        (error as any).code === 'ECONNREFUSED'
42!
266
          ? 'core node unresponsive'
267
          : 'cannot connect to core node';
268
      res
42✔
269
        .writeHead(502, { 'Content-Type': 'application/json' })
270
        .end(JSON.stringify({ message: msg, error: error }));
271
    },
272
  };
273

274
  const stacksNodeRpcProxy = createProxyMiddleware(proxyOptions);
207✔
275

276
  router.use(stacksNodeRpcProxy);
207✔
277

278
  return router;
207✔
279
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc