• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3631

07 Mar 2025 03:18PM UTC coverage: 60.671% (-3.0%) from 63.629%
#3631

push

travis-ci

web-flow
Merge pull request #30074 from taosdata/ciup30

ci: update ci workflow to fix path issue

141481 of 300084 branches covered (47.15%)

Branch coverage included in aggregate %.

223132 of 300884 relevant lines covered (74.16%)

7878557.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.7
/source/dnode/vnode/src/tsdb/tsdbFSetRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbFSetRW.h"
17
#include "meta.h"
18

19
// SFSetWriter ==================================================
20
struct SFSetWriter {
21
  SFSetWriterConfig config[1];
22

23
  SSkmInfo skmTb[1];
24
  SSkmInfo skmRow[1];
25
  SBuffer  buffers[10];
26

27
  struct {
28
    TABLEID tbid[1];
29
  } ctx[1];
30

31
  // writer
32
  SBlockData       blockData[2];
33
  int32_t          blockDataIdx;
34
  SDataFileWriter *dataWriter;
35
  SSttFileWriter  *sttWriter;
36
  SHashObj        *pColCmprObj;
37
};
38

39
static int32_t tsdbFSetWriteTableDataBegin(SFSetWriter *writer, const TABLEID *tbid) {
87,528✔
40
  int32_t code = 0;
87,528✔
41
  int32_t lino = 0;
87,528✔
42

43
  writer->ctx->tbid->suid = tbid->suid;
87,528✔
44
  writer->ctx->tbid->uid = tbid->uid;
87,528✔
45

46
  code = tsdbUpdateSkmTb(writer->config->tsdb, writer->ctx->tbid, writer->skmTb);
87,528✔
47
  TSDB_CHECK_CODE(code, lino, _exit);
87,528!
48

49
  if (writer->pColCmprObj != NULL) {
87,528!
50
    taosHashCleanup(writer->pColCmprObj);
×
51
    writer->pColCmprObj = NULL;
×
52
  }
53
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, tbid->suid ? tbid->suid : tbid->uid, &writer->pColCmprObj);
87,528✔
54

55
  writer->blockDataIdx = 0;
87,525✔
56
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
262,549✔
57
    code = tBlockDataInit(&writer->blockData[i], writer->ctx->tbid, writer->skmTb->pTSchema, NULL, 0);
175,050✔
58
    TSDB_CHECK_CODE(code, lino, _exit);
175,024!
59
  }
60

61
_exit:
87,499✔
62
  if (code) {
87,499!
63
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
64
  }
65
  return code;
87,518✔
66
}
67

68
static int32_t tsdbFSetWriteTableDataEnd(SFSetWriter *writer) {
93,387✔
69
  if (writer->ctx->tbid->uid == 0) return 0;
93,387✔
70

71
  int32_t code = 0;
87,527✔
72
  int32_t lino = 0;
87,527✔
73

74
  int32_t cidx = writer->blockDataIdx;
87,527✔
75
  int32_t pidx = ((cidx + 1) & 1);
87,527✔
76
  int32_t numRow = ((writer->blockData[pidx].nRow + writer->blockData[cidx].nRow) >> 1);
87,527✔
77

78
  if (writer->blockData[pidx].nRow > 0 && numRow >= writer->config->minRow) {
116,175!
79
    SRowInfo row = {
28,681✔
80
        .suid = writer->ctx->tbid->suid,
28,681✔
81
        .uid = writer->ctx->tbid->uid,
28,681✔
82
        .row = tsdbRowFromBlockData(writer->blockData + pidx, 0),
28,681✔
83
    };
84

85
    for (int32_t i = 0; i < numRow; i++) {
77,673,177✔
86
      row.row.iRow = i;
77,644,603✔
87

88
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
77,644,603✔
89
      TSDB_CHECK_CODE(code, lino, _exit);
77,644,496!
90
    }
91

92
    code = tsdbDataFileFlush(writer->dataWriter);
28,574✔
93
    TSDB_CHECK_CODE(code, lino, _exit);
28,681!
94

95
    for (int32_t i = numRow; i < writer->blockData[pidx].nRow; i++) {
39,838,662✔
96
      row.row.iRow = i;
39,809,985✔
97
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
39,809,985✔
98
      TSDB_CHECK_CODE(code, lino, _exit);
39,809,981!
99
    }
100

101
    row.row = tsdbRowFromBlockData(writer->blockData + cidx, 0);
28,677✔
102
    for (int32_t i = 0; i < writer->blockData[cidx].nRow; i++) {
37,866,504✔
103
      row.row.iRow = i;
37,837,856✔
104
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
37,837,856✔
105
      TSDB_CHECK_CODE(code, lino, _exit);
37,837,827!
106
    }
107
  } else {
108
    // pidx
109
    if (writer->blockData[pidx].nRow > 0) {
58,846!
110
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[pidx]);
×
111
      TSDB_CHECK_CODE(code, lino, _exit);
×
112
    }
113

114
    // cidx
115
    if (writer->blockData[cidx].nRow < writer->config->minRow) {
58,846✔
116
      code = tsdbSttFileWriteBlockData(writer->sttWriter, &writer->blockData[cidx]);
26,501✔
117
      TSDB_CHECK_CODE(code, lino, _exit);
26,484!
118
    } else {
119
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[cidx]);
32,345✔
120
      TSDB_CHECK_CODE(code, lino, _exit);
32,346!
121
    }
122
  }
123

124
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
262,486✔
125
    tBlockDataReset(&writer->blockData[i]);
175,017✔
126
  }
127

128
_exit:
87,469✔
129
  if (code) {
87,469!
130
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
131
  }
132
  taosHashCleanup(writer->pColCmprObj);
87,469✔
133
  writer->pColCmprObj = NULL;
87,529✔
134

135
  return code;
87,529✔
136
}
137

138
int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
384,903✔
139
  int32_t code = 0;
384,903✔
140
  int32_t lino = 0;
384,903✔
141

142
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
384,903!
143
  if (writer[0] == NULL) {
384,930!
144
    return terrno;
×
145
  }
146

147
  writer[0]->config[0] = config[0];
384,930✔
148

149
  // data writer
150
  if (!config->toSttOnly) {
384,930✔
151
    SDataFileWriterConfig dataWriterConfig = {
5,860✔
152
        .tsdb = config->tsdb,
5,860✔
153
        .cmprAlg = config->cmprAlg,
5,860✔
154
        .maxRow = config->maxRow,
5,860✔
155
        .szPage = config->szPage,
5,860✔
156
        .fid = config->fid,
5,860✔
157
        .cid = config->cid,
5,860✔
158
        .expLevel = config->expLevel,
5,860✔
159
        .compactVersion = config->compactVersion,
5,860✔
160
        .skmTb = writer[0]->skmTb,
5,860✔
161
        .skmRow = writer[0]->skmRow,
5,860✔
162
        .lcn = config->lcn,
5,860✔
163
        .buffers = writer[0]->buffers,
5,860✔
164
    };
165
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
29,296✔
166
      dataWriterConfig.files[ftype].exist = config->files[ftype].exist;
23,436✔
167
      dataWriterConfig.files[ftype].file = config->files[ftype].file;
23,436✔
168
    }
169

170
    code = tsdbDataFileWriterOpen(&dataWriterConfig, &writer[0]->dataWriter);
5,860✔
171
    TSDB_CHECK_CODE(code, lino, _exit);
5,860!
172
  }
173

174
  // stt writer
175
  SSttFileWriterConfig sttWriterConfig = {
384,930✔
176
      .tsdb = config->tsdb,
384,930✔
177
      .maxRow = config->maxRow,
384,930✔
178
      .szPage = config->szPage,
384,930✔
179
      .cmprAlg = config->cmprAlg,
384,930✔
180
      .compactVersion = config->compactVersion,
384,930✔
181
      .expLevel = config->expLevel,
384,930✔
182
      .fid = config->fid,
384,930✔
183
      .cid = config->cid,
384,930✔
184
      .level = config->level,
384,930✔
185
      .skmTb = writer[0]->skmTb,
384,930✔
186
      .skmRow = writer[0]->skmRow,
384,930✔
187
      .buffers = writer[0]->buffers,
384,930✔
188

189
  };
190
  code = tsdbSttFileWriterOpen(&sttWriterConfig, &writer[0]->sttWriter);
384,930✔
191
  TSDB_CHECK_CODE(code, lino, _exit);
384,931!
192

193
_exit:
384,931✔
194
  if (code) {
384,931!
195
    TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
×
196
  }
197
  return code;
384,922✔
198
}
199

200
int32_t tsdbFSetWriterClose(SFSetWriter **writer, bool abort, TFileOpArray *fopArr) {
385,020✔
201
  if (writer[0] == NULL) return 0;
385,020✔
202

203
  int32_t code = 0;
384,914✔
204
  int32_t lino = 0;
384,914✔
205

206
  STsdb *tsdb = writer[0]->config->tsdb;
384,914✔
207

208
  // end
209
  if (!writer[0]->config->toSttOnly) {
384,914✔
210
    code = tsdbFSetWriteTableDataEnd(writer[0]);
5,860✔
211
    TSDB_CHECK_CODE(code, lino, _exit);
5,858!
212

213
    code = tsdbDataFileWriterClose(&writer[0]->dataWriter, abort, fopArr);
5,858✔
214
    TSDB_CHECK_CODE(code, lino, _exit);
5,859!
215
  }
216

217
  code = tsdbSttFileWriterClose(&writer[0]->sttWriter, abort, fopArr);
384,913✔
218
  TSDB_CHECK_CODE(code, lino, _exit);
384,939!
219

220
  // free
221
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->blockData); i++) {
1,154,824✔
222
    tBlockDataDestroy(&writer[0]->blockData[i]);
769,879✔
223
  }
224
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); i++) {
4,233,990✔
225
    tBufferDestroy(&writer[0]->buffers[i]);
3,849,050✔
226
  }
227
  tDestroyTSchema(writer[0]->skmRow->pTSchema);
384,940!
228
  tDestroyTSchema(writer[0]->skmTb->pTSchema);
384,943!
229
  taosMemoryFree(writer[0]);
384,944!
230
  writer[0] = NULL;
384,951✔
231

232
_exit:
384,951✔
233
  if (code) {
384,951!
234
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
235
  }
236
  return code;
384,951✔
237
}
238

239
int32_t tsdbFSetWriteRow(SFSetWriter *writer, SRowInfo *row) {
622,168,325✔
240
  int32_t code = 0;
622,168,325✔
241
  int32_t lino = 0;
622,168,325✔
242

243
  if (writer->config->toSttOnly) {
622,168,325✔
244
    code = tsdbSttFileWriteRow(writer->sttWriter, row);
348,165,129✔
245
    TSDB_CHECK_CODE(code, lino, _exit);
349,071,033!
246
  } else {
247
    if (writer->ctx->tbid->uid != row->uid) {
274,003,196✔
248
      code = tsdbFSetWriteTableDataEnd(writer);
87,529✔
249
      TSDB_CHECK_CODE(code, lino, _exit);
87,529!
250

251
      code = tsdbFSetWriteTableDataBegin(writer, (TABLEID *)row);
87,529✔
252
      TSDB_CHECK_CODE(code, lino, _exit);
87,518!
253
    }
254

255
    if (row->row.type == TSDBROW_ROW_FMT) {
274,003,185✔
256
      code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(&row->row), writer->skmRow);
1,503,581!
257
      TSDB_CHECK_CODE(code, lino, _exit);
1,503,662!
258
    }
259

260
    if (TSDBROW_VERSION(&row->row) <= writer->config->compactVersion  //
274,003,266!
261
        && writer->blockData[writer->blockDataIdx].nRow > 0           //
274,466,221✔
262
        && tsdbRowCompareWithoutVersion(&row->row,
274,576,742✔
263
                                        &tsdbRowFromBlockData(&writer->blockData[writer->blockDataIdx],
274,385,569✔
264
                                                              writer->blockData[writer->blockDataIdx].nRow - 1)) == 0) {
265
      code = tBlockDataUpdateRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema);
212,709✔
266
      TSDB_CHECK_CODE(code, lino, _exit);
215,319!
267
    } else {
268
      if (writer->blockData[writer->blockDataIdx].nRow >= writer->config->maxRow) {
273,981,730✔
269
        int32_t idx = ((writer->blockDataIdx + 1) & 1);
42,855✔
270
        if (writer->blockData[idx].nRow >= writer->config->maxRow) {
42,855✔
271
          code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[idx]);
14,174✔
272
          TSDB_CHECK_CODE(code, lino, _exit);
14,174!
273

274
          tBlockDataClear(&writer->blockData[idx]);
14,174✔
275
        }
276
        writer->blockDataIdx = idx;
42,855✔
277
      }
278

279
      code =
280
          tBlockDataAppendRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema, row->uid);
273,981,730✔
281
      TSDB_CHECK_CODE(code, lino, _exit);
274,806,900!
282
    }
283
  }
284

285
_exit:
624,093,252✔
286
  if (code) {
624,093,252!
287
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
288
  }
289
  return code;
623,751,581✔
290
}
291

292
int32_t tsdbFSetWriteTombRecord(SFSetWriter *writer, const STombRecord *tombRecord) {
85,629✔
293
  int32_t code = 0;
85,629✔
294
  int32_t lino = 0;
85,629✔
295

296
  if (writer->config->toSttOnly || tsdbSttFileWriterIsOpened(writer->sttWriter)) {
85,629✔
297
    code = tsdbSttFileWriteTombRecord(writer->sttWriter, tombRecord);
71,095✔
298
    TSDB_CHECK_CODE(code, lino, _exit);
71,095!
299
  } else {
300
    code = tsdbDataFileWriteTombRecord(writer->dataWriter, tombRecord);
14,534✔
301
    TSDB_CHECK_CODE(code, lino, _exit);
14,534!
302
  }
303

304
_exit:
14,534✔
305
  if (code) {
85,629!
306
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
307
  }
308
  return code;
85,629✔
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc