• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.43
/source/dnode/mnode/sdb/src/sdbHash.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sdb.h"
18

19
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow);
20

21
const char *sdbTableName(ESdbType type) {
2,433,699✔
22
  switch (type) {
2,433,699!
23
    case SDB_TRANS:
265,144✔
24
      return "trans";
265,144✔
25
    case SDB_CLUSTER:
25,147✔
26
      return "cluster";
25,147✔
27
    case SDB_MNODE:
66,027✔
28
      return "mnode";
66,027✔
29
    case SDB_QNODE:
24,447✔
30
      return "qnode";
24,447✔
31
    case SDB_SNODE:
7,331✔
32
      return "snode";
7,331✔
33
    case SDB_DNODE:
272,154✔
34
      return "dnode";
272,154✔
35
    case SDB_USER:
178,305✔
36
      return "user";
178,305✔
37
    case SDB_AUTH:
2✔
38
      return "auth";
2✔
39
    case SDB_ACCT:
13,350✔
40
      return "acct";
13,350✔
41
    case SDB_STREAM_CK:
2✔
42
      return "stream_ck";
2✔
43
    case SDB_STREAM:
50,022✔
44
      return "stream";
50,022✔
45
    case SDB_OFFSET:
2✔
46
      return "offset";
2✔
47
    case SDB_SUBSCRIBE:
11,063✔
48
      return "subscribe";
11,063✔
49
    case SDB_CONSUMER:
14,798✔
50
      return "consumer";
14,798✔
51
    case SDB_TOPIC:
9,786✔
52
      return "topic";
9,786✔
53
    case SDB_VGROUP:
592,649✔
54
      return "vgroup";
592,649✔
55
    case SDB_SMA:
33,239✔
56
      return "sma";
33,239✔
57
    case SDB_STB:
118,942✔
58
      return "stb";
118,942✔
59
    case SDB_DB:
179,166✔
60
      return "db";
179,166✔
61
    case SDB_FUNC:
7,341✔
62
      return "func";
7,341✔
63
    case SDB_IDX:
54,977✔
64
      return "idx";
54,977✔
65
    case SDB_VIEW:
8,339✔
66
      return "view";
8,339✔
67
    case SDB_STREAM_SEQ:
7,158✔
68
      return "stream_seq";
7,158✔
69
    case SDB_COMPACT:
7,188✔
70
      return "compact";
7,188✔
71
    case SDB_COMPACT_DETAIL:
7,338✔
72
      return "compact_detail";
7,338✔
73
    case SDB_GRANT:
15,761✔
74
      return "grant";
15,761✔
75
    case SDB_ARBGROUP:
7,259✔
76
      return "arb_group";
7,259✔
77
    case SDB_ANODE:
7,169✔
78
      return "anode";
7,169✔
79
    case SDB_CFG:
449,633✔
80
      return "config";
449,633✔
81
    default:
×
82
      return "undefine";
×
83
  }
84
}
85

86
const char *sdbStatusName(ESdbStatus status) {
1,741,714✔
87
  switch (status) {
1,741,714!
88
    case SDB_STATUS_CREATING:
29,196✔
89
      return "creating";
29,196✔
90
    case SDB_STATUS_DROPPING:
9,427✔
91
      return "dropping";
9,427✔
92
    case SDB_STATUS_READY:
1,624,493✔
93
      return "ready";
1,624,493✔
94
    case SDB_STATUS_DROPPED:
68,044✔
95
      return "dropped";
68,044✔
96
    case SDB_STATUS_INIT:
10,549✔
97
      return "init";
10,549✔
UNCOV
98
    case SDB_STATUS_UPDATE:
×
UNCOV
99
      return "update";
×
100
    default:
5✔
101
      return "undefine";
5✔
102
  }
103
}
104

105
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
397,388,133✔
106
#if 1
107
  EKeyType keyType = pSdb->keyTypes[pRow->type];
397,388,133✔
108

109
  if (keyType == SDB_KEY_BINARY) {
397,388,133✔
110
    mTrace("%s:%s, ref:%d oper:%s row:%p row->pObj:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj,
49,133,840✔
111
           pRow->refCount, oper, pRow, pRow->pObj, sdbStatusName(pRow->status));
112
  } else if (keyType == SDB_KEY_INT32) {
348,254,293✔
113
    mTrace("%s:%d, ref:%d oper:%s row:%p row->pObj:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj,
346,770,741✔
114
           pRow->refCount, oper, pRow, pRow->pObj, sdbStatusName(pRow->status));
115
  } else if (keyType == SDB_KEY_INT64) {
1,483,552!
116
    mTrace("%s:%" PRId64 ", ref:%d oper:%s row:%p row->pObj:%p status:%s", sdbTableName(pRow->type),
1,505,803✔
117
           *(int64_t *)pRow->pObj, pRow->refCount, oper, pRow, pRow->pObj, sdbStatusName(pRow->status));
118
  } else {
119
  }
120
#endif
121
}
397,388,142✔
122

123
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
211,275,837✔
124
  if (type >= SDB_MAX || type < 0) {
211,275,837!
125
    terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
×
126
    return NULL;
1✔
127
  }
128

129
  SHashObj *hash = pSdb->hashObjs[type];
211,276,931✔
130
  if (hash == NULL) {
211,276,931✔
131
    terrno = TSDB_CODE_APP_ERROR;
4✔
132
    return NULL;
4✔
133
  }
134

135
  return hash;
211,276,927✔
136
}
137

138
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
35,973,047✔
139
  int32_t  keySize = 0;
35,973,047✔
140
  EKeyType keyType = pSdb->keyTypes[type];
35,973,047✔
141

142
  if (keyType == SDB_KEY_INT32) {
35,973,047✔
143
    keySize = sizeof(int32_t);
22,937,296✔
144
  } else if (keyType == SDB_KEY_BINARY) {
13,035,751✔
145
    keySize = strlen(pKey) + 1;
12,470,093✔
146
  } else {
147
    keySize = sizeof(int64_t);
565,658✔
148
  }
149

150
  return keySize;
35,973,047✔
151
}
152

153
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
265,386✔
154
  int32_t type = pRow->type;
265,386✔
155
  sdbWriteLock(pSdb, type);
265,386✔
156

157
  SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
265,386✔
158
  if (pOldRow != NULL) {
265,386!
159
    sdbUnLock(pSdb, type);
×
160
    sdbFreeRow(pSdb, pRow, false);
×
161
    terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
×
162
    return terrno;
×
163
  }
164

165
  pRow->refCount = 0;
265,386✔
166
  pRow->status = pRaw->status;
265,386✔
167
  sdbPrintOper(pSdb, pRow, "insert");
265,386✔
168

169
  int32_t code = 0;
265,386✔
170
  if ((code = taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *))) != 0) {
265,386!
171
    sdbUnLock(pSdb, type);
×
172
    sdbFreeRow(pSdb, pRow, false);
×
173
    return code;
×
174
  }
175

176
  SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
265,386✔
177
  if (insertFp != NULL) {
265,386!
178
    code = (*insertFp)(pSdb, pRow->pObj);
265,386✔
179
    if (code != 0) {
265,386!
180
      if (taosHashRemove(hash, pRow->pObj, keySize) != 0) {
×
181
        mError("failed to remove row from hash");
×
182
      }
183
      sdbFreeRow(pSdb, pRow, false);
×
184
      sdbUnLock(pSdb, type);
×
185
      terrno = code;
×
186
      return terrno;
×
187
    }
188
  }
189

190
  sdbUnLock(pSdb, type);
265,386✔
191

192
  if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
265,386✔
193
    pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
71,265✔
194
  }
195
  if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT64) {
265,386✔
196
    pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int64_t *)pRow->pObj));
2,329✔
197
  }
198
  pSdb->tableVer[pRow->type]++;
265,386✔
199

200
  return 0;
265,386✔
201
}
202

203
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
364,627✔
204
  int32_t type = pNewRow->type;
364,627✔
205
  sdbWriteLock(pSdb, type);
364,627✔
206

207
  SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
364,627✔
208
  if (ppOldRow == NULL || *ppOldRow == NULL) {
364,627!
209
    sdbUnLock(pSdb, type);
250,270✔
210
    return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
250,270✔
211
  }
212

213
  SSdbRow *pOldRow = *ppOldRow;
114,357✔
214
  pOldRow->status = pRaw->status;
114,357✔
215
  sdbPrintOper(pSdb, pOldRow, "update");
114,357✔
216

217
  int32_t     code = 0;
114,357✔
218
  SdbUpdateFp updateFp = pSdb->updateFps[type];
114,357✔
219
  if (updateFp != NULL) {
114,357!
220
    code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
114,357✔
221
  }
222
  sdbUnLock(pSdb, type);
114,357✔
223

224
  // sdbUnLock(pSdb, type);
225
  sdbFreeRow(pSdb, pNewRow, false);
114,357✔
226

227
  pSdb->tableVer[pOldRow->type]++;
114,357✔
228
  return code;
114,357✔
229
}
230

231
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
73,623✔
232
  int32_t type = pRow->type;
73,623✔
233
  sdbWriteLock(pSdb, type);
73,623✔
234

235
  SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
73,623✔
236
  if (ppOldRow == NULL || *ppOldRow == NULL) {
73,623!
237
    sdbUnLock(pSdb, type);
15✔
238
    sdbFreeRow(pSdb, pRow, false);
15✔
239
    terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
15✔
240
    return terrno;
15✔
241
  }
242
  SSdbRow *pOldRow = *ppOldRow;
73,608✔
243
  pOldRow->status = pRaw->status;
73,608✔
244

245
  (void)atomic_add_fetch_32(&pOldRow->refCount, 1);
73,608✔
246
  sdbPrintOper(pSdb, pOldRow, "delete");
73,608✔
247

248
  if (taosHashRemove(hash, pOldRow->pObj, keySize) != 0) {
73,608!
249
    sdbUnLock(pSdb, type);
×
250
    sdbFreeRow(pSdb, pRow, false);
×
251
    terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
252
    return terrno;
×
253
  }
254
  pSdb->tableVer[pOldRow->type]++;
73,608✔
255
  sdbUnLock(pSdb, type);
73,608✔
256

257
  sdbFreeRow(pSdb, pRow, false);
73,608✔
258

259
  sdbCheckRow(pSdb, pOldRow);
73,608✔
260
  return 0;
73,608✔
261
}
262

263
int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) {
453,368✔
264
  if (pRaw->type == SDB_CFG) {
453,368✔
265
    mTrace("sdb write cfg");
158,937✔
266
  }
267
  SHashObj *hash = sdbGetHash(pSdb, pRaw->type);
453,368✔
268
  if (hash == NULL) return terrno;
453,368✔
269

270
  SdbDecodeFp decodeFp = pSdb->decodeFps[pRaw->type];
453,366✔
271
  SSdbRow    *pRow = (*decodeFp)(pRaw);
453,366✔
272
  if (pRow == NULL) return terrno;
453,366!
273

274
  pRow->type = pRaw->type;
453,366✔
275

276
  int32_t keySize = sdbGetkeySize(pSdb, pRow->type, pRow->pObj);
453,366✔
277
  int32_t code = TSDB_CODE_SDB_INVALID_ACTION_TYPE;
453,366✔
278

279
  switch (pRaw->status) {
453,366!
280
    case SDB_STATUS_CREATING:
15,116✔
281
      code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
15,116✔
282
      break;
15,116✔
283
    case SDB_STATUS_READY:
364,627✔
284
    case SDB_STATUS_UPDATE:
285
    case SDB_STATUS_DROPPING:
286
      code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
364,627✔
287
      break;
364,627✔
288
    case SDB_STATUS_DROPPED:
73,623✔
289
      code = sdbDeleteRow(pSdb, hash, pRaw, pRow, keySize);
73,623✔
290
      break;
73,623✔
291
  }
292

293
  return code;
453,366✔
294
}
295

296
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
54,326✔
297
  int32_t code = sdbWriteWithoutFree(pSdb, pRaw);
54,326✔
298
  sdbFreeRaw(pRaw);
54,326✔
299
  return code;
54,326✔
300
}
301

302
void *sdbAcquireAll(SSdb *pSdb, ESdbType type, const void *pKey, bool onlyReady) {
35,520,037✔
303
  terrno = 0;
35,520,037✔
304

305
  SHashObj *hash = sdbGetHash(pSdb, type);
35,519,842✔
306
  if (hash == NULL) return NULL;
35,519,420✔
307

308
  void   *pRet = NULL;
35,519,419✔
309
  int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
35,519,419✔
310

311
  sdbReadLock(pSdb, type);
35,520,109✔
312

313
  SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
35,522,260✔
314
  if (ppRow == NULL || *ppRow == NULL) {
35,522,925!
315
    sdbUnLock(pSdb, type);
2,377,357✔
316
    terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
2,377,524✔
317
    return NULL;
2,377,523✔
318
  }
319

320
  SSdbRow *pRow = *ppRow;
33,145,568✔
321
  switch (pRow->status) {
33,145,568✔
322
    case SDB_STATUS_READY:
33,137,855✔
323
      (void)atomic_add_fetch_32(&pRow->refCount, 1);
33,137,855✔
324
      pRet = pRow->pObj;
33,138,338✔
325
      sdbPrintOper(pSdb, pRow, "acquire");
33,138,338✔
326
      break;
33,137,858✔
327
    case SDB_STATUS_CREATING:
4,963✔
328
      terrno = TSDB_CODE_SDB_OBJ_CREATING;
4,963✔
329
      break;
4,963✔
330
    case SDB_STATUS_DROPPING:
2,520✔
331
      terrno = TSDB_CODE_SDB_OBJ_DROPPING;
2,520✔
332
      break;
2,520✔
333
    default:
230✔
334
      terrno = TSDB_CODE_APP_ERROR;
230✔
335
      break;
×
336
  }
337

338
  if (pRet == NULL) {
33,145,341✔
339
    if (!onlyReady) {
7,483!
340
      terrno = 0;
×
341
      (void)atomic_add_fetch_32(&pRow->refCount, 1);
×
342
      pRet = pRow->pObj;
×
343
      sdbPrintOper(pSdb, pRow, "acquire");
×
344
    }
345
  }
346

347
  sdbUnLock(pSdb, type);
33,145,341✔
348
  return pRet;
33,145,193✔
349
}
350

351
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { return sdbAcquireAll(pSdb, type, pKey, true); }
35,516,341✔
352
void *sdbAcquireNotReadyObj(SSdb *pSdb, ESdbType type, const void *pKey) {
2,344✔
353
  return sdbAcquireAll(pSdb, type, pKey, false);
2,344✔
354
}
355

356
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
73,608✔
357
  int32_t type = pRow->type;
73,608✔
358
  sdbWriteLock(pSdb, type);
73,608✔
359

360
  int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
73,608✔
361
  sdbPrintOper(pSdb, pRow, "check");
73,608✔
362
  if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
73,608!
363
    sdbFreeRow(pSdb, pRow, true);
18,596✔
364
  }
365

366
  sdbUnLock(pSdb, type);
73,608✔
367
}
73,608✔
368

369
void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
198,448,952✔
370
  if (pObj == NULL) return;
198,448,952✔
371

372
  SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
198,010,210✔
373
  if (pRow->type >= SDB_MAX) return;
198,010,210✔
374

375
  int32_t type = pRow->type;
198,010,209✔
376
  if (lock) {
198,010,209!
377
    sdbWriteLock(pSdb, type);
198,013,694✔
378
  }
379

380
  int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
198,034,897✔
381
  sdbPrintOper(pSdb, pRow, "release");
198,046,792✔
382
  if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
198,043,352✔
383
    sdbFreeRow(pSdb, pRow, true);
55,012✔
384
  }
385

386
  if (lock) {
198,043,352✔
387
    sdbUnLock(pSdb, type);
198,039,685✔
388
  }
389
}
390

391
void sdbRelease(SSdb *pSdb, void *pObj) { sdbReleaseLock(pSdb, pObj, true); }
198,442,221✔
392

393
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
166,847,655✔
394
  *ppObj = NULL;
166,847,655✔
395

396
  SHashObj *hash = sdbGetHash(pSdb, type);
166,847,655✔
397
  if (hash == NULL) return NULL;
166,847,047✔
398

399
  sdbReadLock(pSdb, type);
166,847,046✔
400

401
  SSdbRow **ppRow = taosHashIterate(hash, pIter);
166,850,726✔
402
  while (ppRow != NULL) {
166,890,127✔
403
    SSdbRow *pRow = *ppRow;
163,971,514✔
404
    if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
163,971,514!
405
      ppRow = taosHashIterate(hash, ppRow);
34,414✔
406
      continue;
34,194✔
407
    }
408

409
    (void)atomic_add_fetch_32(&pRow->refCount, 1);
163,937,100✔
410
    sdbPrintOper(pSdb, pRow, "fetch");
163,936,204✔
411
    *ppObj = pRow->pObj;
163,931,202✔
412
    break;
163,931,202✔
413
  }
414
  sdbUnLock(pSdb, type);
166,849,815✔
415

416
  return ppRow;
166,852,104✔
417
}
418

419
void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status, bool lock) {
1,092,684✔
420
  *ppObj = NULL;
1,092,684✔
421

422
  SHashObj *hash = sdbGetHash(pSdb, type);
1,092,684✔
423
  if (hash == NULL) return NULL;
1,092,678!
424

425
  if (lock) {
1,092,678✔
426
    sdbReadLock(pSdb, type);
1,086,310✔
427
  }
428

429
  SSdbRow **ppRow = taosHashIterate(hash, pIter);
1,092,699✔
430
  while (ppRow != NULL) {
1,092,736✔
431
    SSdbRow *pRow = *ppRow;
968,567✔
432
    if (pRow == NULL) {
968,567!
433
      ppRow = taosHashIterate(hash, ppRow);
×
434
      continue;
×
435
    }
436

437
    (void)atomic_add_fetch_32(&pRow->refCount, 1);
968,567✔
438
    sdbPrintOper(pSdb, pRow, "fetch");
968,556✔
439
    *ppObj = pRow->pObj;
968,532✔
440
    *status = pRow->status;
968,532✔
441
    break;
968,532✔
442
  }
443
  if (lock) {
1,092,701✔
444
    sdbUnLock(pSdb, type);
1,086,328✔
445
  }
446

447
  return ppRow;
1,092,694✔
448
}
449

450
void sdbCancelFetch(SSdb *pSdb, void *pIter) {
5,134,242✔
451
  if (pIter == NULL) return;
5,134,242✔
452
  SSdbRow *pRow = *(SSdbRow **)pIter;
5,132,089✔
453
  mTrace("cancel fetch row:%p", pRow);
5,132,089✔
454
  SHashObj *hash = sdbGetHash(pSdb, pRow->type);
5,132,089✔
455
  if (hash == NULL) return;
5,132,089!
456

457
  int32_t type = pRow->type;
5,132,089✔
458
  sdbReadLock(pSdb, type);
5,132,089✔
459
  taosHashCancelIterate(hash, pIter);
5,132,111✔
460
  sdbUnLock(pSdb, type);
5,132,122✔
461
}
462

463
void sdbCancelFetchByType(SSdb *pSdb, void *pIter, ESdbType type) {
×
464
  if (pIter == NULL) return;
×
465
  if (type >= SDB_MAX || type < 0) return;
×
466
  SHashObj *hash = sdbGetHash(pSdb, type);
×
467
  if (hash == NULL) return;
×
468

469
  sdbReadLock(pSdb, type);
×
470
  taosHashCancelIterate(hash, pIter);
×
471
  sdbUnLock(pSdb, type);
×
472
}
473

474
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
686,819✔
475
  SHashObj *hash = sdbGetHash(pSdb, type);
686,819✔
476
  if (hash == NULL) return;
686,819!
477

478
  sdbReadLock(pSdb, type);
686,819✔
479

480
  SSdbRow **ppRow = taosHashIterate(hash, NULL);
686,823✔
481
  while (ppRow != NULL) {
32,576,451✔
482
    SSdbRow *pRow = *ppRow;
31,892,359✔
483
    if (pRow->status == SDB_STATUS_READY) {
31,892,359✔
484
      bool isContinue = (*fp)(pSdb->pMnode, pRow->pObj, p1, p2, p3);
31,875,969✔
485
      if (!isContinue) {
31,827,280✔
486
        taosHashCancelIterate(hash, ppRow);
2✔
487
        break;
2✔
488
      }
489
    }
490

491
    ppRow = taosHashIterate(hash, ppRow);
31,843,668✔
492
  }
493

494
  sdbUnLock(pSdb, type);
684,094✔
495
}
496

497
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
1,510,950✔
498
  SHashObj *hash = sdbGetHash(pSdb, type);
1,510,950✔
499
  if (hash == NULL) return 0;
1,510,666!
500

501
  sdbReadLock(pSdb, type);
1,510,666✔
502
  int32_t size = taosHashGetSize(hash);
1,511,034✔
503
  sdbUnLock(pSdb, type);
1,510,939✔
504

505
  return size;
1,511,000✔
506
}
507

508
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
60,079✔
509
  SHashObj *hash = sdbGetHash(pSdb, type);
60,079✔
510
  if (hash == NULL) return -1;
60,079✔
511

512
  if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1;
60,078✔
513

514
  int32_t maxId = 0;
60,072✔
515
  sdbReadLock(pSdb, type);
60,072✔
516

517
  SSdbRow **ppRow = taosHashIterate(hash, NULL);
60,072✔
518
  while (ppRow != NULL) {
134,600✔
519
    SSdbRow *pRow = *ppRow;
74,528✔
520
    int32_t  id = *(int32_t *)pRow->pObj;
74,528✔
521
    maxId = TMAX(id, maxId);
74,528✔
522
    ppRow = taosHashIterate(hash, ppRow);
74,528✔
523
  }
524

525
  sdbUnLock(pSdb, type);
60,072✔
526
  maxId = TMAX(maxId, pSdb->maxId[type]);
60,072✔
527
  return maxId + 1;
60,072✔
528
}
529

530
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type) {
627,112✔
531
  if (type >= SDB_MAX || type < 0) {
627,112✔
532
    terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
1✔
533
    return -1;
1✔
534
  }
535

536
  return pSdb->tableVer[type];
627,111✔
537
}
538

539
bool countValid(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
540
  int32_t *pInt = p1;
×
541
  (*pInt) += 1;
×
542
  return true;
×
543
}
544

545
int32_t sdbGetValidSize(SSdb *pSdb, ESdbType type) {
×
546
  int32_t num = 0;
×
547
  sdbTraverse(pSdb, type, countValid, &num, 0, 0);
×
548
  return num;
×
549
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc