• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5071

17 May 2026 01:15AM UTC coverage: 63.054% (-10.3%) from 73.326%
#5071

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

238317 of 377957 relevant lines covered (63.05%)

130539817.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.9
/source/dnode/vnode/src/tsdb/tsdbMemTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tglobal.h"
17
#include "tsdb.h"
18
#include "util/tsimplehash.h"
19

20
#define MEM_MIN_HASH 1024
21
#define SL_MAX_LEVEL 5
22

23
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
24
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
25
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
26
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
27
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
28
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
29
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
30
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
31

32
#define SL_MOVE_BACKWARD 0x1
33
#define SL_MOVE_FROM_POS 0x2
34

35
static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags);
36
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
37
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
38
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
39
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
40
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
41

42
static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
997,354,415✔
43
  STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn);
997,354,415✔
44
  STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn);
997,411,204✔
45
  if (tbData1->suid < tbData2->suid) return -1;
997,410,147✔
46
  if (tbData1->suid > tbData2->suid) return 1;
990,050,356✔
47
  if (tbData1->uid < tbData2->uid) return -1;
977,084,320✔
48
  if (tbData1->uid > tbData2->uid) return 1;
840,602,677✔
49
  return 0;
×
50
}
51

52
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
9,685,398✔
53
  int32_t    code = 0;
9,685,398✔
54
  SMemTable *pMemTable = NULL;
9,685,398✔
55

56
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
9,685,398✔
57
  if (pMemTable == NULL) {
9,697,574✔
58
    code = terrno;
×
59
    goto _err;
×
60
  }
61
  taosInitRWLatch(&pMemTable->latch);
9,697,574✔
62
  pMemTable->pTsdb = pTsdb;
9,725,650✔
63
  pMemTable->pPool = pTsdb->pVnode->inUse;
9,717,088✔
64
  pMemTable->nRef = 1;
9,747,191✔
65
  pMemTable->minVer = VERSION_MAX;
9,740,898✔
66
  pMemTable->maxVer = VERSION_MIN;
9,726,877✔
67
  pMemTable->minKey = TSKEY_MAX;
9,733,499✔
68
  pMemTable->maxKey = TSKEY_MIN;
9,721,349✔
69
  pMemTable->nRow = 0;
9,706,171✔
70
  pMemTable->nDel = 0;
9,737,032✔
71
  pMemTable->nTbData = 0;
9,728,741✔
72
  pMemTable->nBucket = MEM_MIN_HASH;
9,716,116✔
73
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
9,685,852✔
74
  if (pMemTable->aBucket == NULL) {
9,749,162✔
75
    code = terrno;
×
76
    taosMemoryFree(pMemTable);
×
77
    goto _err;
×
78
  }
79
  vnodeBufPoolRef(pMemTable->pPool);
9,751,562✔
80
  tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn);
9,750,052✔
81

82
  *ppMemTable = pMemTable;
9,764,744✔
83
  return code;
9,765,567✔
84

85
_err:
×
86
  *ppMemTable = NULL;
×
87
  return code;
×
88
}
89

90
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
9,764,265✔
91
  if (pMemTable) {
9,764,265✔
92
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
9,768,123✔
93
    taosMemoryFree(pMemTable->aBucket);
9,772,581✔
94
    taosMemoryFree(pMemTable);
9,769,603✔
95
  }
96
}
9,767,107✔
97

98
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
99
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
1,251,026,191✔
100

101
  while (pTbData) {
1,267,349,525✔
102
    if (pTbData->uid == uid) break;
1,052,387,558✔
103
    pTbData = pTbData->next;
16,149,709✔
104
  }
105

106
  return pTbData;
1,251,260,594✔
107
}
108

109
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
735,954,948✔
110
  STbData *pTbData;
111

112
  taosRLockLatch(&pMemTable->latch);
735,954,948✔
113
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
736,770,094✔
114
  taosRUnLockLatch(&pMemTable->latch);
736,770,094✔
115

116
  return pTbData;
736,854,408✔
117
}
118

119
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
512,669,044✔
120
  int32_t    code = 0;
512,669,044✔
121
  SMemTable *pMemTable = pTsdb->mem;
512,669,044✔
122
  STbData   *pTbData = NULL;
512,673,742✔
123
  tb_uid_t   suid = pSubmitTbData->suid;
512,678,342✔
124
  tb_uid_t   uid = pSubmitTbData->uid;
512,687,540✔
125

126
  if (tsBypassFlag & TSDB_BYPASS_RB_TSDB_WRITE_MEM) {
512,685,539✔
127
    goto _err;
×
128
  }
129

130
  // create/get STbData to op
131
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
512,685,539✔
132
  if (code) {
512,705,003✔
133
    goto _err;
×
134
  }
135

136
  // do insert impl
137
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
512,705,003✔
138
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
7,748,086✔
139
  } else {
140
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
504,959,564✔
141
  }
142
  if (code) goto _err;
512,667,897✔
143

144
  // update
145
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
512,667,897✔
146
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
512,674,031✔
147

148
  return code;
512,680,780✔
149

150
_err:
×
151
  terrno = code;
×
152
  return code;
×
153
}
154

155
int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey,
1,818,670✔
156
                            int8_t secureDelete) {
157
  int32_t    code = 0;
1,818,670✔
158
  SMemTable *pMemTable = pTsdb->mem;
1,818,670✔
159
  STbData   *pTbData = NULL;
1,818,670✔
160
  SVBufPool *pPool = pTsdb->pVnode->inUse;
1,818,670✔
161

162
  // check if table exists
163
  SMetaInfo info;
1,818,670✔
164
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
1,818,670✔
165
  if (code) {
1,818,670✔
166
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
167
    goto _err;
×
168
  }
169
  if (info.suid != suid) {
1,818,670✔
170
    code = TSDB_CODE_INVALID_MSG;
×
171
    goto _err;
×
172
  }
173

174
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
1,818,670✔
175
  if (code) {
1,818,670✔
176
    goto _err;
×
177
  }
178

179
  // secureDelete is merged by planner and vnode runtime config.
180
  int8_t doSecureErase = secureDelete;
1,818,670✔
181
  if (doSecureErase) {
1,818,670✔
182

183
    // Phase 2: overwrite on-disk (data file + STT file) blocks.
184
    // Errors are logged but not fatal: delete markers guarantee correctness
185
    // even if the physical overwrite fails.
186
    int32_t eraseCode = tsdbSecureEraseFileRange(pTsdb, suid, uid, sKey, eKey);
640✔
187
    if (eraseCode != 0) {
640✔
188
      tsdbWarn("vgId:%d, secure erase file range failed for suid:%" PRId64 " uid:%" PRId64
×
189
               " skey:%" PRId64 " eKey:%" PRId64 " since %s",
190
               TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(eraseCode));
191
    }
192
  }
193

194
  // do delete
195
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
1,818,670✔
196
  if (pDelData == NULL) {
1,818,670✔
197
    code = terrno;
×
198
    goto _err;
×
199
  }
200
  pDelData->version = version;
1,818,670✔
201
  pDelData->sKey = sKey;
1,818,670✔
202
  pDelData->eKey = eKey;
1,818,670✔
203
  pDelData->pNext = NULL;
1,818,670✔
204
  taosWLockLatch(&pTbData->lock);
1,818,670✔
205
  if (pTbData->pHead == NULL) {
1,818,670✔
206
    pTbData->pHead = pTbData->pTail = pDelData;
1,793,192✔
207
  } else {
208
    pTbData->pTail->pNext = pDelData;
25,478✔
209
    pTbData->pTail = pDelData;
25,478✔
210
  }
211
  taosWUnLockLatch(&pTbData->lock);
1,818,670✔
212

213
  pMemTable->nDel++;
1,818,670✔
214
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
1,818,670✔
215
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
1,818,670✔
216

217
  if (tsdbCacheDel(pTsdb, suid, uid, sKey, eKey) != 0) {
1,818,670✔
218
    tsdbError("vgId:%d, failed to delete cache data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64
×
219
              " eKey:%" PRId64 " at version %" PRId64,
220
              TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
221
  }
222

223
  tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
1,818,670✔
224
            " at version %" PRId64 " secureDelete:%d",
225
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, (int)doSecureErase);
226
  return code;
1,818,670✔
227

228
_err:
×
229
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
×
230
            " at version %" PRId64 " since %s",
231
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
232
  return code;
×
233
}
234

235
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter) {
449,640,572✔
236
  int32_t code = 0;
449,640,572✔
237

238
  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
449,640,572✔
239
  if ((*ppIter) == NULL) {
449,435,910✔
240
    code = terrno;
×
241
    goto _exit;
×
242
  }
243

244
  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);
449,351,484✔
245

246
_exit:
449,705,285✔
247
  return code;
449,705,285✔
248
}
249

250
void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
449,472,387✔
251
  if (pIter) {
449,472,387✔
252
    taosMemoryFree(pIter);
449,518,068✔
253
  }
254
  return NULL;
449,690,053✔
255
}
256

257
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter) {
725,670,508✔
258
  SMemSkipListNode *pos[SL_MAX_LEVEL];
725,670,508✔
259
  SMemSkipListNode *pHead;
260
  SMemSkipListNode *pTail;
261

262
  pHead = pTbData->sl.pHead;
725,830,948✔
263
  pTail = pTbData->sl.pTail;
726,217,830✔
264
  pIter->pTbData = pTbData;
726,330,708✔
265
  pIter->backward = backward;
726,324,443✔
266
  pIter->pRow = NULL;
726,312,518✔
267
  if (pFrom == NULL) {
726,050,891✔
268
    // create from head or tail
269
    if (backward) {
10,087,921✔
270
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
10,087,921✔
271
    } else {
272
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
×
273
    }
274
  } else {
275
    // create from a key
276
    if (backward) {
715,962,970✔
277
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
43,287,273✔
278
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
43,292,086✔
279
    } else {
280
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
672,675,697✔
281
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
671,676,611✔
282
    }
283
  }
284
}
725,053,723✔
285

286
bool tsdbTbDataIterNext(STbDataIter *pIter) {
2,147,483,647✔
287
  pIter->pRow = NULL;
2,147,483,647✔
288
  if (pIter->backward) {
2,147,483,647✔
289
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
2,147,483,647✔
290
      return false;
×
291
    }
292

293
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
2,147,483,647✔
294
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
2,147,483,647✔
295
      return false;
40,057,952✔
296
    }
297
  } else {
298
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
2,147,483,647✔
299
      return false;
×
300
    }
301

302
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
2,147,483,647✔
303
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
2,147,483,647✔
304
      return false;
451,010,797✔
305
    }
306
  }
307

308
  return true;
2,147,483,647✔
309
}
310

311
int64_t tsdbCountTbDataRows(STbData *pTbData) {
×
312
  SMemSkipListNode *pNode = pTbData->sl.pHead;
×
313
  int64_t           rowsNum = 0;
×
314

315
  while (NULL != pNode) {
×
316
    pNode = SL_GET_NODE_FORWARD(pNode, 0);
×
317
    if (pNode == pTbData->sl.pTail) {
×
318
      return rowsNum;
×
319
    }
320

321
    rowsNum++;
×
322
  }
323

324
  return rowsNum;
×
325
}
326

327
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
×
328
  taosRLockLatch(&pMemTable->latch);
×
329
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
×
330
    STbData *pTbData = pMemTable->aBucket[i];
×
331
    while (pTbData) {
×
332
      void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
×
333
      if (p == NULL) {
×
334
        pTbData = pTbData->next;
×
335
        continue;
×
336
      }
337

338
      *rowsNum += tsdbCountTbDataRows(pTbData);
×
339
      pTbData = pTbData->next;
×
340
    }
341
  }
342
  taosRUnLockLatch(&pMemTable->latch);
×
343
}
×
344

345
typedef int32_t (*__tsdb_cache_update)(SMemTable *imem, int64_t suid, int64_t uid);
346

347
int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func) {
60,664✔
348
  int32_t             code = 0;
60,664✔
349
  __tsdb_cache_update cb = (__tsdb_cache_update)func;
60,664✔
350

351
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
62,965,112✔
352
    STbData *pTbData = pMemTable->aBucket[i];
62,905,216✔
353
    while (pTbData) {
72,054,235✔
354
      code = (*cb)(pMemTable, pTbData->suid, pTbData->uid);
9,149,787✔
355
      if (code) {
9,149,403✔
356
        TAOS_RETURN(code);
×
357
      }
358

359
      pTbData = pTbData->next;
9,149,403✔
360
    }
361
  }
362

363
  return code;
60,664✔
364
}
365

366
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
18,108✔
367
  int32_t code = 0;
18,108✔
368

369
  int32_t   nBucket = pMemTable->nBucket * 2;
18,108✔
370
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
18,108✔
371
  if (aBucket == NULL) {
18,108✔
372
    code = terrno;
×
373
    goto _exit;
×
374
  }
375

376
  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
47,328,956✔
377
    STbData *pTbData = pMemTable->aBucket[iBucket];
47,310,848✔
378

379
    while (pTbData) {
94,621,696✔
380
      STbData *pNext = pTbData->next;
47,310,848✔
381

382
      int32_t idx = TABS(pTbData->uid) % nBucket;
47,310,848✔
383
      pTbData->next = aBucket[idx];
47,310,848✔
384
      aBucket[idx] = pTbData;
47,310,848✔
385

386
      pTbData = pNext;
47,310,848✔
387
    }
388
  }
389

390
  taosMemoryFree(pMemTable->aBucket);
18,108✔
391
  pMemTable->nBucket = nBucket;
18,108✔
392
  pMemTable->aBucket = aBucket;
18,108✔
393

394
_exit:
18,108✔
395
  return code;
18,108✔
396
}
397

398
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
514,489,747✔
399
  int32_t code = 0;
514,489,747✔
400

401
  // get
402
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
514,490,500✔
403
  if (pTbData) goto _exit;
514,490,500✔
404

405
  // create
406
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
60,962,062✔
407
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
60,960,743✔
408

409
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
60,961,471✔
410
  if (pTbData == NULL) {
60,986,730✔
411
    code = terrno;
×
412
    goto _exit;
×
413
  }
414
  pTbData->suid = suid;
60,986,730✔
415
  pTbData->uid = uid;
60,986,730✔
416
  pTbData->minKey = TSKEY_MAX;
60,986,509✔
417
  pTbData->maxKey = TSKEY_MIN;
60,984,526✔
418
  pTbData->pHead = NULL;
60,979,503✔
419
  pTbData->pTail = NULL;
60,980,158✔
420
  pTbData->sl.seed = taosRand();
60,970,793✔
421
  pTbData->sl.size = 0;
61,010,552✔
422
  pTbData->sl.maxLevel = maxLevel;
61,011,378✔
423
  pTbData->sl.level = 0;
61,010,714✔
424
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
61,010,714✔
425
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
61,004,738✔
426
  pTbData->sl.pHead->level = maxLevel;
61,003,699✔
427
  pTbData->sl.pTail->level = maxLevel;
61,003,772✔
428
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
365,606,705✔
429
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
304,630,221✔
430
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;
304,570,223✔
431

432
    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
304,568,601✔
433
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
304,541,574✔
434
  }
435
  taosInitRWLatch(&pTbData->lock);
60,976,484✔
436

437
  taosWLockLatch(&pMemTable->latch);
60,967,480✔
438

439
  if (pMemTable->nTbData >= pMemTable->nBucket) {
60,996,008✔
440
    code = tsdbMemTableRehash(pMemTable);
18,108✔
441
    if (code) {
18,108✔
442
      taosWUnLockLatch(&pMemTable->latch);
×
443
      goto _exit;
×
444
    }
445
  }
446

447
  int32_t idx = TABS(uid) % pMemTable->nBucket;
60,991,661✔
448
  pTbData->next = pMemTable->aBucket[idx];
60,993,571✔
449
  pMemTable->aBucket[idx] = pTbData;
60,984,955✔
450
  pMemTable->nTbData++;
60,979,316✔
451

452
  if (tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn) == NULL) {
60,980,042✔
453
    taosWUnLockLatch(&pMemTable->latch);
×
454
    code = TSDB_CODE_INTERNAL_ERROR;
×
455
    goto _exit;
×
456
  }
457

458
  taosWUnLockLatch(&pMemTable->latch);
60,962,489✔
459

460
_exit:
514,528,050✔
461
  if (code) {
514,528,050✔
462
    *ppTbData = NULL;
×
463
  } else {
464
    *ppTbData = pTbData;
514,528,050✔
465
  }
466
  return code;
514,528,714✔
467
}
468

469
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags) {
2,147,483,647✔
470
  SMemSkipListNode *px;
471
  SMemSkipListNode *pn;
472
  STsdbRowKey       tKey;
2,147,483,647✔
473
  int32_t           backward = flags & SL_MOVE_BACKWARD;
2,147,483,647✔
474
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
2,147,483,647✔
475

476
  if (backward) {
2,147,483,647✔
477
    px = pTbData->sl.pTail;
555,968,366✔
478

479
    if (!fromPos) {
555,974,156✔
480
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
1,313,353,282✔
481
        pos[iLevel] = px;
757,392,675✔
482
      }
483
    }
484

485
    if (pTbData->sl.level) {
555,876,532✔
486
      if (fromPos) px = pos[pTbData->sl.level - 1];
494,925,923✔
487

488
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
2,147,483,647✔
489
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
2,022,277,116✔
490
        while (pn != pTbData->sl.pHead) {
2,147,483,647✔
491
          tsdbRowGetKey(&pn->row, &tKey);
2,147,483,647✔
492

493
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
2,147,483,647✔
494
          if (c <= 0) {
2,147,483,647✔
495
            break;
2,013,001,775✔
496
          } else {
497
            px = pn;
370,877,963✔
498
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
370,877,963✔
499
          }
500
        }
501

502
        pos[iLevel] = px;
2,022,250,729✔
503
      }
504
    }
505
  } else {
506
    px = pTbData->sl.pHead;
2,147,483,647✔
507

508
    if (!fromPos) {
2,147,483,647✔
509
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
2,147,483,647✔
510
        pos[iLevel] = px;
1,668,972,381✔
511
      }
512
    }
513

514
    if (pTbData->sl.level) {
2,147,483,647✔
515
      if (fromPos) px = pos[pTbData->sl.level - 1];
2,147,483,647✔
516

517
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
2,147,483,647✔
518
        pn = SL_GET_NODE_FORWARD(px, iLevel);
2,147,483,647✔
519
        while (pn != pTbData->sl.pTail) {
2,147,483,647✔
520
          tsdbRowGetKey(&pn->row, &tKey);
2,147,483,647✔
521

522
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
2,147,483,647✔
523
          if (c >= 0) {
2,147,483,647✔
524
            break;
2,147,483,647✔
525
          } else {
526
            px = pn;
2,147,483,647✔
527
            pn = SL_GET_NODE_FORWARD(px, iLevel);
2,147,483,647✔
528
          }
529
        }
530

531
        pos[iLevel] = px;
2,147,483,647✔
532
      }
533
    }
534
  }
535
}
2,147,483,647✔
536

537
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
538
  int8_t level = 1;
2,147,483,647✔
539
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
2,147,483,647✔
540

541
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
2,147,483,647✔
542
    level++;
2,147,483,647✔
543
  }
544

545
  return level;
2,147,483,647✔
546
}
547
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
2,147,483,647✔
548
                           int8_t forward) {
549
  int32_t           code = 0;
2,147,483,647✔
550
  int8_t            level;
551
  SMemSkipListNode *pNode = NULL;
2,147,483,647✔
552
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
2,147,483,647✔
553
  int64_t           nSize;
554

555
  // create node
556
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
2,147,483,647✔
557
  nSize = SL_NODE_SIZE(level);
2,147,483,647✔
558
  if (pRow->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
559
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
2,147,483,647✔
560
  } else if (pRow->type == TSDBROW_COL_FMT) {
2,147,483,647✔
561
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
2,147,483,647✔
562
  }
563
  if (pNode == NULL) {
2,147,483,647✔
564
    code = terrno;
×
565
    goto _exit;
×
566
  }
567

568
  pNode->level = level;
2,147,483,647✔
569
  pNode->row = *pRow;
2,147,483,647✔
570
  if (pRow->type == TSDBROW_ROW_FMT) {
2,147,483,647✔
571
    pNode->row.pTSRow = (SRow *)((char *)pNode + nSize);
2,147,483,647✔
572
    memcpy(pNode->row.pTSRow, pRow->pTSRow, pRow->pTSRow->len);
2,147,483,647✔
573
  }
574

575
  // set node
576
  if (forward) {
2,147,483,647✔
577
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
2,147,483,647✔
578
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
2,147,483,647✔
579
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
2,147,483,647✔
580
    }
581
  } else {
582
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
1,101,767,556✔
583
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
660,675,338✔
584
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
660,693,532✔
585
    }
586
  }
587

588
  // set forward and backward
589
  if (forward) {
2,147,483,647✔
590
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
2,147,483,647✔
591
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
2,147,483,647✔
592

593
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
2,147,483,647✔
594
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
2,147,483,647✔
595

596
      pos[iLevel] = pNode;
2,147,483,647✔
597
    }
598
  } else {
599
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
1,173,360,153✔
600
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
660,669,392✔
601

602
      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
660,680,654✔
603
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
660,749,973✔
604

605
      pos[iLevel] = pNode;
660,728,629✔
606
    }
607
  }
608

609
  pTbData->sl.size++;
2,147,483,647✔
610
  if (pTbData->sl.level < pNode->level) {
2,147,483,647✔
611
    pTbData->sl.level = pNode->level;
119,893,754✔
612
  }
613

614
_exit:
2,147,483,647✔
615
  return code;
2,147,483,647✔
616
}
617

618
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
7,748,086✔
619
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
620
  int32_t code = 0;
7,748,086✔
621

622
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
7,748,086✔
623
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
7,748,086✔
624
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
7,748,086✔
625

626
  // copy and construct block data
627
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
7,748,086✔
628
  if (pBlockData == NULL) {
7,748,086✔
629
    code = terrno;
×
630
    goto _exit;
×
631
  }
632

633
  pBlockData->suid = pTbData->suid;
7,748,086✔
634
  pBlockData->uid = pTbData->uid;
7,748,086✔
635
  pBlockData->nRow = aColData[0].nVal;
7,748,086✔
636
  pBlockData->aUid = NULL;
7,748,086✔
637
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
7,748,086✔
638
  if (pBlockData->aVersion == NULL) {
7,747,702✔
639
    code = terrno;
×
640
    goto _exit;
×
641
  }
642
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
2,147,483,647✔
643
    pBlockData->aVersion[i] = version;
2,147,483,647✔
644
  }
645

646
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
7,747,318✔
647
  if (pBlockData->aTSKEY == NULL) {
7,747,702✔
648
    code = terrno;
×
649
    goto _exit;
×
650
  }
651
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);
7,747,702✔
652

653
  pBlockData->nColData = nColData - 1;
7,747,702✔
654
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
7,748,086✔
655
  if (pBlockData->aColData == NULL) {
7,747,692✔
656
    code = terrno;
×
657
    goto _exit;
×
658
  }
659

660
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
23,344,583✔
661
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
15,596,497✔
662
    if (code) goto _exit;
15,596,891✔
663
  }
664

665
  // loop to add each row to the skiplist
666
  SMemSkipListNode *pos[SL_MAX_LEVEL];
7,748,086✔
667
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
7,748,086✔
668
  STsdbRowKey       key;
7,748,086✔
669

670
  // first row
671
  tsdbRowGetKey(&tRow, &key);
7,748,086✔
672
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
7,748,086✔
673
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
7,748,086✔
674
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
7,748,086✔
675

676
  // remain row
677
  ++tRow.iRow;
7,748,086✔
678
  if (tRow.iRow < pBlockData->nRow) {
7,748,086✔
679
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
38,648,815✔
680
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
30,918,284✔
681
    }
682

683
    while (tRow.iRow < pBlockData->nRow) {
2,147,483,647✔
684
      tsdbRowGetKey(&tRow, &key);
2,147,483,647✔
685

686
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
2,147,483,647✔
687
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
×
688
      }
689

690
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
2,147,483,647✔
691

692
      ++tRow.iRow;
2,147,483,647✔
693
    }
694
  }
695

696
  if (key.key.ts >= pTbData->maxKey) {
7,526,696✔
697
    pTbData->maxKey = key.key.ts;
7,748,086✔
698
  }
699

700
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) {
7,748,086✔
701
    if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
×
702
      tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
×
703
                TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
704
    }
705
  }
706

707
  // SMemTable
708
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
7,748,086✔
709
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
7,748,086✔
710
  pMemTable->nRow += pBlockData->nRow;
7,748,086✔
711

712
  if (affectedRows) *affectedRows = pBlockData->nRow;
7,748,086✔
713

714
_exit:
7,748,086✔
715
  return code;
7,748,086✔
716
}
717

718
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
504,955,650✔
719
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
720
  int32_t code = 0;
504,955,650✔
721

722
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
504,955,650✔
723
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
504,966,509✔
724
  STsdbRowKey       key;
504,958,709✔
725
  SMemSkipListNode *pos[SL_MAX_LEVEL];
504,950,328✔
726
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
504,950,328✔
727
  int32_t           iRow = 0;
504,947,900✔
728

729
  // backward put first data
730
  tRow.pTSRow = aRow[iRow++];
504,947,900✔
731
  tsdbRowGetKey(&tRow, &key);
504,944,295✔
732
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
504,909,789✔
733
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
504,940,985✔
734
  if (code) goto _exit;
504,917,764✔
735

736
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
504,917,764✔
737

738
  // forward put rest data
739
  if (iRow < nRow) {
504,923,974✔
740
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
561,263,961✔
741
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
441,921,327✔
742
    }
743

744
    while (iRow < nRow) {
2,147,483,647✔
745
      tRow.pTSRow = aRow[iRow];
2,147,483,647✔
746
      tsdbRowGetKey(&tRow, &key);
2,147,483,647✔
747

748
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
2,147,483,647✔
749
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
2,147,483,647✔
750
      }
751

752
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
2,147,483,647✔
753
      if (code) goto _exit;
2,147,483,647✔
754

755
      iRow++;
2,147,483,647✔
756
    }
757
  }
758

759
  if (key.key.ts >= pTbData->maxKey) {
497,997,922✔
760
    pTbData->maxKey = key.key.ts;
498,405,970✔
761
  }
762
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) {
504,956,594✔
763
    TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
×
764
  }
765

766
  // SMemTable
767
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
504,941,388✔
768
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
504,943,205✔
769
  pMemTable->nRow += nRow;
504,932,479✔
770

771
  if (affectedRows) *affectedRows = nRow;
504,925,721✔
772

773
_exit:
504,917,909✔
774
  return code;
504,917,063✔
775
}
776

777
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
1,291✔
778

779
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
221,680,530✔
780
  int32_t code = 0;
221,680,530✔
781

782
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
221,680,530✔
783
  if (nRef <= 0) {
221,895,285✔
784
    tsdbError("vgId:%d, memtable ref count is invalid, ref:%d", TD_VID(pMemTable->pTsdb->pVnode), nRef);
×
785
  }
786

787
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
221,895,285✔
788

789
_exit:
221,695,330✔
790
  return code;
221,695,330✔
791
}
792

793
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
227,609,856✔
794
  if (pNode) {
227,609,856✔
795
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
221,967,556✔
796
  }
797

798
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
227,666,833✔
799
    tsdbMemTableDestroy(pMemTable, proactive);
5,733,153✔
800
  }
801
}
227,822,277✔
802

803
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
804
  STbData *pTbData1 = *(STbData **)p1;
805
  STbData *pTbData2 = *(STbData **)p2;
806

807
  if (pTbData1->suid < pTbData2->suid) {
808
    return -1;
809
  } else if (pTbData1->suid > pTbData2->suid) {
810
    return 1;
811
  }
812

813
  if (pTbData1->uid < pTbData2->uid) {
814
    return -1;
815
  } else if (pTbData1->uid > pTbData2->uid) {
816
    return 1;
817
  }
818

819
  return 0;
820
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc