• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

denizzzka / dpq2 / #921

18 Jun 2026 02:10PM UTC coverage: 85.754% (-0.2%) from 85.959%
#921

push

coveralls-ruby

denizzzka
Merge remote-tracking branch 'origin/master'

1553 of 1811 relevant lines covered (85.75%)

31.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.94
/src/dpq2/connection.d
1
/**
2
 * Represents connection to the PostgreSQL server
3
 *
4
 * Most functions is correspond to those in the documentation of Postgres:
5
 * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html)
6
 */
7
module dpq2.connection;
8

9
import dpq2.query;
10
import dpq2.args: QueryParams;
11
import dpq2.cancellation;
12
import dpq2.result;
13
import dpq2.exception;
14

15
import derelict.pq.pq;
16
import std.conv: to;
17
import std.string: toStringz, fromStringz;
18
import std.exception: enforce;
19
import std.range;
20
import std.stdio: File;
21
import std.socket;
22
import core.exception;
23
import core.time: Duration;
24

25
/*
26
 * Bugs: On Unix connection is not thread safe.
27
 *
28
 * On Unix, forking a process with open libpq connections can lead
29
 * to unpredictable results because the parent and child processes share
30
 * the same sockets and operating system resources. For this reason,
31
 * such usage is not recommended, though doing an exec from the child
32
 * process to load a new executable is safe.
33

34

35

36
int PQisthreadsafe();
37
Returns 1 if the libpq is thread-safe and 0 if it is not.
38
*/
39

40
private mixin template ConnectionCtors()
41
{
42

43
    /// Makes a new connection to the database server
44
    this(string connString)
16✔
45
    {
46
        conn = PQconnectdb(toStringz(connString));
8✔
47
        version(Dpq2_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
8✔
48
        checkCreatedConnection();
8✔
49
    }
50

51
    /// ditto
52
    this(in string[string] keyValueParams)
×
53
    {
54
        auto a = keyValueParams.keyValToPQparamsArrays;
1✔
55

56
        conn = PQconnectdbParams(&a.keys[0], &a.vals[0], 0);
1✔
57
        version(Dpq2_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
1✔
58
        checkCreatedConnection();
1✔
59
    }
60

61
        /// Starts creation of a connection to the database server in a nonblocking manner
62
    this(ConnectionStart, string connString)
×
63
    {
64
        conn = PQconnectStart(toStringz(connString));
1✔
65
        version(Dpq2_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
1✔
66
        checkCreatedConnection();
1✔
67
    }
68

69
        /// ditto
70
    this(ConnectionStart, in string[string] keyValueParams)
×
71
    {
72
        auto a = keyValueParams.keyValToPQparamsArrays;
1✔
73

74
        conn = PQconnectStartParams(&a.keys[0], &a.vals[0], 0);
1✔
75
        version(Dpq2_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
1✔
76
        checkCreatedConnection();
1✔
77
    }
78
}
79

80
/// dumb flag for Connection ctor parametrization
81
struct ConnectionStart {};
82

83
/// Connection
84
class Connection
85
{
86
    package PGconn* conn;
87

88
    invariant
89
    {
90
        assert(conn !is null);
677✔
91
    }
92

93
    version(Dpq2_Static)
94
        mixin ConnectionCtors;
95
    else
96
    {
97
        import dpq2.dynloader: ReferenceCounter;
98

99
        private immutable ReferenceCounter dynLoaderRefCnt;
100

101
        package mixin ConnectionCtors;
102
    }
103

104
    private void checkCreatedConnection()
105
    {
106
        enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
11✔
107

108
        if( status == CONNECTION_BAD )
11✔
109
            throw new ConnectionException(this, __FILE__, __LINE__);
3✔
110
    }
111

112
    ~this()
113
    {
114
        PQfinish( conn );
1✔
115

116
        version(Dpq2_Dynamic) dynLoaderRefCnt.__custom_dtor();
1✔
117
    }
118

119
    mixin Queries;
120

121
    /// Returns the blocking status of the database connection
122
    bool isNonBlocking()
123
    {
124
        return PQisnonblocking(conn) == 1;
4✔
125
    }
126

127
    /// Sets the nonblocking status of the connection
128
    private void setNonBlocking(bool state)
129
    {
130
        if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
×
131
            throw new ConnectionException(this, __FILE__, __LINE__);
×
132
    }
133

134
    /// Begin reset the communication channel to the server, in a nonblocking manner
135
    ///
136
    /// Useful only for non-blocking operations.
137
    void resetStart()
138
    {
139
        if(PQresetStart(conn) == 0)
×
140
            throw new ConnectionException(this, __FILE__, __LINE__);
×
141
    }
142

143
    /// Useful only for non-blocking operations.
144
    PostgresPollingStatusType poll() nothrow
145
    {
146
        assert(conn);
3✔
147

148
        return PQconnectPoll(conn);
6✔
149
    }
150

151
    /// Useful only for non-blocking operations.
152
    PostgresPollingStatusType resetPoll() nothrow
153
    {
154
        assert(conn);
×
155

156
        return PQresetPoll(conn);
×
157
    }
158

159
    /// Returns the status of the connection
160
    ConnStatusType status() nothrow
161
    {
162
        return PQstatus(conn);
28✔
163
    }
164

165
    /**
166
        Returns the current in-transaction status of the server.
167
        The status can be:
168
            * PQTRANS_IDLE    - currently idle
169
            * PQTRANS_ACTIVE  - a command is in progress (reported only when a query has been sent to the server and not yet completed)
170
            * PQTRANS_INTRANS - idle, in a valid transaction block
171
            * PQTRANS_INERROR - idle, in a failed transaction block
172
            * PQTRANS_UNKNOWN - reported if the connection is bad
173
     */
174
    PGTransactionStatusType transactionStatus() nothrow
175
    {
176
        return PQtransactionStatus(conn);
8✔
177
    }
178

179
    /// If input is available from the server, consume it
180
    ///
181
    /// Useful only for non-blocking operations.
182
    void consumeInput()
183
    {
184
        assert(conn);
4✔
185

186
        const size_t r = PQconsumeInput( conn );
4✔
187
        if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
4✔
188
    }
189

190
    /// Attempts to flush any queued output data to the server.
191
    ///
192
    /// Returns: true if successful (or if the send queue is empty), or 1
193
    /// if it was unable to send all the data in the send queue yet (this
194
    /// case can only occur if the connection is nonblocking).
195
    bool flush()
196
    {
197
        assert(conn);
×
198

199
        auto r = PQflush(conn);
×
200
        if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
×
201
        return r == 0;
×
202
    }
203

204
    /// Obtains the file descriptor number of the connection socket to the server
205
    int posixSocket()
206
    {
207
        int r = PQsocket(conn);
4✔
208

209
        if(r == -1)
4✔
210
            throw new ConnectionException(this, __FILE__, __LINE__);
×
211

212
        return r;
8✔
213
    }
214

215
    /// Obtains std.socket.Socket of the connection to the server
216
    SocketNotBorrowed socket()
217
    {
218
        return new SocketNotBorrowed(cast(socket_t) posixSocket, AddressFamily.UNSPEC);
8✔
219
    }
220

221
    /// Returns the error message most recently generated by an operation on the connection
222
    string errorMessage() const nothrow
223
    {
224
        return PQerrorMessage(conn).to!string;
6✔
225
    }
226

227
    /**
228
     * Sets or examines the current notice processor
229
     *
230
     * Returns the previous notice receiver or processor function pointer, and sets the new value.
231
     * If you supply a null function pointer, no action is taken, but the current pointer is returned.
232
     */
233
    PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
234
    {
235
        assert(conn);
×
236

237
        return PQsetNoticeProcessor(conn, proc, arg);
×
238
    }
239

240
    /// Get next result after sending a non-blocking commands. Can return null.
241
    ///
242
    /// Useful only for non-blocking operations.
243
    immutable(Result) getResult()
244
    {
245
        // is guaranteed by libpq that the result will not be changed until it will not be destroyed
246
        auto r = cast(immutable) PQgetResult(conn);
14✔
247

248
        if(r)
14✔
249
        {
250
            auto container = new immutable ResultContainer(r);
7✔
251
            return new immutable Result(container);
14✔
252
        }
253

254
        return null;
14✔
255
    }
256

257
    /// Get result after PQexec* functions or throw exception if pull is empty
258
    package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
259
    {
260
        if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
262✔
261

262
        return new immutable ResultContainer(r);
262✔
263
    }
264

265
    /// Select single-row mode for the currently-executing query
266
    bool setSingleRowMode()
267
    {
268
        return PQsetSingleRowMode(conn) == 1;
×
269
    }
270

271
    /// Causes a connection to enter pipeline mode if it is currently idle or already in pipeline mode.
272
    void enterPipelineMode()
273
    {
274
        if(PQenterPipelineMode(conn) == 0)
×
275
            throw new ConnectionException(this);
×
276
    }
277

278
    /// Causes a connection to exit pipeline mode if it is currently in pipeline mode with an empty queue and no pending results.
279
    void exitPipelineMode()
280
    {
281
        if(PQexitPipelineMode(conn) == 0)
×
282
            throw new ConnectionException(this);
×
283
    }
284

285
    /// Sends a request for the server to flush its output buffer.
286
    void sendFlushRequest()
287
    {
288
        if(PQsendFlushRequest(conn) == 0)
×
289
            throw new ConnectionException(this);
×
290
    }
291

292
    /// Marks a synchronization point in a pipeline by sending a sync message and flushing the send buffer.
293
    void pipelineSync()
294
    {
295
        if(PQpipelineSync(conn) != 1)
×
296
            throw new ConnectionException(this);
×
297
    }
298

299
    ///
300
    PGpipelineStatus pipelineStatus()
301
    {
302
        return PQpipelineStatus(conn);
×
303
    }
304

305
    /**
306
     Try to cancel query in a blocking manner
307

308
     If the cancellation is effective, the current command will
309
     terminate early and return an error result or exception. If the
310
     cancellation will fails (say, because the server was already done
311
     processing the command) there will be no visible result at all.
312
    */
313
    void cancel()
314
    {
315
        auto c = new Cancellation(this);
×
316
        c.doCancelBlocking;
×
317
    }
318

319
    ///
320
    bool isBusy() nothrow
321
    {
322
        assert(conn);
×
323

324
        return PQisBusy(conn) == 1;
×
325
    }
326

327
    ///
328
    string parameterStatus(string paramName)
329
    {
330
        assert(conn);
×
331

332
        auto res = PQparameterStatus(conn, toStringz(paramName));
×
333

334
        if(res is null)
×
335
            throw new ConnectionException(this, __FILE__, __LINE__);
×
336

337
        return to!string(fromStringz(res));
×
338
    }
339

340
    ///
341
    string escapeLiteral(string msg)
342
    {
343
        assert(conn);
1✔
344

345
        auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
1✔
346

347
        if(buf is null)
1✔
348
            throw new ConnectionException(this, __FILE__, __LINE__);
×
349

350
        string res = buf.fromStringz.to!string;
1✔
351

352
        PQfreemem(buf);
1✔
353

354
        return res;
2✔
355
    }
356

357
    ///
358
    string escapeIdentifier(string msg)
359
    {
360
        assert(conn);
2✔
361

362
        auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
2✔
363

364
        if(buf is null)
2✔
365
            throw new ConnectionException(this, __FILE__, __LINE__);
×
366

367
        string res = buf.fromStringz.to!string;
2✔
368

369
        PQfreemem(buf);
2✔
370

371
        return res;
4✔
372
    }
373

374
    ///
375
    string dbName() const nothrow
376
    {
377
        assert(conn);
1✔
378

379
        return PQdb(conn).fromStringz.to!string;
2✔
380
    }
381

382
    ///
383
    string host() const nothrow
384
    {
385
        assert(conn);
×
386

387
        return PQhost(conn).fromStringz.to!string;
×
388
    }
389

390
    ///
391
    int protocolVersion() const nothrow
392
    {
393
        assert(conn);
1✔
394

395
        return PQprotocolVersion(conn);
2✔
396
    }
397

398
    ///
399
    int serverVersion() const nothrow
400
    {
401
        assert(conn);
1✔
402

403
        return PQserverVersion(conn);
2✔
404
    }
405

406
    ///
407
    void trace(ref File stream)
408
    {
409
        PQtrace(conn, stream.getFP);
×
410
    }
411

412
    ///
413
    void untrace()
414
    {
415
        PQuntrace(conn);
×
416
    }
417

418
    ///
419
    void setClientEncoding(string encoding)
420
    {
421
        if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
1✔
422
            throw new ConnectionException(this, __FILE__, __LINE__);
×
423
    }
424
}
425

426
/**
427
Same as std.socket.Socket, but socket acquired from the outside doesn't
428
close in the destructor. This allows to use a system socket borrowed
429
from somewhere and simply leave, so that others can continue working
430
with it.
431
*/
432
class SocketNotBorrowed : Socket
433
{
434
    /// Use an existing socket handle.
435
    this(socket_t sock, AddressFamily af) pure nothrow @nogc
4✔
436
    {
437
        super(sock, af);
4✔
438
    }
439

440
    ~this()
441
    {
442
        release();
×
443
    }
444
}
445

446
private auto keyValToPQparamsArrays(in string[string] keyValueParams)
447
{
448
    static struct PQparamsArrays
449
    {
450
        immutable(char)*[] keys;
451
        immutable(char)*[] vals;
452
    }
453

454
    PQparamsArrays a;
2✔
455
    a.keys.length = keyValueParams.length + 1;
2✔
456
    a.vals.length = keyValueParams.length + 1;
2✔
457

458
    size_t i;
2✔
459
    foreach(e; keyValueParams.byKeyValue)
16✔
460
    {
461
        a.keys[i] = e.key.toStringz;
4✔
462
        a.vals[i] = e.value.toStringz;
4✔
463

464
        i++;
4✔
465
    }
466

467
    assert(i == keyValueParams.length);
2✔
468

469
    return a;
2✔
470
}
471

472
/// Check connection options in the provided connection string
473
///
474
/// Throws exception if connection string isn't passes check.
475
version(Dpq2_Static)
476
void connStringCheck(string connString)
477
{
478
    _connStringCheck(connString);
479
}
480

481
/// ditto
482
package void _connStringCheck(string connString)
483
{
484
    char* errmsg = null;
2✔
485
    PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
2✔
486

487
    if(r is null)
2✔
488
    {
489
        enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
1✔
490
    }
491
    else
492
    {
493
        PQconninfoFree(r);
1✔
494
    }
495

496
    if(errmsg !is null)
2✔
497
    {
498
        string s = errmsg.fromStringz.to!string;
1✔
499
        PQfreemem(cast(void*) errmsg);
1✔
500

501
        throw new ConnectionException(s, __FILE__, __LINE__);
1✔
502
    }
503
}
504

505
/// Connection exception
506
class ConnectionException : Dpq2Exception
507
{
508
    this(in Connection c, string file = __FILE__, size_t line = __LINE__)
3✔
509
    {
510
        super(c.errorMessage(), file, line);
3✔
511
    }
512

513
    this(string msg, string file = __FILE__, size_t line = __LINE__)
1✔
514
    {
515
        super(msg, file, line);
1✔
516
    }
517
}
518

519
version (integration_tests)
520
Connection createTestConn(T...)(T params)
521
{
522
    version(Dpq2_Static)
523
        auto c = new Connection(params);
524
    else
525
    {
526
        import dpq2.dynloader: connFactory;
527

528
        Connection c = connFactory.createConnection(params);
11✔
529
    }
530

531
    return c;
8✔
532
}
533

534
version (integration_tests)
535
void _integration_test( string connParam )
536
{
537
    {
538
        debug import std.experimental.logger;
539

540
        auto c = createTestConn(connParam);
1✔
541

542
        assert( PQlibVersion() >= 9_0100 );
1✔
543

544
        auto dbname = c.dbName();
1✔
545
        auto pver = c.protocolVersion();
1✔
546
        auto sver = c.serverVersion();
1✔
547

548
        debug
549
        {
550
            trace("DB name: ", dbname);
551
            trace("Protocol version: ", pver);
552
            trace("Server version: ", sver);
553
        }
554

555
        destroy(c);
1✔
556
    }
557

558
    {
559
        version(Dpq2_Dynamic)
560
        {
561
            void csc(string s)
562
            {
563
                import dpq2.dynloader: connFactory;
564

565
                connFactory.connStringCheck(s);
2✔
566
            }
567
        }
568
        else
569
            void csc(string s){ connStringCheck(s); }
570

571
        csc("dbname=postgres user=postgres");
1✔
572

573
        {
574
            bool raised = false;
1✔
575

576
            try
577
                csc("wrong conninfo string");
1✔
578
            catch(ConnectionException e)
579
                raised = true;
1✔
580

581
            assert(raised);
1✔
582
        }
583
    }
584

585
    {
586
        bool exceptionFlag = false;
1✔
587

588
        try
589
            auto c = createTestConn(ConnectionStart(), "!!!some incorrect connection string!!!");
1✔
590
        catch(ConnectionException e)
591
        {
592
            exceptionFlag = true;
1✔
593
            assert(e.msg.length > 40); // error message check
1✔
594
        }
595
        finally
596
            assert(exceptionFlag);
1✔
597
    }
598

599
    {
600
        auto c = createTestConn(connParam);
1✔
601

602
        assert(c.escapeLiteral("abc'def") == "'abc''def'");
1✔
603
        assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
1✔
604

605
        c.setClientEncoding("WIN866");
1✔
606
        assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
1✔
607
    }
608

609
    {
610
        auto c = createTestConn(connParam);
1✔
611

612
        assert(c.transactionStatus == PQTRANS_IDLE);
1✔
613

614
        c.exec("BEGIN");
1✔
615
        assert(c.transactionStatus == PQTRANS_INTRANS);
1✔
616

617
        try c.exec("DISCARD ALL");
1✔
618
        catch (Exception) {}
619
        assert(c.transactionStatus == PQTRANS_INERROR);
1✔
620

621
        c.exec("ROLLBACK");
1✔
622
        assert(c.transactionStatus == PQTRANS_IDLE);
1✔
623
    }
624

625
    {
626
        import std.exception: assertThrown;
627

628
        string[string] kv;
1✔
629
        kv["host"] = "wrong-host";
1✔
630
        kv["dbname"] = "wrong-db-name";
1✔
631

632
        assertThrown!ConnectionException(createTestConn(kv));
2✔
633
        assertThrown!ConnectionException(createTestConn(ConnectionStart(), kv));
2✔
634
    }
635
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc