• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.63
/source/dnode/vnode/src/tsdb/tsdbMemTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "util/tsimplehash.h"
18

19
#define MEM_MIN_HASH 1024
20
#define SL_MAX_LEVEL 5
21

22
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
23
#define SL_NODE_SIZE(l)               (sizeof(SMemSkipListNode) + ((l) << 4))
24
#define SL_NODE_FORWARD(n, l)         ((n)->forwards[l])
25
#define SL_NODE_BACKWARD(n, l)        ((n)->forwards[(n)->level + (l)])
26
#define SL_GET_NODE_FORWARD(n, l)     ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_FORWARD(n, l)))
27
#define SL_GET_NODE_BACKWARD(n, l)    ((SMemSkipListNode *)atomic_load_ptr(&SL_NODE_BACKWARD(n, l)))
28
#define SL_SET_NODE_FORWARD(n, l, p)  atomic_store_ptr(&SL_NODE_FORWARD(n, l), p)
29
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_ptr(&SL_NODE_BACKWARD(n, l), p)
30

31
#define SL_MOVE_BACKWARD 0x1
32
#define SL_MOVE_FROM_POS 0x2
33

34
static void    tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags);
35
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
36
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
37
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
38
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
39
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
40

41
static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
1,654,915✔
42
  STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn);
1,654,915✔
43
  STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn);
1,654,915✔
44
  if (tbData1->suid < tbData2->suid) return -1;
1,654,915✔
45
  if (tbData1->suid > tbData2->suid) return 1;
1,602,586✔
46
  if (tbData1->uid < tbData2->uid) return -1;
1,462,399✔
47
  if (tbData1->uid > tbData2->uid) return 1;
1,270,470!
48
  return 0;
×
49
}
50

51
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
38,531✔
52
  int32_t    code = 0;
38,531✔
53
  SMemTable *pMemTable = NULL;
38,531✔
54

55
  pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
38,531✔
56
  if (pMemTable == NULL) {
38,541!
57
    code = terrno;
×
58
    goto _err;
×
59
  }
60
  taosInitRWLatch(&pMemTable->latch);
38,541✔
61
  pMemTable->pTsdb = pTsdb;
38,542✔
62
  pMemTable->pPool = pTsdb->pVnode->inUse;
38,542✔
63
  pMemTable->nRef = 1;
38,542✔
64
  pMemTable->minVer = VERSION_MAX;
38,542✔
65
  pMemTable->maxVer = VERSION_MIN;
38,542✔
66
  pMemTable->minKey = TSKEY_MAX;
38,542✔
67
  pMemTable->maxKey = TSKEY_MIN;
38,542✔
68
  pMemTable->nRow = 0;
38,542✔
69
  pMemTable->nDel = 0;
38,542✔
70
  pMemTable->nTbData = 0;
38,542✔
71
  pMemTable->nBucket = MEM_MIN_HASH;
38,542✔
72
  pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
38,542✔
73
  if (pMemTable->aBucket == NULL) {
38,537!
74
    code = terrno;
×
75
    taosMemoryFree(pMemTable);
×
76
    goto _err;
×
77
  }
78
  vnodeBufPoolRef(pMemTable->pPool);
38,537✔
79
  tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn);
38,539✔
80

81
  *ppMemTable = pMemTable;
38,540✔
82
  return code;
38,540✔
83

84
_err:
×
85
  *ppMemTable = NULL;
×
86
  return code;
×
87
}
88

89
void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) {
38,545✔
90
  if (pMemTable) {
38,545!
91
    vnodeBufPoolUnRef(pMemTable->pPool, proactive);
38,545✔
92
    taosMemoryFree(pMemTable->aBucket);
38,545✔
93
    taosMemoryFree(pMemTable);
38,544✔
94
  }
95
}
38,544✔
96

97
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
98
  STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
5,777,325✔
99

100
  while (pTbData) {
5,793,349✔
101
    if (pTbData->uid == uid) break;
4,620,401✔
102
    pTbData = pTbData->next;
16,024✔
103
  }
104

105
  return pTbData;
5,777,325✔
106
}
107

108
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
2,879,449✔
109
  STbData *pTbData;
110

111
  taosRLockLatch(&pMemTable->latch);
2,879,449✔
112
  pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
2,882,896✔
113
  taosRUnLockLatch(&pMemTable->latch);
2,882,896✔
114

115
  return pTbData;
2,882,821✔
116
}
117

118
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
2,881,575✔
119
  int32_t    code = 0;
2,881,575✔
120
  SMemTable *pMemTable = pTsdb->mem;
2,881,575✔
121
  STbData   *pTbData = NULL;
2,881,575✔
122
  tb_uid_t   suid = pSubmitTbData->suid;
2,881,575✔
123
  tb_uid_t   uid = pSubmitTbData->uid;
2,881,575✔
124

125
  if (tsBypassFlag & TSDB_BYPASS_RB_TSDB_WRITE_MEM) {
2,881,575!
UNCOV
126
    goto _err;
×
127
  }
128

129
  // create/get STbData to op
130
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
2,881,575✔
131
  if (code) {
2,880,497!
UNCOV
132
    goto _err;
×
133
  }
134

135
  // do insert impl
136
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
2,880,497✔
137
    code = tsdbInsertColDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
148✔
138
  } else {
139
    code = tsdbInsertRowDataToTable(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
2,880,349✔
140
  }
141
  if (code) goto _err;
2,884,105!
142

143
  // update
144
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
2,884,105✔
145
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
2,884,105✔
146

147
  return code;
2,884,105✔
148

UNCOV
149
_err:
×
UNCOV
150
  terrno = code;
×
UNCOV
151
  return code;
×
152
}
153

154
int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
12,937✔
155
  int32_t    code = 0;
12,937✔
156
  SMemTable *pMemTable = pTsdb->mem;
12,937✔
157
  STbData   *pTbData = NULL;
12,937✔
158
  SVBufPool *pPool = pTsdb->pVnode->inUse;
12,937✔
159

160
  // check if table exists
161
  SMetaInfo info;
162
  code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
12,937✔
163
  if (code) {
12,937!
164
    code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
165
    goto _err;
×
166
  }
167
  if (info.suid != suid) {
12,937!
UNCOV
168
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
169
    goto _err;
×
170
  }
171

172
  code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
12,937✔
173
  if (code) {
12,937!
UNCOV
174
    goto _err;
×
175
  }
176

177
  // do delete
178
  SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
12,937✔
179
  if (pDelData == NULL) {
12,937!
UNCOV
180
    code = terrno;
×
UNCOV
181
    goto _err;
×
182
  }
183
  pDelData->version = version;
12,937✔
184
  pDelData->sKey = sKey;
12,937✔
185
  pDelData->eKey = eKey;
12,937✔
186
  pDelData->pNext = NULL;
12,937✔
187
  taosWLockLatch(&pTbData->lock);
12,937✔
188
  if (pTbData->pHead == NULL) {
12,937✔
189
    pTbData->pHead = pTbData->pTail = pDelData;
7,410✔
190
  } else {
191
    pTbData->pTail->pNext = pDelData;
5,527✔
192
    pTbData->pTail = pDelData;
5,527✔
193
  }
194
  taosWUnLockLatch(&pTbData->lock);
12,937✔
195

196
  pMemTable->nDel++;
12,937✔
197
  pMemTable->minVer = TMIN(pMemTable->minVer, version);
12,937✔
198
  pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
12,937✔
199

200
  if (tsdbCacheDel(pTsdb, suid, uid, sKey, eKey) != 0) {
12,937!
UNCOV
201
    tsdbError("vgId:%d, failed to delete cache data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64
×
202
              " eKey:%" PRId64 " at version %" PRId64,
203
              TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
204
  }
205

206
  tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
12,937✔
207
            " at version %" PRId64,
208
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
209
  return code;
12,937✔
210

211
_err:
×
UNCOV
212
  tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
×
213
            " at version %" PRId64 " since %s",
214
            TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
UNCOV
215
  return code;
×
216
}
217

218
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter) {
1,505,502✔
219
  int32_t code = 0;
1,505,502✔
220

221
  (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter));
1,505,502✔
222
  if ((*ppIter) == NULL) {
1,505,579!
UNCOV
223
    code = terrno;
×
UNCOV
224
    goto _exit;
×
225
  }
226

227
  tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter);
1,505,579✔
228

229
_exit:
1,503,728✔
230
  return code;
1,503,728✔
231
}
232

233
void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
1,503,483✔
234
  if (pIter) {
1,503,483!
235
    taosMemoryFree(pIter);
1,503,508✔
236
  }
237
  return NULL;
1,505,273✔
238
}
239

240
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter) {
2,524,271✔
241
  SMemSkipListNode *pos[SL_MAX_LEVEL];
242
  SMemSkipListNode *pHead;
243
  SMemSkipListNode *pTail;
244

245
  pHead = pTbData->sl.pHead;
2,524,271✔
246
  pTail = pTbData->sl.pTail;
2,524,271✔
247
  pIter->pTbData = pTbData;
2,524,271✔
248
  pIter->backward = backward;
2,524,271✔
249
  pIter->pRow = NULL;
2,524,271✔
250
  if (pFrom == NULL) {
2,524,271✔
251
    // create from head or tail
252
    if (backward) {
395!
253
      pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
395✔
254
    } else {
UNCOV
255
      pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
×
256
    }
257
  } else {
258
    // create from a key
259
    if (backward) {
2,523,876✔
260
      tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
135,021✔
261
      pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
134,781✔
262
    } else {
263
      tbDataMovePosTo(pTbData, pos, pFrom, 0);
2,388,855✔
264
      pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
2,381,047✔
265
    }
266
  }
267
}
2,515,110✔
268

269
bool tsdbTbDataIterNext(STbDataIter *pIter) {
760,500,616✔
270
  pIter->pRow = NULL;
760,500,616✔
271
  if (pIter->backward) {
760,500,616✔
272
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
42,280,449!
UNCOV
273
      return false;
×
274
    }
275

276
    pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
42,280,449✔
277
    if (pIter->pNode == pIter->pTbData->sl.pHead) {
42,157,352✔
278
      return false;
128,470✔
279
    }
280
  } else {
281
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
718,220,167!
UNCOV
282
      return false;
×
283
    }
284

285
    pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
718,220,167✔
286
    if (pIter->pNode == pIter->pTbData->sl.pTail) {
716,645,027✔
287
      return false;
1,402,754✔
288
    }
289
  }
290

291
  return true;
757,271,155✔
292
}
293

294
int64_t tsdbCountTbDataRows(STbData *pTbData) {
×
295
  SMemSkipListNode *pNode = pTbData->sl.pHead;
×
296
  int64_t           rowsNum = 0;
×
297

UNCOV
298
  while (NULL != pNode) {
×
UNCOV
299
    pNode = SL_GET_NODE_FORWARD(pNode, 0);
×
300
    if (pNode == pTbData->sl.pTail) {
×
UNCOV
301
      return rowsNum;
×
302
    }
303

UNCOV
304
    rowsNum++;
×
305
  }
306

307
  return rowsNum;
×
308
}
309

310
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
×
311
  taosRLockLatch(&pMemTable->latch);
×
312
  for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
×
313
    STbData *pTbData = pMemTable->aBucket[i];
×
314
    while (pTbData) {
×
UNCOV
315
      void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
×
UNCOV
316
      if (p == NULL) {
×
317
        pTbData = pTbData->next;
×
318
        continue;
×
319
      }
320

321
      *rowsNum += tsdbCountTbDataRows(pTbData);
×
322
      pTbData = pTbData->next;
×
323
    }
324
  }
UNCOV
325
  taosRUnLockLatch(&pMemTable->latch);
×
UNCOV
326
}
×
327

328
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
12✔
329
  int32_t code = 0;
12✔
330

331
  int32_t   nBucket = pMemTable->nBucket * 2;
12✔
332
  STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
12✔
333
  if (aBucket == NULL) {
12!
UNCOV
334
    code = terrno;
×
UNCOV
335
    goto _exit;
×
336
  }
337

338
  for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
46,092✔
339
    STbData *pTbData = pMemTable->aBucket[iBucket];
46,080✔
340

341
    while (pTbData) {
92,160✔
342
      STbData *pNext = pTbData->next;
46,080✔
343

344
      int32_t idx = TABS(pTbData->uid) % nBucket;
46,080✔
345
      pTbData->next = aBucket[idx];
46,080✔
346
      aBucket[idx] = pTbData;
46,080✔
347

348
      pTbData = pNext;
46,080✔
349
    }
350
  }
351

352
  taosMemoryFree(pMemTable->aBucket);
12✔
353
  pMemTable->nBucket = nBucket;
12✔
354
  pMemTable->aBucket = aBucket;
12✔
355

356
_exit:
12✔
357
  return code;
12✔
358
}
359

360
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
2,894,429✔
361
  int32_t code = 0;
2,894,429✔
362

363
  // get
364
  STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
2,894,429✔
365
  if (pTbData) goto _exit;
2,894,429✔
366

367
  // create
368
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
128,108✔
369
  int8_t     maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
128,108✔
370

371
  pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
128,108✔
372
  if (pTbData == NULL) {
128,269!
UNCOV
373
    code = terrno;
×
UNCOV
374
    goto _exit;
×
375
  }
376
  pTbData->suid = suid;
128,269✔
377
  pTbData->uid = uid;
128,269✔
378
  pTbData->minKey = TSKEY_MAX;
128,269✔
379
  pTbData->maxKey = TSKEY_MIN;
128,269✔
380
  pTbData->pHead = NULL;
128,269✔
381
  pTbData->pTail = NULL;
128,269✔
382
  pTbData->sl.seed = taosRand();
128,269✔
383
  pTbData->sl.size = 0;
128,290✔
384
  pTbData->sl.maxLevel = maxLevel;
128,290✔
385
  pTbData->sl.level = 0;
128,290✔
386
  pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1];
128,290✔
387
  pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel));
128,290✔
388
  pTbData->sl.pHead->level = maxLevel;
128,290✔
389
  pTbData->sl.pTail->level = maxLevel;
128,290✔
390
  for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
769,655✔
391
    SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail;
641,365✔
392
    SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead;
641,365✔
393

394
    SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL;
641,365✔
395
    SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
641,365✔
396
  }
397
  taosInitRWLatch(&pTbData->lock);
128,290✔
398

399
  taosWLockLatch(&pMemTable->latch);
128,276✔
400

401
  if (pMemTable->nTbData >= pMemTable->nBucket) {
128,287✔
402
    code = tsdbMemTableRehash(pMemTable);
12✔
403
    if (code) {
12!
UNCOV
404
      taosWUnLockLatch(&pMemTable->latch);
×
UNCOV
405
      goto _exit;
×
406
    }
407
  }
408

409
  int32_t idx = TABS(uid) % pMemTable->nBucket;
128,287✔
410
  pTbData->next = pMemTable->aBucket[idx];
128,287✔
411
  pMemTable->aBucket[idx] = pTbData;
128,287✔
412
  pMemTable->nTbData++;
128,287✔
413

414
  if (tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn) == NULL) {
128,287!
UNCOV
415
    taosWUnLockLatch(&pMemTable->latch);
×
UNCOV
416
    code = TSDB_CODE_INTERNAL_ERROR;
×
UNCOV
417
    goto _exit;
×
418
  }
419

420
  taosWUnLockLatch(&pMemTable->latch);
128,265✔
421

422
_exit:
2,894,233✔
423
  if (code) {
2,894,233!
UNCOV
424
    *ppTbData = NULL;
×
425
  } else {
426
    *ppTbData = pTbData;
2,894,233✔
427
  }
428
  return code;
2,894,233✔
429
}
430

431
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, STsdbRowKey *pKey, int32_t flags) {
22,726,810✔
432
  SMemSkipListNode *px;
433
  SMemSkipListNode *pn;
434
  STsdbRowKey       tKey;
435
  int32_t           backward = flags & SL_MOVE_BACKWARD;
22,726,810✔
436
  int32_t           fromPos = flags & SL_MOVE_FROM_POS;
22,726,810✔
437

438
  if (backward) {
22,726,810✔
439
    px = pTbData->sl.pTail;
3,013,569✔
440

441
    if (!fromPos) {
3,013,569✔
442
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
6,256,741✔
443
        pos[iLevel] = px;
3,243,216✔
444
      }
445
    }
446

447
    if (pTbData->sl.level) {
3,013,569✔
448
      if (fromPos) px = pos[pTbData->sl.level - 1];
2,884,055!
449

450
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
14,493,494✔
451
        pn = SL_GET_NODE_BACKWARD(px, iLevel);
11,627,769✔
452
        while (pn != pTbData->sl.pHead) {
13,593,219✔
453
          tsdbRowGetKey(&pn->row, &tKey);
13,516,364✔
454

455
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
13,511,798✔
456
          if (c <= 0) {
13,505,052✔
457
            break;
11,532,584✔
458
          } else {
459
            px = pn;
1,972,468✔
460
            pn = SL_GET_NODE_BACKWARD(px, iLevel);
1,972,468✔
461
          }
462
        }
463

464
        pos[iLevel] = px;
11,609,439✔
465
      }
466
    }
467
  } else {
468
    px = pTbData->sl.pHead;
19,713,241✔
469

470
    if (!fromPos) {
19,713,241✔
471
      for (int8_t iLevel = pTbData->sl.level; iLevel < pTbData->sl.maxLevel; iLevel++) {
5,876,281✔
472
        pos[iLevel] = px;
3,487,412✔
473
      }
474
    }
475

476
    if (pTbData->sl.level) {
19,713,241✔
477
      if (fromPos) px = pos[pTbData->sl.level - 1];
19,711,624✔
478

479
      for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
114,543,700✔
480
        pn = SL_GET_NODE_FORWARD(px, iLevel);
94,844,509✔
481
        while (pn != pTbData->sl.pTail) {
325,582,945✔
482
          tsdbRowGetKey(&pn->row, &tKey);
324,952,992✔
483

484
          int32_t c = tsdbRowKeyCmpr(&tKey, pKey);
324,954,998✔
485
          if (c >= 0) {
325,064,383✔
486
            break;
94,202,123✔
487
          } else {
488
            px = pn;
230,862,260✔
489
            pn = SL_GET_NODE_FORWARD(px, iLevel);
230,862,260✔
490
          }
491
        }
492

493
        pos[iLevel] = px;
94,832,076✔
494
      }
495
    }
496
  }
497
}
22,696,047✔
498

499
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
500
  int8_t level = 1;
149,823,122✔
501
  int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
149,823,122✔
502

503
  while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
199,491,992✔
504
    level++;
49,668,870✔
505
  }
506

507
  return level;
149,705,909✔
508
}
509
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
149,823,122✔
510
                           int8_t forward) {
511
  int32_t           code = 0;
149,823,122✔
512
  int8_t            level;
513
  SMemSkipListNode *pNode = NULL;
149,823,122✔
514
  SVBufPool        *pPool = pMemTable->pTsdb->pVnode->inUse;
149,823,122✔
515
  int64_t           nSize;
516

517
  // create node
518
  level = tsdbMemSkipListRandLevel(&pTbData->sl);
149,823,122✔
519
  nSize = SL_NODE_SIZE(level);
149,705,909✔
520
  if (pRow->type == TSDBROW_ROW_FMT) {
149,705,909!
521
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->pTSRow->len);
149,735,360✔
522
  } else if (pRow->type == TSDBROW_COL_FMT) {
×
523
    pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize);
253✔
524
  }
525
  if (pNode == NULL) {
149,713,612!
UNCOV
526
    code = terrno;
×
UNCOV
527
    goto _exit;
×
528
  }
529

530
  pNode->level = level;
149,713,612✔
531
  pNode->row = *pRow;
149,713,612✔
532
  if (pRow->type == TSDBROW_ROW_FMT) {
149,713,612!
533
    pNode->row.pTSRow = (SRow *)((char *)pNode + nSize);
149,720,991✔
534
    memcpy(pNode->row.pTSRow, pRow->pTSRow, pRow->pTSRow->len);
149,720,991✔
535
  }
536

537
  // set node
538
  if (forward) {
149,713,612✔
539
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
342,414,627✔
540
      SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
195,537,590✔
541
      SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
195,537,590✔
542
    }
543
  } else {
544
    for (int8_t iLevel = 0; iLevel < level; iLevel++) {
6,594,744✔
545
      SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
3,758,169✔
546
      SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
3,758,169✔
547
    }
548
  }
549

550
  // set forward and backward
551
  if (forward) {
149,713,612✔
552
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
342,592,851✔
553
      SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
195,583,146✔
554

555
      SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
195,583,146✔
556
      SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
195,744,028✔
557

558
      pos[iLevel] = pNode;
195,714,187✔
559
    }
560
  } else {
561
    for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
6,645,852✔
562
      SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
3,761,945✔
563

564
      SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
3,761,945✔
565
      SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
3,781,164✔
566

567
      pos[iLevel] = pNode;
3,810,904✔
568
    }
569
  }
570

571
  pTbData->sl.size++;
149,893,612✔
572
  if (pTbData->sl.level < pNode->level) {
149,893,612✔
573
    pTbData->sl.level = pNode->level;
355,720✔
574
  }
575

576
_exit:
149,537,892✔
577
  return code;
149,893,612✔
578
}
579

580
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
148✔
581
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
582
  int32_t code = 0;
148✔
583

584
  SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
148✔
585
  int32_t    nColData = TARRAY_SIZE(pSubmitTbData->aCol);
148✔
586
  SColData  *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
148✔
587

588
  // copy and construct block data
589
  SBlockData *pBlockData = vnodeBufPoolMalloc(pPool, sizeof(*pBlockData));
148✔
590
  if (pBlockData == NULL) {
148!
UNCOV
591
    code = terrno;
×
UNCOV
592
    goto _exit;
×
593
  }
594

595
  pBlockData->suid = pTbData->suid;
148✔
596
  pBlockData->uid = pTbData->uid;
148✔
597
  pBlockData->nRow = aColData[0].nVal;
148✔
598
  pBlockData->aUid = NULL;
148✔
599
  pBlockData->aVersion = vnodeBufPoolMalloc(pPool, aColData[0].nData);
148✔
600
  if (pBlockData->aVersion == NULL) {
148!
UNCOV
601
    code = terrno;
×
UNCOV
602
    goto _exit;
×
603
  }
604
  for (int32_t i = 0; i < pBlockData->nRow; i++) {  // todo: here can be optimized
401✔
605
    pBlockData->aVersion[i] = version;
253✔
606
  }
607

608
  pBlockData->aTSKEY = vnodeBufPoolMalloc(pPool, aColData[0].nData);
148✔
609
  if (pBlockData->aTSKEY == NULL) {
148!
UNCOV
610
    code = terrno;
×
UNCOV
611
    goto _exit;
×
612
  }
613
  memcpy(pBlockData->aTSKEY, aColData[0].pData, aColData[0].nData);
148✔
614

615
  pBlockData->nColData = nColData - 1;
148✔
616
  pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
148✔
617
  if (pBlockData->aColData == NULL) {
148!
UNCOV
618
    code = terrno;
×
UNCOV
619
    goto _exit;
×
620
  }
621

622
  for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
664✔
623
    code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
516✔
624
    if (code) goto _exit;
516!
625
  }
626

627
  // loop to add each row to the skiplist
628
  SMemSkipListNode *pos[SL_MAX_LEVEL];
629
  TSDBROW           tRow = tsdbRowFromBlockData(pBlockData, 0);
148✔
630
  STsdbRowKey       key;
631

632
  // first row
633
  tsdbRowGetKey(&tRow, &key);
148✔
634
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
148✔
635
  if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
148!
636
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
148✔
637

638
  // remain row
639
  ++tRow.iRow;
148✔
640
  if (tRow.iRow < pBlockData->nRow) {
148✔
641
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
189✔
642
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
151✔
643
    }
644

645
    while (tRow.iRow < pBlockData->nRow) {
143✔
646
      tsdbRowGetKey(&tRow, &key);
105✔
647

648
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
105!
UNCOV
649
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
×
650
      }
651

652
      if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit;
105!
653

654
      ++tRow.iRow;
105✔
655
    }
656
  }
657

658
  if (key.key.ts >= pTbData->maxKey) {
148✔
659
    pTbData->maxKey = key.key.ts;
147✔
660
  }
661

662
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
148!
UNCOV
663
    if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
×
UNCOV
664
      tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
×
665
                TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
666
    }
667
  }
668

669
  // SMemTable
670
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
148✔
671
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
148✔
672
  pMemTable->nRow += pBlockData->nRow;
148✔
673

674
  if (affectedRows) *affectedRows = pBlockData->nRow;
148!
675

UNCOV
676
_exit:
×
677
  return code;
148✔
678
}
679

680
static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
2,880,021✔
681
                                        SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
682
  int32_t code = 0;
2,880,021✔
683

684
  int32_t           nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
2,880,021✔
685
  SRow            **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
2,880,021✔
686
  STsdbRowKey       key;
687
  SMemSkipListNode *pos[SL_MAX_LEVEL];
688
  TSDBROW           tRow = {.type = TSDBROW_ROW_FMT, .version = version};
2,880,021✔
689
  int32_t           iRow = 0;
2,880,021✔
690

691
  // backward put first data
692
  tRow.pTSRow = aRow[iRow++];
2,880,021✔
693
  tsdbRowGetKey(&tRow, &key);
2,880,021✔
694
  tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
2,878,613✔
695
  code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
2,852,246✔
696
  if (code) goto _exit;
2,884,870!
697

698
  pTbData->minKey = TMIN(pTbData->minKey, key.key.ts);
2,884,870✔
699

700
  // forward put rest data
701
  if (iRow < nRow) {
2,884,870✔
702
    for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
2,887,489✔
703
      pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
2,272,726✔
704
    }
705

706
    while (iRow < nRow) {
147,615,520✔
707
      tRow.pTSRow = aRow[iRow];
146,992,399✔
708
      tsdbRowGetKey(&tRow, &key);
146,992,399✔
709

710
      if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
146,982,279✔
711
        tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
17,323,631✔
712
      }
713

714
      code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
146,981,892✔
715
      if (code) goto _exit;
147,000,757!
716

717
      iRow++;
147,000,757✔
718
    }
719
  }
720

721
  if (key.key.ts >= pTbData->maxKey) {
2,893,228✔
722
    pTbData->maxKey = key.key.ts;
2,824,223✔
723
  }
724
  if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
2,893,228✔
725
    TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
20,437✔
726
  }
727

728
  // SMemTable
729
  pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
2,884,197✔
730
  pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
2,884,197✔
731
  pMemTable->nRow += nRow;
2,884,197✔
732

733
  if (affectedRows) *affectedRows = nRow;
2,884,197!
734

UNCOV
735
_exit:
×
736
  return code;
2,884,197✔
737
}
738

739
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
3✔
740

741
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) {
888,520✔
742
  int32_t code = 0;
888,520✔
743

744
  int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
888,520✔
745
  if (nRef <= 0) {
888,838!
UNCOV
746
    tsdbError("vgId:%d, memtable ref count is invalid, ref:%d", TD_VID(pMemTable->pTsdb->pVnode), nRef);
×
747
  }
748

749
  vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode);
888,838✔
750

751
_exit:
888,844✔
752
  return code;
888,844✔
753
}
754

755
void tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) {
915,069✔
756
  if (pNode) {
915,069✔
757
    vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive);
888,835✔
758
  }
759

760
  if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) {
915,097✔
761
    tsdbMemTableDestroy(pMemTable, proactive);
26,242✔
762
  }
763
}
915,117✔
764

765
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
766
  STbData *pTbData1 = *(STbData **)p1;
767
  STbData *pTbData2 = *(STbData **)p2;
768

769
  if (pTbData1->suid < pTbData2->suid) {
770
    return -1;
771
  } else if (pTbData1->suid > pTbData2->suid) {
772
    return 1;
773
  }
774

775
  if (pTbData1->uid < pTbData2->uid) {
776
    return -1;
777
  } else if (pTbData1->uid > pTbData2->uid) {
778
    return 1;
779
  }
780

781
  return 0;
782
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc