• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.45
/source/dnode/vnode/src/tsdb/tsdbFSetRW.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbFSetRW.h"
17
#include "meta.h"
18

19
// SFSetWriter ==================================================
20
struct SFSetWriter {
21
  SFSetWriterConfig config[1];
22

23
  SSkmInfo skmTb[1];
24
  SSkmInfo skmRow[1];
25
  SBuffer  buffers[10];
26

27
  struct {
28
    TABLEID tbid[1];
29
  } ctx[1];
30

31
  // writer
32
  SBlockData       blockData[2];
33
  int32_t          blockDataIdx;
34
  SDataFileWriter *dataWriter;
35
  SSttFileWriter  *sttWriter;
36
  SHashObj        *pColCmprObj;
37
};
38

39
static int32_t tsdbFSetWriteTableDataBegin(SFSetWriter *writer, const TABLEID *tbid) {
12✔
40
  int32_t code = 0;
12✔
41
  int32_t lino = 0;
12✔
42

43
  writer->ctx->tbid->suid = tbid->suid;
12✔
44
  writer->ctx->tbid->uid = tbid->uid;
12✔
45

46
  code = tsdbUpdateSkmTb(writer->config->tsdb, writer->ctx->tbid, writer->skmTb);
12✔
47
  TSDB_CHECK_CODE(code, lino, _exit);
12!
48

49
  if (writer->pColCmprObj != NULL) {
12!
50
    taosHashCleanup(writer->pColCmprObj);
×
51
    writer->pColCmprObj = NULL;
×
52
  }
53
  code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, tbid->suid ? tbid->suid : tbid->uid, &writer->pColCmprObj);
12✔
54

55
  writer->blockDataIdx = 0;
12✔
56
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
36✔
57
    code = tBlockDataInit(&writer->blockData[i], writer->ctx->tbid, writer->skmTb->pTSchema, NULL, 0);
24✔
58
    TSDB_CHECK_CODE(code, lino, _exit);
24!
59
  }
60

61
_exit:
12✔
62
  if (code) {
12!
63
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
64
  }
65
  return code;
12✔
66
}
67

68
static int32_t tsdbFSetWriteTableDataEnd(SFSetWriter *writer) {
17✔
69
  if (writer->ctx->tbid->uid == 0) return 0;
17✔
70

71
  int32_t code = 0;
12✔
72
  int32_t lino = 0;
12✔
73

74
  int32_t cidx = writer->blockDataIdx;
12✔
75
  int32_t pidx = ((cidx + 1) & 1);
12✔
76
  int32_t numRow = ((writer->blockData[pidx].nRow + writer->blockData[cidx].nRow) >> 1);
12✔
77

78
  if (writer->blockData[pidx].nRow > 0 && numRow >= writer->config->minRow) {
12!
UNCOV
79
    SRowInfo row = {
×
UNCOV
80
        .suid = writer->ctx->tbid->suid,
×
UNCOV
81
        .uid = writer->ctx->tbid->uid,
×
UNCOV
82
        .row = tsdbRowFromBlockData(writer->blockData + pidx, 0),
×
83
    };
84

UNCOV
85
    for (int32_t i = 0; i < numRow; i++) {
×
UNCOV
86
      row.row.iRow = i;
×
87

UNCOV
88
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
UNCOV
89
      TSDB_CHECK_CODE(code, lino, _exit);
×
90
    }
91

UNCOV
92
    code = tsdbDataFileFlush(writer->dataWriter);
×
UNCOV
93
    TSDB_CHECK_CODE(code, lino, _exit);
×
94

UNCOV
95
    for (int32_t i = numRow; i < writer->blockData[pidx].nRow; i++) {
×
UNCOV
96
      row.row.iRow = i;
×
UNCOV
97
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
UNCOV
98
      TSDB_CHECK_CODE(code, lino, _exit);
×
99
    }
100

UNCOV
101
    row.row = tsdbRowFromBlockData(writer->blockData + cidx, 0);
×
UNCOV
102
    for (int32_t i = 0; i < writer->blockData[cidx].nRow; i++) {
×
UNCOV
103
      row.row.iRow = i;
×
UNCOV
104
      code = tsdbDataFileWriteRow(writer->dataWriter, &row);
×
UNCOV
105
      TSDB_CHECK_CODE(code, lino, _exit);
×
106
    }
107
  } else {
108
    // pidx
109
    if (writer->blockData[pidx].nRow > 0) {
12!
110
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[pidx]);
×
111
      TSDB_CHECK_CODE(code, lino, _exit);
×
112
    }
113

114
    // cidx
115
    if (writer->blockData[cidx].nRow < writer->config->minRow) {
12✔
116
      code = tsdbSttFileWriteBlockData(writer->sttWriter, &writer->blockData[cidx]);
5✔
117
      TSDB_CHECK_CODE(code, lino, _exit);
5!
118
    } else {
119
      code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[cidx]);
7✔
120
      TSDB_CHECK_CODE(code, lino, _exit);
7!
121
    }
122
  }
123

124
  for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
36✔
125
    tBlockDataReset(&writer->blockData[i]);
24✔
126
  }
127

128
_exit:
12✔
129
  if (code) {
12!
130
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
131
  }
132
  taosHashCleanup(writer->pColCmprObj);
12✔
133
  writer->pColCmprObj = NULL;
12✔
134

135
  return code;
12✔
136
}
137

138
int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
26✔
139
  int32_t code = 0;
26✔
140
  int32_t lino = 0;
26✔
141

142
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
26!
143
  if (writer[0] == NULL) {
26!
144
    return terrno;
×
145
  }
146

147
  writer[0]->config[0] = config[0];
26✔
148

149
  // data writer
150
  if (!config->toSttOnly) {
26✔
151
    SDataFileWriterConfig dataWriterConfig = {
5✔
152
        .tsdb = config->tsdb,
5✔
153
        .cmprAlg = config->cmprAlg,
5✔
154
        .maxRow = config->maxRow,
5✔
155
        .szPage = config->szPage,
5✔
156
        .fid = config->fid,
5✔
157
        .cid = config->cid,
5✔
158
        .did = config->did,
159
        .compactVersion = config->compactVersion,
5✔
160
        .skmTb = writer[0]->skmTb,
5✔
161
        .skmRow = writer[0]->skmRow,
5✔
162
        .lcn = config->lcn,
5✔
163
        .buffers = writer[0]->buffers,
5✔
164
    };
165
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
25✔
166
      dataWriterConfig.files[ftype].exist = config->files[ftype].exist;
20✔
167
      dataWriterConfig.files[ftype].file = config->files[ftype].file;
20✔
168
    }
169

170
    code = tsdbDataFileWriterOpen(&dataWriterConfig, &writer[0]->dataWriter);
5✔
171
    TSDB_CHECK_CODE(code, lino, _exit);
5!
172
  }
173

174
  // stt writer
175
  SSttFileWriterConfig sttWriterConfig = {
26✔
176
      .tsdb = config->tsdb,
26✔
177
      .maxRow = config->maxRow,
26✔
178
      .szPage = config->szPage,
26✔
179
      .cmprAlg = config->cmprAlg,
26✔
180
      .compactVersion = config->compactVersion,
26✔
181
      .did = config->did,
182
      .fid = config->fid,
26✔
183
      .cid = config->cid,
26✔
184
      .level = config->level,
26✔
185
      .skmTb = writer[0]->skmTb,
26✔
186
      .skmRow = writer[0]->skmRow,
26✔
187
      .buffers = writer[0]->buffers,
26✔
188

189
  };
190
  code = tsdbSttFileWriterOpen(&sttWriterConfig, &writer[0]->sttWriter);
26✔
191
  TSDB_CHECK_CODE(code, lino, _exit);
26!
192

193
_exit:
26✔
194
  if (code) {
26!
195
    TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
×
196
  }
197
  return code;
26✔
198
}
199

200
int32_t tsdbFSetWriterClose(SFSetWriter **writer, bool abort, TFileOpArray *fopArr) {
26✔
201
  if (writer[0] == NULL) return 0;
26!
202

203
  int32_t code = 0;
26✔
204
  int32_t lino = 0;
26✔
205

206
  STsdb *tsdb = writer[0]->config->tsdb;
26✔
207

208
  // end
209
  if (!writer[0]->config->toSttOnly) {
26✔
210
    code = tsdbFSetWriteTableDataEnd(writer[0]);
5✔
211
    TSDB_CHECK_CODE(code, lino, _exit);
5!
212

213
    code = tsdbDataFileWriterClose(&writer[0]->dataWriter, abort, fopArr);
5✔
214
    TSDB_CHECK_CODE(code, lino, _exit);
5!
215
  }
216

217
  code = tsdbSttFileWriterClose(&writer[0]->sttWriter, abort, fopArr);
26✔
218
  TSDB_CHECK_CODE(code, lino, _exit);
26!
219

220
  // free
221
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->blockData); i++) {
78✔
222
    tBlockDataDestroy(&writer[0]->blockData[i]);
52✔
223
  }
224
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); i++) {
286✔
225
    tBufferDestroy(&writer[0]->buffers[i]);
260✔
226
  }
227
  tDestroyTSchema(writer[0]->skmRow->pTSchema);
26!
228
  tDestroyTSchema(writer[0]->skmTb->pTSchema);
26!
229
  taosMemoryFree(writer[0]);
26!
230
  writer[0] = NULL;
26✔
231

232
_exit:
26✔
233
  if (code) {
26!
234
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
235
  }
236
  return code;
26✔
237
}
238

239
int32_t tsdbFSetWriteRow(SFSetWriter *writer, SRowInfo *row) {
13,148✔
240
  int32_t code = 0;
13,148✔
241
  int32_t lino = 0;
13,148✔
242

243
  if (writer->config->toSttOnly) {
13,148✔
244
    code = tsdbSttFileWriteRow(writer->sttWriter, row);
11,838✔
245
    TSDB_CHECK_CODE(code, lino, _exit);
11,875!
246
  } else {
247
    if (writer->ctx->tbid->uid != row->uid) {
1,310✔
248
      code = tsdbFSetWriteTableDataEnd(writer);
12✔
249
      TSDB_CHECK_CODE(code, lino, _exit);
12!
250

251
      code = tsdbFSetWriteTableDataBegin(writer, (TABLEID *)row);
12✔
252
      TSDB_CHECK_CODE(code, lino, _exit);
12!
253
    }
254

255
    if (row->row.type == TSDBROW_ROW_FMT) {
1,310!
UNCOV
256
      code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, TSDBROW_SVERSION(&row->row), writer->skmRow);
×
UNCOV
257
      TSDB_CHECK_CODE(code, lino, _exit);
×
258
    }
259

260
    if (TSDBROW_VERSION(&row->row) <= writer->config->compactVersion  //
1,310!
261
        && writer->blockData[writer->blockDataIdx].nRow > 0           //
1,303✔
262
        && tsdbRowCompareWithoutVersion(&row->row,
1,296✔
263
                                        &tsdbRowFromBlockData(&writer->blockData[writer->blockDataIdx],
1,291✔
264
                                                              writer->blockData[writer->blockDataIdx].nRow - 1)) == 0) {
265
      code = tBlockDataUpdateRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema);
49✔
266
      TSDB_CHECK_CODE(code, lino, _exit);
49!
267
    } else {
268
      if (writer->blockData[writer->blockDataIdx].nRow >= writer->config->maxRow) {
1,266!
UNCOV
269
        int32_t idx = ((writer->blockDataIdx + 1) & 1);
×
UNCOV
270
        if (writer->blockData[idx].nRow >= writer->config->maxRow) {
×
UNCOV
271
          code = tsdbDataFileWriteBlockData(writer->dataWriter, &writer->blockData[idx]);
×
UNCOV
272
          TSDB_CHECK_CODE(code, lino, _exit);
×
273

UNCOV
274
          tBlockDataClear(&writer->blockData[idx]);
×
275
        }
UNCOV
276
        writer->blockDataIdx = idx;
×
277
      }
278

279
      code =
280
          tBlockDataAppendRow(&writer->blockData[writer->blockDataIdx], &row->row, writer->skmRow->pTSchema, row->uid);
1,266✔
281
      TSDB_CHECK_CODE(code, lino, _exit);
1,241!
282
    }
283
  }
284

285
_exit:
13,165✔
286
  if (code) {
13,165!
287
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
288
  }
289
  return code;
13,166✔
290
}
291

UNCOV
292
int32_t tsdbFSetWriteTombRecord(SFSetWriter *writer, const STombRecord *tombRecord) {
×
UNCOV
293
  int32_t code = 0;
×
UNCOV
294
  int32_t lino = 0;
×
295

UNCOV
296
  if (writer->config->toSttOnly || tsdbSttFileWriterIsOpened(writer->sttWriter)) {
×
UNCOV
297
    code = tsdbSttFileWriteTombRecord(writer->sttWriter, tombRecord);
×
UNCOV
298
    TSDB_CHECK_CODE(code, lino, _exit);
×
299
  } else {
UNCOV
300
    code = tsdbDataFileWriteTombRecord(writer->dataWriter, tombRecord);
×
UNCOV
301
    TSDB_CHECK_CODE(code, lino, _exit);
×
302
  }
303

UNCOV
304
_exit:
×
UNCOV
305
  if (code) {
×
306
    TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
×
307
  }
UNCOV
308
  return code;
×
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc