• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3798

31 Mar 2025 10:39AM UTC coverage: 9.424% (-20.9%) from 30.372%
#3798

push

travis-ci

happyguoxy
test:add test cases

21549 of 307601 branches covered (7.01%)

Branch coverage included in aggregate %.

36084 of 303967 relevant lines covered (11.87%)

58620.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tsdb/tsdbFSetRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbFSetRW.h"
17
#include "meta.h"
18

19
// SFSetWriter ==================================================
20
struct SFSetWriter {
21
  SFSetWriterConfig config[1];
22

23
  SSkmInfo skmTb[1];
24
  SSkmInfo skmRow[1];
25
  SBuffer  buffers[10];
26

27
  struct {
28
    TABLEID tbid[1];
29
  } ctx[1];
30

31
  // writer
32
  SBlockData       blockData[2];
33
  int32_t          blockDataIdx;
34
  SDataFileWriter *dataWriter;
35
  SSttFileWriter  *sttWriter;
36
  SHashObj        *pColCmprObj;
37
};
38

39
static int32_t tsdbFSetWriteTableDataBegin(SFSetWriter *writer, const TABLEID *tbid) {
×
40
  int32_t code = 0;
×
41
  int32_t lino = 0;
×
42

43
  writer->ctx->tbid->suid = tbid->suid;
×
44
  writer->ctx->tbid->uid = tbid->uid;
×
45

46
  code = tsdbUpdateSkmTb(writer->config->tsdb, writer->ctx->tbid, writer->skmTb);
×
47
  TSDB_CHECK_CODE(code, lino, _exit);
×
48

49
  if (writer->pColCmprObj != NULL) {
×
50
    taosHashCleanup(writer->pColCmprObj);
×
51
    writer->pColCmprObj = NULL;
×
52
  }
53
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, tbid->suid ? tbid->suid : tbid->uid, &writer->pColCmprObj);
×
54

55
  writer->blockDataIdx = 0;
×
56
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
×
57
    code = tBlockDataInit(&writer->blockData[i], writer->ctx->tbid, writer->skmTb->pTSchema, NULL, 0);
×
58
    TSDB_CHECK_CODE(code, lino, _exit);
×
59
  }
60

61
_exit:
×
62
  if (code) {
×
63
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
64
  }
65
  return code;
×
66
}
67

68
static int32_t tsdbFSetWriteTableDataEnd(SFSetWriter *writer) {
×
69
  if (writer->ctx->tbid->uid == 0) return 0;
×
70

71
  int32_t code = 0;
×
72
  int32_t lino = 0;
×
73

74
  int32_t cidx = writer->blockDataIdx;
×
75
  int32_t pidx = ((cidx + 1) & 1);
×
76
  int32_t numRow = ((writer->blockData[pidx].nRow + writer->blockData[cidx].nRow) >> 1);
×
77

78
  if (writer->blockData[pidx].nRow > 0 && numRow >= writer->config->minRow) {
×
79
    SRowInfo row = {
×
80
        .suid = writer->ctx->tbid->suid,
×
81
        .uid = writer->ctx->tbid->uid,
×
82
        .row = tsdbRowFromBlockData(writer->blockData + pidx, 0),
×
83
    };
84

85
    for (int32_t i = 0; i < numRow; i++) {
×
86
      row.row.iRow = i;
×
87

88
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
89
      TSDB_CHECK_CODE(code, lino, _exit);
×
90
    }
91

92
    code = tsdbDataFileFlush(writer->dataWriter);
×
93
    TSDB_CHECK_CODE(code, lino, _exit);
×
94

95
    for (int32_t i = numRow; i < writer->blockData[pidx].nRow; i++) {
×
96
      row.row.iRow = i;
×
97
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
98
      TSDB_CHECK_CODE(code, lino, _exit);
×
99
    }
100

101
    row.row = tsdbRowFromBlockData(writer->blockData + cidx, 0);
×
102
    for (int32_t i = 0; i < writer->blockData[cidx].nRow; i++) {
×
103
      row.row.iRow = i;
×
104
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
105
      TSDB_CHECK_CODE(code, lino, _exit);
×
106
    }
107
  } else {
108
    // pidx
109
    if (writer->blockData[pidx].nRow > 0) {
×
110
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[pidx]);
×
111
      TSDB_CHECK_CODE(code, lino, _exit);
×
112
    }
113

114
    // cidx
115
    if (writer->blockData[cidx].nRow < writer->config->minRow) {
×
116
      code = tsdbSttFileWriteBlockData(writer->sttWriter, &writer->blockData[cidx]);
×
117
      TSDB_CHECK_CODE(code, lino, _exit);
×
118
    } else {
119
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[cidx]);
×
120
      TSDB_CHECK_CODE(code, lino, _exit);
×
121
    }
122
  }
123

124
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
×
125
    tBlockDataReset(&writer->blockData[i]);
×
126
  }
127

128
_exit:
×
129
  if (code) {
×
130
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
131
  }
132
  taosHashCleanup(writer->pColCmprObj);
×
133
  writer->pColCmprObj = NULL;
×
134

135
  return code;
×
136
}
137

138
int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
×
139
  int32_t code = 0;
×
140
  int32_t lino = 0;
×
141

142
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
×
143
  if (writer[0] == NULL) {
×
144
    return terrno;
×
145
  }
146

147
  writer[0]->config[0] = config[0];
×
148

149
  // data writer
150
  if (!config->toSttOnly) {
×
151
    SDataFileWriterConfig dataWriterConfig = {
×
152
        .tsdb = config->tsdb,
×
153
        .cmprAlg = config->cmprAlg,
×
154
        .maxRow = config->maxRow,
×
155
        .szPage = config->szPage,
×
156
        .fid = config->fid,
×
157
        .cid = config->cid,
×
158
        .expLevel = config->expLevel,
×
159
        .compactVersion = config->compactVersion,
×
160
        .skmTb = writer[0]->skmTb,
×
161
        .skmRow = writer[0]->skmRow,
×
162
        .lcn = config->lcn,
×
163
        .buffers = writer[0]->buffers,
×
164
    };
165
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
166
      dataWriterConfig.files[ftype].exist = config->files[ftype].exist;
×
167
      dataWriterConfig.files[ftype].file = config->files[ftype].file;
×
168
    }
169

170
    code = tsdbDataFileWriterOpen(&dataWriterConfig, &writer[0]->dataWriter);
×
171
    TSDB_CHECK_CODE(code, lino, _exit);
×
172
  }
173

174
  // stt writer
175
  SSttFileWriterConfig sttWriterConfig = {
×
176
      .tsdb = config->tsdb,
×
177
      .maxRow = config->maxRow,
×
178
      .szPage = config->szPage,
×
179
      .cmprAlg = config->cmprAlg,
×
180
      .compactVersion = config->compactVersion,
×
181
      .expLevel = config->expLevel,
×
182
      .fid = config->fid,
×
183
      .cid = config->cid,
×
184
      .level = config->level,
×
185
      .skmTb = writer[0]->skmTb,
×
186
      .skmRow = writer[0]->skmRow,
×
187
      .buffers = writer[0]->buffers,
×
188

189
  };
190
  code = tsdbSttFileWriterOpen(&sttWriterConfig, &writer[0]->sttWriter);
×
191
  TSDB_CHECK_CODE(code, lino, _exit);
×
192

193
_exit:
×
194
  if (code) {
×
195
    TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
×
196
  }
197
  return code;
×
198
}
199

200
int32_t tsdbFSetWriterClose(SFSetWriter **writer, bool abort, TFileOpArray *fopArr) {
×
201
  if (writer[0] == NULL) return 0;
×
202

203
  int32_t code = 0;
×
204
  int32_t lino = 0;
×
205

206
  STsdb *tsdb = writer[0]->config->tsdb;
×
207

208
  // end
209
  if (!writer[0]->config->toSttOnly) {
×
210
    code = tsdbFSetWriteTableDataEnd(writer[0]);
×
211
    TSDB_CHECK_CODE(code, lino, _exit);
×
212

213
    code = tsdbDataFileWriterClose(&writer[0]->dataWriter, abort, fopArr);
×
214
    TSDB_CHECK_CODE(code, lino, _exit);
×
215
  }
216

217
  code = tsdbSttFileWriterClose(&writer[0]->sttWriter, abort, fopArr);
×
218
  TSDB_CHECK_CODE(code, lino, _exit);
×
219

220
  // free
221
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->blockData); i++) {
×
222
    tBlockDataDestroy(&writer[0]->blockData[i]);
×
223
  }
224
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); i++) {
×
225
    tBufferDestroy(&writer[0]->buffers[i]);
×
226
  }
227
  tDestroyTSchema(writer[0]->skmRow->pTSchema);
×
228
  tDestroyTSchema(writer[0]->skmTb->pTSchema);
×
229
  taosMemoryFree(writer[0]);
×
230
  writer[0] = NULL;
×
231

232
_exit:
×
233
  if (code) {
×
234
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
235
  }
236
  return code;
×
237
}
238

239
int32_t tsdbFSetWriteRow(SFSetWriter *writer, SRowInfo *row) {
×
240
  int32_t code = 0;
×
241
  int32_t lino = 0;
×
242

243
  if (writer->config->toSttOnly) {
×
244
    code = tsdbSttFileWriteRow(writer->sttWriter, row);
×
245
    TSDB_CHECK_CODE(code, lino, _exit);
×
246
  } else {
247
    if (writer->ctx->tbid->uid != row->uid) {
×
248
      code = tsdbFSetWriteTableDataEnd(writer);
×
249
      TSDB_CHECK_CODE(code, lino, _exit);
×
250

251
      code = tsdbFSetWriteTableDataBegin(writer, (TABLEID *)row);
×
252
      TSDB_CHECK_CODE(code, lino, _exit);
×
253
    }
254

255
    if (row->row.type == TSDBROW_ROW_FMT) {
×
256
      code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(&row->row), writer->skmRow);
×
257
      TSDB_CHECK_CODE(code, lino, _exit);
×
258
    }
259

260
    if (TSDBROW_VERSION(&row->row) <= writer->config->compactVersion  //
×
261
        && writer->blockData[writer->blockDataIdx].nRow > 0           //
×
262
        && tsdbRowCompareWithoutVersion(&row->row,
×
263
                                        &tsdbRowFromBlockData(&writer->blockData[writer->blockDataIdx],
×
264
                                                              writer->blockData[writer->blockDataIdx].nRow - 1)) == 0) {
265
      code = tBlockDataUpdateRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema);
×
266
      TSDB_CHECK_CODE(code, lino, _exit);
×
267
    } else {
268
      if (writer->blockData[writer->blockDataIdx].nRow >= writer->config->maxRow) {
×
269
        int32_t idx = ((writer->blockDataIdx + 1) & 1);
×
270
        if (writer->blockData[idx].nRow >= writer->config->maxRow) {
×
271
          code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[idx]);
×
272
          TSDB_CHECK_CODE(code, lino, _exit);
×
273

274
          tBlockDataClear(&writer->blockData[idx]);
×
275
        }
276
        writer->blockDataIdx = idx;
×
277
      }
278

279
      code =
280
          tBlockDataAppendRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema, row->uid);
×
281
      TSDB_CHECK_CODE(code, lino, _exit);
×
282
    }
283
  }
284

285
_exit:
×
286
  if (code) {
×
287
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
288
  }
289
  return code;
×
290
}
291

292
int32_t tsdbFSetWriteTombRecord(SFSetWriter *writer, const STombRecord *tombRecord) {
×
293
  int32_t code = 0;
×
294
  int32_t lino = 0;
×
295

296
  if (writer->config->toSttOnly || tsdbSttFileWriterIsOpened(writer->sttWriter)) {
×
297
    code = tsdbSttFileWriteTombRecord(writer->sttWriter, tombRecord);
×
298
    TSDB_CHECK_CODE(code, lino, _exit);
×
299
  } else {
300
    code = tsdbDataFileWriteTombRecord(writer->dataWriter, tombRecord);
×
301
    TSDB_CHECK_CODE(code, lino, _exit);
×
302
  }
303

304
_exit:
×
305
  if (code) {
×
306
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
307
  }
308
  return code;
×
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc