• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 18643955603

20 Oct 2025 06:19AM UTC coverage: 89.252% (-0.01%) from 89.265%
18643955603

push

gh-ci

ostafen
chore(embedded/sql): add COALESCE function

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

15 of 21 new or added lines in 1 file covered. (71.43%)

2 existing lines in 1 file now uncovered.

37956 of 42527 relevant lines covered (89.25%)

150326.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.17
/pkg/server/stream_replication.go
1
/*
2
Copyright 2025 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package server
18

19
import (
20
        "bytes"
21
        "encoding/binary"
22
        "strconv"
23

24
        "github.com/codenotary/immudb/pkg/api/schema"
25
        "github.com/codenotary/immudb/pkg/stream"
26
        "google.golang.org/grpc/metadata"
27
)
28

29
func (s *ImmuServer) ExportTx(req *schema.ExportTxRequest, txsServer schema.ImmuService_ExportTxServer) error {
6✔
30
        return s.exportTx(req, txsServer, true, make([]byte, s.Options.StreamChunkSize))
6✔
31
}
6✔
32

33
// StreamExportTx implements the bidirectional streaming endpoint used to export transactions
34
func (s *ImmuServer) StreamExportTx(stream schema.ImmuService_StreamExportTxServer) error {
42✔
35
        buf := make([]byte, s.Options.StreamChunkSize)
42✔
36

42✔
37
        for {
8,229✔
38
                req, err := stream.Recv()
8,187✔
39
                if err != nil {
8,187✔
UNCOV
40
                        return err
×
UNCOV
41
                }
×
42

43
                err = s.exportTx(req, stream, false, buf)
8,187✔
44
                if err != nil {
8,229✔
45
                        return err
42✔
46
                }
42✔
47
        }
48
}
49

50
func (s *ImmuServer) exportTx(req *schema.ExportTxRequest, txsServer schema.ImmuService_ExportTxServer, setTrailer bool, buf []byte) error {
8,193✔
51
        if req == nil || req.Tx == 0 || txsServer == nil {
8,195✔
52
                return ErrIllegalArguments
2✔
53
        }
2✔
54

55
        db, err := s.getDBFromCtx(txsServer.Context(), "ExportTx")
8,191✔
56
        if err != nil {
8,193✔
57
                return err
2✔
58
        }
2✔
59

60
        txbs, mayCommitUpToTxID, mayCommitUpToAlh, err := db.ExportTxByID(txsServer.Context(), req)
8,189✔
61
        if err != nil {
8,203✔
62
                return err
14✔
63
        }
14✔
64

65
        var bCommittedTxID [8]byte
8,175✔
66
        state, err := db.CurrentState()
8,175✔
67
        if err == nil {
16,350✔
68
                binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId)
8,175✔
69
        }
8,175✔
70

71
        // In asynchronous replication, the last committed transaction value is sent to the replica
72
        // to enable updating its replication lag.
73
        streamMetadata := map[string][]byte{
8,175✔
74
                "committed-txid-bin": bCommittedTxID[:],
8,175✔
75
        }
8,175✔
76

8,175✔
77
        if req.ReplicaState != nil {
14,704✔
78
                var bMayCommitUpToTxID [8]byte
6,529✔
79
                binary.BigEndian.PutUint64(bMayCommitUpToTxID[:], mayCommitUpToTxID)
6,529✔
80

6,529✔
81
                streamMetadata["may-commit-up-to-txid-bin"] = bMayCommitUpToTxID[:]
6,529✔
82
                streamMetadata["may-commit-up-to-alh-bin"] = mayCommitUpToAlh[:]
6,529✔
83

6,529✔
84
                if setTrailer {
6,529✔
85
                        // trailer metadata is kept for backward compatibility
×
86
                        // it should not be sent when replication is done with bidirectional streaming
×
87
                        // otherwise metadata will get accumulated over time
×
88
                        md := metadata.Pairs(
×
89
                                "may-commit-up-to-txid-bin", string(bMayCommitUpToTxID[:]),
×
90
                                "may-commit-up-to-alh-bin", string(mayCommitUpToAlh[:]),
×
91
                                "committed-txid-bin", string(bCommittedTxID[:]),
×
92
                        )
×
93
                        txsServer.SetTrailer(md)
×
94
                }
×
95
        }
96

97
        sender := stream.NewMsgSender(txsServer, buf)
8,175✔
98

8,175✔
99
        return sender.Send(bytes.NewReader(txbs), len(txbs), streamMetadata)
8,175✔
100
}
101

102
func (s *ImmuServer) ReplicateTx(replicateTxServer schema.ImmuService_ReplicateTxServer) error {
4✔
103
        if replicateTxServer == nil {
5✔
104
                return ErrIllegalArguments
1✔
105
        }
1✔
106

107
        ctx := replicateTxServer.Context()
3✔
108

3✔
109
        db, err := s.getDBFromCtx(ctx, "ReplicateTx")
3✔
110
        if err != nil {
4✔
111
                return err
1✔
112
        }
1✔
113

114
        if s.replicationInProgressFor(db.GetName()) {
2✔
115
                return ErrReplicationInProgress
×
116
        }
×
117

118
        var skipIntegrityCheck bool
2✔
119
        var waitForIndexing bool
2✔
120

2✔
121
        md, ok := metadata.FromIncomingContext(ctx)
2✔
122
        if ok {
4✔
123
                if len(md.Get("skip-integrity-check")) > 0 {
3✔
124
                        skipIntegrityCheck, err = strconv.ParseBool(md.Get("skip-integrity-check")[0])
1✔
125
                        if err != nil {
1✔
126
                                return err
×
127
                        }
×
128
                }
129

130
                if len(md.Get("wait-for-indexing")) > 0 {
3✔
131
                        waitForIndexing, err = strconv.ParseBool(md.Get("wait-for-indexing")[0])
1✔
132
                        if err != nil {
1✔
133
                                return err
×
134
                        }
×
135
                }
136
        }
137

138
        receiver := s.StreamServiceFactory.NewMsgReceiver(replicateTxServer)
2✔
139

2✔
140
        bs, _, err := receiver.ReadFully()
2✔
141
        if err != nil {
3✔
142
                return err
1✔
143
        }
1✔
144

145
        hdr, err := db.ReplicateTx(replicateTxServer.Context(), bs, skipIntegrityCheck, waitForIndexing)
1✔
146
        if err != nil {
1✔
147
                return err
×
148
        }
×
149

150
        return replicateTxServer.SendAndClose(hdr)
1✔
151
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc