• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3565

25 Dec 2024 05:34AM UTC coverage: 51.098% (-11.1%) from 62.21%
#3565

push

travis-ci

web-flow
Merge pull request #29316 from taosdata/enh/3.0/TD-33266

enh(ut):Add wal & config UT.

111558 of 284773 branches covered (39.17%)

Branch coverage included in aggregate %.

1 of 2 new or added lines in 2 files covered. (50.0%)

39015 existing lines in 102 files now uncovered.

177882 of 281666 relevant lines covered (63.15%)

15090998.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tsdb/tsdbFSetRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbFSetRW.h"
17
#include "meta.h"
18

19
// SFSetWriter ==================================================
20
struct SFSetWriter {
21
  SFSetWriterConfig config[1];
22

23
  SSkmInfo skmTb[1];
24
  SSkmInfo skmRow[1];
25
  SBuffer  buffers[10];
26

27
  struct {
28
    TABLEID tbid[1];
29
  } ctx[1];
30

31
  // writer
32
  SBlockData       blockData[2];
33
  int32_t          blockDataIdx;
34
  SDataFileWriter *dataWriter;
35
  SSttFileWriter  *sttWriter;
36
  SHashObj        *pColCmprObj;
37
};
38

UNCOV
39
static int32_t tsdbFSetWriteTableDataBegin(SFSetWriter *writer, const TABLEID *tbid) {
×
UNCOV
40
  int32_t code = 0;
×
UNCOV
41
  int32_t lino = 0;
×
42

UNCOV
43
  writer->ctx->tbid->suid = tbid->suid;
×
UNCOV
44
  writer->ctx->tbid->uid = tbid->uid;
×
45

UNCOV
46
  code = tsdbUpdateSkmTb(writer->config->tsdb, writer->ctx->tbid, writer->skmTb);
×
UNCOV
47
  TSDB_CHECK_CODE(code, lino, _exit);
×
48

UNCOV
49
  if (writer->pColCmprObj != NULL) {
×
50
    taosHashCleanup(writer->pColCmprObj);
×
51
    writer->pColCmprObj = NULL;
×
52
  }
UNCOV
53
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, tbid->suid ? tbid->suid : tbid->uid, &writer->pColCmprObj);
×
54

UNCOV
55
  writer->blockDataIdx = 0;
×
UNCOV
56
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
×
UNCOV
57
    code = tBlockDataInit(&writer->blockData[i], writer->ctx->tbid, writer->skmTb->pTSchema, NULL, 0);
×
UNCOV
58
    TSDB_CHECK_CODE(code, lino, _exit);
×
59
  }
60

UNCOV
61
_exit:
×
UNCOV
62
  if (code) {
×
63
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
64
  }
UNCOV
65
  return code;
×
66
}
67

UNCOV
68
static int32_t tsdbFSetWriteTableDataEnd(SFSetWriter *writer) {
×
UNCOV
69
  if (writer->ctx->tbid->uid == 0) return 0;
×
70

UNCOV
71
  int32_t code = 0;
×
UNCOV
72
  int32_t lino = 0;
×
73

UNCOV
74
  int32_t cidx = writer->blockDataIdx;
×
UNCOV
75
  int32_t pidx = ((cidx + 1) & 1);
×
UNCOV
76
  int32_t numRow = ((writer->blockData[pidx].nRow + writer->blockData[cidx].nRow) >> 1);
×
77

UNCOV
78
  if (writer->blockData[pidx].nRow > 0 && numRow >= writer->config->minRow) {
×
UNCOV
79
    SRowInfo row = {
×
UNCOV
80
        .suid = writer->ctx->tbid->suid,
×
UNCOV
81
        .uid = writer->ctx->tbid->uid,
×
UNCOV
82
        .row = tsdbRowFromBlockData(writer->blockData + pidx, 0),
×
83
    };
84

UNCOV
85
    for (int32_t i = 0; i < numRow; i++) {
×
UNCOV
86
      row.row.iRow = i;
×
87

UNCOV
88
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
UNCOV
89
      TSDB_CHECK_CODE(code, lino, _exit);
×
90
    }
91

UNCOV
92
    code = tsdbDataFileFlush(writer->dataWriter);
×
UNCOV
93
    TSDB_CHECK_CODE(code, lino, _exit);
×
94

UNCOV
95
    for (int32_t i = numRow; i < writer->blockData[pidx].nRow; i++) {
×
UNCOV
96
      row.row.iRow = i;
×
UNCOV
97
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
UNCOV
98
      TSDB_CHECK_CODE(code, lino, _exit);
×
99
    }
100

UNCOV
101
    row.row = tsdbRowFromBlockData(writer->blockData + cidx, 0);
×
UNCOV
102
    for (int32_t i = 0; i < writer->blockData[cidx].nRow; i++) {
×
UNCOV
103
      row.row.iRow = i;
×
UNCOV
104
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
UNCOV
105
      TSDB_CHECK_CODE(code, lino, _exit);
×
106
    }
107
  } else {
108
    // pidx
UNCOV
109
    if (writer->blockData[pidx].nRow > 0) {
×
110
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[pidx]);
×
111
      TSDB_CHECK_CODE(code, lino, _exit);
×
112
    }
113

114
    // cidx
UNCOV
115
    if (writer->blockData[cidx].nRow < writer->config->minRow) {
×
UNCOV
116
      code = tsdbSttFileWriteBlockData(writer->sttWriter, &writer->blockData[cidx]);
×
UNCOV
117
      TSDB_CHECK_CODE(code, lino, _exit);
×
118
    } else {
UNCOV
119
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[cidx]);
×
UNCOV
120
      TSDB_CHECK_CODE(code, lino, _exit);
×
121
    }
122
  }
123

UNCOV
124
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
×
UNCOV
125
    tBlockDataReset(&writer->blockData[i]);
×
126
  }
127

UNCOV
128
_exit:
×
UNCOV
129
  if (code) {
×
130
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
131
  }
UNCOV
132
  taosHashCleanup(writer->pColCmprObj);
×
UNCOV
133
  writer->pColCmprObj = NULL;
×
134

UNCOV
135
  return code;
×
136
}
137

UNCOV
138
int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
×
UNCOV
139
  int32_t code = 0;
×
UNCOV
140
  int32_t lino = 0;
×
141

UNCOV
142
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
×
UNCOV
143
  if (writer[0] == NULL) {
×
144
    return terrno;
×
145
  }
146

UNCOV
147
  writer[0]->config[0] = config[0];
×
148

149
  // data writer
UNCOV
150
  if (!config->toSttOnly) {
×
UNCOV
151
    SDataFileWriterConfig dataWriterConfig = {
×
UNCOV
152
        .tsdb = config->tsdb,
×
UNCOV
153
        .cmprAlg = config->cmprAlg,
×
UNCOV
154
        .maxRow = config->maxRow,
×
UNCOV
155
        .szPage = config->szPage,
×
UNCOV
156
        .fid = config->fid,
×
UNCOV
157
        .cid = config->cid,
×
158
        .did = config->did,
UNCOV
159
        .compactVersion = config->compactVersion,
×
UNCOV
160
        .skmTb = writer[0]->skmTb,
×
UNCOV
161
        .skmRow = writer[0]->skmRow,
×
UNCOV
162
        .lcn = config->lcn,
×
UNCOV
163
        .buffers = writer[0]->buffers,
×
164
    };
UNCOV
165
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
UNCOV
166
      dataWriterConfig.files[ftype].exist = config->files[ftype].exist;
×
UNCOV
167
      dataWriterConfig.files[ftype].file = config->files[ftype].file;
×
168
    }
169

UNCOV
170
    code = tsdbDataFileWriterOpen(&dataWriterConfig, &writer[0]->dataWriter);
×
UNCOV
171
    TSDB_CHECK_CODE(code, lino, _exit);
×
172
  }
173

174
  // stt writer
UNCOV
175
  SSttFileWriterConfig sttWriterConfig = {
×
UNCOV
176
      .tsdb = config->tsdb,
×
UNCOV
177
      .maxRow = config->maxRow,
×
UNCOV
178
      .szPage = config->szPage,
×
UNCOV
179
      .cmprAlg = config->cmprAlg,
×
UNCOV
180
      .compactVersion = config->compactVersion,
×
181
      .did = config->did,
UNCOV
182
      .fid = config->fid,
×
UNCOV
183
      .cid = config->cid,
×
UNCOV
184
      .level = config->level,
×
UNCOV
185
      .skmTb = writer[0]->skmTb,
×
UNCOV
186
      .skmRow = writer[0]->skmRow,
×
UNCOV
187
      .buffers = writer[0]->buffers,
×
188

189
  };
UNCOV
190
  code = tsdbSttFileWriterOpen(&sttWriterConfig, &writer[0]->sttWriter);
×
UNCOV
191
  TSDB_CHECK_CODE(code, lino, _exit);
×
192

UNCOV
193
_exit:
×
UNCOV
194
  if (code) {
×
195
    TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
×
196
  }
UNCOV
197
  return code;
×
198
}
199

UNCOV
200
int32_t tsdbFSetWriterClose(SFSetWriter **writer, bool abort, TFileOpArray *fopArr) {
×
UNCOV
201
  if (writer[0] == NULL) return 0;
×
202

UNCOV
203
  int32_t code = 0;
×
UNCOV
204
  int32_t lino = 0;
×
205

UNCOV
206
  STsdb *tsdb = writer[0]->config->tsdb;
×
207

208
  // end
UNCOV
209
  if (!writer[0]->config->toSttOnly) {
×
UNCOV
210
    code = tsdbFSetWriteTableDataEnd(writer[0]);
×
UNCOV
211
    TSDB_CHECK_CODE(code, lino, _exit);
×
212

UNCOV
213
    code = tsdbDataFileWriterClose(&writer[0]->dataWriter, abort, fopArr);
×
UNCOV
214
    TSDB_CHECK_CODE(code, lino, _exit);
×
215
  }
216

UNCOV
217
  code = tsdbSttFileWriterClose(&writer[0]->sttWriter, abort, fopArr);
×
UNCOV
218
  TSDB_CHECK_CODE(code, lino, _exit);
×
219

220
  // free
UNCOV
221
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->blockData); i++) {
×
UNCOV
222
    tBlockDataDestroy(&writer[0]->blockData[i]);
×
223
  }
UNCOV
224
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); i++) {
×
UNCOV
225
    tBufferDestroy(&writer[0]->buffers[i]);
×
226
  }
UNCOV
227
  tDestroyTSchema(writer[0]->skmRow->pTSchema);
×
UNCOV
228
  tDestroyTSchema(writer[0]->skmTb->pTSchema);
×
UNCOV
229
  taosMemoryFree(writer[0]);
×
UNCOV
230
  writer[0] = NULL;
×
231

UNCOV
232
_exit:
×
UNCOV
233
  if (code) {
×
234
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
235
  }
UNCOV
236
  return code;
×
237
}
238

UNCOV
239
int32_t tsdbFSetWriteRow(SFSetWriter *writer, SRowInfo *row) {
×
UNCOV
240
  int32_t code = 0;
×
UNCOV
241
  int32_t lino = 0;
×
242

UNCOV
243
  if (writer->config->toSttOnly) {
×
UNCOV
244
    code = tsdbSttFileWriteRow(writer->sttWriter, row);
×
UNCOV
245
    TSDB_CHECK_CODE(code, lino, _exit);
×
246
  } else {
UNCOV
247
    if (writer->ctx->tbid->uid != row->uid) {
×
UNCOV
248
      code = tsdbFSetWriteTableDataEnd(writer);
×
UNCOV
249
      TSDB_CHECK_CODE(code, lino, _exit);
×
250

UNCOV
251
      code = tsdbFSetWriteTableDataBegin(writer, (TABLEID *)row);
×
UNCOV
252
      TSDB_CHECK_CODE(code, lino, _exit);
×
253
    }
254

UNCOV
255
    if (row->row.type == TSDBROW_ROW_FMT) {
×
UNCOV
256
      code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(&row->row), writer->skmRow);
×
UNCOV
257
      TSDB_CHECK_CODE(code, lino, _exit);
×
258
    }
259

UNCOV
260
    if (TSDBROW_VERSION(&row->row) <= writer->config->compactVersion  //
×
UNCOV
261
        && writer->blockData[writer->blockDataIdx].nRow > 0           //
×
UNCOV
262
        && tsdbRowCompareWithoutVersion(&row->row,
×
UNCOV
263
                                        &tsdbRowFromBlockData(&writer->blockData[writer->blockDataIdx],
×
264
                                                              writer->blockData[writer->blockDataIdx].nRow - 1)) == 0) {
UNCOV
265
      code = tBlockDataUpdateRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema);
×
UNCOV
266
      TSDB_CHECK_CODE(code, lino, _exit);
×
267
    } else {
UNCOV
268
      if (writer->blockData[writer->blockDataIdx].nRow >= writer->config->maxRow) {
×
UNCOV
269
        int32_t idx = ((writer->blockDataIdx + 1) & 1);
×
UNCOV
270
        if (writer->blockData[idx].nRow >= writer->config->maxRow) {
×
UNCOV
271
          code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[idx]);
×
UNCOV
272
          TSDB_CHECK_CODE(code, lino, _exit);
×
273

UNCOV
274
          tBlockDataClear(&writer->blockData[idx]);
×
275
        }
UNCOV
276
        writer->blockDataIdx = idx;
×
277
      }
278

279
      code =
UNCOV
280
          tBlockDataAppendRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema, row->uid);
×
UNCOV
281
      TSDB_CHECK_CODE(code, lino, _exit);
×
282
    }
283
  }
284

UNCOV
285
_exit:
×
UNCOV
286
  if (code) {
×
287
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
288
  }
UNCOV
289
  return code;
×
290
}
291

UNCOV
292
int32_t tsdbFSetWriteTombRecord(SFSetWriter *writer, const STombRecord *tombRecord) {
×
UNCOV
293
  int32_t code = 0;
×
UNCOV
294
  int32_t lino = 0;
×
295

UNCOV
296
  if (writer->config->toSttOnly || tsdbSttFileWriterIsOpened(writer->sttWriter)) {
×
UNCOV
297
    code = tsdbSttFileWriteTombRecord(writer->sttWriter, tombRecord);
×
UNCOV
298
    TSDB_CHECK_CODE(code, lino, _exit);
×
299
  } else {
UNCOV
300
    code = tsdbDataFileWriteTombRecord(writer->dataWriter, tombRecord);
×
UNCOV
301
    TSDB_CHECK_CODE(code, lino, _exit);
×
302
  }
303

UNCOV
304
_exit:
×
UNCOV
305
  if (code) {
×
306
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
307
  }
UNCOV
308
  return code;
×
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc