• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4800

16 Oct 2025 09:19AM UTC coverage: 53.935% (-7.1%) from 61.083%
#4800

push

travis-ci

web-flow
Merge b32e3a393 into a190048d5

134724 of 323629 branches covered (41.63%)

Branch coverage included in aggregate %.

184803 of 268802 relevant lines covered (68.75%)

69058627.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.6
/source/dnode/vnode/src/tsdb/tsdbMemTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "util/tsimplehash.h"
18

19
#define MEM_MIN_HASH 1024
20
#define SL_MAX_LEVEL 5
21

22
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
23
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
24
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
25
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
26
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
27
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
28
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
29
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
30

31
#define SL_MOVE_BACKWARD 0x1
32
#define SL_MOVE_FROM_POS 0x2
33

34
static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags);
35
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
36
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
37
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
38
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
39
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
40

41
static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
280,976,994✔
42
  STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn);
280,976,994✔
43
  STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn);
280,977,309✔
44
  if (tbData1->suid < tbData2->suid) return -1;
280,976,793✔
45
  if (tbData1->suid > tbData2->suid) return 1;
275,240,028✔
46
  if (tbData1->uid < tbData2->uid) return -1;
241,834,094✔
47
  if (tbData1->uid > tbData2->uid) return 1;
210,444,221!
48
  return 0;
×
49
}
50

51
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
3,404,078✔
52
  int32_t    code = 0;
3,404,078✔
53
  SMemTable *pMemTable = NULL;
3,404,078✔
54

55
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
3,404,078!
56
  if (pMemTable == NULL) {
3,406,249!
57
    code = terrno;
×
58
    goto _err;
×
59
  }
60
  taosInitRWLatch(&pMemTable->latch);
3,406,249✔
61
  pMemTable->pTsdb = pTsdb;
3,409,195✔
62
  pMemTable->pPool = pTsdb->pVnode->inUse;
3,409,195✔
63
  pMemTable->nRef = 1;
3,412,800✔
64
  pMemTable->minVer = VERSION_MAX;
3,411,746✔
65
  pMemTable->maxVer = VERSION_MIN;
3,410,565✔
66
  pMemTable->minKey = TSKEY_MAX;
3,409,294✔
67
  pMemTable->maxKey = TSKEY_MIN;
3,411,203✔
68
  pMemTable->nRow = 0;
3,408,572✔
69
  pMemTable->nDel = 0;
3,409,349✔
70
  pMemTable->nTbData = 0;
3,408,303✔
71
  pMemTable->nBucket = MEM_MIN_HASH;
3,407,848✔
72
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
3,407,053!
73
  if (pMemTable->aBucket == NULL) {
3,411,542!
74
    code = terrno;
×
75
    taosMemoryFree(pMemTable);
×
76
    goto _err;
×
77
  }
78
  vnodeBufPoolRef(pMemTable->pPool);
3,412,218✔
79
  tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn);
3,409,856✔
80

81
  *ppMemTable = pMemTable;
3,412,850✔
82
  return code;
3,412,056✔
83

84
_err:
×
85
  *ppMemTable = NULL;
×
86
  return code;
×
87
}
88

89
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
3,413,842✔
90
  if (pMemTable) {
3,413,842!
91
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
3,413,842✔
92
    taosMemoryFree(pMemTable->aBucket);
3,414,537✔
93
    taosMemoryFree(pMemTable);
3,413,143✔
94
  }
95
}
3,414,149✔
96

97
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
98
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
328,799,024!
99

100
  while (pTbData) {
333,136,206✔
101
    if (pTbData->uid == uid) break;
313,885,464✔
102
    pTbData = pTbData->next;
4,335,409✔
103
  }
104

105
  return pTbData;
328,798,369✔
106
}
107

108
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
115,701,926✔
109
  STbData *pTbData;
110

111
  taosRLockLatch(&pMemTable->latch);
115,701,926✔
112
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
115,793,559✔
113
  taosRUnLockLatch(&pMemTable->latch);
115,793,559✔
114

115
  return pTbData;
115,795,205✔
116
}
117

118
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
211,678,050✔
119
  int32_t    code = 0;
211,678,050✔
120
  SMemTable *pMemTable = pTsdb->mem;
211,678,050✔
121
  STbData   *pTbData = NULL;
211,679,011✔
122
  tb_uid_t   suid = pSubmitTbData->suid;
211,678,992✔
123
  tb_uid_t   uid = pSubmitTbData->uid;
211,678,992✔
124

125
  if (tsBypassFlag & TSDB_BYPASS_RB_TSDB_WRITE_MEM) {
211,679,006✔
126
    goto _err;
160✔
127
  }
128

129
  // create/get STbData to op
130
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
211,678,846✔
131
  if (code) {
211,677,277!
132
    goto _err;
×
133
  }
134

135
  // do insert impl
136
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
211,677,277✔
137
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
55,944✔
138
  } else {
139
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
211,621,333✔
140
  }
141
  if (code) goto _err;
211,670,787!
142

143
  // update
144
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
211,670,787✔
145
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
211,665,926✔
146

147
  return code;
211,669,899✔
148

149
_err:
160✔
150
  terrno = code;
160✔
151
  return code;
160✔
152
}
153

154
int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
1,325,959✔
155
  int32_t    code = 0;
1,325,959✔
156
  SMemTable *pMemTable = pTsdb->mem;
1,325,959✔
157
  STbData   *pTbData = NULL;
1,325,959✔
158
  SVBufPool *pPool = pTsdb->pVnode->inUse;
1,325,959✔
159

160
  // check if table exists
161
  SMetaInfo info;
1,325,959✔
162
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
1,325,959✔
163
  if (code) {
1,325,959!
164
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
165
    goto _err;
×
166
  }
167
  if (info.suid != suid) {
1,325,959!
168
    code = TSDB_CODE_INVALID_MSG;
×
169
    goto _err;
×
170
  }
171

172
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
1,325,959✔
173
  if (code) {
1,325,959!
174
    goto _err;
×
175
  }
176

177
  // do delete
178
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
1,325,959✔
179
  if (pDelData == NULL) {
1,325,959!
180
    code = terrno;
×
181
    goto _err;
×
182
  }
183
  pDelData->version = version;
1,325,959✔
184
  pDelData->sKey = sKey;
1,325,959✔
185
  pDelData->eKey = eKey;
1,325,959✔
186
  pDelData->pNext = NULL;
1,325,959✔
187
  taosWLockLatch(&pTbData->lock);
1,325,959✔
188
  if (pTbData->pHead == NULL) {
1,325,959✔
189
    pTbData->pHead = pTbData->pTail = pDelData;
949,868✔
190
  } else {
191
    pTbData->pTail->pNext = pDelData;
376,091✔
192
    pTbData->pTail = pDelData;
376,091✔
193
  }
194
  taosWUnLockLatch(&pTbData->lock);
1,325,959✔
195

196
  pMemTable->nDel++;
1,325,959✔
197
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
1,325,959✔
198
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
1,325,959✔
199

200
  if (tsdbCacheDel(pTsdb, suid, uid, sKey, eKey) != 0) {
1,325,959!
201
    tsdbError("vgId:%d, failed to delete cache data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64
×
202
              " eKey:%" PRId64 " at version %" PRId64,
203
              TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
204
  }
205

206
  tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
1,325,959!
207
            " at version %" PRId64,
208
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
209
  return code;
1,325,959✔
210

211
_err:
×
212
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
×
213
            " at version %" PRId64 " since %s",
214
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
215
  return code;
×
216
}
217

218
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter) {
104,706,819✔
219
  int32_t code = 0;
104,706,819✔
220

221
  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
104,706,819✔
222
  if ((*ppIter) == NULL) {
104,654,740!
223
    code = terrno;
×
224
    goto _exit;
×
225
  }
226

227
  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);
104,669,757✔
228

229
_exit:
104,709,268✔
230
  return code;
104,709,268✔
231
}
232

233
void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
104,671,946✔
234
  if (pIter) {
104,671,946!
235
    taosMemoryFree(pIter);
104,673,664✔
236
  }
237
  return NULL;
104,729,371✔
238
}
239

240
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter) {
163,092,947✔
241
  SMemSkipListNode *pos[SL_MAX_LEVEL];
163,089,699✔
242
  SMemSkipListNode *pHead;
243
  SMemSkipListNode *pTail;
244

245
  pHead = pTbData->sl.pHead;
163,128,880✔
246
  pTail = pTbData->sl.pTail;
163,155,879✔
247
  pIter->pTbData = pTbData;
163,154,365✔
248
  pIter->backward = backward;
163,152,428✔
249
  pIter->pRow = NULL;
163,153,300✔
250
  if (pFrom == NULL) {
163,084,050✔
251
    // create from head or tail
252
    if (backward) {
298,091!
253
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
298,091✔
254
    } else {
255
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
×
256
    }
257
  } else {
258
    // create from a key
259
    if (backward) {
162,785,959✔
260
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
12,809,918✔
261
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
12,809,784✔
262
    } else {
263
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
149,976,041✔
264
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
149,929,503✔
265
    }
266
  }
267
}
163,019,537✔
268

269
bool tsdbTbDataIterNext(STbDataIter *pIter) {
2,147,483,647✔
270
  pIter->pRow = NULL;
2,147,483,647✔
271
  if (pIter->backward) {
2,147,483,647✔
272
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
760,868,786!
273
      return false;
×
274
    }
275

276
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
761,568,163✔
277
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
759,163,853✔
278
      return false;
11,906,655✔
279
    }
280
  } else {
281
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
2,147,483,647!
282
      return false;
×
283
    }
284

285
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
2,147,483,647✔
286
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
2,147,483,647✔
287
      return false;
89,373,201✔
288
    }
289
  }
290

291
  return true;
2,147,483,647✔
292
}
293

294
int64_t tsdbCountTbDataRows(STbData *pTbData) {
×
295
  SMemSkipListNode *pNode = pTbData->sl.pHead;
×
296
  int64_t           rowsNum = 0;
×
297

298
  while (NULL != pNode) {
×
299
    pNode = SL_GET_NODE_FORWARD(pNode, 0);
×
300
    if (pNode == pTbData->sl.pTail) {
×
301
      return rowsNum;
×
302
    }
303

304
    rowsNum++;
×
305
  }
306

307
  return rowsNum;
×
308
}
309

310
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
×
311
  taosRLockLatch(&pMemTable->latch);
×
312
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
×
313
    STbData *pTbData = pMemTable->aBucket[i];
×
314
    while (pTbData) {
×
315
      void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
×
316
      if (p == NULL) {
×
317
        pTbData = pTbData->next;
×
318
        continue;
×
319
      }
320

321
      *rowsNum += tsdbCountTbDataRows(pTbData);
×
322
      pTbData = pTbData->next;
×
323
    }
324
  }
325
  taosRUnLockLatch(&pMemTable->latch);
×
326
}
×
327

328
typedef int32_t (*__tsdb_cache_update)(SMemTable *imem, int64_t suid, int64_t uid);
329

330
int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func) {
6,933✔
331
  int32_t             code = 0;
6,933✔
332
  __tsdb_cache_update cb = (__tsdb_cache_update)func;
6,933✔
333

334
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
7,070,236✔
335
    STbData *pTbData = pMemTable->aBucket[i];
7,059,568✔
336
    while (pTbData) {
7,121,683✔
337
      code = (*cb)(pMemTable, pTbData->suid, pTbData->uid);
58,380✔
338
      if (code) {
58,380!
339
        TAOS_RETURN(code);
×
340
      }
341

342
      pTbData = pTbData->next;
58,380✔
343
    }
344
  }
345

346
  return code;
6,933✔
347
}
348

349
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
5,390✔
350
  int32_t code = 0;
5,390✔
351

352
  int32_t   nBucket = pMemTable->nBucket * 2;
5,390✔
353
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
5,390!
354
  if (aBucket == NULL) {
5,390!
355
    code = terrno;
×
356
    goto _exit;
×
357
  }
358

359
  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
14,908,686✔
360
    STbData *pTbData = pMemTable->aBucket[iBucket];
14,903,296✔
361

362
    while (pTbData) {
29,806,592✔
363
      STbData *pNext = pTbData->next;
14,903,296✔
364

365
      int32_t idx = TABS(pTbData->uid) % nBucket;
14,903,296!
366
      pTbData->next = aBucket[idx];
14,903,296✔
367
      aBucket[idx] = pTbData;
14,903,296✔
368

369
      pTbData = pNext;
14,903,296✔
370
    }
371
  }
372

373
  taosMemoryFree(pMemTable->aBucket);
5,390!
374
  pMemTable->nBucket = nBucket;
5,390✔
375
  pMemTable->aBucket = aBucket;
5,390✔
376

377
_exit:
5,390✔
378
  return code;
5,390✔
379
}
380

381
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
213,004,481✔
382
  int32_t code = 0;
213,004,481✔
383

384
  // get
385
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
213,004,810✔
386
  if (pTbData) goto _exit;
213,004,810✔
387

388
  // create
389
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
16,431,224✔
390
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
16,431,224✔
391

392
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
16,431,224✔
393
  if (pTbData == NULL) {
16,431,224!
394
    code = terrno;
×
395
    goto _exit;
×
396
  }
397
  pTbData->suid = suid;
16,431,224✔
398
  pTbData->uid = uid;
16,431,224✔
399
  pTbData->minKey = TSKEY_MAX;
16,431,224✔
400
  pTbData->maxKey = TSKEY_MIN;
16,431,224✔
401
  pTbData->pHead = NULL;
16,431,219✔
402
  pTbData->pTail = NULL;
16,431,219✔
403
  pTbData->sl.seed = taosRand();
16,431,219✔
404
  pTbData->sl.size = 0;
16,431,224✔
405
  pTbData->sl.maxLevel = maxLevel;
16,431,224✔
406
  pTbData->sl.level = 0;
16,431,224✔
407
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
16,431,224✔
408
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
16,431,224✔
409
  pTbData->sl.pHead->level = maxLevel;
16,431,224✔
410
  pTbData->sl.pTail->level = maxLevel;
16,431,224✔
411
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
98,586,232✔
412
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
82,155,008✔
413
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;
82,155,360✔
414

415
    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
82,156,009✔
416
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
82,155,037✔
417
  }
418
  taosInitRWLatch(&pTbData->lock);
16,431,224✔
419

420
  taosWLockLatch(&pMemTable->latch);
16,430,909✔
421

422
  if (pMemTable->nTbData >= pMemTable->nBucket) {
16,431,219✔
423
    code = tsdbMemTableRehash(pMemTable);
5,390✔
424
    if (code) {
5,390!
425
      taosWUnLockLatch(&pMemTable->latch);
×
426
      goto _exit;
×
427
    }
428
  }
429

430
  int32_t idx = TABS(uid) % pMemTable->nBucket;
16,431,201!
431
  pTbData->next = pMemTable->aBucket[idx];
16,430,904✔
432
  pMemTable->aBucket[idx] = pTbData;
16,431,219✔
433
  pMemTable->nTbData++;
16,431,224✔
434

435
  if (tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn) == NULL) {
16,430,904!
436
    taosWUnLockLatch(&pMemTable->latch);
×
437
    code = TSDB_CODE_INTERNAL_ERROR;
×
438
    goto _exit;
×
439
  }
440

441
  taosWUnLockLatch(&pMemTable->latch);
16,430,274✔
442

443
_exit:
213,003,551✔
444
  if (code) {
213,003,551!
445
    *ppTbData = NULL;
×
446
  } else {
447
    *ppTbData = pTbData;
213,003,551✔
448
  }
449
  return code;
213,003,866✔
450
}
451

452
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags) {
2,147,483,647✔
453
  SMemSkipListNode *px;
454
  SMemSkipListNode *pn;
455
  STsdbRowKey       tKey;
2,147,483,647✔
456
  int32_t           backward = flags & SL_MOVE_BACKWARD;
2,147,483,647✔
457
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
2,147,483,647✔
458

459
  if (backward) {
2,147,483,647✔
460
    px = pTbData->sl.pTail;
224,489,778✔
461

462
    if (!fromPos) {
224,489,858✔
463
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
461,247,678✔
464
        pos[iLevel] = px;
236,761,432✔
465
      }
466
    }
467

468
    if (pTbData->sl.level) {
224,483,491✔
469
      if (fromPos) px = pos[pTbData->sl.level - 1];
208,023,767!
470

471
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
1,093,680,959✔
472
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
885,660,819✔
473
        while (pn != pTbData->sl.pHead) {
1,018,374,681✔
474
          tsdbRowGetKey(&pn->row, &tKey);
1,015,560,675✔
475

476
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
1,015,564,447✔
477
          if (c <= 0) {
1,015,564,027✔
478
            break;
882,847,301✔
479
          } else {
480
            px = pn;
132,716,726✔
481
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
132,716,726✔
482
          }
483
        }
484

485
        pos[iLevel] = px;
885,651,037✔
486
      }
487
    }
488
  } else {
489
    px = pTbData->sl.pHead;
2,147,483,647✔
490

491
    if (!fromPos) {
2,147,483,647✔
492
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
544,119,894✔
493
        pos[iLevel] = px;
394,104,166✔
494
      }
495
    }
496

497
    if (pTbData->sl.level) {
2,147,483,647✔
498
      if (fromPos) px = pos[pTbData->sl.level - 1];
2,147,483,647✔
499

500
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
2,147,483,647✔
501
        pn = SL_GET_NODE_FORWARD(px, iLevel);
2,147,483,647✔
502
        while (pn != pTbData->sl.pTail) {
2,147,483,647✔
503
          tsdbRowGetKey(&pn->row, &tKey);
2,147,483,647✔
504

505
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
2,147,483,647✔
506
          if (c >= 0) {
2,147,483,647✔
507
            break;
2,147,483,647✔
508
          } else {
509
            px = pn;
2,147,483,647✔
510
            pn = SL_GET_NODE_FORWARD(px, iLevel);
2,147,483,647✔
511
          }
512
        }
513

514
        pos[iLevel] = px;
2,147,483,647✔
515
      }
516
    }
517
  }
518
}
2,147,483,647✔
519

520
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
521
  int8_t level = 1;
2,147,483,647✔
522
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
2,147,483,647✔
523

524
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
2,147,483,647✔
525
    level++;
2,147,483,647✔
526
  }
527

528
  return level;
2,147,483,647✔
529
}
530
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
2,147,483,647✔
531
                           int8_t forward) {
532
  int32_t           code = 0;
2,147,483,647✔
533
  int8_t            level;
534
  SMemSkipListNode *pNode = NULL;
2,147,483,647✔
535
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
2,147,483,647✔
536
  int64_t           nSize;
537

538
  // create node
539
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
2,147,483,647✔
540
  nSize = SL_NODE_SIZE(level);
2,147,483,647✔
541
  if (pRow->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
542
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
2,147,483,647✔
543
  } else if (pRow->type == TSDBROW_COL_FMT) {
4,001,544✔
544
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
3,999,734✔
545
  }
546
  if (pNode == NULL) {
2,147,483,647!
547
    code = terrno;
×
548
    goto _exit;
×
549
  }
550

551
  pNode->level = level;
2,147,483,647✔
552
  pNode->row = *pRow;
2,147,483,647✔
553
  if (pRow->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
554
    pNode->row.pTSRow = (SRow *)((char *)pNode + nSize);
2,147,483,647✔
555
    memcpy(pNode->row.pTSRow, pRow->pTSRow, pRow->pTSRow->len);
2,147,483,647!
556
  }
557

558
  // set node
559
  if (forward) {
2,147,483,647✔
560
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
2,147,483,647✔
561
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
2,147,483,647✔
562
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
2,147,483,647✔
563
    }
564
  } else {
565
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
485,603,942✔
566
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
275,342,621✔
567
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
275,343,245✔
568
    }
569
  }
570

571
  // set forward and backward
572
  if (forward) {
2,147,483,647✔
573
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
2,147,483,647✔
574
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
2,147,483,647✔
575

576
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
2,147,483,647✔
577
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
2,147,483,647✔
578

579
      pos[iLevel] = pNode;
2,147,483,647✔
580
    }
581
  } else {
582
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
487,017,050✔
583
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
275,343,402✔
584

585
      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
275,346,248✔
586
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
275,349,985✔
587

588
      pos[iLevel] = pNode;
275,347,371✔
589
    }
590
  }
591

592
  pTbData->sl.size++;
2,147,483,647✔
593
  if (pTbData->sl.level < pNode->level) {
2,147,483,647✔
594
    pTbData->sl.level = pNode->level;
22,203,310✔
595
  }
596

597
_exit:
2,147,483,647✔
598
  return code;
2,147,483,647✔
599
}
600

601
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
55,944✔
602
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
603
  int32_t code = 0;
55,944✔
604

605
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
55,944✔
606
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
55,944✔
607
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
55,939✔
608

609
  // copy and construct block data
610
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
55,944✔
611
  if (pBlockData == NULL) {
55,944!
612
    code = terrno;
×
613
    goto _exit;
×
614
  }
615

616
  pBlockData->suid = pTbData->suid;
55,944✔
617
  pBlockData->uid = pTbData->uid;
55,944✔
618
  pBlockData->nRow = aColData[0].nVal;
55,944✔
619
  pBlockData->aUid = NULL;
55,939✔
620
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
55,939✔
621
  if (pBlockData->aVersion == NULL) {
55,939!
622
    code = terrno;
×
623
    goto _exit;
×
624
  }
625
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
4,057,713✔
626
    pBlockData->aVersion[i] = version;
4,001,764✔
627
  }
628

629
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
55,939✔
630
  if (pBlockData->aTSKEY == NULL) {
55,944!
631
    code = terrno;
×
632
    goto _exit;
×
633
  }
634
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);
55,944!
635

636
  pBlockData->nColData = nColData - 1;
55,944✔
637
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
55,944✔
638
  if (pBlockData->aColData == NULL) {
55,944!
639
    code = terrno;
×
640
    goto _exit;
×
641
  }
642

643
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
424,647✔
644
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
368,703✔
645
    if (code) goto _exit;
368,703!
646
  }
647

648
  // loop to add each row to the skiplist
649
  SMemSkipListNode *pos[SL_MAX_LEVEL];
55,944✔
650
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
55,944✔
651
  STsdbRowKey       key;
55,944✔
652

653
  // first row
654
  tsdbRowGetKey(&tRow, &key);
55,944✔
655
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
55,944✔
656
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
55,944!
657
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
55,944✔
658

659
  // remain row
660
  ++tRow.iRow;
55,944✔
661
  if (tRow.iRow < pBlockData->nRow) {
55,944✔
662
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
254,595✔
663
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
203,676✔
664
    }
665

666
    while (tRow.iRow < pBlockData->nRow) {
3,993,272✔
667
      tsdbRowGetKey(&tRow, &key);
3,942,206✔
668

669
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
3,944,957!
670
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
×
671
      }
672

673
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
3,944,957!
674

675
      ++tRow.iRow;
3,942,353✔
676
    }
677
  }
678

679
  if (key.key.ts >= pTbData->maxKey) {
55,944!
680
    pTbData->maxKey = key.key.ts;
55,944✔
681
  }
682

683
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) {
55,944!
684
    if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
×
685
      tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
×
686
                TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
687
    }
688
  }
689

690
  // SMemTable
691
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
55,939✔
692
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
55,944✔
693
  pMemTable->nRow += pBlockData->nRow;
55,944✔
694

695
  if (affectedRows) *affectedRows = pBlockData->nRow;
55,944!
696

697
_exit:
55,944✔
698
  return code;
55,944✔
699
}
700

701
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
211,622,134✔
702
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
703
  int32_t code = 0;
211,622,134✔
704

705
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
211,622,134✔
706
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
211,622,273✔
707
  STsdbRowKey       key;
211,620,967✔
708
  SMemSkipListNode *pos[SL_MAX_LEVEL];
211,619,994✔
709
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
211,621,609✔
710
  int32_t           iRow = 0;
211,621,609✔
711

712
  // backward put first data
713
  tRow.pTSRow = aRow[iRow++];
211,621,609✔
714
  tsdbRowGetKey(&tRow, &key);
211,622,225✔
715
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
211,620,970✔
716
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
211,621,577✔
717
  if (code) goto _exit;
211,615,316!
718

719
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
211,615,316✔
720

721
  // forward put rest data
722
  if (iRow < nRow) {
211,611,536✔
723
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
26,127,285✔
724
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
20,611,544✔
725
    }
726

727
    while (iRow < nRow) {
2,147,483,647✔
728
      tRow.pTSRow = aRow[iRow];
2,147,483,647✔
729
      tsdbRowGetKey(&tRow, &key);
2,147,483,647✔
730

731
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
2,147,483,647✔
732
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
2,147,483,647✔
733
      }
734

735
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
2,147,483,647✔
736
      if (code) goto _exit;
2,147,483,647!
737

738
      iRow++;
2,147,483,647✔
739
    }
740
  }
741

742
  if (key.key.ts >= pTbData->maxKey) {
211,409,938✔
743
    pTbData->maxKey = key.key.ts;
208,947,161✔
744
  }
745
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) {
211,613,711!
746
    TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
×
747
  }
748

749
  // SMemTable
750
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
211,613,609✔
751
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
211,611,251✔
752
  pMemTable->nRow += nRow;
211,616,207✔
753

754
  if (affectedRows) *affectedRows = nRow;
211,607,797!
755

756
_exit:
211,612,495✔
757
  return code;
211,617,385✔
758
}
759

760
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
1,573✔
761

762
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
47,542,503✔
763
  int32_t code = 0;
47,542,503✔
764

765
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
47,542,503✔
766
  if (nRef <= 0) {
47,548,145!
767
    tsdbError("vgId:%d, memtable ref count is invalid, ref:%d", TD_VID(pMemTable->pTsdb->pVnode), nRef);
×
768
  }
769

770
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
47,548,145✔
771

772
_exit:
47,539,961✔
773
  return code;
47,539,961✔
774
}
775

776
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
49,534,828✔
777
  if (pNode) {
49,534,828✔
778
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
47,556,891✔
779
  }
780

781
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
49,531,011✔
782
    tsdbMemTableDestroy(pMemTable, proactive);
1,982,179✔
783
  }
784
}
49,540,146✔
785

786
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
787
  STbData *pTbData1 = *(STbData **)p1;
788
  STbData *pTbData2 = *(STbData **)p2;
789

790
  if (pTbData1->suid < pTbData2->suid) {
791
    return -1;
792
  } else if (pTbData1->suid > pTbData2->suid) {
793
    return 1;
794
  }
795

796
  if (pTbData1->uid < pTbData2->uid) {
797
    return -1;
798
  } else if (pTbData1->uid > pTbData2->uid) {
799
    return 1;
800
  }
801

802
  return 0;
803
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc