• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

panates / postgresql-client / 50d5a8f1-a1ba-4c39-947d-fb4f2c7685e1

16 Apr 2024 07:12AM UTC coverage: 85.802% (+0.3%) from 85.476%
50d5a8f1-a1ba-4c39-947d-fb4f2c7685e1

push

circleci

erayhanoglu
Added prettier formatting

696 of 980 branches covered (71.02%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

52 existing lines in 10 files now uncovered.

2513 of 2760 relevant lines covered (91.05%)

799.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.44
/src/protocol/pg-socket.ts
1
import crypto from 'crypto';
30✔
2
import net from 'net';
30✔
3
import promisify from 'putil-promisify';
30✔
4
import tls from 'tls';
30✔
5
import { ConnectionState } from '../constants.js';
30✔
6
import { ConnectionConfiguration } from '../interfaces/database-connection-params.js';
7
import { SafeEventEmitter } from '../safe-event-emitter.js';
30✔
8
import { Callback, Maybe } from '../types.js';
9
import { Backend } from './backend.js';
30✔
10
import { DatabaseError } from './database-error.js';
30✔
11
import { Frontend } from './frontend.js';
30✔
12
import { Protocol } from './protocol.js';
30✔
13
import { SASL } from './sasl.js';
30✔
14

15
const DEFAULT_PORT_NUMBER = 5432;
30✔
16
const COMMAND_RESULT_PATTERN = /^([^\d]+)(?: (\d+)(?: (\d+))?)?$/;
30✔
17

18
export type CaptureCallback = (
19
  code: Protocol.BackendMessageCode,
20
  msg: any,
21
  done: (err: Maybe<Error>, result?: any) => void,
22
) => void | Promise<void>;
23

24
export interface SocketError extends Error {
25
  code: string;
26
}
27

28
export class PgSocket extends SafeEventEmitter {
30✔
29
  private _state = ConnectionState.CLOSED;
81✔
30
  private _socket?: net.Socket;
31
  private _backend = new Backend();
81✔
32
  private _frontend: Frontend;
33
  private _sessionParameters: Record<string, string> = {};
81✔
34
  private _saslSession?: SASL.Session;
35
  private _processID?: number;
36
  private _secretKey?: number;
37

38
  constructor(public options: ConnectionConfiguration) {
81✔
39
    super();
81✔
40
    this._frontend = new Frontend({ buffer: options.buffer });
81✔
41
    this.setMaxListeners(99);
81✔
42
  }
43

44
  get state(): ConnectionState {
45
    if (!this._socket || this._socket.destroyed) this._state = ConnectionState.CLOSED;
3,765✔
46
    return this._state;
3,765✔
47
  }
48

49
  get processID(): Maybe<number> {
50
    return this._processID;
3✔
51
  }
52

53
  get secretKey(): Maybe<number> {
54
    return this._secretKey;
3✔
55
  }
56

57
  get sessionParameters(): Record<string, string> {
58
    return this._sessionParameters;
×
59
  }
60

61
  connect() {
62
    if (this._socket) return;
81!
63
    this._state = ConnectionState.CONNECTING;
81✔
64
    const options = this.options;
81✔
65
    const socket = (this._socket = new net.Socket());
81✔
66

67
    const errorHandler = err => {
81✔
68
      this._state = ConnectionState.CLOSED;
×
69
      this._removeListeners();
×
70
      this._reset();
×
71
      socket.destroy();
×
72
      this._socket = undefined;
×
73
      this.emit('error', err);
×
74
    };
75

76
    const connectHandler = () => {
81✔
77
      socket.setTimeout(0);
81✔
78
      if (this.options.keepAlive || this.options.keepAlive == null) socket.setKeepAlive(true);
81✔
79
      if (options.ssl) {
81!
80
        socket.write(this._frontend.getSSLRequestMessage());
×
81
        socket.once('data', x => {
×
82
          this._removeListeners();
×
83
          if (x.toString() === 'S') {
×
84
            const tslOptions = { ...options.ssl, socket };
×
85
            if (options.host && net.isIP(options.host) === 0) tslOptions.servername = options.host;
×
86
            const tlsSocket = (this._socket = tls.connect(tslOptions));
×
87
            tlsSocket.once('error', errorHandler);
×
88
            tlsSocket.once('secureConnect', () => {
×
89
              this._removeListeners();
×
90
              this._handleConnect();
×
91
            });
92
            return;
×
93
          }
94
          if (x.toString() === 'N') return errorHandler(new Error('Server does not support SSL connections'));
×
95
          return errorHandler(new Error('There was an error establishing an SSL connection'));
×
96
        });
97
      } else {
98
        this._handleConnect();
81✔
99
      }
100
    };
101

102
    socket.setNoDelay(true);
81✔
103
    socket.setTimeout(options.connectTimeoutMs || 30000, () => errorHandler(new Error('Connection timed out')));
81✔
104
    socket.once('error', errorHandler);
81✔
105
    socket.once('connect', connectHandler);
81✔
106

107
    this.emit('connecting');
81✔
108
    if (options.host && options.host.startsWith('/')) socket.connect(options.host);
81!
109
    else socket.connect(options.port || DEFAULT_PORT_NUMBER, options.host || 'localhost');
81!
110
  }
111

112
  close(): void {
113
    if (!this._socket || this._socket.destroyed) {
81!
114
      this._state = ConnectionState.CLOSED;
×
115
      this._socket = undefined;
×
116
      this._reset();
×
117
      return;
×
118
    }
119
    if (this._state === ConnectionState.CLOSING) return;
81!
120
    const socket = this._socket;
81✔
121
    this._state = ConnectionState.CLOSING;
81✔
122
    this._removeListeners();
81✔
123
    socket.once('close', () => this._handleClose());
81✔
124
    socket.destroy();
81✔
125
  }
126

127
  sendParseMessage(args: Frontend.ParseMessageArgs, cb?: Callback): void {
128
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendParseMessage', args });
684!
129

130
    this._send(this._frontend.getParseMessage(args), cb);
684✔
131
  }
132

133
  sendBindMessage(args: Frontend.BindMessageArgs, cb?: Callback): void {
134
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendBindMessage', args });
681!
135

136
    this._send(this._frontend.getBindMessage(args), cb);
681✔
137
  }
138

139
  sendDescribeMessage(args: Frontend.DescribeMessageArgs, cb?: Callback): void {
140
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendDescribeMessage', args });
681!
141

142
    this._send(this._frontend.getDescribeMessage(args), cb);
681✔
143
  }
144

145
  sendExecuteMessage(args: Frontend.ExecuteMessageArgs, cb?: Callback): void {
146
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendDescribeMessage', args });
684!
147

148
    this._send(this._frontend.getExecuteMessage(args), cb);
684✔
149
  }
150

151
  sendCloseMessage(args: Frontend.CloseMessageArgs, cb?: Callback): void {
152
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendCloseMessage', args });
1,356!
153

154
    this._send(this._frontend.getCloseMessage(args), cb);
1,356✔
155
  }
156

157
  sendQueryMessage(sql: string, cb?: Callback): void {
158
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendQueryMessage', sql });
309!
159

160
    this._send(this._frontend.getQueryMessage(sql), cb);
309✔
161
  }
162

163
  sendFlushMessage(cb?: Callback): void {
164
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendFlushMessage' });
2,724!
165

166
    this._send(this._frontend.getFlushMessage(), cb);
2,724✔
167
  }
168

169
  sendTerminateMessage(cb?: Callback): void {
170
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendTerminateMessage' });
81!
171

172
    this._send(this._frontend.getTerminateMessage(), cb);
81✔
173
  }
174

175
  sendSyncMessage(): void {
176
    if (this.listenerCount('debug')) this.emit('debug', { location: 'PgSocket.sendSyncMessage' });
2,040!
177

178
    this._send(this._frontend.getSyncMessage());
2,040✔
179
  }
180

181
  capture(callback: CaptureCallback): Promise<any> {
182
    return new Promise((resolve, reject) => {
5,073✔
183
      const done = (err?: Error, result?: any) => {
5,073✔
184
        this.removeListener('error', errorHandler);
5,067✔
185
        this.removeListener('message', msgHandler);
5,067✔
186
        if (err) reject(err);
5,067!
187
        else resolve(result);
5,067✔
188
      };
189
      const errorHandler = (err: Error) => {
5,073✔
190
        this.removeListener('message', msgHandler);
6✔
191
        reject(err);
6✔
192
      };
193
      const msgHandler = (code: Protocol.BackendMessageCode, msg: any) => {
5,073✔
194
        const x = callback(code, msg, done);
10,038✔
195
        if (promisify.isPromise(x)) (x as Promise<void>).catch(err => done(err));
10,038✔
196
      };
197
      this.once('error', errorHandler);
5,073✔
198
      this.on('message', msgHandler);
5,073✔
199
    });
200
  }
201

202
  protected _removeListeners(): void {
203
    if (!this._socket) return;
81!
204
    this._socket.removeAllListeners('error');
81✔
205
    this._socket.removeAllListeners('connect');
81✔
206
    this._socket.removeAllListeners('data');
81✔
207
    this._socket.removeAllListeners('close');
81✔
208
  }
209

210
  protected _reset(): void {
211
    this._backend.reset();
156✔
212
    this._sessionParameters = {};
156✔
213
    this._processID = undefined;
156✔
214
    this._secretKey = undefined;
156✔
215
    this._saslSession = undefined;
156✔
216
  }
217

218
  protected _handleConnect(): void {
219
    const socket = this._socket;
81✔
220
    if (!socket) return;
81!
221
    this._state = ConnectionState.AUTHORIZING;
81✔
222
    this._reset();
81✔
223
    socket.on('data', (data: Buffer) => this._handleData(data));
5,185✔
224
    socket.on('error', (err: SocketError) => this._handleError(err));
81✔
225
    socket.on('close', () => this._handleClose());
81✔
226
    this._send(
81✔
227
      this._frontend.getStartupMessage({
228
        user: this.options.user || 'postgres',
81!
229
        database: this.options.database || '',
81!
230
      }),
231
    );
232
  }
233

234
  protected _handleClose(): void {
235
    this._reset();
75✔
236
    this._socket = undefined;
75✔
237
    this._state = ConnectionState.CLOSED;
75✔
238
    this.emit('close');
75✔
239
  }
240

241
  protected _handleError(err: unknown): void {
UNCOV
242
    if (this._state !== ConnectionState.READY) {
×
UNCOV
243
      this._socket?.end();
×
244
    }
UNCOV
245
    this.emit('error', err);
×
246
  }
247

248
  protected _handleData(data: Buffer): void {
249
    this._backend.parse(data, (code: Protocol.BackendMessageCode, payload?: any) => {
5,185✔
250
      try {
11,406✔
251
        switch (code) {
11,406✔
252
          case Protocol.BackendMessageCode.Authentication:
253
            this._handleAuthenticationMessage(payload);
81✔
254
            break;
81✔
255
          case Protocol.BackendMessageCode.ErrorResponse:
256
            this.emit('error', new DatabaseError(payload));
6✔
257
            break;
6✔
258
          case Protocol.BackendMessageCode.NoticeResponse:
259
            this.emit('notice', payload);
36✔
260
            break;
36✔
261
          case Protocol.BackendMessageCode.NotificationResponse:
262
            this.emit('notification', payload);
18✔
263
            break;
18✔
264
          case Protocol.BackendMessageCode.ParameterStatus:
265
            this._handleParameterStatus(payload as Protocol.ParameterStatusMessage);
1,059✔
266
            break;
1,059✔
267
          case Protocol.BackendMessageCode.BackendKeyData:
268
            this._handleBackendKeyData(payload as Protocol.BackendKeyDataMessage);
81✔
269
            break;
81✔
270
          case Protocol.BackendMessageCode.ReadyForQuery:
271
            if (this._state !== ConnectionState.READY) {
2,430✔
272
              this._state = ConnectionState.READY;
81✔
273
              this.emit('ready');
81✔
274
            } else this.emit('message', code, payload);
2,349✔
275
            break;
2,430✔
276
          case Protocol.BackendMessageCode.CommandComplete: {
277
            const msg = this._handleCommandComplete(payload);
2,331✔
278
            this.emit('message', code, msg);
2,331✔
279
            break;
2,331✔
280
          }
281
          default:
282
            this.emit('message', code, payload);
5,364✔
283
        }
284
      } catch (e) {
UNCOV
285
        this._handleError(e);
×
286
      }
287
    });
288
  }
289

290
  protected _resolvePassword(cb: (password: string) => void): void {
UNCOV
291
    (async (): Promise<void> => {
×
UNCOV
292
      const pass = typeof this.options.password === 'function' ? await this.options.password() : this.options.password;
×
UNCOV
293
      cb(pass || '');
×
294
    })().catch(err => this._handleError(err));
×
295
  }
296

297
  protected _handleAuthenticationMessage(msg?: any): void {
298
    if (!msg) {
81✔
299
      this.emit('authenticate');
81✔
300
      return;
81✔
301
    }
302

303
    switch (msg.kind) {
×
304
      case Protocol.AuthenticationMessageKind.CleartextPassword:
UNCOV
305
        this._resolvePassword(password => {
×
UNCOV
306
          this._send(this._frontend.getPasswordMessage(password));
×
307
        });
UNCOV
308
        break;
×
309
      case Protocol.AuthenticationMessageKind.MD5Password:
UNCOV
310
        this._resolvePassword(password => {
×
UNCOV
311
          const md5 = (x: any) => crypto.createHash('md5').update(x, 'utf8').digest('hex');
×
312
          const l = md5(password + this.options.user);
×
UNCOV
313
          const r = md5(Buffer.concat([Buffer.from(l), msg.salt]));
×
314
          const pass = 'md5' + r;
×
315
          this._send(this._frontend.getPasswordMessage(pass));
×
316
        });
317
        break;
×
318
      case Protocol.AuthenticationMessageKind.SASL: {
319
        if (!msg.mechanisms.includes('SCRAM-SHA-256'))
×
320
          throw new Error('SASL: Only mechanism SCRAM-SHA-256 is currently supported');
×
321
        const saslSession = (this._saslSession = SASL.createSession(this.options.user || '', 'SCRAM-SHA-256'));
×
322
        this._send(this._frontend.getSASLMessage(saslSession));
×
323
        break;
×
324
      }
325
      case Protocol.AuthenticationMessageKind.SASLContinue: {
326
        const saslSession = this._saslSession;
×
UNCOV
327
        if (!saslSession) throw new Error('SASL: Session not started yet');
×
328
        this._resolvePassword(password => {
×
329
          SASL.continueSession(saslSession, password, msg.data);
×
330
          const buf = this._frontend.getSASLFinalMessage(saslSession);
×
331
          this._send(buf);
×
332
        });
UNCOV
333
        break;
×
334
      }
335
      case Protocol.AuthenticationMessageKind.SASLFinal: {
336
        const session = this._saslSession;
×
337
        if (!session) throw new Error('SASL: Session not started yet');
×
338
        SASL.finalizeSession(session, msg.data);
×
339
        this._saslSession = undefined;
×
340
        break;
×
341
      }
342
    }
343
  }
344

345
  protected _handleParameterStatus(msg: Protocol.ParameterStatusMessage): void {
346
    this._sessionParameters[msg.name] = msg.value;
1,059✔
347
  }
348

349
  protected _handleBackendKeyData(msg: Protocol.BackendKeyDataMessage): void {
350
    this._processID = msg.processID;
81✔
351
    this._secretKey = msg.secretKey;
81✔
352
  }
353

354
  protected _handleCommandComplete(msg: any): Protocol.CommandCompleteMessage {
355
    const m = msg.command && msg.command.match(COMMAND_RESULT_PATTERN);
2,331✔
356
    const result: Protocol.CommandCompleteMessage = {
2,331✔
357
      command: m[1],
358
    };
359
    if (m[3] != null) {
2,331✔
360
      result.oid = parseInt(m[2], 10);
1,311✔
361
      result.rowCount = parseInt(m[3], 10);
1,311✔
362
    } else if (m[2]) result.rowCount = parseInt(m[2], 10);
1,020✔
363
    return result;
2,331✔
364
  }
365

366
  protected _send(data: Buffer, cb?: Callback): void {
367
    if (this._socket && this._socket.writable) {
9,315✔
368
      this._socket.write(data, cb);
9,315✔
369
    }
370
  }
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc