• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.12
/source/dnode/vnode/src/tsdb/tsdbMemTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "util/tsimplehash.h"
18

19
#define MEM_MIN_HASH 1024
20
#define SL_MAX_LEVEL 5
21

22
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
23
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
24
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
25
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
26
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
27
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
28
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
29
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
30

31
#define SL_MOVE_BACKWARD 0x1
32
#define SL_MOVE_FROM_POS 0x2
33

34
static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags);
35
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
36
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
37
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
38
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
39
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
40

41
static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
115,099✔
42
  STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn);
115,099✔
43
  STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn);
115,099✔
44
  if (tbData1->suid < tbData2->suid) return -1;
115,099✔
45
  if (tbData1->suid > tbData2->suid) return 1;
106,577✔
46
  if (tbData1->uid < tbData2->uid) return -1;
95,697✔
47
  if (tbData1->uid > tbData2->uid) return 1;
70,995!
48
  return 0;
×
49
}
50

51
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
7,221✔
52
  int32_t    code = 0;
7,221✔
53
  SMemTable *pMemTable = NULL;
7,221✔
54

55
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
7,221✔
56
  if (pMemTable == NULL) {
7,223!
57
    code = terrno;
×
58
    goto _err;
×
59
  }
60
  taosInitRWLatch(&pMemTable->latch);
7,223✔
61
  pMemTable->pTsdb = pTsdb;
7,223✔
62
  pMemTable->pPool = pTsdb->pVnode->inUse;
7,223✔
63
  pMemTable->nRef = 1;
7,223✔
64
  pMemTable->minVer = VERSION_MAX;
7,223✔
65
  pMemTable->maxVer = VERSION_MIN;
7,223✔
66
  pMemTable->minKey = TSKEY_MAX;
7,223✔
67
  pMemTable->maxKey = TSKEY_MIN;
7,223✔
68
  pMemTable->nRow = 0;
7,223✔
69
  pMemTable->nDel = 0;
7,223✔
70
  pMemTable->nTbData = 0;
7,223✔
71
  pMemTable->nBucket = MEM_MIN_HASH;
7,223✔
72
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
7,223✔
73
  if (pMemTable->aBucket == NULL) {
7,224!
74
    code = terrno;
×
75
    taosMemoryFree(pMemTable);
×
76
    goto _err;
×
77
  }
78
  vnodeBufPoolRef(pMemTable->pPool);
7,224✔
79
  tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn);
7,223✔
80

81
  *ppMemTable = pMemTable;
7,223✔
82
  return code;
7,223✔
83

84
_err:
×
85
  *ppMemTable = NULL;
×
86
  return code;
×
87
}
88

89
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
7,224✔
90
  if (pMemTable) {
7,224!
91
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
7,224✔
92
    taosMemoryFree(pMemTable->aBucket);
7,224✔
93
    taosMemoryFree(pMemTable);
7,224✔
94
  }
95
}
7,224✔
96

97
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
98
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
4,720,575✔
99

100
  while (pTbData) {
4,728,836✔
101
    if (pTbData->uid == uid) break;
4,453,280✔
102
    pTbData = pTbData->next;
8,261✔
103
  }
104

105
  return pTbData;
4,720,575✔
106
}
107

108
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
445,649✔
109
  STbData *pTbData;
110

111
  taosRLockLatch(&pMemTable->latch);
445,649✔
112
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
445,822✔
113
  taosRUnLockLatch(&pMemTable->latch);
445,822✔
114

115
  return pTbData;
445,826✔
116
}
117

118
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
4,272,719✔
119
  int32_t    code = 0;
4,272,719✔
120
  SMemTable *pMemTable = pTsdb->mem;
4,272,719✔
121
  STbData   *pTbData = NULL;
4,272,719✔
122
  tb_uid_t   suid = pSubmitTbData->suid;
4,272,719✔
123
  tb_uid_t   uid = pSubmitTbData->uid;
4,272,719✔
124

125
  // create/get STbData to op
126
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
4,272,719✔
127
  if (code) {
4,272,857!
128
    goto _err;
×
129
  }
130

131
  // do insert impl
132
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
4,272,857✔
133
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
7✔
134
  } else {
135
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
4,272,850✔
136
  }
137
  if (code) goto _err;
4,272,862!
138

139
  // update
140
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
4,272,862✔
141
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
4,272,862✔
142

143
  return code;
4,272,862✔
144

145
_err:
×
146
  terrno = code;
×
147
  return code;
×
148
}
149

150
int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
2,031✔
151
  int32_t    code = 0;
2,031✔
152
  SMemTable *pMemTable = pTsdb->mem;
2,031✔
153
  STbData   *pTbData = NULL;
2,031✔
154
  SVBufPool *pPool = pTsdb->pVnode->inUse;
2,031✔
155

156
  // check if table exists
157
  SMetaInfo info;
158
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
2,031✔
159
  if (code) {
2,033!
160
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
161
    goto _err;
×
162
  }
163
  if (info.suid != suid) {
2,033!
164
    code = TSDB_CODE_INVALID_MSG;
×
165
    goto _err;
×
166
  }
167

168
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
2,033✔
169
  if (code) {
2,033!
170
    goto _err;
×
171
  }
172

173
  // do delete
174
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
2,033✔
175
  if (pDelData == NULL) {
2,031!
176
    code = terrno;
×
177
    goto _err;
×
178
  }
179
  pDelData->version = version;
2,031✔
180
  pDelData->sKey = sKey;
2,031✔
181
  pDelData->eKey = eKey;
2,031✔
182
  pDelData->pNext = NULL;
2,031✔
183
  taosWLockLatch(&pTbData->lock);
2,031✔
184
  if (pTbData->pHead == NULL) {
2,030✔
185
    pTbData->pHead = pTbData->pTail = pDelData;
720✔
186
  } else {
187
    pTbData->pTail->pNext = pDelData;
1,310✔
188
    pTbData->pTail = pDelData;
1,310✔
189
  }
190
  taosWUnLockLatch(&pTbData->lock);
2,030✔
191

192
  pMemTable->nDel++;
2,033✔
193
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
2,033✔
194
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
2,033✔
195

196
  if (tsdbCacheDel(pTsdb, suid, uid, sKey, eKey) != 0) {
2,033!
197
    tsdbError("vgId:%d, failed to delete cache data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64
×
198
              " eKey:%" PRId64 " at version %" PRId64,
199
              TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
200
  }
201

202
  tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
2,033!
203
            " at version %" PRId64,
204
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
205
  return code;
2,033✔
206

207
_err:
×
208
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
×
209
            " at version %" PRId64 " since %s",
210
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
211
  return code;
×
212
}
213

214
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter) {
161,199✔
215
  int32_t code = 0;
161,199✔
216

217
  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
161,199✔
218
  if ((*ppIter) == NULL) {
161,229✔
219
    code = terrno;
6✔
220
    goto _exit;
6✔
221
  }
222

223
  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);
161,223✔
224

225
_exit:
161,197✔
226
  return code;
161,197✔
227
}
228

229
void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
161,079✔
230
  if (pIter) {
161,079!
231
    taosMemoryFree(pIter);
161,084✔
232
  }
233
  return NULL;
161,203✔
234
}
235

236
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter) {
200,652✔
237
  SMemSkipListNode *pos[SL_MAX_LEVEL];
238
  SMemSkipListNode *pHead;
239
  SMemSkipListNode *pTail;
240

241
  pHead = pTbData->sl.pHead;
200,652✔
242
  pTail = pTbData->sl.pTail;
200,652✔
243
  pIter->pTbData = pTbData;
200,652✔
244
  pIter->backward = backward;
200,652✔
245
  pIter->pRow = NULL;
200,652✔
246
  if (pFrom == NULL) {
200,652✔
247
    // create from head or tail
248
    if (backward) {
48!
249
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
48✔
250
    } else {
251
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
×
252
    }
253
  } else {
254
    // create from a key
255
    if (backward) {
200,604✔
256
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
12,071✔
257
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
12,070✔
258
    } else {
259
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
188,533✔
260
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
188,276✔
261
    }
262
  }
263
}
200,302✔
264

265
bool tsdbTbDataIterNext(STbDataIter *pIter) {
74,477,553✔
266
  pIter->pRow = NULL;
74,477,553✔
267
  if (pIter->backward) {
74,477,553✔
268
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
1,887,331!
269
      return false;
×
270
    }
271

272
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
1,887,331✔
273
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
1,886,578✔
274
      return false;
8,617✔
275
    }
276
  } else {
277
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
72,590,222!
278
      return false;
×
279
    }
280

281
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
72,590,222✔
282
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
72,579,300✔
283
      return false;
140,908✔
284
    }
285
  }
286

287
  return true;
74,316,353✔
288
}
289

290
int64_t tsdbCountTbDataRows(STbData *pTbData) {
×
291
  SMemSkipListNode *pNode = pTbData->sl.pHead;
×
292
  int64_t           rowsNum = 0;
×
293

294
  while (NULL != pNode) {
×
295
    pNode = SL_GET_NODE_FORWARD(pNode, 0);
×
296
    if (pNode == pTbData->sl.pTail) {
×
297
      return rowsNum;
×
298
    }
299

300
    rowsNum++;
×
301
  }
302

303
  return rowsNum;
×
304
}
305

306
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
×
307
  taosRLockLatch(&pMemTable->latch);
×
308
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
×
309
    STbData *pTbData = pMemTable->aBucket[i];
×
310
    while (pTbData) {
×
311
      void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
×
312
      if (p == NULL) {
×
313
        pTbData = pTbData->next;
×
314
        continue;
×
315
      }
316

317
      *rowsNum += tsdbCountTbDataRows(pTbData);
×
318
      pTbData = pTbData->next;
×
319
    }
320
  }
321
  taosRUnLockLatch(&pMemTable->latch);
×
322
}
×
323

UNCOV
324
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
×
UNCOV
325
  int32_t code = 0;
×
326

UNCOV
327
  int32_t   nBucket = pMemTable->nBucket * 2;
×
UNCOV
328
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
×
UNCOV
329
  if (aBucket == NULL) {
×
330
    code = terrno;
×
331
    goto _exit;
×
332
  }
333

UNCOV
334
  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
×
UNCOV
335
    STbData *pTbData = pMemTable->aBucket[iBucket];
×
336

UNCOV
337
    while (pTbData) {
×
UNCOV
338
      STbData *pNext = pTbData->next;
×
339

UNCOV
340
      int32_t idx = TABS(pTbData->uid) % nBucket;
×
UNCOV
341
      pTbData->next = aBucket[idx];
×
UNCOV
342
      aBucket[idx] = pTbData;
×
343

UNCOV
344
      pTbData = pNext;
×
345
    }
346
  }
347

UNCOV
348
  taosMemoryFree(pMemTable->aBucket);
×
UNCOV
349
  pMemTable->nBucket = nBucket;
×
UNCOV
350
  pMemTable->aBucket = aBucket;
×
351

UNCOV
352
_exit:
×
UNCOV
353
  return code;
×
354
}
355

356
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
4,274,753✔
357
  int32_t code = 0;
4,274,753✔
358

359
  // get
360
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
4,274,753✔
361
  if (pTbData) goto _exit;
4,274,753✔
362

363
  // create
364
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
15,143✔
365
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
15,143✔
366

367
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
15,143✔
368
  if (pTbData == NULL) {
15,319!
369
    code = terrno;
×
370
    goto _exit;
×
371
  }
372
  pTbData->suid = suid;
15,319✔
373
  pTbData->uid = uid;
15,319✔
374
  pTbData->minKey = TSKEY_MAX;
15,319✔
375
  pTbData->maxKey = TSKEY_MIN;
15,319✔
376
  pTbData->pHead = NULL;
15,319✔
377
  pTbData->pTail = NULL;
15,319✔
378
  pTbData->sl.seed = taosRand();
15,319✔
379
  pTbData->sl.size = 0;
15,322✔
380
  pTbData->sl.maxLevel = maxLevel;
15,322✔
381
  pTbData->sl.level = 0;
15,322✔
382
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
15,322✔
383
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
15,322✔
384
  pTbData->sl.pHead->level = maxLevel;
15,322✔
385
  pTbData->sl.pTail->level = maxLevel;
15,322✔
386
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
91,923✔
387
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
76,601✔
388
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;
76,601✔
389

390
    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
76,601✔
391
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
76,601✔
392
  }
393
  taosInitRWLatch(&pTbData->lock);
15,322✔
394

395
  taosWLockLatch(&pMemTable->latch);
15,321✔
396

397
  if (pMemTable->nTbData >= pMemTable->nBucket) {
15,322!
UNCOV
398
    code = tsdbMemTableRehash(pMemTable);
×
UNCOV
399
    if (code) {
×
400
      taosWUnLockLatch(&pMemTable->latch);
×
401
      goto _exit;
×
402
    }
403
  }
404

405
  int32_t idx = TABS(uid) % pMemTable->nBucket;
15,322✔
406
  pTbData->next = pMemTable->aBucket[idx];
15,322✔
407
  pMemTable->aBucket[idx] = pTbData;
15,322✔
408
  pMemTable->nTbData++;
15,322✔
409

410
  if (tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn) == NULL) {
15,322!
411
    taosWUnLockLatch(&pMemTable->latch);
×
412
    code = TSDB_CODE_INTERNAL_ERROR;
×
413
    goto _exit;
×
414
  }
415

416
  taosWUnLockLatch(&pMemTable->latch);
15,322✔
417

418
_exit:
4,274,900✔
419
  if (code) {
4,274,900!
420
    *ppTbData = NULL;
×
421
  } else {
422
    *ppTbData = pTbData;
4,274,900✔
423
  }
424
  return code;
4,274,900✔
425
}
426

427
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags) {
4,526,644✔
428
  SMemSkipListNode *px;
429
  SMemSkipListNode *pn;
430
  STsdbRowKey       tKey;
431
  int32_t           backward = flags & SL_MOVE_BACKWARD;
4,526,644✔
432
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
4,526,644✔
433

434
  if (backward) {
4,526,644✔
435
    px = pTbData->sl.pTail;
4,284,883✔
436

437
    if (!fromPos) {
4,284,883!
438
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
5,412,357✔
439
        pos[iLevel] = px;
1,127,464✔
440
      }
441
    }
442

443
    if (pTbData->sl.level) {
4,284,883✔
444
      if (fromPos) px = pos[pTbData->sl.level - 1];
4,269,756!
445

446
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
24,565,954✔
447
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
20,296,434✔
448
        while (pn != pTbData->sl.pHead) {
21,517,824✔
449
          tsdbRowGetKey(&pn->row, &tKey);
21,481,467✔
450

451
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
21,481,488✔
452
          if (c <= 0) {
21,482,128✔
453
            break;
20,259,841✔
454
          } else {
455
            px = pn;
1,222,287✔
456
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
1,222,287✔
457
          }
458
        }
459

460
        pos[iLevel] = px;
20,296,198✔
461
      }
462
    }
463
  } else {
464
    px = pTbData->sl.pHead;
241,761✔
465

466
    if (!fromPos) {
241,761✔
467
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
584,396✔
468
        pos[iLevel] = px;
395,874✔
469
      }
470
    }
471

472
    if (pTbData->sl.level) {
241,761✔
473
      if (fromPos) px = pos[pTbData->sl.level - 1];
241,247✔
474

475
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
1,046,593✔
476
        pn = SL_GET_NODE_FORWARD(px, iLevel);
805,583✔
477
        while (pn != pTbData->sl.pTail) {
1,597,916✔
478
          tsdbRowGetKey(&pn->row, &tKey);
1,554,519✔
479

480
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
1,554,474✔
481
          if (c >= 0) {
1,555,580✔
482
            break;
761,949✔
483
          } else {
484
            px = pn;
793,631✔
485
            pn = SL_GET_NODE_FORWARD(px, iLevel);
793,631✔
486
          }
487
        }
488

489
        pos[iLevel] = px;
805,346✔
490
      }
491
    }
492
  }
493
}
4,526,171✔
494

495
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
496
  int8_t level = 1;
21,356,296✔
497
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
21,356,296✔
498

499
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
28,449,680✔
500
    level++;
7,093,384✔
501
  }
502

503
  return level;
21,354,221✔
504
}
505
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
21,356,296✔
506
                           int8_t forward) {
507
  int32_t           code = 0;
21,356,296✔
508
  int8_t            level;
509
  SMemSkipListNode *pNode = NULL;
21,356,296✔
510
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
21,356,296✔
511
  int64_t           nSize;
512

513
  // create node
514
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
21,356,296✔
515
  nSize = SL_NODE_SIZE(level);
21,354,221✔
516
  if (pRow->type == TSDBROW_ROW_FMT) {
21,354,221!
517
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
21,375,204✔
518
  } else if (pRow->type == TSDBROW_COL_FMT) {
×
519
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
17✔
520
  }
521
  if (pNode == NULL) {
21,351,360!
522
    code = terrno;
×
523
    goto _exit;
×
524
  }
525

526
  pNode->level = level;
21,351,360✔
527
  pNode->row = *pRow;
21,351,360✔
528
  if (pRow->type == TSDBROW_ROW_FMT) {
21,351,360!
529
    pNode->row.pTSRow = (SRow *)((char *)pNode + nSize);
21,367,095✔
530
    memcpy(pNode->row.pTSRow, pRow->pTSRow, pRow->pTSRow->len);
21,367,095✔
531
  }
532

533
  // set node
534
  if (forward) {
21,351,360✔
535
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
39,876,774✔
536
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
22,776,356✔
537
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
22,776,356✔
538
    }
539
  } else {
540
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
9,935,253✔
541
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
5,684,311✔
542
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
5,684,311✔
543
    }
544
  }
545

546
  // set forward and backward
547
  if (forward) {
21,351,360✔
548
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
39,897,527✔
549
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
22,775,020✔
550

551
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
22,775,020✔
552
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
22,776,167✔
553

554
      pos[iLevel] = pNode;
22,787,443✔
555
    }
556
  } else {
557
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
9,956,182✔
558
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
5,684,354✔
559

560
      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
5,684,354✔
561
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
5,684,577✔
562

563
      pos[iLevel] = pNode;
5,714,906✔
564
    }
565
  }
566

567
  pTbData->sl.size++;
21,394,335✔
568
  if (pTbData->sl.level < pNode->level) {
21,394,335✔
569
    pTbData->sl.level = pNode->level;
47,088✔
570
  }
571

572
_exit:
21,347,247✔
573
  return code;
21,394,335✔
574
}
575

576
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
7✔
577
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
578
  int32_t code = 0;
7✔
579

580
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
7✔
581
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
7✔
582
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
7✔
583

584
  // copy and construct block data
585
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
7✔
586
  if (pBlockData == NULL) {
7!
587
    code = terrno;
×
588
    goto _exit;
×
589
  }
590

591
  pBlockData->suid = pTbData->suid;
7✔
592
  pBlockData->uid = pTbData->uid;
7✔
593
  pBlockData->nRow = aColData[0].nVal;
7✔
594
  pBlockData->aUid = NULL;
7✔
595
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
7✔
596
  if (pBlockData->aVersion == NULL) {
7!
597
    code = terrno;
×
598
    goto _exit;
×
599
  }
600
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
24✔
601
    pBlockData->aVersion[i] = version;
17✔
602
  }
603

604
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
7✔
605
  if (pBlockData->aTSKEY == NULL) {
7!
606
    code = terrno;
×
607
    goto _exit;
×
608
  }
609
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);
7✔
610

611
  pBlockData->nColData = nColData - 1;
7✔
612
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
7✔
613
  if (pBlockData->aColData == NULL) {
7!
614
    code = terrno;
×
615
    goto _exit;
×
616
  }
617

618
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
28✔
619
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
21✔
620
    if (code) goto _exit;
21!
621
  }
622

623
  // loop to add each row to the skiplist
624
  SMemSkipListNode *pos[SL_MAX_LEVEL];
625
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
7✔
626
  STsdbRowKey       key;
627

628
  // first row
629
  tsdbRowGetKey(&tRow, &key);
7✔
630
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
7✔
631
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
7!
632
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
7✔
633

634
  // remain row
635
  ++tRow.iRow;
7✔
636
  if (tRow.iRow < pBlockData->nRow) {
7✔
637
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
25✔
638
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
20✔
639
    }
640

641
    while (tRow.iRow < pBlockData->nRow) {
15✔
642
      tsdbRowGetKey(&tRow, &key);
10✔
643

644
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
10✔
645
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
3✔
646
      }
647

648
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
10!
649

650
      ++tRow.iRow;
10✔
651
    }
652
  }
653

654
  if (key.key.ts >= pTbData->maxKey) {
7✔
655
    pTbData->maxKey = key.key.ts;
6✔
656
  }
657

658
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
7!
659
    if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
×
660
      tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
×
661
                TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
662
    }
663
  }
664

665
  // SMemTable
666
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
7✔
667
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
7✔
668
  pMemTable->nRow += pBlockData->nRow;
7✔
669

670
  if (affectedRows) *affectedRows = pBlockData->nRow;
7!
671

672
_exit:
×
673
  return code;
7✔
674
}
675

676
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
4,272,782✔
677
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
678
  int32_t code = 0;
4,272,782✔
679

680
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
4,272,782✔
681
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
4,272,782✔
682
  STsdbRowKey       key;
683
  SMemSkipListNode *pos[SL_MAX_LEVEL];
684
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
4,272,782✔
685
  int32_t           iRow = 0;
4,272,782✔
686

687
  // backward put first data
688
  tRow.pTSRow = aRow[iRow++];
4,272,782✔
689
  tsdbRowGetKey(&tRow, &key);
4,272,782✔
690
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
4,272,755✔
691
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
4,272,800✔
692
  if (code) goto _exit;
4,272,893!
693

694
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
4,272,893✔
695

696
  // forward put rest data
697
  if (iRow < nRow) {
4,272,893✔
698
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
181,153✔
699
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
142,782✔
700
    }
701

702
    while (iRow < nRow) {
17,144,622✔
703
      tRow.pTSRow = aRow[iRow];
17,116,838✔
704
      tsdbRowGetKey(&tRow, &key);
17,116,838✔
705

706
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
17,100,151✔
707
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
53,307✔
708
      }
709

710
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
17,100,151✔
711
      if (code) goto _exit;
17,106,251!
712

713
      iRow++;
17,106,251✔
714
    }
715
  }
716

717
  if (key.key.ts >= pTbData->maxKey) {
4,262,306✔
718
    pTbData->maxKey = key.key.ts;
4,186,692✔
719
  }
720
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
4,262,306✔
721
    TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
194,568✔
722
  }
723

724
  // SMemTable
725
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
4,272,857✔
726
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
4,272,857✔
727
  pMemTable->nRow += nRow;
4,272,857✔
728

729
  if (affectedRows) *affectedRows = nRow;
4,272,857!
730

731
_exit:
×
732
  return code;
4,272,857✔
733
}
734

735
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
1,481✔
736

737
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
111,091✔
738
  int32_t code = 0;
111,091✔
739

740
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
111,091✔
741
  if (nRef <= 0) {
111,139!
742
    tsdbError("vgId:%d, memtable ref count is invalid, ref:%d", TD_VID(pMemTable->pTsdb->pVnode), nRef);
×
743
  }
744

745
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
111,139✔
746

747
_exit:
111,131✔
748
  return code;
111,131✔
749
}
750

751
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
112,903✔
752
  if (pNode) {
112,903✔
753
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
111,120✔
754
  }
755

756
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
112,914✔
757
    tsdbMemTableDestroy(pMemTable, proactive);
1,784✔
758
  }
759
}
112,920✔
760

761
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
762
  STbData *pTbData1 = *(STbData **)p1;
763
  STbData *pTbData2 = *(STbData **)p2;
764

765
  if (pTbData1->suid < pTbData2->suid) {
766
    return -1;
767
  } else if (pTbData1->suid > pTbData2->suid) {
768
    return 1;
769
  }
770

771
  if (pTbData1->uid < pTbData2->uid) {
772
    return -1;
773
  } else if (pTbData1->uid > pTbData2->uid) {
774
    return 1;
775
  }
776

777
  return 0;
778
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc