• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rokucommunity / roku-debug / #2015

pending completion
#2015

push

web-flow
Merge 4961675eb into f30a7deaa

1876 of 2742 branches covered (68.42%)

Branch coverage included in aggregate %.

1876 of 1876 new or added lines in 41 files covered. (100.0%)

3422 of 4578 relevant lines covered (74.75%)

27.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.8
/src/debugProtocol/DebugProtocolClientReplaySession.ts
1
import { DebugProtocolClient } from './client/DebugProtocolClient';
1✔
2
import { defer, util } from '../util';
1✔
3
import type { ProtocolRequest, ProtocolResponse, ProtocolUpdate } from './events/ProtocolEvent';
4
import { DebugProtocolServer } from './server/DebugProtocolServer';
1✔
5
import * as Net from 'net';
1✔
6
import { ActionQueue } from '../managers/ActionQueue';
1✔
7
import { IOPortOpenedUpdate, isIOPortOpenedUpdate } from './events/updates/IOPortOpenedUpdate';
1✔
8

9
export class DebugProtocolClientReplaySession {
1✔
10
    constructor(options: {
11
        bufferLog: string;
12
    }) {
13
        this.parseBufferLog(options?.bufferLog);
×
14
    }
15

16
    private disposables = Array<() => void>();
×
17

18
    /**
19
     * A dumb tcp server that will simply spit back the server buffer data when needed
20
     */
21
    private server: Net.Socket;
22

23
    private ioSocket: Net.Socket;
24

25
    private client: DebugProtocolClient;
26

27
    private entryIndex = 0;
×
28
    private entries: Array<BufferLogEntry>;
29

30
    private peekEntry() {
31
        this.flushIO();
×
32
        return this.entries[this.entryIndex];
×
33
    }
34
    private advanceEntry() {
35
        this.flushIO();
×
36
        return this.entries[this.entryIndex++];
×
37
    }
38

39
    private flushIO() {
40
        while (this.entries[this.entryIndex]?.type === 'io') {
×
41
            const entry = this.entries[this.entryIndex++];
×
42
            this.ioSocket.write(entry.buffer);
×
43
        }
44
    }
45

46
    private parseBufferLog(bufferLog: string) {
47
        this.entries = bufferLog
×
48
            .split(/\r?\n/g)
49
            .map(x => x.trim())
×
50
            .filter(x => !!x)
×
51
            .map(line => {
52
                const entry = JSON.parse(line);
×
53
                entry.timestamp = new Date(entry.timestamp as string);
×
54
                // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
55
                entry.buffer = Buffer.from(entry.buffer);
×
56
                return entry;
×
57
            });
58
    }
59

60
    public result: Array<ProtocolRequest | ProtocolResponse | ProtocolUpdate> = [];
×
61
    private finished = defer();
×
62
    private controlPort: number;
63
    private ioPort: number;
64

65
    public async run() {
66
        this.controlPort = await util.getPort();
×
67
        this.ioPort = await util.getPort();
×
68

69
        await this.createServer(this.controlPort);
×
70

71
        this.createClient(this.controlPort);
×
72

73
        //connect, but don't send the handshake. That'll be send through our first server-to-client entry (hopefully)
74
        await this.client.connect(false);
×
75

76
        void this.clientProcess();
×
77
        await this.finished.promise;
×
78
    }
79

80
    private createClient(controlPort: number) {
81
        this.client = new DebugProtocolClient({
×
82
            controlPort: controlPort,
83
            host: 'localhost'
84
        });
85

86
        //store the responses in the result
87
        this.client.on('response', (response) => {
×
88
            this.result.push(response);
×
89
            void this.clientProcess();
×
90
        });
91
        this.client.on('update', (update) => {
×
92
            this.result.push(update);
×
93
            void this.clientProcess();
×
94
        });
95

96
        this.client.on('io-output', (data) => {
×
97
            console.log(data);
×
98
            void this.clientProcess();
×
99
        });
100

101
        //anytime the client receives buffer data, we should try and process it
102
        this.client.on('data', (data) => {
×
103
            this.clientSync.pushActual(data);
×
104
            void this.clientProcess();
×
105
        });
106

107
        this.client.plugins.add({
×
108
            beforeHandleUpdate: async (event) => {
109
                if (isIOPortOpenedUpdate(event.update)) {
×
110
                    //spin up an IO port before finishing this update
111
                    await this.openIOPort();
×
112

113
                    const update = IOPortOpenedUpdate.fromJson(event.update.data);
×
114
                    update.data.port = this.ioPort;
×
115
                    //if we get an IO update, change the port and host to the local stuff (for testing purposes)
116
                    event.update = update;
×
117
                }
118
            }
119
        });
120

121
        //stuff to run when the session is disposed
122
        this.disposables.push(() => {
×
123
            this.client.destroy();
×
124
        });
125
    }
126

127
    private openIOPort() {
128
        console.log(`Spinning up mock IO socket on port ${this.ioPort}`);
×
129
        return new Promise<void>((resolve) => {
×
130
            const server = new Net.Server({});
×
131

132
            //whenever a client makes a connection
133
            // eslint-disable-next-line @typescript-eslint/no-misused-promises
134
            server.on('connection', (client: Net.Socket) => {
×
135
                this.ioSocket = client;
×
136
                //anytime we receive incoming data from the client
137
                client.on('data', (data) => {
×
138
                    //TODO send IO data
139
                });
140
            });
141
            server.listen({
×
142
                port: this.ioPort,
143
                hostName: 'localhost'
144
            }, () => {
145
                resolve();
×
146
            });
147

148
            //stuff to run when the session is disposed
149
            this.disposables.push(() => {
×
150
                server.close();
×
151
            });
152
            this.disposables.push(() => {
×
153
                this.ioSocket?.destroy();
×
154
            });
155
        });
156
    }
157

158
    private clientSync = new BufferSync();
×
159

160
    private clientActionQueue = new ActionQueue();
×
161

162
    private async clientProcess() {
163
        await this.clientActionQueue.run(async () => {
×
164
            //build a single buffer of client data
165
            while (this.peekEntry()?.type === 'client-to-server') {
×
166
                //make sure it's been enough time since the last entry
167
                await this.sleepForEntryGap();
×
168
                const entry = this.advanceEntry();
×
169
                const request = DebugProtocolServer.getRequest(entry.buffer, true);
×
170

171
                //store this client data for our mock server to recognize and track
172
                this.serverSync.pushExpected(request.toBuffer());
×
173

174
                //store the request in the result
175
                this.result.push(request);
×
176

177
                //send the request
178
                void this.client.processRequest(request);
×
179

180
            }
181
            this.finalizeIfDone();
×
182
            return true;
×
183
        });
184
    }
185

186
    private finalizeIfDone() {
187
        if (this.clientSync.areInSync && this.serverSync.areInSync && this.entryIndex >= this.entries.length) {
×
188
            this.finished.resolve();
×
189
        }
190
    }
191

192
    private createServer(controlPort: number) {
193
        return new Promise<void>((resolve) => {
×
194

195
            const server = new Net.Server({});
×
196
            //Roku only allows 1 connection, so we should too.
197
            server.maxConnections = 1;
×
198

199
            //whenever a client makes a connection
200
            // eslint-disable-next-line @typescript-eslint/no-misused-promises
201
            server.on('connection', (client: Net.Socket) => {
×
202
                this.server = client;
×
203
                //anytime we receive incoming data from the client
204
                client.on('data', (data) => {
×
205
                    console.log('server got:', JSON.stringify(data.toJSON().data));
×
206
                    void this.serverProcess(data);
×
207
                });
208
            });
209
            server.listen({
×
210
                port: controlPort,
211
                hostName: 'localhost'
212
            }, () => {
213
                resolve();
×
214
            });
215

216
            //stuff to run when the session is disposed
217
            this.disposables.push(() => {
×
218
                server.close();
×
219
            });
220
            this.disposables.push(() => {
×
221
                this.client?.destroy();
×
222
            });
223
        });
224
    }
225

226
    private serverActionQueue = new ActionQueue();
×
227

228
    private serverSync = new BufferSync();
×
229
    private serverProcessIdx = 0;
×
230
    private async serverProcess(data: Buffer) {
231
        let serverProcesIdx = this.serverProcessIdx++;
×
232
        await this.serverActionQueue.run(async () => {
×
233
            try {
×
234
                console.log(serverProcesIdx);
×
235
                this.serverSync.pushActual(data);
×
236
                if (this.serverSync.areInSync) {
×
237
                    this.serverSync.clear();
×
238
                    //send all the server messages, each delayed slightly to simulate the chunked buffer flushing that roku causes
239
                    while (this.peekEntry()?.type === 'server-to-client') {
×
240
                        //make sure enough time has passed since the last entry
241
                        await this.sleepForEntryGap();
×
242
                        const entry = this.advanceEntry();
×
243
                        this.server.write(entry.buffer);
×
244
                        this.clientSync.pushExpected(entry.buffer);
×
245
                    }
246
                }
247
                this.finalizeIfDone();
×
248
            } catch (e) {
249
                console.error('serverProcess failed to handle buffer', e);
×
250
            }
251
            return true;
×
252
        });
253
    }
254

255
    /**
256
     * Sleep for the amount of time between the two specified entries
257
     */
258
    private async sleepForEntryGap() {
259
        const currentEntry = this.entries[this.entryIndex];
×
260
        const previousEntry = this.entries[this.entryIndex - 1];
×
261
        let gap = 0;
×
262
        if (currentEntry && previousEntry) {
×
263
            gap = currentEntry.timestamp.getTime() - previousEntry?.timestamp.getTime();
×
264
            //if the gap is negative, then the time has already passed. Just timeout at zero
265
            gap = gap > 0 ? gap : 0;
×
266
        }
267
        //longer delays make the test run slower, but don't really make the test any more accurate,
268
        //so cap the delay at 100ms
269
        if (gap > 100) {
×
270
            gap = 100;
×
271
        }
272
        await util.sleep(gap);
×
273
    }
274

275
    public destroy() {
276
        for (const dispose of this.disposables) {
×
277
            try {
×
278
                dispose();
×
279
            } catch { }
280
        }
281
    }
282
}
283

284
class BufferSync {
285
    private expected = Buffer.alloc(0);
×
286
    public pushExpected(buffer: Buffer) {
287
        this.expected = Buffer.concat([this.expected, buffer]);
×
288
    }
289

290
    private actual = Buffer.alloc(0);
×
291
    public pushActual(buffer: Buffer) {
292
        this.actual = Buffer.concat([this.actual, buffer]);
×
293
    }
294

295
    /**
296
     * Are the two buffers in sync?
297
     */
298
    public get areInSync() {
299
        return JSON.stringify(this.expected) === JSON.stringify(this.actual);
×
300
    }
301

302
    public clear() {
303
        this.expected = Buffer.alloc(0);
×
304
        this.actual = Buffer.alloc(0);
×
305
    }
306
}
307

308
function bufferStartsWith(subject: Buffer, search: Buffer) {
309
    const subjectData = subject.toJSON().data;
×
310
    const searchData = search.toJSON().data;
×
311
    for (let i = 0; i < searchData.length; i++) {
×
312
        if (subjectData[i] !== searchData[i]) {
×
313
            return false;
×
314
        }
315
    }
316
    //if we made it to the end of the search, then the subject fully starts with search
317
    return true;
×
318
}
319

320
export interface BufferLogEntry {
321
    type: 'client-to-server' | 'server-to-client' | 'io';
322
    timestamp: Date;
323
    buffer: Buffer;
324
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc