• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3528

13 Nov 2024 02:14AM UTC coverage: 60.905% (+0.09%) from 60.819%
#3528

push

travis-ci

web-flow
Merge pull request #28748 from taosdata/test/chr-3.0-TD14758

test:add docs ci in jenkinsfile2

118800 of 249004 branches covered (47.71%)

Branch coverage included in aggregate %.

199361 of 273386 relevant lines covered (72.92%)

14738389.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.65
/source/dnode/vnode/src/tsdb/tsdbMemTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "util/tsimplehash.h"
18

19
#define MEM_MIN_HASH 1024
20
#define SL_MAX_LEVEL 5
21

22
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
23
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
24
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
25
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
26
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
27
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
28
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
29
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
30

31
#define SL_MOVE_BACKWARD 0x1
32
#define SL_MOVE_FROM_POS 0x2
33

34
static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags);
35
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
36
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
37
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
38
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
39
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
40

41
static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
2,375,175✔
42
  STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn);
2,375,175✔
43
  STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn);
2,375,175✔
44
  if (tbData1->suid < tbData2->suid) return -1;
2,375,175✔
45
  if (tbData1->suid > tbData2->suid) return 1;
2,153,723✔
46
  if (tbData1->uid < tbData2->uid) return -1;
2,093,361✔
47
  if (tbData1->uid > tbData2->uid) return 1;
1,706,524!
48
  return 0;
×
49
}
50

51
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
44,534✔
52
  int32_t    code = 0;
44,534✔
53
  SMemTable *pMemTable = NULL;
44,534✔
54

55
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
44,534✔
56
  if (pMemTable == NULL) {
44,551!
57
    code = terrno;
×
58
    goto _err;
×
59
  }
60
  taosInitRWLatch(&pMemTable->latch);
44,551✔
61
  pMemTable->pTsdb = pTsdb;
44,550✔
62
  pMemTable->pPool = pTsdb->pVnode->inUse;
44,550✔
63
  pMemTable->nRef = 1;
44,550✔
64
  pMemTable->minVer = VERSION_MAX;
44,550✔
65
  pMemTable->maxVer = VERSION_MIN;
44,550✔
66
  pMemTable->minKey = TSKEY_MAX;
44,550✔
67
  pMemTable->maxKey = TSKEY_MIN;
44,550✔
68
  pMemTable->nRow = 0;
44,550✔
69
  pMemTable->nDel = 0;
44,550✔
70
  pMemTable->nTbData = 0;
44,550✔
71
  pMemTable->nBucket = MEM_MIN_HASH;
44,550✔
72
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
44,550✔
73
  if (pMemTable->aBucket == NULL) {
44,549!
74
    code = terrno;
×
75
    taosMemoryFree(pMemTable);
×
76
    goto _err;
×
77
  }
78
  vnodeBufPoolRef(pMemTable->pPool);
44,549✔
79
  tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn);
44,549✔
80

81
  *ppMemTable = pMemTable;
44,550✔
82
  return code;
44,550✔
83

84
_err:
×
85
  *ppMemTable = NULL;
×
86
  return code;
×
87
}
88

89
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
44,552✔
90
  if (pMemTable) {
44,552!
91
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
44,552✔
92
    taosMemoryFree(pMemTable->aBucket);
44,551✔
93
    taosMemoryFree(pMemTable);
44,550✔
94
  }
95
}
44,550✔
96

97
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
98
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
22,483,279✔
99

100
  while (pTbData) {
22,515,292✔
101
    if (pTbData->uid == uid) break;
12,111,583✔
102
    pTbData = pTbData->next;
32,013✔
103
  }
104

105
  return pTbData;
22,483,279✔
106
}
107

108
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
16,389,248✔
109
  STbData *pTbData;
110

111
  taosRLockLatch(&pMemTable->latch);
16,389,248✔
112
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
16,403,587✔
113
  taosRUnLockLatch(&pMemTable->latch);
16,403,587✔
114

115
  return pTbData;
16,404,303✔
116
}
117

118
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
6,019,805✔
119
  int32_t    code = 0;
6,019,805✔
120
  SMemTable *pMemTable = pTsdb->mem;
6,019,805✔
121
  STbData   *pTbData = NULL;
6,019,805✔
122
  tb_uid_t   suid = pSubmitTbData->suid;
6,019,805✔
123
  tb_uid_t   uid = pSubmitTbData->uid;
6,019,805✔
124

125
  // create/get STbData to op
126
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
6,019,805✔
127
  if (code) {
6,020,242!
128
    goto _err;
×
129
  }
130

131
  // do insert impl
132
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
6,020,242✔
133
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
149✔
134
  } else {
135
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
6,020,093✔
136
  }
137
  if (code) goto _err;
6,021,737!
138

139
  // update
140
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
6,021,737✔
141
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
6,021,737✔
142

143
  return code;
6,021,737✔
144

145
_err:
×
146
  terrno = code;
×
147
  return code;
×
148
}
149

150
int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
59,989✔
151
  int32_t    code = 0;
59,989✔
152
  SMemTable *pMemTable = pTsdb->mem;
59,989✔
153
  STbData   *pTbData = NULL;
59,989✔
154
  SVBufPool *pPool = pTsdb->pVnode->inUse;
59,989✔
155

156
  // check if table exists
157
  SMetaInfo info;
158
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
59,989✔
159
  if (code) {
59,990!
160
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
161
    goto _err;
×
162
  }
163
  if (info.suid != suid) {
59,990!
164
    code = TSDB_CODE_INVALID_MSG;
×
165
    goto _err;
×
166
  }
167

168
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
59,990✔
169
  if (code) {
59,991!
170
    goto _err;
×
171
  }
172

173
  // do delete
174
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
59,991✔
175
  if (pDelData == NULL) {
59,991!
176
    code = terrno;
×
177
    goto _err;
×
178
  }
179
  pDelData->version = version;
59,991✔
180
  pDelData->sKey = sKey;
59,991✔
181
  pDelData->eKey = eKey;
59,991✔
182
  pDelData->pNext = NULL;
59,991✔
183
  taosWLockLatch(&pTbData->lock);
59,991✔
184
  if (pTbData->pHead == NULL) {
59,991✔
185
    pTbData->pHead = pTbData->pTail = pDelData;
16,329✔
186
  } else {
187
    pTbData->pTail->pNext = pDelData;
43,662✔
188
    pTbData->pTail = pDelData;
43,662✔
189
  }
190
  taosWUnLockLatch(&pTbData->lock);
59,991✔
191

192
  pMemTable->nDel++;
59,991✔
193
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
59,991✔
194
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
59,991✔
195

196
  if (tsdbCacheDel(pTsdb, suid, uid, sKey, eKey) != 0) {
59,991!
197
    tsdbError("vgId:%d, failed to delete cache data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64
×
198
              " eKey:%" PRId64 " at version %" PRId64,
199
              TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
200
  }
201

202
  tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
59,991✔
203
            " at version %" PRId64,
204
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
205
  return code;
59,989✔
206

207
_err:
×
208
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
×
209
            " at version %" PRId64 " since %s",
210
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
211
  return code;
×
212
}
213

214
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter) {
4,476,513✔
215
  int32_t code = 0;
4,476,513✔
216

217
  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
4,476,513✔
218
  if ((*ppIter) == NULL) {
4,478,136!
219
    code = terrno;
×
220
    goto _exit;
×
221
  }
222

223
  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);
4,478,136✔
224

225
_exit:
4,475,049✔
226
  return code;
4,475,049✔
227
}
228

229
void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
4,474,582✔
230
  if (pIter) {
4,474,582!
231
    taosMemoryFree(pIter);
4,474,654✔
232
  }
233
  return NULL;
4,477,996✔
234
}
235

236
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter) {
5,666,814✔
237
  SMemSkipListNode *pos[SL_MAX_LEVEL];
238
  SMemSkipListNode *pHead;
239
  SMemSkipListNode *pTail;
240

241
  pHead = pTbData->sl.pHead;
5,666,814✔
242
  pTail = pTbData->sl.pTail;
5,666,814✔
243
  pIter->pTbData = pTbData;
5,666,814✔
244
  pIter->backward = backward;
5,666,814✔
245
  pIter->pRow = NULL;
5,666,814✔
246
  if (pFrom == NULL) {
5,666,814✔
247
    // create from head or tail
248
    if (backward) {
703!
249
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
703✔
250
    } else {
251
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
×
252
    }
253
  } else {
254
    // create from a key
255
    if (backward) {
5,666,111✔
256
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
749,592✔
257
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
749,498✔
258
    } else {
259
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
4,916,519✔
260
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
4,910,148✔
261
    }
262
  }
263
}
5,659,518✔
264

265
bool tsdbTbDataIterNext(STbDataIter *pIter) {
1,679,111,115✔
266
  pIter->pRow = NULL;
1,679,111,115✔
267
  if (pIter->backward) {
1,679,111,115✔
268
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
134,723,058!
269
      return false;
×
270
    }
271

272
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
134,723,058✔
273
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
134,478,516✔
274
      return false;
341,653✔
275
    }
276
  } else {
277
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
1,544,388,057!
278
      return false;
×
279
    }
280

281
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
1,544,388,057✔
282
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
1,543,610,375✔
283
      return false;
2,773,124✔
284
    }
285
  }
286

287
  return true;
1,674,974,114✔
288
}
289

290
int64_t tsdbCountTbDataRows(STbData *pTbData) {
×
291
  SMemSkipListNode *pNode = pTbData->sl.pHead;
×
292
  int64_t           rowsNum = 0;
×
293

294
  while (NULL != pNode) {
×
295
    pNode = SL_GET_NODE_FORWARD(pNode, 0);
×
296
    if (pNode == pTbData->sl.pTail) {
×
297
      return rowsNum;
×
298
    }
299

300
    rowsNum++;
×
301
  }
302

303
  return rowsNum;
×
304
}
305

306
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
×
307
  taosRLockLatch(&pMemTable->latch);
×
308
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
×
309
    STbData *pTbData = pMemTable->aBucket[i];
×
310
    while (pTbData) {
×
311
      void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
×
312
      if (p == NULL) {
×
313
        pTbData = pTbData->next;
×
314
        continue;
×
315
      }
316

317
      *rowsNum += tsdbCountTbDataRows(pTbData);
×
318
      pTbData = pTbData->next;
×
319
    }
320
  }
321
  taosRUnLockLatch(&pMemTable->latch);
×
322
}
×
323

324
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
30✔
325
  int32_t code = 0;
30✔
326

327
  int32_t   nBucket = pMemTable->nBucket * 2;
30✔
328
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
30✔
329
  if (aBucket == NULL) {
30!
330
    code = terrno;
×
331
    goto _exit;
×
332
  }
333

334
  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
72,734✔
335
    STbData *pTbData = pMemTable->aBucket[iBucket];
72,704✔
336

337
    while (pTbData) {
145,408✔
338
      STbData *pNext = pTbData->next;
72,704✔
339

340
      int32_t idx = TABS(pTbData->uid) % nBucket;
72,704✔
341
      pTbData->next = aBucket[idx];
72,704✔
342
      aBucket[idx] = pTbData;
72,704✔
343

344
      pTbData = pNext;
72,704✔
345
    }
346
  }
347

348
  taosMemoryFree(pMemTable->aBucket);
30✔
349
  pMemTable->nBucket = nBucket;
30✔
350
  pMemTable->aBucket = aBucket;
30✔
351

352
_exit:
30✔
353
  return code;
30✔
354
}
355

356
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
6,079,692✔
357
  int32_t code = 0;
6,079,692✔
358

359
  // get
360
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
6,079,692✔
361
  if (pTbData) goto _exit;
6,079,692✔
362

363
  // create
364
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
182,959✔
365
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
182,959✔
366

367
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
182,959✔
368
  if (pTbData == NULL) {
184,017!
369
    code = terrno;
×
370
    goto _exit;
×
371
  }
372
  pTbData->suid = suid;
184,017✔
373
  pTbData->uid = uid;
184,017✔
374
  pTbData->minKey = TSKEY_MAX;
184,017✔
375
  pTbData->maxKey = TSKEY_MIN;
184,017✔
376
  pTbData->pHead = NULL;
184,017✔
377
  pTbData->pTail = NULL;
184,017✔
378
  pTbData->sl.seed = taosRand();
184,017✔
379
  pTbData->sl.size = 0;
184,030✔
380
  pTbData->sl.maxLevel = maxLevel;
184,030✔
381
  pTbData->sl.level = 0;
184,030✔
382
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
184,030✔
383
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
184,030✔
384
  pTbData->sl.pHead->level = maxLevel;
184,030✔
385
  pTbData->sl.pTail->level = maxLevel;
184,030✔
386
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
1,104,112✔
387
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
920,082✔
388
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;
920,082✔
389

390
    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
920,082✔
391
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
920,082✔
392
  }
393
  taosInitRWLatch(&pTbData->lock);
184,030✔
394

395
  taosWLockLatch(&pMemTable->latch);
184,020✔
396

397
  if (pMemTable->nTbData >= pMemTable->nBucket) {
184,026✔
398
    code = tsdbMemTableRehash(pMemTable);
30✔
399
    if (code) {
30!
400
      taosWUnLockLatch(&pMemTable->latch);
×
401
      goto _exit;
×
402
    }
403
  }
404

405
  int32_t idx = TABS(uid) % pMemTable->nBucket;
184,026✔
406
  pTbData->next = pMemTable->aBucket[idx];
184,026✔
407
  pMemTable->aBucket[idx] = pTbData;
184,026✔
408
  pMemTable->nTbData++;
184,026✔
409

410
  if (tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn) == NULL) {
184,026!
411
    taosWUnLockLatch(&pMemTable->latch);
×
412
    code = TSDB_CODE_INTERNAL_ERROR;
×
413
    goto _exit;
×
414
  }
415

416
  taosWUnLockLatch(&pMemTable->latch);
184,013✔
417

418
_exit:
6,080,636✔
419
  if (code) {
6,080,636!
420
    *ppTbData = NULL;
×
421
  } else {
422
    *ppTbData = pTbData;
6,080,636✔
423
  }
424
  return code;
6,080,636✔
425
}
426

427
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags) {
48,937,261✔
428
  SMemSkipListNode *px;
429
  SMemSkipListNode *pn;
430
  STsdbRowKey       tKey;
431
  int32_t           backward = flags & SL_MOVE_BACKWARD;
48,937,261✔
432
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
48,937,261✔
433

434
  if (backward) {
48,937,261✔
435
    px = pTbData->sl.pTail;
6,769,970✔
436

437
    if (!fromPos) {
6,769,970!
438
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
11,357,150✔
439
        pos[iLevel] = px;
4,586,984✔
440
      }
441
    }
442

443
    if (pTbData->sl.level) {
6,769,970✔
444
      if (fromPos) px = pos[pTbData->sl.level - 1];
6,588,129!
445

446
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
35,731,486✔
447
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
29,157,635✔
448
        while (pn != pTbData->sl.pHead) {
37,846,720✔
449
          tsdbRowGetKey(&pn->row, &tKey);
36,903,159✔
450

451
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
36,900,306✔
452
          if (c <= 0) {
36,897,930✔
453
            break;
28,199,796✔
454
          } else {
455
            px = pn;
8,698,134✔
456
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
8,698,134✔
457
          }
458
        }
459

460
        pos[iLevel] = px;
29,143,357✔
461
      }
462
    }
463
  } else {
464
    px = pTbData->sl.pHead;
42,167,291✔
465

466
    if (!fromPos) {
42,167,291✔
467
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
10,067,821✔
468
        pos[iLevel] = px;
5,150,214✔
469
      }
470
    }
471

472
    if (pTbData->sl.level) {
42,167,291✔
473
      if (fromPos) px = pos[pTbData->sl.level - 1];
42,165,718✔
474

475
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
246,732,717✔
476
        pn = SL_GET_NODE_FORWARD(px, iLevel);
204,584,150✔
477
        while (pn != pTbData->sl.pTail) {
669,418,180✔
478
          tsdbRowGetKey(&pn->row, &tKey);
665,229,444✔
479

480
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
665,420,698✔
481
          if (c >= 0) {
665,869,963✔
482
            break;
200,378,263✔
483
          } else {
484
            px = pn;
465,491,700✔
485
            pn = SL_GET_NODE_FORWARD(px, iLevel);
465,491,700✔
486
          }
487
        }
488

489
        pos[iLevel] = px;
204,566,999✔
490
      }
491
    }
492
  }
493
}
48,905,832✔
494

495
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
496
  int8_t level = 1;
335,481,225✔
497
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
335,481,225✔
498

499
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
446,720,318✔
500
    level++;
111,239,093✔
501
  }
502

503
  return level;
335,326,785✔
504
}
505
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
335,481,225✔
506
                           int8_t forward) {
507
  int32_t           code = 0;
335,481,225✔
508
  int8_t            level;
509
  SMemSkipListNode *pNode = NULL;
335,481,225✔
510
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
335,481,225✔
511
  int64_t           nSize;
512

513
  // create node
514
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
335,481,225✔
515
  nSize = SL_NODE_SIZE(level);
335,326,785✔
516
  if (pRow->type == TSDBROW_ROW_FMT) {
335,326,785!
517
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
335,415,396✔
518
  } else if (pRow->type == TSDBROW_COL_FMT) {
×
519
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
254✔
520
  }
521
  if (pNode == NULL) {
335,357,775!
522
    code = terrno;
×
523
    goto _exit;
×
524
  }
525

526
  pNode->level = level;
335,357,775✔
527
  pNode->row = *pRow;
335,357,775✔
528
  if (pRow->type == TSDBROW_ROW_FMT) {
335,357,775!
529
    pNode->row.pTSRow = (SRow *)((char *)pNode + nSize);
335,368,196✔
530
    memcpy(pNode->row.pTSRow, pRow->pTSRow, pRow->pTSRow->len);
335,368,196✔
531
  }
532

533
  // set node
534
  if (forward) {
335,357,775✔
535
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
767,949,332✔
536
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
438,582,972✔
537
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
438,582,972✔
538
    }
539
  } else {
540
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
13,927,557✔
541
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
7,936,142✔
542
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
7,936,142✔
543
    }
544
  }
545

546
  // set forward and backward
547
  if (forward) {
335,357,775✔
548
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
768,193,462✔
549
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
438,642,957✔
550

551
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
438,642,957✔
552
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
438,811,581✔
553

554
      pos[iLevel] = pNode;
438,786,116✔
555
    }
556
  } else {
557
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
13,969,262✔
558
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
7,937,443✔
559

560
      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
7,937,443✔
561
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
7,946,072✔
562

563
      pos[iLevel] = pNode;
8,018,833✔
564
    }
565
  }
566

567
  pTbData->sl.size++;
335,582,324✔
568
  if (pTbData->sl.level < pNode->level) {
335,582,324✔
569
    pTbData->sl.level = pNode->level;
587,768✔
570
  }
571

572
_exit:
334,994,556✔
573
  return code;
335,582,324✔
574
}
575

576
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
149✔
577
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
578
  int32_t code = 0;
149✔
579

580
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
149✔
581
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
149✔
582
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
149✔
583

584
  // copy and construct block data
585
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
149✔
586
  if (pBlockData == NULL) {
149!
587
    code = terrno;
×
588
    goto _exit;
×
589
  }
590

591
  pBlockData->suid = pTbData->suid;
149✔
592
  pBlockData->uid = pTbData->uid;
149✔
593
  pBlockData->nRow = aColData[0].nVal;
149✔
594
  pBlockData->aUid = NULL;
149✔
595
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
149✔
596
  if (pBlockData->aVersion == NULL) {
149!
597
    code = terrno;
×
598
    goto _exit;
×
599
  }
600
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
403✔
601
    pBlockData->aVersion[i] = version;
254✔
602
  }
603

604
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
149✔
605
  if (pBlockData->aTSKEY == NULL) {
149!
606
    code = terrno;
×
607
    goto _exit;
×
608
  }
609
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);
149✔
610

611
  pBlockData->nColData = nColData - 1;
149✔
612
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
149✔
613
  if (pBlockData->aColData == NULL) {
149!
614
    code = terrno;
×
615
    goto _exit;
×
616
  }
617

618
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
667✔
619
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
518✔
620
    if (code) goto _exit;
518!
621
  }
622

623
  // loop to add each row to the skiplist
624
  SMemSkipListNode *pos[SL_MAX_LEVEL];
625
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
149✔
626
  STsdbRowKey       key;
627

628
  // first row
629
  tsdbRowGetKey(&tRow, &key);
149✔
630
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
149✔
631
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
149!
632
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
149✔
633

634
  // remain row
635
  ++tRow.iRow;
149✔
636
  if (tRow.iRow < pBlockData->nRow) {
149✔
637
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
190✔
638
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
152✔
639
    }
640

641
    while (tRow.iRow < pBlockData->nRow) {
143✔
642
      tsdbRowGetKey(&tRow, &key);
105✔
643

644
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
105!
645
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
×
646
      }
647

648
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
105!
649

650
      ++tRow.iRow;
105✔
651
    }
652
  }
653

654
  if (key.key.ts >= pTbData->maxKey) {
149✔
655
    pTbData->maxKey = key.key.ts;
148✔
656
  }
657

658
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
149!
659
    if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
×
660
      tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
×
661
                TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
662
    }
663
  }
664

665
  // SMemTable
666
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
149✔
667
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
149✔
668
  pMemTable->nRow += pBlockData->nRow;
149✔
669

670
  if (affectedRows) *affectedRows = pBlockData->nRow;
149!
671

672
_exit:
×
673
  return code;
149✔
674
}
675

676
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
6,019,847✔
677
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
678
  int32_t code = 0;
6,019,847✔
679

680
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
6,019,847✔
681
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
6,019,847✔
682
  STsdbRowKey       key;
683
  SMemSkipListNode *pos[SL_MAX_LEVEL];
684
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
6,019,847✔
685
  int32_t           iRow = 0;
6,019,847✔
686

687
  // backward put first data
688
  tRow.pTSRow = aRow[iRow++];
6,019,847✔
689
  tsdbRowGetKey(&tRow, &key);
6,019,847✔
690
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
6,020,164✔
691
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
6,006,289✔
692
  if (code) goto _exit;
6,023,123!
693

694
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
6,023,123✔
695

696
  // forward put rest data
697
  if (iRow < nRow) {
6,023,123✔
698
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
3,477,462✔
699
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
2,739,828✔
700
    }
701

702
    while (iRow < nRow) {
330,280,525✔
703
      tRow.pTSRow = aRow[iRow];
329,553,589✔
704
      tsdbRowGetKey(&tRow, &key);
329,553,589✔
705

706
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
329,514,975✔
707
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
37,252,029✔
708
      }
709

710
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
329,512,531✔
711
      if (code) goto _exit;
329,542,891!
712

713
      iRow++;
329,542,891✔
714
    }
715
  }
716

717
  if (key.key.ts >= pTbData->maxKey) {
6,012,425✔
718
    pTbData->maxKey = key.key.ts;
5,865,930✔
719
  }
720
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
6,012,425✔
721
    TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
238,825✔
722
  }
723

724
  // SMemTable
725
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
6,021,687✔
726
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
6,021,687✔
727
  pMemTable->nRow += nRow;
6,021,687✔
728

729
  if (affectedRows) *affectedRows = nRow;
6,021,687!
730

731
_exit:
×
732
  return code;
6,021,687✔
733
}
734

735
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
1,624✔
736

737
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
4,390,996✔
738
  int32_t code = 0;
4,390,996✔
739

740
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
4,390,996✔
741
  if (nRef <= 0) {
4,396,411!
742
    tsdbError("vgId:%d, memtable ref count is invalid, ref:%d", TD_VID(pMemTable->pTsdb->pVnode), nRef);
×
743
  }
744

745
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
4,396,411✔
746

747
_exit:
4,396,469✔
748
  return code;
4,396,469✔
749
}
750

751
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
4,427,472✔
752
  if (pNode) {
4,427,472✔
753
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
4,396,486✔
754
  }
755

756
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
4,427,475✔
757
    tsdbMemTableDestroy(pMemTable, proactive);
31,045✔
758
  }
759
}
4,427,780✔
760

761
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
762
  STbData *pTbData1 = *(STbData **)p1;
763
  STbData *pTbData2 = *(STbData **)p2;
764

765
  if (pTbData1->suid < pTbData2->suid) {
766
    return -1;
767
  } else if (pTbData1->suid > pTbData2->suid) {
768
    return 1;
769
  }
770

771
  if (pTbData1->uid < pTbData2->uid) {
772
    return -1;
773
  } else if (pTbData1->uid > pTbData2->uid) {
774
    return 1;
775
  }
776

777
  return 0;
778
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc