• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3844

10 Apr 2025 08:48AM UTC coverage: 43.852% (-19.2%) from 63.077%
#3844

push

travis-ci

web-flow
fix: the REPLICA parameter supports plural forms when used to create and alter a database (#30732)

104394 of 310659 branches covered (33.6%)

Branch coverage included in aggregate %.

167442 of 309240 relevant lines covered (54.15%)

3866624.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.0
/source/dnode/vnode/src/tsdb/tsdbFSetRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbFSetRW.h"
17
#include "meta.h"
18

19
// SFSetWriter ==================================================
20
struct SFSetWriter {
21
  SFSetWriterConfig config[1];
22

23
  SSkmInfo skmTb[1];
24
  SSkmInfo skmRow[1];
25
  SBuffer  buffers[10];
26

27
  struct {
28
    TABLEID tbid[1];
29
  } ctx[1];
30

31
  // writer
32
  SBlockData       blockData[2];
33
  int32_t          blockDataIdx;
34
  SDataFileWriter *dataWriter;
35
  SSttFileWriter  *sttWriter;
36
  SHashObj        *pColCmprObj;
37
};
38

39
static int32_t tsdbFSetWriteTableDataBegin(SFSetWriter *writer, const TABLEID *tbid) {
1,860✔
40
  int32_t code = 0;
1,860✔
41
  int32_t lino = 0;
1,860✔
42

43
  writer->ctx->tbid->suid = tbid->suid;
1,860✔
44
  writer->ctx->tbid->uid = tbid->uid;
1,860✔
45

46
  code = tsdbUpdateSkmTb(writer->config->tsdb, writer->ctx->tbid, writer->skmTb);
1,860✔
47
  TSDB_CHECK_CODE(code, lino, _exit);
1,860!
48

49
  if (writer->pColCmprObj != NULL) {
1,860!
50
    taosHashCleanup(writer->pColCmprObj);
×
51
    writer->pColCmprObj = NULL;
×
52
  }
53
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, tbid->suid ? tbid->suid : tbid->uid, &writer->pColCmprObj);
1,860!
54

55
  writer->blockDataIdx = 0;
1,860✔
56
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
5,580✔
57
    code = tBlockDataInit(&writer->blockData[i], writer->ctx->tbid, writer->skmTb->pTSchema, NULL, 0);
3,720✔
58
    TSDB_CHECK_CODE(code, lino, _exit);
3,720!
59
  }
60

61
_exit:
1,860✔
62
  if (code) {
1,860!
63
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
64
  }
65
  return code;
1,860✔
66
}
67

68
static int32_t tsdbFSetWriteTableDataEnd(SFSetWriter *writer) {
1,868✔
69
  if (writer->ctx->tbid->uid == 0) return 0;
1,868✔
70

71
  int32_t code = 0;
1,860✔
72
  int32_t lino = 0;
1,860✔
73

74
  int32_t cidx = writer->blockDataIdx;
1,860✔
75
  int32_t pidx = ((cidx + 1) & 1);
1,860✔
76
  int32_t numRow = ((writer->blockData[pidx].nRow + writer->blockData[cidx].nRow) >> 1);
1,860✔
77

78
  if (writer->blockData[pidx].nRow > 0 && numRow >= writer->config->minRow) {
3,346!
79
    SRowInfo row = {
1,856✔
80
        .suid = writer->ctx->tbid->suid,
1,856✔
81
        .uid = writer->ctx->tbid->uid,
1,856✔
82
        .row = tsdbRowFromBlockData(writer->blockData + pidx, 0),
1,856✔
83
    };
84

85
    for (int32_t i = 0; i < numRow; i++) {
5,458,548✔
86
      row.row.iRow = i;
5,457,082✔
87

88
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
5,457,082✔
89
      TSDB_CHECK_CODE(code, lino, _exit);
5,456,692!
90
    }
91

92
    code = tsdbDataFileFlush(writer->dataWriter);
1,466✔
93
    TSDB_CHECK_CODE(code, lino, _exit);
1,856!
94

95
    for (int32_t i = numRow; i < writer->blockData[pidx].nRow; i++) {
2,120,730✔
96
      row.row.iRow = i;
2,119,012✔
97
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
2,119,012✔
98
      TSDB_CHECK_CODE(code, lino, _exit);
2,118,874!
99
    }
100

101
    row.row = tsdbRowFromBlockData(writer->blockData + cidx, 0);
1,718✔
102
    for (int32_t i = 0; i < writer->blockData[cidx].nRow; i++) {
3,348,154✔
103
      row.row.iRow = i;
3,346,668✔
104
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
3,346,668✔
105
      TSDB_CHECK_CODE(code, lino, _exit);
3,346,436!
106
    }
107
  } else {
108
    // pidx
109
    if (writer->blockData[pidx].nRow > 0) {
4!
110
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[pidx]);
×
111
      TSDB_CHECK_CODE(code, lino, _exit);
×
112
    }
113

114
    // cidx
115
    if (writer->blockData[cidx].nRow < writer->config->minRow) {
4!
116
      code = tsdbSttFileWriteBlockData(writer->sttWriter, &writer->blockData[cidx]);
×
117
      TSDB_CHECK_CODE(code, lino, _exit);
×
118
    } else {
119
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[cidx]);
4✔
120
      TSDB_CHECK_CODE(code, lino, _exit);
4!
121
    }
122
  }
123

124
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
5,210✔
125
    tBlockDataReset(&writer->blockData[i]);
3,720✔
126
  }
127

128
_exit:
1,490✔
129
  if (code) {
1,490!
130
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
131
  }
132
  taosHashCleanup(writer->pColCmprObj);
1,490✔
133
  writer->pColCmprObj = NULL;
1,860✔
134

135
  return code;
1,860✔
136
}
137

138
int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
28✔
139
  int32_t code = 0;
28✔
140
  int32_t lino = 0;
28✔
141

142
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
28!
143
  if (writer[0] == NULL) {
28!
144
    return terrno;
×
145
  }
146

147
  writer[0]->config[0] = config[0];
28✔
148

149
  // data writer
150
  if (!config->toSttOnly) {
28✔
151
    SDataFileWriterConfig dataWriterConfig = {
8✔
152
        .tsdb = config->tsdb,
8✔
153
        .cmprAlg = config->cmprAlg,
8✔
154
        .maxRow = config->maxRow,
8✔
155
        .szPage = config->szPage,
8✔
156
        .fid = config->fid,
8✔
157
        .cid = config->cid,
8✔
158
        .expLevel = config->expLevel,
8✔
159
        .compactVersion = config->compactVersion,
8✔
160
        .skmTb = writer[0]->skmTb,
8✔
161
        .skmRow = writer[0]->skmRow,
8✔
162
        .lcn = config->lcn,
8✔
163
        .buffers = writer[0]->buffers,
8✔
164
    };
165
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
40✔
166
      dataWriterConfig.files[ftype].exist = config->files[ftype].exist;
32✔
167
      dataWriterConfig.files[ftype].file = config->files[ftype].file;
32✔
168
    }
169

170
    code = tsdbDataFileWriterOpen(&dataWriterConfig, &writer[0]->dataWriter);
8✔
171
    TSDB_CHECK_CODE(code, lino, _exit);
8!
172
  }
173

174
  // stt writer
175
  SSttFileWriterConfig sttWriterConfig = {
28✔
176
      .tsdb = config->tsdb,
28✔
177
      .maxRow = config->maxRow,
28✔
178
      .szPage = config->szPage,
28✔
179
      .cmprAlg = config->cmprAlg,
28✔
180
      .compactVersion = config->compactVersion,
28✔
181
      .expLevel = config->expLevel,
28✔
182
      .fid = config->fid,
28✔
183
      .cid = config->cid,
28✔
184
      .level = config->level,
28✔
185
      .skmTb = writer[0]->skmTb,
28✔
186
      .skmRow = writer[0]->skmRow,
28✔
187
      .buffers = writer[0]->buffers,
28✔
188

189
  };
190
  code = tsdbSttFileWriterOpen(&sttWriterConfig, &writer[0]->sttWriter);
28✔
191
  TSDB_CHECK_CODE(code, lino, _exit);
28!
192

193
_exit:
28✔
194
  if (code) {
28!
195
    TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
×
196
  }
197
  return code;
28✔
198
}
199

200
int32_t tsdbFSetWriterClose(SFSetWriter **writer, bool abort, TFileOpArray *fopArr) {
28✔
201
  if (writer[0] == NULL) return 0;
28!
202

203
  int32_t code = 0;
28✔
204
  int32_t lino = 0;
28✔
205

206
  STsdb *tsdb = writer[0]->config->tsdb;
28✔
207

208
  // end
209
  if (!writer[0]->config->toSttOnly) {
28✔
210
    code = tsdbFSetWriteTableDataEnd(writer[0]);
8✔
211
    TSDB_CHECK_CODE(code, lino, _exit);
8!
212

213
    code = tsdbDataFileWriterClose(&writer[0]->dataWriter, abort, fopArr);
8✔
214
    TSDB_CHECK_CODE(code, lino, _exit);
8!
215
  }
216

217
  code = tsdbSttFileWriterClose(&writer[0]->sttWriter, abort, fopArr);
28✔
218
  TSDB_CHECK_CODE(code, lino, _exit);
28!
219

220
  // free
221
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->blockData); i++) {
84✔
222
    tBlockDataDestroy(&writer[0]->blockData[i]);
56✔
223
  }
224
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); i++) {
308✔
225
    tBufferDestroy(&writer[0]->buffers[i]);
280✔
226
  }
227
  tDestroyTSchema(writer[0]->skmRow->pTSchema);
28!
228
  tDestroyTSchema(writer[0]->skmTb->pTSchema);
28!
229
  taosMemoryFree(writer[0]);
28!
230
  writer[0] = NULL;
28✔
231

232
_exit:
28✔
233
  if (code) {
28!
234
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
235
  }
236
  return code;
28✔
237
}
238

239
int32_t tsdbFSetWriteRow(SFSetWriter *writer, SRowInfo *row) {
37,861,010✔
240
  int32_t code = 0;
37,861,010✔
241
  int32_t lino = 0;
37,861,010✔
242

243
  if (writer->config->toSttOnly) {
37,861,010✔
244
    code = tsdbSttFileWriteRow(writer->sttWriter, row);
19,906,370✔
245
    TSDB_CHECK_CODE(code, lino, _exit);
19,899,210!
246
  } else {
247
    if (writer->ctx->tbid->uid != row->uid) {
17,954,640✔
248
      code = tsdbFSetWriteTableDataEnd(writer);
1,860✔
249
      TSDB_CHECK_CODE(code, lino, _exit);
1,860!
250

251
      code = tsdbFSetWriteTableDataBegin(writer, (TABLEID *)row);
1,860✔
252
      TSDB_CHECK_CODE(code, lino, _exit);
1,860!
253
    }
254

255
    if (row->row.type == TSDBROW_ROW_FMT) {
17,954,640!
256
      code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(&row->row), writer->skmRow);
×
257
      TSDB_CHECK_CODE(code, lino, _exit);
×
258
    }
259

260
    if (TSDBROW_VERSION(&row->row) <= writer->config->compactVersion  //
17,954,640!
261
        && writer->blockData[writer->blockDataIdx].nRow > 0           //
18,168,202!
262
        && tsdbRowCompareWithoutVersion(&row->row,
18,196,388✔
263
                                        &tsdbRowFromBlockData(&writer->blockData[writer->blockDataIdx],
18,173,284✔
264
                                                              writer->blockData[writer->blockDataIdx].nRow - 1)) == 0) {
265
      code = tBlockDataUpdateRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema);
400✔
266
      TSDB_CHECK_CODE(code, lino, _exit);
400!
267
    } else {
268
      if (writer->blockData[writer->blockDataIdx].nRow >= writer->config->maxRow) {
17,977,344✔
269
        int32_t idx = ((writer->blockDataIdx + 1) & 1);
3,712✔
270
        if (writer->blockData[idx].nRow >= writer->config->maxRow) {
3,712✔
271
          code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[idx]);
1,856✔
272
          TSDB_CHECK_CODE(code, lino, _exit);
1,856!
273

274
          tBlockDataClear(&writer->blockData[idx]);
1,856✔
275
        }
276
        writer->blockDataIdx = idx;
3,712✔
277
      }
278

279
      code =
280
          tBlockDataAppendRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema, row->uid);
17,977,344✔
281
      TSDB_CHECK_CODE(code, lino, _exit);
18,191,834!
282
    }
283
  }
284

285
_exit:
38,091,444✔
286
  if (code) {
38,091,444!
287
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
288
  }
289
  return code;
37,970,206✔
290
}
291

292
int32_t tsdbFSetWriteTombRecord(SFSetWriter *writer, const STombRecord *tombRecord) {
×
293
  int32_t code = 0;
×
294
  int32_t lino = 0;
×
295

296
  if (writer->config->toSttOnly || tsdbSttFileWriterIsOpened(writer->sttWriter)) {
×
297
    code = tsdbSttFileWriteTombRecord(writer->sttWriter, tombRecord);
×
298
    TSDB_CHECK_CODE(code, lino, _exit);
×
299
  } else {
300
    code = tsdbDataFileWriteTombRecord(writer->dataWriter, tombRecord);
×
301
    TSDB_CHECK_CODE(code, lino, _exit);
×
302
  }
303

304
_exit:
×
305
  if (code) {
×
306
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
307
  }
308
  return code;
×
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc