• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

janis-commerce / log / 19508099670

19 Nov 2025 04:10PM UTC coverage: 97.81% (-0.7%) from 98.491%
19508099670

push

github

Juan Hapes
5.1.4

78 of 81 branches covered (96.3%)

Branch coverage included in aggregate %.

190 of 193 relevant lines covered (98.45%)

29.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.22
/lib/firehose-instance.js
1
/* eslint-disable max-len */
2

3
'use strict';
4

5
const { STS, Firehose } = require('./aws-wrappers');
1✔
6
const LogError = require('./log-error');
1✔
7

8
const { internalLog, internalLogError } = require('./logging');
1✔
9

10
const sts = new STS();
1✔
11

12
const ARN_DURATION = 1800; // 30 min
1✔
13
const MAX_TIMEOUT = 500;
1✔
14

15
let firehoseInstance;
16
let credentialsExpiration;
17

18
module.exports = class FirehoseInstance {
1✔
19

20
        /**
21
         * @private
22
         */
23
        get batchSize() {
24
                return {
32✔
25
                        1: 500,
26
                        2: 100,
27
                        3: 50,
28
                        4: 10,
29
                        5: 1
30
                };
31
        }
32

33
        /**
34
         * @private
35
         */
36
        get logRoleArn() {
37
                return process.env.TRACE_LOG_ROLE_ARN;
23✔
38
        }
39

40
        /**
41
         * @private
42
         */
43
        get deliveryStreamName() {
44
                return process.env.TRACE_FIREHOSE_DELIVERY_STREAM;
29✔
45
        }
46

47
        /**
48
         * @private
49
         */
50
        get roleSessionName() {
51
                return `role-session-${process.env.JANIS_SERVICE_NAME}`;
11✔
52
        }
53

54
        /**
55
         * Set Instance of Firehose Wrapper when is not set
56
         * @private
57
         */
58
        async ensureFirehoseInstance() {
59

60
                if(this.validCredentials())
13✔
61
                        return;
1✔
62

63
                const firehoseParams = {
12✔
64
                        region: process.env.AWS_DEFAULT_REGION,
65
                        httpOptions: { timeout: MAX_TIMEOUT }
66
                };
67

68
                if(this.logRoleArn) {
12✔
69
                        firehoseParams.credentials = await this.getCredentials();
11✔
70
                        credentialsExpiration = new Date(firehoseParams.credentials.expiration);
9✔
71
                }
72

73
                firehoseInstance = new Firehose(firehoseParams);
10✔
74
        }
75

76
        /**
77
         * @private
78
         */
79
        validCredentials() {
80
                return firehoseInstance
13✔
81
                        && credentialsExpiration
82
                        && credentialsExpiration >= new Date();
83
        }
84

85
        /**
86
         * @private
87
         */
88
        async getCredentials() {
89

90
                const assumedRole = await sts.assumeRole({
11✔
91
                        RoleArn: this.logRoleArn,
92
                        RoleSessionName: this.roleSessionName,
93
                        DurationSeconds: ARN_DURATION
94
                });
95

96
                if(!assumedRole)
10✔
97
                        throw new LogError('Failed to assume role, invalid response.', LogError.codes.ASSUME_ROLE_ERROR);
1✔
98

99
                const { Credentials, Expiration } = assumedRole;
9✔
100

101
                return {
9✔
102
                        accessKeyId: Credentials.AccessKeyId,
103
                        secretAccessKey: Credentials.SecretAccessKey,
104
                        sessionToken: Credentials.SessionToken,
105
                        expiration: Expiration
106
                };
107
        }
108

109
        /**
110
         * @param {LogData[]} logs
111
         * @returns
112
         */
113
        async putRecords(logs) {
114

115
                try {
13✔
116
                        await this.ensureFirehoseInstance();
13✔
117
                } catch(err) {
118
                        internalLogError(`Error ensuring Firehose instance - ${err.message}`);
2✔
119
                }
120

121
                internalLog(`Putting ${logs.length} logs`);
13✔
122

123
                const initialBatchSize = this.batchSize[1];
13✔
124

125
                const promises = [];
13✔
126

127
                for(let offset = 0; offset < logs.length; offset += initialBatchSize)
13✔
128
                        promises.push(this.putBatch(logs.slice(offset, offset + initialBatchSize)));
13✔
129

130
                await Promise.allSettled(promises);
13✔
131
        }
132

133
        async putBatch(batch, attemptNumber = 1) {
13✔
134

135
                const response = await this.putRecordBatch(batch);
29✔
136

137
                if(!response?.FailedPutCount)
29✔
138
                        return internalLog(`Successfully put ${batch.length} log(s)`);
10✔
139

140
                const failedRecords = response.RequestResponses ?
19✔
141
                        batch.filter((_, index) => response.RequestResponses[index]?.ErrorCode) :
2✔
142
                        batch;
143

144
                const nextBatchSize = this.batchSize[attemptNumber + 1];
19✔
145

146
                if(!nextBatchSize)
19✔
147
                        return internalLogError(`Failed to put ${failedRecords.length} log(s) after all retries`);
4✔
148

149
                if(!failedRecords.length)
15!
150
                        return internalLog(`Successfully put ${batch.length} log(s)`);
×
151

152
                const promises = [];
15✔
153

154
                for(let offset = 0; offset < failedRecords.length; offset += nextBatchSize)
15✔
155
                        promises.push(this.putBatch(failedRecords.slice(offset, offset + nextBatchSize), attemptNumber + 1));
16✔
156

157
                await Promise.allSettled(promises);
15✔
158
        }
159

160
        /**
161
         * @param {LogData[]} records
162
         * @private
163
         */
164
        async putRecordBatch(records) {
165

166
                try {
29✔
167

168
                        const response = await firehoseInstance.putRecordBatch({
29✔
169
                                DeliveryStreamName: this.deliveryStreamName,
170
                                Records: records.map(record => ({ Data: Buffer.from(JSON.stringify(record)) }))
34✔
171
                        });
172

173
                        internalLog(`putRecordBatch() response for ${records.length} logs - ${JSON.stringify(response)}`);
23✔
174

175
                        return response;
23✔
176

177
                } catch(err) {
178

179
                        internalLogError(`Error putRecordBatch() for ${records.length} logs - ${err.message}`);
6✔
180

181
                        return {
6✔
182
                                FailedPutCount: records.length,
183
                                Message: err.message
184
                        };
185
                }
186
        }
187
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc