• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3520

06 Nov 2024 11:19AM UTC coverage: 56.943% (-0.8%) from 57.706%
#3520

push

travis-ci

web-flow
Merge pull request #28535 from taosdata/fix/TD-30837

feat(stream):stream interp && twa

108576 of 248375 branches covered (43.71%)

Branch coverage included in aggregate %.

2353 of 3792 new or added lines in 40 files covered. (62.05%)

8730 existing lines in 213 files now uncovered.

188369 of 273100 relevant lines covered (68.97%)

2375614.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.59
/source/dnode/vnode/src/tsdb/tsdbMemTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "util/tsimplehash.h"
18

19
#define MEM_MIN_HASH 1024
20
#define SL_MAX_LEVEL 5
21

22
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
23
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
24
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
25
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
26
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
27
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
28
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
29
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
30

31
#define SL_MOVE_BACKWARD 0x1
32
#define SL_MOVE_FROM_POS 0x2
33

34
static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags);
35
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
36
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
37
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
38
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
39
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
40

41
static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
676,911✔
42
  STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn);
676,911✔
43
  STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn);
676,911✔
44
  if (tbData1->suid < tbData2->suid) return -1;
676,911✔
45
  if (tbData1->suid > tbData2->suid) return 1;
667,729✔
46
  if (tbData1->uid < tbData2->uid) return -1;
660,684✔
47
  if (tbData1->uid > tbData2->uid) return 1;
591,972!
48
  return 0;
×
49
}
50

51
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
8,757✔
52
  int32_t    code = 0;
8,757✔
53
  SMemTable *pMemTable = NULL;
8,757✔
54

55
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
8,757✔
56
  if (pMemTable == NULL) {
8,760!
57
    code = terrno;
×
58
    goto _err;
×
59
  }
60
  taosInitRWLatch(&pMemTable->latch);
8,760✔
61
  pMemTable->pTsdb = pTsdb;
8,760✔
62
  pMemTable->pPool = pTsdb->pVnode->inUse;
8,760✔
63
  pMemTable->nRef = 1;
8,760✔
64
  pMemTable->minVer = VERSION_MAX;
8,760✔
65
  pMemTable->maxVer = VERSION_MIN;
8,760✔
66
  pMemTable->minKey = TSKEY_MAX;
8,760✔
67
  pMemTable->maxKey = TSKEY_MIN;
8,760✔
68
  pMemTable->nRow = 0;
8,760✔
69
  pMemTable->nDel = 0;
8,760✔
70
  pMemTable->nTbData = 0;
8,760✔
71
  pMemTable->nBucket = MEM_MIN_HASH;
8,760✔
72
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
8,760✔
73
  if (pMemTable->aBucket == NULL) {
8,760!
74
    code = terrno;
×
75
    taosMemoryFree(pMemTable);
×
76
    goto _err;
×
77
  }
78
  vnodeBufPoolRef(pMemTable->pPool);
8,760✔
79
  tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn);
8,760✔
80

81
  *ppMemTable = pMemTable;
8,760✔
82
  return code;
8,760✔
83

84
_err:
×
85
  *ppMemTable = NULL;
×
86
  return code;
×
87
}
88

89
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
8,760✔
90
  if (pMemTable) {
8,760!
91
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
8,760✔
92
    taosMemoryFree(pMemTable->aBucket);
8,760✔
93
    taosMemoryFree(pMemTable);
8,760✔
94
  }
95
}
8,760✔
96

97
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
98
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
1,066,189✔
99

100
  while (pTbData) {
1,076,163✔
101
    if (pTbData->uid == uid) break;
1,025,283✔
102
    pTbData = pTbData->next;
9,974✔
103
  }
104

105
  return pTbData;
1,066,189✔
106
}
107

108
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
216,846✔
109
  STbData *pTbData;
110

111
  taosRLockLatch(&pMemTable->latch);
216,846✔
112
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
216,869✔
113
  taosRUnLockLatch(&pMemTable->latch);
216,869✔
114

115
  return pTbData;
216,871✔
116
}
117

118
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
846,541✔
119
  int32_t    code = 0;
846,541✔
120
  SMemTable *pMemTable = pTsdb->mem;
846,541✔
121
  STbData   *pTbData = NULL;
846,541✔
122
  tb_uid_t   suid = pSubmitTbData->suid;
846,541✔
123
  tb_uid_t   uid = pSubmitTbData->uid;
846,541✔
124

125
  // create/get STbData to op
126
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
846,541✔
127
  if (code) {
846,571!
128
    goto _err;
×
129
  }
130

131
  // do insert impl
132
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
846,571!
UNCOV
133
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
×
134
  } else {
135
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
846,571✔
136
  }
137
  if (code) goto _err;
846,563!
138

139
  // update
140
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
846,563✔
141
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
846,563✔
142

143
  return code;
846,563✔
144

145
_err:
×
146
  terrno = code;
×
147
  return code;
×
148
}
149

150
int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
2,784✔
151
  int32_t    code = 0;
2,784✔
152
  SMemTable *pMemTable = pTsdb->mem;
2,784✔
153
  STbData   *pTbData = NULL;
2,784✔
154
  SVBufPool *pPool = pTsdb->pVnode->inUse;
2,784✔
155

156
  // check if table exists
157
  SMetaInfo info;
158
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
2,784✔
159
  if (code) {
2,784!
160
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
161
    goto _err;
×
162
  }
163
  if (info.suid != suid) {
2,784!
164
    code = TSDB_CODE_INVALID_MSG;
×
165
    goto _err;
×
166
  }
167

168
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
2,784✔
169
  if (code) {
2,784!
170
    goto _err;
×
171
  }
172

173
  // do delete
174
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
2,784✔
175
  if (pDelData == NULL) {
2,784!
176
    code = terrno;
×
177
    goto _err;
×
178
  }
179
  pDelData->version = version;
2,784✔
180
  pDelData->sKey = sKey;
2,784✔
181
  pDelData->eKey = eKey;
2,784✔
182
  pDelData->pNext = NULL;
2,784✔
183
  taosWLockLatch(&pTbData->lock);
2,784✔
184
  if (pTbData->pHead == NULL) {
2,784✔
185
    pTbData->pHead = pTbData->pTail = pDelData;
2,064✔
186
  } else {
187
    pTbData->pTail->pNext = pDelData;
720✔
188
    pTbData->pTail = pDelData;
720✔
189
  }
190
  taosWUnLockLatch(&pTbData->lock);
2,784✔
191

192
  pMemTable->nDel++;
2,784✔
193
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
2,784✔
194
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
2,784✔
195

196
  if (tsdbCacheDel(pTsdb, suid, uid, sKey, eKey) != 0) {
2,784!
197
    tsdbError("vgId:%d, failed to delete cache data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64
×
198
              " eKey:%" PRId64 " at version %" PRId64,
199
              TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
200
  }
201

202
  tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
2,784!
203
            " at version %" PRId64,
204
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
205
  return code;
2,784✔
206

207
_err:
×
208
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
×
209
            " at version %" PRId64 " since %s",
210
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
211
  return code;
×
212
}
213

214
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter) {
199,673✔
215
  int32_t code = 0;
199,673✔
216

217
  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
199,673✔
218
  if ((*ppIter) == NULL) {
199,673!
219
    code = terrno;
×
220
    goto _exit;
×
221
  }
222

223
  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);
199,673✔
224

225
_exit:
199,648✔
226
  return code;
199,648✔
227
}
228

229
void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
199,420✔
230
  if (pIter) {
199,420!
231
    taosMemoryFree(pIter);
199,424✔
232
  }
233
  return NULL;
199,609✔
234
}
235

236
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter) {
300,505✔
237
  SMemSkipListNode *pos[SL_MAX_LEVEL];
238
  SMemSkipListNode *pHead;
239
  SMemSkipListNode *pTail;
240

241
  pHead = pTbData->sl.pHead;
300,505✔
242
  pTail = pTbData->sl.pTail;
300,505✔
243
  pIter->pTbData = pTbData;
300,505✔
244
  pIter->backward = backward;
300,505✔
245
  pIter->pRow = NULL;
300,505✔
246
  if (pFrom == NULL) {
300,505✔
247
    // create from head or tail
248
    if (backward) {
46!
249
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
46✔
250
    } else {
251
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
×
252
    }
253
  } else {
254
    // create from a key
255
    if (backward) {
300,459✔
256
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
6,953✔
257
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
6,952✔
258
    } else {
259
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
293,506✔
260
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
292,802✔
261
    }
262
  }
263
}
297,963✔
264

265
bool tsdbTbDataIterNext(STbDataIter *pIter) {
155,646,893✔
266
  pIter->pRow = NULL;
155,646,893✔
267
  if (pIter->backward) {
155,646,893✔
268
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
420,253!
269
      return false;
×
270
    }
271

272
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
420,253✔
273
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
419,764✔
274
      return false;
6,154✔
275
    }
276
  } else {
277
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
155,226,640!
278
      return false;
×
279
    }
280

281
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
155,226,640✔
282
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
155,039,664✔
283
      return false;
216,949✔
284
    }
285
  }
286

287
  return true;
155,236,325✔
288
}
289

290
int64_t tsdbCountTbDataRows(STbData *pTbData) {
×
291
  SMemSkipListNode *pNode = pTbData->sl.pHead;
×
292
  int64_t           rowsNum = 0;
×
293

294
  while (NULL != pNode) {
×
295
    pNode = SL_GET_NODE_FORWARD(pNode, 0);
×
296
    if (pNode == pTbData->sl.pTail) {
×
297
      return rowsNum;
×
298
    }
299

300
    rowsNum++;
×
301
  }
302

303
  return rowsNum;
×
304
}
305

306
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
×
307
  taosRLockLatch(&pMemTable->latch);
×
308
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
×
309
    STbData *pTbData = pMemTable->aBucket[i];
×
310
    while (pTbData) {
×
311
      void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
×
312
      if (p == NULL) {
×
313
        pTbData = pTbData->next;
×
314
        continue;
×
315
      }
316

317
      *rowsNum += tsdbCountTbDataRows(pTbData);
×
318
      pTbData = pTbData->next;
×
319
    }
320
  }
321
  taosRUnLockLatch(&pMemTable->latch);
×
322
}
×
323

324
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
8✔
325
  int32_t code = 0;
8✔
326

327
  int32_t   nBucket = pMemTable->nBucket * 2;
8✔
328
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
8✔
329
  if (aBucket == NULL) {
8!
330
    code = terrno;
×
331
    goto _exit;
×
332
  }
333

334
  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
30,728✔
335
    STbData *pTbData = pMemTable->aBucket[iBucket];
30,720✔
336

337
    while (pTbData) {
61,440✔
338
      STbData *pNext = pTbData->next;
30,720✔
339

340
      int32_t idx = TABS(pTbData->uid) % nBucket;
30,720✔
341
      pTbData->next = aBucket[idx];
30,720✔
342
      aBucket[idx] = pTbData;
30,720✔
343

344
      pTbData = pNext;
30,720✔
345
    }
346
  }
347

348
  taosMemoryFree(pMemTable->aBucket);
8✔
349
  pMemTable->nBucket = nBucket;
8✔
350
  pMemTable->aBucket = aBucket;
8✔
351

352
_exit:
8✔
353
  return code;
8✔
354
}
355

356
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
849,320✔
357
  int32_t code = 0;
849,320✔
358

359
  // get
360
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
849,320✔
361
  if (pTbData) goto _exit;
849,320✔
362

363
  // create
364
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
41,477✔
365
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
41,477✔
366

367
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
41,477✔
368
  if (pTbData == NULL) {
41,521!
369
    code = terrno;
×
370
    goto _exit;
×
371
  }
372
  pTbData->suid = suid;
41,521✔
373
  pTbData->uid = uid;
41,521✔
374
  pTbData->minKey = TSKEY_MAX;
41,521✔
375
  pTbData->maxKey = TSKEY_MIN;
41,521✔
376
  pTbData->pHead = NULL;
41,521✔
377
  pTbData->pTail = NULL;
41,521✔
378
  pTbData->sl.seed = taosRand();
41,521✔
379
  pTbData->sl.size = 0;
41,521✔
380
  pTbData->sl.maxLevel = maxLevel;
41,521✔
381
  pTbData->sl.level = 0;
41,521✔
382
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
41,521✔
383
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
41,521✔
384
  pTbData->sl.pHead->level = maxLevel;
41,521✔
385
  pTbData->sl.pTail->level = maxLevel;
41,521✔
386
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
249,126✔
387
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
207,605✔
388
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;
207,605✔
389

390
    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
207,605✔
391
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
207,605✔
392
  }
393
  taosInitRWLatch(&pTbData->lock);
41,521✔
394

395
  taosWLockLatch(&pMemTable->latch);
41,521✔
396

397
  if (pMemTable->nTbData >= pMemTable->nBucket) {
41,521✔
398
    code = tsdbMemTableRehash(pMemTable);
8✔
399
    if (code) {
8!
400
      taosWUnLockLatch(&pMemTable->latch);
×
401
      goto _exit;
×
402
    }
403
  }
404

405
  int32_t idx = TABS(uid) % pMemTable->nBucket;
41,521✔
406
  pTbData->next = pMemTable->aBucket[idx];
41,521✔
407
  pMemTable->aBucket[idx] = pTbData;
41,521✔
408
  pMemTable->nTbData++;
41,521✔
409

410
  if (tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn) == NULL) {
41,521!
411
    taosWUnLockLatch(&pMemTable->latch);
×
412
    code = TSDB_CODE_INTERNAL_ERROR;
×
413
    goto _exit;
×
414
  }
415

416
  taosWUnLockLatch(&pMemTable->latch);
41,521✔
417

418
_exit:
849,363✔
419
  if (code) {
849,363!
420
    *ppTbData = NULL;
×
421
  } else {
422
    *ppTbData = pTbData;
849,363✔
423
  }
424
  return code;
849,363✔
425
}
426

427
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags) {
1,163,136✔
428
  SMemSkipListNode *px;
429
  SMemSkipListNode *pn;
430
  STsdbRowKey       tKey;
431
  int32_t           backward = flags & SL_MOVE_BACKWARD;
1,163,136✔
432
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
1,163,136✔
433

434
  if (backward) {
1,163,136✔
435
    px = pTbData->sl.pTail;
853,508✔
436

437
    if (!fromPos) {
853,508✔
438
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
1,448,850✔
439
        pos[iLevel] = px;
595,344✔
440
      }
441
    }
442

443
    if (pTbData->sl.level) {
853,508✔
444
      if (fromPos) px = pos[pTbData->sl.level - 1];
812,069!
445

446
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
4,483,090✔
447
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
3,671,094✔
448
        while (pn != pTbData->sl.pHead) {
3,737,978✔
449
          tsdbRowGetKey(&pn->row, &tKey);
3,728,682✔
450

451
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
3,728,701✔
452
          if (c <= 0) {
3,728,658✔
453
            break;
3,661,725✔
454
          } else {
455
            px = pn;
66,933✔
456
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
66,933✔
457
          }
458
        }
459

460
        pos[iLevel] = px;
3,671,021✔
461
      }
462
    }
463
  } else {
464
    px = pTbData->sl.pHead;
309,628✔
465

466
    if (!fromPos) {
309,628✔
467
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
1,213,587✔
468
        pos[iLevel] = px;
919,610✔
469
      }
470
    }
471

472
    if (pTbData->sl.level) {
309,628!
473
      if (fromPos) px = pos[pTbData->sl.level - 1];
309,642✔
474

475
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
930,972✔
476
        pn = SL_GET_NODE_FORWARD(px, iLevel);
623,010✔
477
        while (pn != pTbData->sl.pTail) {
1,030,570✔
478
          tsdbRowGetKey(&pn->row, &tKey);
981,245✔
479

480
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
981,120✔
481
          if (c >= 0) {
985,183✔
482
            break;
572,005✔
483
          } else {
484
            px = pn;
413,178✔
485
            pn = SL_GET_NODE_FORWARD(px, iLevel);
413,178✔
486
          }
487
        }
488

489
        pos[iLevel] = px;
621,330✔
490
      }
491
    }
492
  }
493
}
1,161,383✔
494

495
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
496
  int8_t level = 1;
13,370,384✔
497
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
13,370,384✔
498

499
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
17,784,759✔
500
    level++;
4,414,375✔
501
  }
502

503
  return level;
13,349,142✔
504
}
505
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
13,370,384✔
506
                           int8_t forward) {
507
  int32_t           code = 0;
13,370,384✔
508
  int8_t            level;
509
  SMemSkipListNode *pNode = NULL;
13,370,384✔
510
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
13,370,384✔
511
  int64_t           nSize;
512

513
  // create node
514
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
13,370,384✔
515
  nSize = SL_NODE_SIZE(level);
13,349,142✔
516
  if (pRow->type == TSDBROW_ROW_FMT) {
13,349,142!
517
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
13,355,001✔
518
  } else if (pRow->type == TSDBROW_COL_FMT) {
×
UNCOV
519
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
×
520
  }
521
  if (pNode == NULL) {
13,352,434!
522
    code = terrno;
×
523
    goto _exit;
×
524
  }
525

526
  pNode->level = level;
13,352,434✔
527
  pNode->row = *pRow;
13,352,434✔
528
  if (pRow->type == TSDBROW_ROW_FMT) {
13,352,434✔
529
    pNode->row.pTSRow = (SRow *)((char *)pNode + nSize);
13,351,936✔
530
    memcpy(pNode->row.pTSRow, pRow->pTSRow, pRow->pTSRow->len);
13,351,936✔
531
  }
532

533
  // set node
534
  if (forward) {
13,352,434✔
535
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
29,148,933✔
536
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
16,644,174✔
537
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
16,644,174✔
538
    }
539
  } else {
540
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
1,959,300✔
541
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
1,111,625✔
542
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
1,111,625✔
543
    }
544
  }
545

546
  // set forward and backward
547
  if (forward) {
13,352,434✔
548
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
29,178,176✔
549
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
16,648,611✔
550

551
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
16,648,611✔
552
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
16,668,962✔
553

554
      pos[iLevel] = pNode;
16,672,052✔
555
    }
556
  } else {
557
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
1,953,634✔
558
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
1,111,621✔
559

560
      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
1,111,621✔
561
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
1,111,772✔
562

563
      pos[iLevel] = pNode;
1,107,324✔
564
    }
565
  }
566

567
  pTbData->sl.size++;
13,371,578✔
568
  if (pTbData->sl.level < pNode->level) {
13,371,578✔
569
    pTbData->sl.level = pNode->level;
83,892✔
570
  }
571

572
_exit:
13,287,686✔
573
  return code;
13,371,578✔
574
}
575

UNCOV
576
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
×
577
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
UNCOV
578
  int32_t code = 0;
×
579

UNCOV
580
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
×
UNCOV
581
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
×
UNCOV
582
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
×
583

584
  // copy and construct block data
UNCOV
585
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
×
UNCOV
586
  if (pBlockData == NULL) {
×
587
    code = terrno;
×
588
    goto _exit;
×
589
  }
590

UNCOV
591
  pBlockData->suid = pTbData->suid;
×
UNCOV
592
  pBlockData->uid = pTbData->uid;
×
UNCOV
593
  pBlockData->nRow = aColData[0].nVal;
×
UNCOV
594
  pBlockData->aUid = NULL;
×
UNCOV
595
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
×
UNCOV
596
  if (pBlockData->aVersion == NULL) {
×
597
    code = terrno;
×
598
    goto _exit;
×
599
  }
UNCOV
600
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
×
UNCOV
601
    pBlockData->aVersion[i] = version;
×
602
  }
603

UNCOV
604
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
×
UNCOV
605
  if (pBlockData->aTSKEY == NULL) {
×
606
    code = terrno;
×
607
    goto _exit;
×
608
  }
UNCOV
609
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);
×
610

UNCOV
611
  pBlockData->nColData = nColData - 1;
×
UNCOV
612
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
×
UNCOV
613
  if (pBlockData->aColData == NULL) {
×
614
    code = terrno;
×
615
    goto _exit;
×
616
  }
617

UNCOV
618
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
×
UNCOV
619
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
×
UNCOV
620
    if (code) goto _exit;
×
621
  }
622

623
  // loop to add each row to the skiplist
624
  SMemSkipListNode *pos[SL_MAX_LEVEL];
UNCOV
625
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
×
626
  STsdbRowKey       key;
627

628
  // first row
UNCOV
629
  tsdbRowGetKey(&tRow, &key);
×
UNCOV
630
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
×
UNCOV
631
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
×
UNCOV
632
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
×
633

634
  // remain row
UNCOV
635
  ++tRow.iRow;
×
UNCOV
636
  if (tRow.iRow < pBlockData->nRow) {
×
637
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
×
638
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
×
639
    }
640

641
    while (tRow.iRow < pBlockData->nRow) {
×
642
      tsdbRowGetKey(&tRow, &key);
×
643

644
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
×
645
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
×
646
      }
647

648
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
×
649

650
      ++tRow.iRow;
×
651
    }
652
  }
653

UNCOV
654
  if (key.key.ts >= pTbData->maxKey) {
×
UNCOV
655
    pTbData->maxKey = key.key.ts;
×
656
  }
657

UNCOV
658
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
×
659
    if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
×
660
      tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
×
661
                TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
662
    }
663
  }
664

665
  // SMemTable
UNCOV
666
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
×
UNCOV
667
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
×
UNCOV
668
  pMemTable->nRow += pBlockData->nRow;
×
669

UNCOV
670
  if (affectedRows) *affectedRows = pBlockData->nRow;
×
671

672
_exit:
×
UNCOV
673
  return code;
×
674
}
675

676
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
846,555✔
677
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
678
  int32_t code = 0;
846,555✔
679

680
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
846,555✔
681
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
846,555✔
682
  STsdbRowKey       key;
683
  SMemSkipListNode *pos[SL_MAX_LEVEL];
684
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
846,555✔
685
  int32_t           iRow = 0;
846,555✔
686

687
  // backward put first data
688
  tRow.pTSRow = aRow[iRow++];
846,555✔
689
  tsdbRowGetKey(&tRow, &key);
846,555✔
690
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
846,545✔
691
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
846,400✔
692
  if (code) goto _exit;
846,581!
693

694
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
846,581✔
695

696
  // forward put rest data
697
  if (iRow < nRow) {
846,581✔
698
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
276,014✔
699
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
217,632✔
700
    }
701

702
    while (iRow < nRow) {
12,583,166✔
703
      tRow.pTSRow = aRow[iRow];
12,524,775✔
704
      tsdbRowGetKey(&tRow, &key);
12,524,775✔
705

706
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
12,524,344✔
707
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
15,692✔
708
      }
709

710
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
12,524,344✔
711
      if (code) goto _exit;
12,524,784!
712

713
      iRow++;
12,524,784✔
714
    }
715
  }
716

717
  if (key.key.ts >= pTbData->maxKey) {
846,590✔
718
    pTbData->maxKey = key.key.ts;
837,295✔
719
  }
720
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
846,590✔
721
    TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
8,834✔
722
  }
723

724
  // SMemTable
725
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
846,570✔
726
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
846,570✔
727
  pMemTable->nRow += nRow;
846,570✔
728

729
  if (affectedRows) *affectedRows = nRow;
846,570!
730

731
_exit:
×
732
  return code;
846,570✔
733
}
734

735
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
2✔
736

737
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
50,181✔
738
  int32_t code = 0;
50,181✔
739

740
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
50,181✔
741
  if (nRef <= 0) {
50,187!
742
    tsdbError("vgId:%d, memtable ref count is invalid, ref:%d", TD_VID(pMemTable->pTsdb->pVnode), nRef);
×
743
  }
744

745
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
50,187✔
746

747
_exit:
50,187✔
748
  return code;
50,187✔
749
}
750

751
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
54,616✔
752
  if (pNode) {
54,616✔
753
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
50,186✔
754
  }
755

756
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
54,617✔
757
    tsdbMemTableDestroy(pMemTable, proactive);
4,430✔
758
  }
759
}
54,617✔
760

761
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
762
  STbData *pTbData1 = *(STbData **)p1;
763
  STbData *pTbData2 = *(STbData **)p2;
764

765
  if (pTbData1->suid < pTbData2->suid) {
766
    return -1;
767
  } else if (pTbData1->suid > pTbData2->suid) {
768
    return 1;
769
  }
770

771
  if (pTbData1->uid < pTbData2->uid) {
772
    return -1;
773
  } else if (pTbData1->uid > pTbData2->uid) {
774
    return 1;
775
  }
776

777
  return 0;
778
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc