• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.65
/source/dnode/vnode/src/tsdb/tsdbMemTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "util/tsimplehash.h"
18

19
#define MEM_MIN_HASH 1024
20
#define SL_MAX_LEVEL 5
21

22
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
23
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
24
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
25
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
26
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
27
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
28
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
29
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
30

31
#define SL_MOVE_BACKWARD 0x1
32
#define SL_MOVE_FROM_POS 0x2
33

34
static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags);
35
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
36
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
37
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
38
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
39
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
40

41
static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
3,177,282✔
42
  STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn);
3,177,282✔
43
  STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn);
3,177,282✔
44
  if (tbData1->suid < tbData2->suid) return -1;
3,177,282✔
45
  if (tbData1->suid > tbData2->suid) return 1;
3,051,111✔
46
  if (tbData1->uid < tbData2->uid) return -1;
2,832,988✔
47
  if (tbData1->uid > tbData2->uid) return 1;
2,248,676!
48
  return 0;
×
49
}
50

51
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
45,084✔
52
  int32_t    code = 0;
45,084✔
53
  SMemTable *pMemTable = NULL;
45,084✔
54

55
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
45,084✔
56
  if (pMemTable == NULL) {
45,095!
57
    code = terrno;
×
58
    goto _err;
×
59
  }
60
  taosInitRWLatch(&pMemTable->latch);
45,095✔
61
  pMemTable->pTsdb = pTsdb;
45,095✔
62
  pMemTable->pPool = pTsdb->pVnode->inUse;
45,095✔
63
  pMemTable->nRef = 1;
45,095✔
64
  pMemTable->minVer = VERSION_MAX;
45,095✔
65
  pMemTable->maxVer = VERSION_MIN;
45,095✔
66
  pMemTable->minKey = TSKEY_MAX;
45,095✔
67
  pMemTable->maxKey = TSKEY_MIN;
45,095✔
68
  pMemTable->nRow = 0;
45,095✔
69
  pMemTable->nDel = 0;
45,095✔
70
  pMemTable->nTbData = 0;
45,095✔
71
  pMemTable->nBucket = MEM_MIN_HASH;
45,095✔
72
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
45,095✔
73
  if (pMemTable->aBucket == NULL) {
45,094!
74
    code = terrno;
×
75
    taosMemoryFree(pMemTable);
×
76
    goto _err;
×
77
  }
78
  vnodeBufPoolRef(pMemTable->pPool);
45,094✔
79
  tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn);
45,096✔
80

81
  *ppMemTable = pMemTable;
45,096✔
82
  return code;
45,096✔
83

84
_err:
×
85
  *ppMemTable = NULL;
×
86
  return code;
×
87
}
88

89
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
45,096✔
90
  if (pMemTable) {
45,096!
91
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
45,096✔
92
    taosMemoryFree(pMemTable->aBucket);
45,097✔
93
    taosMemoryFree(pMemTable);
45,097✔
94
  }
95
}
45,097✔
96

97
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
98
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
30,126,005✔
99

100
  while (pTbData) {
30,202,458✔
101
    if (pTbData->uid == uid) break;
20,054,906✔
102
    pTbData = pTbData->next;
76,453✔
103
  }
104

105
  return pTbData;
30,126,005✔
106
}
107

108
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
16,290,528✔
109
  STbData *pTbData;
110

111
  taosRLockLatch(&pMemTable->latch);
16,290,528✔
112
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
16,302,346✔
113
  taosRUnLockLatch(&pMemTable->latch);
16,302,346✔
114

115
  return pTbData;
16,302,690✔
116
}
117

118
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
13,761,468✔
119
  int32_t    code = 0;
13,761,468✔
120
  SMemTable *pMemTable = pTsdb->mem;
13,761,468✔
121
  STbData   *pTbData = NULL;
13,761,468✔
122
  tb_uid_t   suid = pSubmitTbData->suid;
13,761,468✔
123
  tb_uid_t   uid = pSubmitTbData->uid;
13,761,468✔
124

125
  // create/get STbData to op
126
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
13,761,468✔
127
  if (code) {
13,761,147!
128
    goto _err;
×
129
  }
130

131
  // do insert impl
132
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
13,761,147✔
133
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
149✔
134
  } else {
135
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
13,760,998✔
136
  }
137
  if (code) goto _err;
13,765,038!
138

139
  // update
140
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
13,765,038✔
141
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
13,765,038✔
142

143
  return code;
13,765,038✔
144

145
_err:
×
146
  terrno = code;
×
147
  return code;
×
148
}
149

150
int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
62,530✔
151
  int32_t    code = 0;
62,530✔
152
  SMemTable *pMemTable = pTsdb->mem;
62,530✔
153
  STbData   *pTbData = NULL;
62,530✔
154
  SVBufPool *pPool = pTsdb->pVnode->inUse;
62,530✔
155

156
  // check if table exists
157
  SMetaInfo info;
158
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
62,530✔
159
  if (code) {
62,531!
160
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
161
    goto _err;
×
162
  }
163
  if (info.suid != suid) {
62,531!
164
    code = TSDB_CODE_INVALID_MSG;
×
165
    goto _err;
×
166
  }
167

168
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
62,531✔
169
  if (code) {
62,532!
170
    goto _err;
×
171
  }
172

173
  // do delete
174
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
62,532✔
175
  if (pDelData == NULL) {
62,532!
176
    code = terrno;
×
177
    goto _err;
×
178
  }
179
  pDelData->version = version;
62,532✔
180
  pDelData->sKey = sKey;
62,532✔
181
  pDelData->eKey = eKey;
62,532✔
182
  pDelData->pNext = NULL;
62,532✔
183
  taosWLockLatch(&pTbData->lock);
62,532✔
184
  if (pTbData->pHead == NULL) {
62,532✔
185
    pTbData->pHead = pTbData->pTail = pDelData;
18,796✔
186
  } else {
187
    pTbData->pTail->pNext = pDelData;
43,736✔
188
    pTbData->pTail = pDelData;
43,736✔
189
  }
190
  taosWUnLockLatch(&pTbData->lock);
62,532✔
191

192
  pMemTable->nDel++;
62,532✔
193
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
62,532✔
194
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
62,532✔
195

196
  if (tsdbCacheDel(pTsdb, suid, uid, sKey, eKey) != 0) {
62,532!
197
    tsdbError("vgId:%d, failed to delete cache data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64
×
198
              " eKey:%" PRId64 " at version %" PRId64,
199
              TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
200
  }
201

202
  tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
62,532✔
203
            " at version %" PRId64,
204
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
205
  return code;
62,532✔
206

207
_err:
×
208
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
×
209
            " at version %" PRId64 " since %s",
210
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
211
  return code;
×
212
}
213

214
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter) {
4,636,042✔
215
  int32_t code = 0;
4,636,042✔
216

217
  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
4,636,042✔
218
  if ((*ppIter) == NULL) {
4,636,987!
UNCOV
219
    code = terrno;
×
UNCOV
220
    goto _exit;
×
221
  }
222

223
  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);
4,636,987✔
224

225
_exit:
4,634,627✔
226
  return code;
4,634,627✔
227
}
228

229
void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
4,633,825✔
230
  if (pIter) {
4,633,825!
231
    taosMemoryFree(pIter);
4,633,871✔
232
  }
233
  return NULL;
4,636,775✔
234
}
235

236
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter) {
5,977,345✔
237
  SMemSkipListNode *pos[SL_MAX_LEVEL];
238
  SMemSkipListNode *pHead;
239
  SMemSkipListNode *pTail;
240

241
  pHead = pTbData->sl.pHead;
5,977,345✔
242
  pTail = pTbData->sl.pTail;
5,977,345✔
243
  pIter->pTbData = pTbData;
5,977,345✔
244
  pIter->backward = backward;
5,977,345✔
245
  pIter->pRow = NULL;
5,977,345✔
246
  if (pFrom == NULL) {
5,977,345✔
247
    // create from head or tail
248
    if (backward) {
395!
249
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
395✔
250
    } else {
251
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
×
252
    }
253
  } else {
254
    // create from a key
255
    if (backward) {
5,976,950✔
256
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
853,665✔
257
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
853,624✔
258
    } else {
259
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
5,123,285✔
260
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
5,115,525✔
261
    }
262
  }
263
}
5,968,138✔
264

265
bool tsdbTbDataIterNext(STbDataIter *pIter) {
1,619,056,367✔
266
  pIter->pRow = NULL;
1,619,056,367✔
267
  if (pIter->backward) {
1,619,056,367✔
268
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
134,624,110!
269
      return false;
×
270
    }
271

272
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
134,624,110✔
273
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
134,353,213✔
274
      return false;
325,183✔
275
    }
276
  } else {
277
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
1,484,432,257!
278
      return false;
×
279
    }
280

281
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
1,484,432,257✔
282
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
1,483,978,359✔
283
      return false;
2,773,073✔
284
    }
285
  }
286

287
  return true;
1,615,233,316✔
288
}
289

290
int64_t tsdbCountTbDataRows(STbData *pTbData) {
×
291
  SMemSkipListNode *pNode = pTbData->sl.pHead;
×
292
  int64_t           rowsNum = 0;
×
293

294
  while (NULL != pNode) {
×
295
    pNode = SL_GET_NODE_FORWARD(pNode, 0);
×
296
    if (pNode == pTbData->sl.pTail) {
×
297
      return rowsNum;
×
298
    }
299

300
    rowsNum++;
×
301
  }
302

303
  return rowsNum;
×
304
}
305

306
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
×
307
  taosRLockLatch(&pMemTable->latch);
×
308
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
×
309
    STbData *pTbData = pMemTable->aBucket[i];
×
310
    while (pTbData) {
×
311
      void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
×
312
      if (p == NULL) {
×
313
        pTbData = pTbData->next;
×
314
        continue;
×
315
      }
316

317
      *rowsNum += tsdbCountTbDataRows(pTbData);
×
318
      pTbData = pTbData->next;
×
319
    }
320
  }
321
  taosRUnLockLatch(&pMemTable->latch);
×
322
}
×
323

324
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
57✔
325
  int32_t code = 0;
57✔
326

327
  int32_t   nBucket = pMemTable->nBucket * 2;
57✔
328
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
57✔
329
  if (aBucket == NULL) {
57!
330
    code = terrno;
×
331
    goto _exit;
×
332
  }
333

334
  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
112,697✔
335
    STbData *pTbData = pMemTable->aBucket[iBucket];
112,640✔
336

337
    while (pTbData) {
225,280✔
338
      STbData *pNext = pTbData->next;
112,640✔
339

340
      int32_t idx = TABS(pTbData->uid) % nBucket;
112,640✔
341
      pTbData->next = aBucket[idx];
112,640✔
342
      aBucket[idx] = pTbData;
112,640✔
343

344
      pTbData = pNext;
112,640✔
345
    }
346
  }
347

348
  taosMemoryFree(pMemTable->aBucket);
57✔
349
  pMemTable->nBucket = nBucket;
57✔
350
  pMemTable->aBucket = aBucket;
57✔
351

352
_exit:
57✔
353
  return code;
57✔
354
}
355

356
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
13,823,659✔
357
  int32_t code = 0;
13,823,659✔
358

359
  // get
360
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
13,823,659✔
361
  if (pTbData) goto _exit;
13,823,659✔
362

363
  // create
364
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
233,765✔
365
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
233,765✔
366

367
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
233,765✔
368
  if (pTbData == NULL) {
234,970!
369
    code = terrno;
×
370
    goto _exit;
×
371
  }
372
  pTbData->suid = suid;
234,970✔
373
  pTbData->uid = uid;
234,970✔
374
  pTbData->minKey = TSKEY_MAX;
234,970✔
375
  pTbData->maxKey = TSKEY_MIN;
234,970✔
376
  pTbData->pHead = NULL;
234,970✔
377
  pTbData->pTail = NULL;
234,970✔
378
  pTbData->sl.seed = taosRand();
234,970✔
379
  pTbData->sl.size = 0;
234,981✔
380
  pTbData->sl.maxLevel = maxLevel;
234,981✔
381
  pTbData->sl.level = 0;
234,981✔
382
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
234,981✔
383
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
234,981✔
384
  pTbData->sl.pHead->level = maxLevel;
234,981✔
385
  pTbData->sl.pTail->level = maxLevel;
234,981✔
386
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
1,409,834✔
387
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
1,174,853✔
388
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;
1,174,853✔
389

390
    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
1,174,853✔
391
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
1,174,853✔
392
  }
393
  taosInitRWLatch(&pTbData->lock);
234,981✔
394

395
  taosWLockLatch(&pMemTable->latch);
234,975✔
396

397
  if (pMemTable->nTbData >= pMemTable->nBucket) {
234,977✔
398
    code = tsdbMemTableRehash(pMemTable);
57✔
399
    if (code) {
57!
400
      taosWUnLockLatch(&pMemTable->latch);
×
401
      goto _exit;
×
402
    }
403
  }
404

405
  int32_t idx = TABS(uid) % pMemTable->nBucket;
234,977✔
406
  pTbData->next = pMemTable->aBucket[idx];
234,977✔
407
  pMemTable->aBucket[idx] = pTbData;
234,977✔
408
  pMemTable->nTbData++;
234,977✔
409

410
  if (tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn) == NULL) {
234,977!
411
    taosWUnLockLatch(&pMemTable->latch);
×
412
    code = TSDB_CODE_INTERNAL_ERROR;
×
413
    goto _exit;
×
414
  }
415

416
  taosWUnLockLatch(&pMemTable->latch);
234,966✔
417

418
_exit:
13,824,512✔
419
  if (code) {
13,824,512!
420
    *ppTbData = NULL;
×
421
  } else {
422
    *ppTbData = pTbData;
13,824,512✔
423
  }
424
  return code;
13,824,512✔
425
}
426

427
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags) {
56,688,471✔
428
  SMemSkipListNode *px;
429
  SMemSkipListNode *pn;
430
  STsdbRowKey       tKey;
431
  int32_t           backward = flags & SL_MOVE_BACKWARD;
56,688,471✔
432
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
56,688,471✔
433

434
  if (backward) {
56,688,471✔
435
    px = pTbData->sl.pTail;
14,612,491✔
436

437
    if (!fromPos) {
14,612,491!
438
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
20,644,486✔
439
        pos[iLevel] = px;
6,031,484✔
440
      }
441
    }
442

443
    if (pTbData->sl.level) {
14,612,491✔
444
      if (fromPos) px = pos[pTbData->sl.level - 1];
14,379,820!
445

446
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
81,208,645✔
447
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
66,845,165✔
448
        while (pn != pTbData->sl.pHead) {
77,370,393✔
449
          tsdbRowGetKey(&pn->row, &tKey);
76,142,731✔
450

451
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
76,141,398✔
452
          if (c <= 0) {
76,141,587✔
453
            break;
65,601,163✔
454
          } else {
455
            px = pn;
10,540,424✔
456
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
10,540,424✔
457
          }
458
        }
459

460
        pos[iLevel] = px;
66,828,825✔
461
      }
462
    }
463
  } else {
464
    px = pTbData->sl.pHead;
42,075,980✔
465

466
    if (!fromPos) {
42,075,980✔
467
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
10,240,682✔
468
        pos[iLevel] = px;
5,116,969✔
469
      }
470
    }
471

472
    if (pTbData->sl.level) {
42,075,980✔
473
      if (fromPos) px = pos[pTbData->sl.level - 1];
42,073,470✔
474

475
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
246,833,721✔
476
        pn = SL_GET_NODE_FORWARD(px, iLevel);
204,775,742✔
477
        while (pn != pTbData->sl.pTail) {
672,059,222✔
478
          tsdbRowGetKey(&pn->row, &tKey);
667,969,777✔
479

480
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
668,124,351✔
481
          if (c >= 0) {
668,429,597✔
482
            break;
200,670,806✔
483
          } else {
484
            px = pn;
467,758,791✔
485
            pn = SL_GET_NODE_FORWARD(px, iLevel);
467,758,791✔
486
          }
487
        }
488

489
        pos[iLevel] = px;
204,760,251✔
490
      }
491
    }
492
  }
493
}
56,656,640✔
494

495
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
496
  int8_t level = 1;
393,228,855✔
497
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
393,228,855✔
498

499
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
523,581,474✔
500
    level++;
130,352,619✔
501
  }
502

503
  return level;
393,065,764✔
504
}
505
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
393,228,855✔
506
                           int8_t forward) {
507
  int32_t           code = 0;
393,228,855✔
508
  int8_t            level;
509
  SMemSkipListNode *pNode = NULL;
393,228,855✔
510
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
393,228,855✔
511
  int64_t           nSize;
512

513
  // create node
514
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
393,228,855✔
515
  nSize = SL_NODE_SIZE(level);
393,065,764✔
516
  if (pRow->type == TSDBROW_ROW_FMT) {
393,065,764!
517
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
393,145,022✔
518
  } else if (pRow->type == TSDBROW_COL_FMT) {
×
519
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
254✔
520
  }
521
  if (pNode == NULL) {
393,101,241!
522
    code = terrno;
×
523
    goto _exit;
×
524
  }
525

526
  pNode->level = level;
393,101,241✔
527
  pNode->row = *pRow;
393,101,241✔
528
  if (pRow->type == TSDBROW_ROW_FMT) {
393,101,241!
529
    pNode->row.pTSRow = (SRow *)((char *)pNode + nSize);
393,107,279✔
530
    memcpy(pNode->row.pTSRow, pRow->pTSRow, pRow->pTSRow->len);
393,107,279✔
531
  }
532

533
  // set node
534
  if (forward) {
393,101,241✔
535
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
884,544,390✔
536
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
505,182,537✔
537
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
505,182,537✔
538
    }
539
  } else {
540
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
31,951,246✔
541
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
18,211,858✔
542
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
18,211,858✔
543
    }
544
  }
545

546
  // set forward and backward
547
  if (forward) {
393,101,241✔
548
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
884,816,348✔
549
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
505,193,966✔
550

551
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
505,193,966✔
552
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
505,370,043✔
553

554
      pos[iLevel] = pNode;
505,418,857✔
555
    }
556
  } else {
557
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
31,942,300✔
558
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
18,214,440✔
559

560
      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
18,214,440✔
561
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
18,238,984✔
562

563
      pos[iLevel] = pNode;
18,238,550✔
564
    }
565
  }
566

567
  pTbData->sl.size++;
393,350,242✔
568
  if (pTbData->sl.level < pNode->level) {
393,350,242✔
569
    pTbData->sl.level = pNode->level;
791,459✔
570
  }
571

572
_exit:
392,558,783✔
573
  return code;
393,350,242✔
574
}
575

576
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
149✔
577
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
578
  int32_t code = 0;
149✔
579

580
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
149✔
581
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
149✔
582
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
149✔
583

584
  // copy and construct block data
585
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
149✔
586
  if (pBlockData == NULL) {
149!
587
    code = terrno;
×
588
    goto _exit;
×
589
  }
590

591
  pBlockData->suid = pTbData->suid;
149✔
592
  pBlockData->uid = pTbData->uid;
149✔
593
  pBlockData->nRow = aColData[0].nVal;
149✔
594
  pBlockData->aUid = NULL;
149✔
595
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
149✔
596
  if (pBlockData->aVersion == NULL) {
149!
597
    code = terrno;
×
598
    goto _exit;
×
599
  }
600
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
403✔
601
    pBlockData->aVersion[i] = version;
254✔
602
  }
603

604
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
149✔
605
  if (pBlockData->aTSKEY == NULL) {
149!
606
    code = terrno;
×
607
    goto _exit;
×
608
  }
609
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);
149✔
610

611
  pBlockData->nColData = nColData - 1;
149✔
612
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
149✔
613
  if (pBlockData->aColData == NULL) {
149!
614
    code = terrno;
×
615
    goto _exit;
×
616
  }
617

618
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
669✔
619
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
520✔
620
    if (code) goto _exit;
520!
621
  }
622

623
  // loop to add each row to the skiplist
624
  SMemSkipListNode *pos[SL_MAX_LEVEL];
625
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
149✔
626
  STsdbRowKey       key;
627

628
  // first row
629
  tsdbRowGetKey(&tRow, &key);
149✔
630
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
149✔
631
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
149!
632
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
149✔
633

634
  // remain row
635
  ++tRow.iRow;
149✔
636
  if (tRow.iRow < pBlockData->nRow) {
149✔
637
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
187✔
638
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
149✔
639
    }
640

641
    while (tRow.iRow < pBlockData->nRow) {
143✔
642
      tsdbRowGetKey(&tRow, &key);
105✔
643

644
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
105!
UNCOV
645
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
×
646
      }
647

648
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
105!
649

650
      ++tRow.iRow;
105✔
651
    }
652
  }
653

654
  if (key.key.ts >= pTbData->maxKey) {
149✔
655
    pTbData->maxKey = key.key.ts;
148✔
656
  }
657

658
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
149!
659
    if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
×
660
      tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
×
661
                TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
662
    }
663
  }
664

665
  // SMemTable
666
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
149✔
667
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
149✔
668
  pMemTable->nRow += pBlockData->nRow;
149✔
669

670
  if (affectedRows) *affectedRows = pBlockData->nRow;
149!
671

672
_exit:
×
673
  return code;
149✔
674
}
675

676
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
13,760,632✔
677
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
678
  int32_t code = 0;
13,760,632✔
679

680
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
13,760,632✔
681
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
13,760,632✔
682
  STsdbRowKey       key;
683
  SMemSkipListNode *pos[SL_MAX_LEVEL];
684
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
13,760,632✔
685
  int32_t           iRow = 0;
13,760,632✔
686

687
  // backward put first data
688
  tRow.pTSRow = aRow[iRow++];
13,760,632✔
689
  tsdbRowGetKey(&tRow, &key);
13,760,632✔
690
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
13,758,808✔
691
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
13,730,386✔
692
  if (code) goto _exit;
13,766,521!
693

694
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
13,766,521✔
695

696
  // forward put rest data
697
  if (iRow < nRow) {
13,766,521✔
698
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
5,222,092✔
699
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
4,114,403✔
700
    }
701

702
    while (iRow < nRow) {
380,669,559✔
703
      tRow.pTSRow = aRow[iRow];
379,575,671✔
704
      tsdbRowGetKey(&tRow, &key);
379,575,671✔
705

706
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
379,523,762✔
707
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
36,953,616✔
708
      }
709

710
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
379,521,293✔
711
      if (code) goto _exit;
379,561,870!
712

713
      iRow++;
379,561,870✔
714
    }
715
  }
716

717
  if (key.key.ts >= pTbData->maxKey) {
13,752,720✔
718
    pTbData->maxKey = key.key.ts;
13,624,486✔
719
  }
720
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
13,752,720✔
721
    TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
226,220✔
722
  }
723

724
  // SMemTable
725
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
13,765,282✔
726
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
13,765,282✔
727
  pMemTable->nRow += nRow;
13,765,282✔
728

729
  if (affectedRows) *affectedRows = nRow;
13,765,282!
730

731
_exit:
×
732
  return code;
13,765,282✔
733
}
734

735
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
1,622✔
736

737
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
4,275,250✔
738
  int32_t code = 0;
4,275,250✔
739

740
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
4,275,250✔
741
  if (nRef <= 0) {
4,279,555!
742
    tsdbError("vgId:%d, memtable ref count is invalid, ref:%d", TD_VID(pMemTable->pTsdb->pVnode), nRef);
×
743
  }
744

745
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
4,279,555✔
746

747
_exit:
4,279,263✔
748
  return code;
4,279,263✔
749
}
750

751
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
4,311,024✔
752
  if (pNode) {
4,311,024✔
753
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
4,279,598✔
754
  }
755

756
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
4,311,144✔
757
    tsdbMemTableDestroy(pMemTable, proactive);
31,504✔
758
  }
759
}
4,311,417✔
760

761
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
762
  STbData *pTbData1 = *(STbData **)p1;
763
  STbData *pTbData2 = *(STbData **)p2;
764

765
  if (pTbData1->suid < pTbData2->suid) {
766
    return -1;
767
  } else if (pTbData1->suid > pTbData2->suid) {
768
    return 1;
769
  }
770

771
  if (pTbData1->uid < pTbData2->uid) {
772
    return -1;
773
  } else if (pTbData1->uid > pTbData2->uid) {
774
    return 1;
775
  }
776

777
  return 0;
778
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc