• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

panates / postgrejs / 10865207830

14 Sep 2024 08:14PM UTC coverage: 88.93% (+3.2%) from 85.726%
10865207830

push

github

erayhanoglu
chore: Added coveralls support

732 of 1000 branches covered (73.2%)

Branch coverage included in aggregate %.

2642 of 2794 relevant lines covered (94.56%)

224.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.43
/src/protocol/pg-socket.ts
1
import crypto from 'node:crypto';
10✔
2
import net from 'node:net';
10✔
3
import path from 'node:path';
10✔
4
import tls from 'node:tls';
10✔
5
import promisify from 'putil-promisify';
10✔
6
import { ConnectionState } from '../constants.js';
10✔
7
import type { ConnectionConfiguration } from '../interfaces/database-connection-params.js';
8
import { SafeEventEmitter } from '../safe-event-emitter.js';
10✔
9
import type { Callback, Maybe } from '../types.js';
10
import { Backend } from './backend.js';
10✔
11
import { DatabaseError } from './database-error.js';
10✔
12
import { Frontend } from './frontend.js';
10✔
13
import { Protocol } from './protocol.js';
10✔
14
import { SASL } from './sasl.js';
10✔
15

16
const DEFAULT_PORT_NUMBER = 5432;
10✔
17
const COMMAND_RESULT_PATTERN = /^([^\d]+)(?: (\d+)(?: (\d+))?)?$/;
10✔
18

19
export type CaptureCallback = (
20
  code: Protocol.BackendMessageCode,
21
  msg: any,
22
  done: (err: Maybe<Error>, result?: any) => void,
23
) => void | Promise<void>;
24

25
export interface SocketError extends Error {
26
  code: string;
27
}
28

29
export class PgSocket extends SafeEventEmitter {
10✔
30
  private _state = ConnectionState.CLOSED;
31✔
31
  private _socket?: net.Socket;
32
  private _backend = new Backend();
31✔
33
  private _frontend: Frontend;
34
  private _sessionParameters: Record<string, string> = {};
31✔
35
  private _saslSession?: SASL.Session;
36
  private _processID?: number;
37
  private _secretKey?: number;
38

39
  constructor(public options: ConnectionConfiguration) {
31✔
40
    super();
31✔
41
    this._frontend = new Frontend({ buffer: options.buffer });
31✔
42
    this.setMaxListeners(99);
31✔
43
  }
44

45
  get state(): ConnectionState {
46
    if (!this._socket || this._socket.destroyed) this._state = ConnectionState.CLOSED;
1,260✔
47
    return this._state;
1,260✔
48
  }
49

50
  get processID(): Maybe<number> {
51
    return this._processID;
1✔
52
  }
53

54
  get secretKey(): Maybe<number> {
55
    return this._secretKey;
1✔
56
  }
57

58
  get sessionParameters(): Record<string, string> {
59
    return this._sessionParameters;
×
60
  }
61

62
  connect() {
63
    if (this._socket) return;
31!
64
    this._state = ConnectionState.CONNECTING;
31✔
65
    const options = this.options;
31✔
66
    const socket = (this._socket = new net.Socket());
31✔
67

68
    const errorHandler = (err: Error) => {
31✔
69
      this._state = ConnectionState.CLOSED;
×
70
      this._removeListeners();
×
71
      this._reset();
×
72
      socket.destroy();
×
73
      this._socket = undefined;
×
74
      this.emit('error', err);
×
75
    };
76

77
    const connectHandler = () => {
31✔
78
      socket.setTimeout(0);
31✔
79
      if (this.options.keepAlive || this.options.keepAlive == null) socket.setKeepAlive(true);
31✔
80
      socket.write(this._frontend.getSSLRequestMessage());
31✔
81
      socket.once('data', x => {
31✔
82
        this._removeListeners();
31✔
83
        if (x.toString() === 'S') {
31✔
84
          const tslOptions = { ...options.ssl, socket };
30✔
85
          if (options.host && net.isIP(options.host) === 0) tslOptions.servername = options.host;
30✔
86
          const tlsSocket = (this._socket = tls.connect(tslOptions));
30✔
87
          tlsSocket.once('error', errorHandler);
30✔
88
          tlsSocket.once('secureConnect', () => {
30✔
89
            this._removeListeners();
30✔
90
            this._handleConnect();
30✔
91
          });
92
          return;
30✔
93
        }
94
        if (x.toString() === 'N') {
1✔
95
          if (options.requireSSL) {
1!
96
            return errorHandler(new Error('Server does not support SSL connections'));
×
97
          }
98
          this._removeListeners();
1✔
99
          this._handleConnect();
1✔
100
          return;
1✔
101
        }
102
        return errorHandler(new Error('There was an error establishing an SSL connection'));
×
103
      });
104
    };
105

106
    socket.setNoDelay(true);
31✔
107
    socket.setTimeout(options.connectTimeoutMs || 30000, () => errorHandler(new Error('Connection timed out')));
31✔
108
    socket.once('error', errorHandler);
31✔
109
    socket.once('connect', connectHandler);
31✔
110

111
    this.emit('connecting');
31✔
112
    const port = options.port || DEFAULT_PORT_NUMBER;
31✔
113
    if (options.host && options.host.startsWith('/')) {
31✔
114
      socket.connect(path.join(options.host, '/.s.PGSQL.' + port));
1✔
115
    } else socket.connect(options.port || DEFAULT_PORT_NUMBER, options.host || 'localhost');
30!
116
  }
117

118
  close(): void {
119
    if (!this._socket || this._socket.destroyed) {
29!
120
      this._state = ConnectionState.CLOSED;
×
121
      this._socket = undefined;
×
122
      this._reset();
×
123
      return;
×
124
    }
125
    if (this._state === ConnectionState.CLOSING) return;
29!
126
    const socket = this._socket;
29✔
127
    this._state = ConnectionState.CLOSING;
29✔
128
    this._removeListeners();
29✔
129
    socket.once('close', () => this._handleClose());
29✔
130
    socket.destroy();
29✔
131
  }
132

133
  sendParseMessage(args: Frontend.ParseMessageArgs, cb?: Callback): void {
134
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendParseMessage', args });
229!
135

136
    this._send(this._frontend.getParseMessage(args), cb);
229✔
137
  }
138

139
  sendBindMessage(args: Frontend.BindMessageArgs, cb?: Callback): void {
140
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendBindMessage', args });
228!
141

142
    this._send(this._frontend.getBindMessage(args), cb);
228✔
143
  }
144

145
  sendDescribeMessage(args: Frontend.DescribeMessageArgs, cb?: Callback): void {
146
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendDescribeMessage', args });
228!
147

148
    this._send(this._frontend.getDescribeMessage(args), cb);
228✔
149
  }
150

151
  sendExecuteMessage(args: Frontend.ExecuteMessageArgs, cb?: Callback): void {
152
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendDescribeMessage', args });
229!
153

154
    this._send(this._frontend.getExecuteMessage(args), cb);
229✔
155
  }
156

157
  sendCloseMessage(args: Frontend.CloseMessageArgs, cb?: Callback): void {
158
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendCloseMessage', args });
454!
159

160
    this._send(this._frontend.getCloseMessage(args), cb);
454✔
161
  }
162

163
  sendQueryMessage(sql: string, cb?: Callback): void {
164
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendQueryMessage', sql });
91!
165

166
    this._send(this._frontend.getQueryMessage(sql), cb);
91✔
167
  }
168

169
  sendFlushMessage(cb?: Callback): void {
170
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendFlushMessage' });
912!
171

172
    this._send(this._frontend.getFlushMessage(), cb);
912✔
173
  }
174

175
  sendTerminateMessage(cb?: Callback): void {
176
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendTerminateMessage' });
31!
177

178
    this._send(this._frontend.getTerminateMessage(), cb);
31✔
179
  }
180

181
  sendSyncMessage(): void {
182
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendSyncMessage' });
683!
183

184
    this._send(this._frontend.getSyncMessage());
683✔
185
  }
186

187
  capture(callback: CaptureCallback): Promise<any> {
188
    if (this._state === ConnectionState.CLOSING || this._state === ConnectionState.CLOSED) {
1,686!
189
      return Promise.reject(new Error('Connection closed'));
×
190
    }
191
    if (this._state !== ConnectionState.READY) return Promise.reject(new Error('Connection is not ready'));
1,686!
192
    return new Promise((resolve, reject) => {
1,686✔
193
      const done = (err?: Error, result?: any) => {
1,686✔
194
        this.removeListener('close', closeHandler);
1,684✔
195
        this.removeListener('error', errorHandler);
1,684✔
196
        this.removeListener('message', msgHandler);
1,684✔
197
        if (err) reject(err);
1,684!
198
        else resolve(result);
1,684✔
199
      };
200
      const errorHandler = (err: Error) => {
1,686✔
201
        this.removeListener('close', closeHandler);
2✔
202
        this.removeListener('message', msgHandler);
2✔
203
        reject(err);
2✔
204
      };
205
      const closeHandler = () => {
1,686✔
206
        this.removeListener('error', errorHandler);
×
207
        this.removeListener('message', msgHandler);
×
208
        reject(new Error('Connection closed'));
×
209
      };
210
      const msgHandler = (code: Protocol.BackendMessageCode, msg: any) => {
1,686✔
211
        const x = callback(code, msg, done);
2,989✔
212
        if (promisify.isPromise(x)) (x as Promise<void>).catch(err => done(err));
2,989✔
213
      };
214
      this.once('close', closeHandler);
1,686✔
215
      this.once('error', errorHandler);
1,686✔
216
      this.on('message', msgHandler);
1,686✔
217
    });
218
  }
219

220
  protected _removeListeners(): void {
221
    if (!this._socket) return;
91!
222
    this._socket.removeAllListeners('error');
91✔
223
    this._socket.removeAllListeners('connect');
91✔
224
    this._socket.removeAllListeners('data');
91✔
225
    this._socket.removeAllListeners('close');
91✔
226
  }
227

228
  protected _reset(): void {
229
    this._backend.reset();
60✔
230
    this._sessionParameters = {};
60✔
231
    this._processID = undefined;
60✔
232
    this._secretKey = undefined;
60✔
233
    this._saslSession = undefined;
60✔
234
  }
235

236
  protected _handleConnect(): void {
237
    const socket = this._socket;
31✔
238
    if (!socket) return;
31!
239
    this._state = ConnectionState.AUTHORIZING;
31✔
240
    this._reset();
31✔
241
    socket.on('data', (data: Buffer) => this._handleData(data));
1,732✔
242
    socket.on('error', (err: SocketError) => this._handleError(err));
31✔
243
    socket.on('close', () => this._handleClose());
31✔
244
    this._send(
31✔
245
      this._frontend.getStartupMessage({
246
        user: this.options.user || 'postgres',
31!
247
        database: this.options.database || '',
31!
248
      }),
249
    );
250
  }
251

252
  protected _handleClose(): void {
253
    this._reset();
29✔
254
    this._socket = undefined;
29✔
255
    this._state = ConnectionState.CLOSED;
29✔
256
    this.emit('close');
29✔
257
  }
258

259
  protected _handleError(err: unknown): void {
260
    if (this._state !== ConnectionState.READY) {
×
261
      this._socket?.end();
×
262
    }
263
    this.emit('error', err);
×
264
  }
265

266
  protected _handleData(data: Buffer): void {
267
    this._backend.parse(data, (code: Protocol.BackendMessageCode, payload?: any) => {
1,732✔
268
      try {
3,507✔
269
        switch (code) {
3,507✔
270
          case Protocol.BackendMessageCode.Authentication:
271
            this._handleAuthenticationMessage(payload);
35✔
272
            break;
35✔
273
          case Protocol.BackendMessageCode.ErrorResponse:
274
            this.emit('error', new DatabaseError(payload));
2✔
275
            break;
2✔
276
          case Protocol.BackendMessageCode.NoticeResponse:
277
            this.emit('notice', payload);
8✔
278
            break;
8✔
279
          case Protocol.BackendMessageCode.NotificationResponse:
280
            this.emit('notification', payload);
6✔
281
            break;
6✔
282
          case Protocol.BackendMessageCode.ParameterStatus:
283
            this._handleParameterStatus(payload as Protocol.ParameterStatusMessage);
405✔
284
            break;
405✔
285
          case Protocol.BackendMessageCode.BackendKeyData:
286
            this._handleBackendKeyData(payload as Protocol.BackendKeyDataMessage);
31✔
287
            break;
31✔
288
          case Protocol.BackendMessageCode.ReadyForQuery:
289
            if (this._state !== ConnectionState.READY) {
805✔
290
              this._state = ConnectionState.READY;
31✔
291
              this.emit('ready');
31✔
292
            } else this.emit('message', code, payload);
774✔
293
            break;
805✔
294
          case Protocol.BackendMessageCode.CommandComplete: {
295
            const msg = this._handleCommandComplete(payload);
321✔
296
            this.emit('message', code, msg);
321✔
297
            break;
321✔
298
          }
299
          default:
300
            this.emit('message', code, payload);
1,894✔
301
        }
302
      } catch (e) {
303
        this._handleError(e);
×
304
      }
305
    });
306
  }
307

308
  protected _resolvePassword(cb: (password: string) => void): void {
309
    (async (): Promise<void> => {
2✔
310
      const pass = typeof this.options.password === 'function' ? await this.options.password() : this.options.password;
2!
311
      cb(pass || '');
2!
312
    })().catch(err => this._handleError(err));
×
313
  }
314

315
  protected _handleAuthenticationMessage(msg?: any): void {
316
    if (!msg) {
35✔
317
      this.emit('authenticate');
31✔
318
      return;
31✔
319
    }
320

321
    switch (msg.kind) {
4!
322
      case Protocol.AuthenticationMessageKind.CleartextPassword:
323
        this._resolvePassword(password => {
×
324
          this._send(this._frontend.getPasswordMessage(password));
×
325
        });
326
        break;
×
327
      case Protocol.AuthenticationMessageKind.MD5Password:
328
        this._resolvePassword(password => {
1✔
329
          const md5 = (x: any) => crypto.createHash('md5').update(x, 'utf8').digest('hex');
2✔
330
          const l = md5(password + this.options.user);
1✔
331
          const r = md5(Buffer.concat([Buffer.from(l), msg.salt]));
1✔
332
          const pass = 'md5' + r;
1✔
333
          this._send(this._frontend.getPasswordMessage(pass));
1✔
334
        });
335
        break;
1✔
336
      case Protocol.AuthenticationMessageKind.SASL: {
337
        if (!msg.mechanisms.includes('SCRAM-SHA-256')) {
1!
338
          throw new Error('SASL: Only mechanism SCRAM-SHA-256 is currently supported');
×
339
        }
340
        const saslSession = (this._saslSession = SASL.createSession(this.options.user || '', 'SCRAM-SHA-256'));
1!
341
        this._send(this._frontend.getSASLMessage(saslSession));
1✔
342
        break;
1✔
343
      }
344
      case Protocol.AuthenticationMessageKind.SASLContinue: {
345
        const saslSession = this._saslSession;
1✔
346
        if (!saslSession) throw new Error('SASL: Session not started yet');
1!
347
        this._resolvePassword(password => {
1✔
348
          SASL.continueSession(saslSession, password, msg.data);
1✔
349
          const buf = this._frontend.getSASLFinalMessage(saslSession);
1✔
350
          this._send(buf);
1✔
351
        });
352
        break;
1✔
353
      }
354
      case Protocol.AuthenticationMessageKind.SASLFinal: {
355
        const session = this._saslSession;
1✔
356
        if (!session) throw new Error('SASL: Session not started yet');
1!
357
        SASL.finalizeSession(session, msg.data);
1✔
358
        this._saslSession = undefined;
1✔
359
        break;
1✔
360
      }
361
      default:
362
        break;
×
363
    }
364
  }
365

366
  protected _handleParameterStatus(msg: Protocol.ParameterStatusMessage): void {
367
    this._sessionParameters[msg.name] = msg.value;
405✔
368
  }
369

370
  protected _handleBackendKeyData(msg: Protocol.BackendKeyDataMessage): void {
371
    this._processID = msg.processID;
31✔
372
    this._secretKey = msg.secretKey;
31✔
373
  }
374

375
  protected _handleCommandComplete(msg: any): Protocol.CommandCompleteMessage {
376
    const m = msg.command && msg.command.match(COMMAND_RESULT_PATTERN);
321✔
377
    const result: Protocol.CommandCompleteMessage = {
321✔
378
      command: m[1],
379
    };
380
    if (m[3] != null) {
321✔
381
      result.oid = parseInt(m[2], 10);
5✔
382
      result.rowCount = parseInt(m[3], 10);
5✔
383
    } else if (m[2]) result.rowCount = parseInt(m[2], 10);
316✔
384
    return result;
321✔
385
  }
386

387
  protected _send(data: Buffer, cb?: Callback): void {
388
    if (this._socket && this._socket.writable) {
3,117✔
389
      this._socket.write(data, cb);
3,117✔
390
    }
391
  }
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc