• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5058

17 May 2026 01:15AM UTC coverage: 73.387% (-0.02%) from 73.406%
#5058

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281656 of 383795 relevant lines covered (73.39%)

135114337.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.3
/source/util/src/tref.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tref.h"
18
#include "taoserror.h"
19
#include "tlog.h"
20
#include "tutil.h"
21

22
#define TSDB_REF_OBJECTS        2000
23
#define TSDB_REF_STATE_EMPTY    0
24
#define TSDB_REF_STATE_ACTIVE   1
25
#define TSDB_REF_STATE_DELETED  2
26
#define TSDB_REF_ITER_THRESHOLD 100
27

28
typedef struct SRefNode {
29
  struct SRefNode *prev;     // previous node
30
  struct SRefNode *next;     // next node
31
  void            *p;        // pointer to resource protected,
32
  int64_t          rid;      // reference ID
33
  int32_t          count;    // number of references
34
  int32_t          removed;  // 1: removed
35
} SRefNode;
36

37
typedef struct {
38
  SRefNode **nodeList;  // array of SRefNode linked list
39
  int32_t    state;     // 0: empty, 1: active;  2: deleted
40
  int32_t    rsetId;    // refSet ID, global unique
41
  int64_t    rid;       // increase by one for each new reference
42
  int32_t    max;       // mod
43
  int32_t    count;     // total number of SRefNodes in this set
44
  int64_t   *lockedBy;
45
  void (*fp)(void *);
46
} SRefSet;
47

48
static SRefSet       tsRefSetList[TSDB_REF_OBJECTS];
49
static TdThreadOnce  tsRefModuleInit = PTHREAD_ONCE_INIT;
50
static TdThreadMutex tsRefMutex;
51
static int32_t       tsRefSetNum = 0;
52
static int32_t       tsNextId = 0;
53

54
static void    taosInitRefModule(void);
55
static void    taosLockList(int64_t *lockedBy);
56
static void    taosUnlockList(int64_t *lockedBy);
57
static void    taosIncRsetCount(SRefSet *pSet);
58
static void    taosDecRsetCount(SRefSet *pSet);
59
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t *isReleased);
60

61
int32_t taosOpenRef(int32_t max, RefFp fp) {
41,551,880✔
62
  SRefNode **nodeList;
63
  SRefSet   *pSet;
64
  int64_t   *lockedBy;
65
  int32_t    i, rsetId;
66

67
  (void)taosThreadOnce(&tsRefModuleInit, taosInitRefModule);
41,551,880✔
68

69
  nodeList = taosMemoryCalloc(sizeof(SRefNode *), (size_t)max);
41,551,880✔
70
  if (nodeList == NULL) {
41,551,880✔
71
    return terrno;
×
72
  }
73

74
  lockedBy = taosMemoryCalloc(sizeof(int64_t), (size_t)max);
41,551,880✔
75
  if (lockedBy == NULL) {
41,551,880✔
76
    taosMemoryFree(nodeList);
×
77
    return terrno;
×
78
  }
79

80
  (void)taosThreadMutexLock(&tsRefMutex);
41,551,880✔
81

82
  for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
41,551,880✔
83
    tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
41,551,880✔
84
    if (tsNextId == 0) tsNextId = 1;  // dont use 0 as rsetId
41,551,880✔
85
    if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
41,551,880✔
86
  }
87

88
  if (i < TSDB_REF_OBJECTS) {
41,551,880✔
89
    rsetId = tsNextId;
41,551,880✔
90
    pSet = tsRefSetList + rsetId;
41,551,880✔
91
    pSet->max = max;
41,551,880✔
92
    pSet->nodeList = nodeList;
41,551,880✔
93
    pSet->lockedBy = lockedBy;
41,551,880✔
94
    pSet->fp = fp;
41,551,880✔
95
    pSet->rid = 1;
41,551,880✔
96
    pSet->rsetId = rsetId;
41,551,880✔
97
    pSet->state = TSDB_REF_STATE_ACTIVE;
41,551,880✔
98
    taosIncRsetCount(pSet);
41,551,880✔
99

100
    tsRefSetNum++;
41,551,880✔
101
    uTrace("rsetId:%d, is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum);
41,551,880✔
102
  } else {
103
    rsetId = TSDB_CODE_REF_FULL;
×
104
    taosMemoryFree(nodeList);
×
105
    taosMemoryFree(lockedBy);
×
106
    uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
×
107
  }
108

109
  (void)taosThreadMutexUnlock(&tsRefMutex);
41,551,880✔
110

111
  return rsetId;
41,551,880✔
112
}
113

114
void taosCloseRef(int32_t rsetId) {
37,648,973✔
115
  SRefSet *pSet;
116
  int32_t  deleted = 0;
37,648,973✔
117

118
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
37,648,973✔
119
    uTrace("rsetId:%d, is invalid, out of range", rsetId);
158✔
120
    return;
158✔
121
  }
122

123
  pSet = tsRefSetList + rsetId;
37,648,815✔
124

125
  (void)taosThreadMutexLock(&tsRefMutex);
37,648,815✔
126

127
  if (pSet->state == TSDB_REF_STATE_ACTIVE) {
37,648,815✔
128
    pSet->state = TSDB_REF_STATE_DELETED;
35,145,549✔
129
    deleted = 1;
35,145,549✔
130
    uTrace("rsetId:%d, is closed, count:%d", rsetId, pSet->count);
35,145,549✔
131
  } else {
132
    uTrace("rsetId:%d, is already closed, count:%d", rsetId, pSet->count);
2,503,266✔
133
  }
134

135
  (void)taosThreadMutexUnlock(&tsRefMutex);
37,648,815✔
136

137
  if (deleted) taosDecRsetCount(pSet);
37,648,815✔
138
}
139

140
int32_t taosGetRefSetCount(int32_t rsetId, int32_t *pCount) {
1,081,153,457✔
141
  SRefSet *pSet;
142

143
  if (pCount == NULL) {
1,081,153,457✔
144
    return terrno = TSDB_CODE_INVALID_PARA;
×
145
  }
146

147
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
1,081,153,457✔
148
    uTrace("rsetId:%d, failed to get state/count, out of range", rsetId);
×
149
    return terrno = TSDB_CODE_REF_INVALID_ID;
×
150
  }
151

152
  (void)taosThreadOnce(&tsRefModuleInit, taosInitRefModule);
1,081,153,457✔
153

154
  pSet = tsRefSetList + rsetId;
1,081,153,457✔
155

156
  (void)taosThreadMutexLock(&tsRefMutex);
1,081,153,457✔
157
  *pCount = atomic_load_32(&pSet->count);
1,081,153,457✔
158
  (void)taosThreadMutexUnlock(&tsRefMutex);
1,081,153,457✔
159

160
  return TSDB_CODE_SUCCESS;
1,081,153,457✔
161
}
162

163
int64_t taosAddRef(int32_t rsetId, void *p) {
2,147,483,647✔
164
  int32_t   hash;
165
  SRefNode *pNode;
166
  SRefSet  *pSet;
167
  int64_t   rid = 0;
2,147,483,647✔
168

169
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
2,147,483,647✔
170
    uTrace("p:%p, failed to add, rsetId not valid, rsetId:%d", p, rsetId);
×
171
    return terrno = TSDB_CODE_REF_INVALID_ID;
×
172
  }
173

174
  pSet = tsRefSetList + rsetId;
2,147,483,647✔
175
  taosIncRsetCount(pSet);
2,147,483,647✔
176
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
2,147,483,647✔
177
    taosDecRsetCount(pSet);
×
178
    uTrace("p:%p, failed to add, not active, rsetId:%d", p, rsetId);
×
179
    return terrno = TSDB_CODE_REF_ID_REMOVED;
×
180
  }
181

182
  pNode = taosMemoryCalloc(sizeof(SRefNode), 1);
2,147,483,647✔
183
  if (pNode == NULL) {
2,147,483,647✔
184
    taosDecRsetCount(pSet);
×
185
    uError("p:%p, failed to add, out of memory, rsetId:%d", p, rsetId);
×
186
    return terrno;
×
187
  }
188

189
  rid = atomic_add_fetch_64(&pSet->rid, 1);
2,147,483,647✔
190
  hash = rid % pSet->max;
2,147,483,647✔
191
  taosLockList(pSet->lockedBy + hash);
2,147,483,647✔
192

193
  pNode->p = p;
2,147,483,647✔
194
  pNode->rid = rid;
2,147,483,647✔
195
  pNode->count = 1;
2,147,483,647✔
196

197
  pNode->prev = NULL;
2,147,483,647✔
198
  pNode->next = pSet->nodeList[hash];
2,147,483,647✔
199
  if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
2,147,483,647✔
200
  pSet->nodeList[hash] = pNode;
2,147,483,647✔
201

202
  uTrace("p:%p, rid:0x%" PRIx64 " is added, count:%d, remain count:%d rsetId:%d", p, rid, pSet->count, pNode->count,
2,147,483,647✔
203
         rsetId);
204

205
  taosUnlockList(pSet->lockedBy + hash);
2,147,483,647✔
206

207
  return rid;
2,147,483,647✔
208
}
209

210
int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1, NULL); }
2,147,483,647✔
211

212
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
213
void *taosAcquireRef(int32_t rsetId, int64_t rid) {
2,147,483,647✔
214
  int32_t   hash;
215
  SRefNode *pNode;
216
  SRefSet  *pSet;
217
  int32_t   iter = 0;
2,147,483,647✔
218
  void     *p = NULL;
2,147,483,647✔
219

220
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
2,147,483,647✔
221
    // uTrace("rsetId:%d, rid:0x%" PRIx64 " failed to acquire, rsetId not valid", rsetId, rid);
222
    terrno = TSDB_CODE_REF_INVALID_ID;
514✔
223
    return NULL;
48,411✔
224
  }
225

226
  if (rid <= 0) {
2,147,483,647✔
227
    uTrace("rid:0x%" PRIx64 ", failed to acquire, rid not valid, rsetId:%d", rid, rsetId);
284,507,516✔
228
    terrno = TSDB_CODE_REF_NOT_EXIST;
284,507,516✔
229
    return NULL;
284,507,516✔
230
  }
231

232
  pSet = tsRefSetList + rsetId;
2,147,483,647✔
233
  taosIncRsetCount(pSet);
2,147,483,647✔
234
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
2,147,483,647✔
235
    uTrace("rid:0x%" PRIx64 ", failed to acquire, not active, rsetId:%d", rid, rsetId);
×
236
    taosDecRsetCount(pSet);
×
237
    terrno = TSDB_CODE_REF_ID_REMOVED;
×
238
    return NULL;
×
239
  }
240

241
  hash = rid % pSet->max;
2,147,483,647✔
242
  taosLockList(pSet->lockedBy + hash);
2,147,483,647✔
243

244
  pNode = pSet->nodeList[hash];
2,147,483,647✔
245

246
  while (pNode) {
2,147,483,647✔
247
    if (pNode->rid == rid) {
2,147,483,647✔
248
      break;
2,147,483,647✔
249
    }
250
    iter++;
16,332,655✔
251
    pNode = pNode->next;
16,332,655✔
252
  }
253
 
254
 if (iter >= TSDB_REF_ITER_THRESHOLD) {
2,147,483,647✔
255
    uWarn("rsetId:%d rid:%" PRId64 " iter:%d", rsetId, rid, iter);
×
256
  }
257

258
  if (pNode) {
2,147,483,647✔
259
    if (pNode->removed == 0) {
2,147,483,647✔
260
      pNode->count++;
2,147,483,647✔
261
      p = pNode->p;
2,147,483,647✔
262
      uTrace("p:%p, rid:0x%" PRIx64 " is acquired, remain count:%d, rsetId:%d", pNode->p, rid, pNode->count, rsetId);
2,147,483,647✔
263
    } else {
264
      terrno = TSDB_CODE_REF_NOT_EXIST;
10,529,619✔
265
      uTrace("p:%p, rid:0x%" PRIx64 " is already removed, failed to acquire, rsetId:%d", pNode->p, rid, rsetId);
10,828,736✔
266
    }
267
  } else {
268
    terrno = TSDB_CODE_REF_NOT_EXIST;
1,606,031,364✔
269
    uTrace("rid:0x%" PRIx64 ", is not there, failed to acquire, rsetId:%d", rid, rsetId);
1,606,021,728✔
270
  }
271

272
  taosUnlockList(pSet->lockedBy + hash);
2,147,483,647✔
273

274
  taosDecRsetCount(pSet);
2,147,483,647✔
275

276
  return p;
2,147,483,647✔
277
}
278

279
int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0, NULL); }
2,147,483,647✔
280
int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t *isReleased) {
1,026,263,159✔
281
  return taosDecRefCount(rsetId, rid, 0, isReleased);
1,026,263,159✔
282
}
283

284
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
285
void *taosIterateRef(int32_t rsetId, int64_t rid) {
285,431,466✔
286
  SRefNode *pNode = NULL;
285,431,466✔
287
  SRefSet  *pSet;
288

289
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
285,431,466✔
290
    uTrace("rid:0x%" PRIx64 ", failed to iterate, rsetId not valid, rsetId:%d", rid, rsetId);
×
291
    terrno = TSDB_CODE_REF_INVALID_ID;
×
292
    return NULL;
×
293
  }
294

295
  if (rid < 0) {
285,431,466✔
296
    uTrace("rid:0x%" PRIx64 ", failed to iterate, rid not valid, rsetId:%d", rid, rsetId);
×
297
    terrno = TSDB_CODE_REF_NOT_EXIST;
×
298
    return NULL;
×
299
  }
300

301
  void *newP = NULL;
285,431,466✔
302
  pSet = tsRefSetList + rsetId;
285,431,466✔
303
  taosIncRsetCount(pSet);
285,431,466✔
304
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
285,431,466✔
305
    uTrace("rid:0x%" PRIx64 ", failed to iterate, rset not active, rsetId:%d", rid, rsetId);
×
306
    terrno = TSDB_CODE_REF_ID_REMOVED;
×
307
    taosDecRsetCount(pSet);
×
308
    return NULL;
×
309
  }
310

311
  do {
312
    newP = NULL;
285,431,466✔
313
    int32_t hash = 0;
285,431,466✔
314
    int32_t iter = 0;
285,431,466✔
315
    if (rid > 0) {
285,431,466✔
316
      hash = rid % pSet->max;
220,535,854✔
317
      taosLockList(pSet->lockedBy + hash);
220,535,854✔
318

319
      pNode = pSet->nodeList[hash];
220,535,854✔
320
      while (pNode) {
224,662,410✔
321
        if (pNode->rid == rid) break;
224,662,410✔
322
        pNode = pNode->next;
4,126,556✔
323
        iter++;
4,126,556✔
324
      }
325

326
      if (iter >= TSDB_REF_ITER_THRESHOLD) {
220,535,854✔
327
        uWarn("rid:0x%" PRIx64 ", iter:%d, rsetId:%d", rid, iter, rsetId);
×
328
      }
329

330
      if (pNode == NULL) {
220,535,854✔
331
        uError("rid:0x%" PRIx64 ", not there and quit, rsetId:%d", rid, rsetId);
×
332
        terrno = TSDB_CODE_REF_NOT_EXIST;
×
333
        taosUnlockList(pSet->lockedBy + hash);
×
334
        taosDecRsetCount(pSet);
×
335
        return NULL;
×
336
      }
337

338
      // rid is there
339
      pNode = pNode->next;
220,535,854✔
340
      // check first place
341
      while (pNode) {
220,535,854✔
342
        if (!pNode->removed) break;
3,821,544✔
343
        pNode = pNode->next;
×
344
      }
345
      if (pNode == NULL) {
220,535,854✔
346
        taosUnlockList(pSet->lockedBy + hash);
216,714,310✔
347
        hash++;
216,714,310✔
348
      }
349
    }
350

351
    if (pNode == NULL) {
285,431,466✔
352
      for (; hash < pSet->max; ++hash) {
2,147,483,647✔
353
        taosLockList(pSet->lockedBy + hash);
2,147,483,647✔
354
        pNode = pSet->nodeList[hash];
2,147,483,647✔
355
        if (pNode) {
2,147,483,647✔
356
          // check first place
357
          while (pNode) {
216,714,310✔
358
            if (!pNode->removed) break;
216,714,310✔
359
            pNode = pNode->next;
×
360
          }
361
          if (pNode) break;
216,714,310✔
362
        }
363
        taosUnlockList(pSet->lockedBy + hash);
2,147,483,647✔
364
      }
365
    }
366

367
    if (pNode) {
285,431,466✔
368
      pNode->count++;  // acquire it
220,535,854✔
369
      newP = pNode->p;
220,535,854✔
370
      taosUnlockList(pSet->lockedBy + hash);
220,535,854✔
371
      uTrace("p:%p, rid:0x%" PRIx64 " is returned, rsetId:%d", newP, rid, rsetId);
220,535,854✔
372
    } else {
373
      uTrace("rsetId:%d, the list is over", rsetId);
64,895,612✔
374
    }
375

376
    if (rid > 0) taosReleaseRef(rsetId, rid);  // release the current one
285,431,466✔
377
    if (pNode) rid = pNode->rid;
285,431,466✔
378
  } while (newP && pNode->removed);
285,431,466✔
379

380
  taosDecRsetCount(pSet);
285,431,466✔
381

382
  return newP;
285,431,466✔
383
}
384

385
int32_t taosListRef() {
×
386
  SRefSet  *pSet;
387
  SRefNode *pNode;
388
  int32_t   num = 0;
×
389

390
  (void)taosThreadMutexLock(&tsRefMutex);
×
391

392
  for (int32_t i = 0; i < TSDB_REF_OBJECTS; ++i) {
×
393
    pSet = tsRefSetList + i;
×
394

395
    if (pSet->state == TSDB_REF_STATE_EMPTY) continue;
×
396

397
    uInfo("rsetId:%d, state:%d count:%d", i, pSet->state, pSet->count);
×
398

399
    for (int32_t j = 0; j < pSet->max; ++j) {
×
400
      pNode = pSet->nodeList[j];
×
401

402
      while (pNode) {
×
403
        uInfo("p:%p, rid:0x%" PRIx64 " count:%d, rsetId:%d", pNode->p, pNode->rid, pNode->count, i);
×
404
        pNode = pNode->next;
×
405
        num++;
×
406
      }
407
    }
408
  }
409

410
  (void)taosThreadMutexUnlock(&tsRefMutex);
×
411

412
  return num;
×
413
}
414

415
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t *isReleased) {
2,147,483,647✔
416
  int32_t   hash;
417
  SRefSet  *pSet;
418
  SRefNode *pNode;
419
  int32_t   iter = 0;
2,147,483,647✔
420
  int32_t   released = 0;
2,147,483,647✔
421
  int32_t   code = 0;
2,147,483,647✔
422

423
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
2,147,483,647✔
424
    uTrace("rid:0x%" PRIx64 ", failed to remove, rsetId not valid, rsetId:%d", rid, rsetId);
607✔
425
    return terrno = TSDB_CODE_REF_INVALID_ID;
607✔
426
  }
427

428
  if (rid <= 0) {
2,147,483,647✔
429
    uTrace("rid:0x%" PRIx64 ", failed to remove, rid not valid, rsetId:%d", rid, rsetId);
737,133,290✔
430
    return terrno = TSDB_CODE_REF_NOT_EXIST;
737,133,290✔
431
  }
432

433
  pSet = tsRefSetList + rsetId;
2,147,483,647✔
434
  if (pSet->state == TSDB_REF_STATE_EMPTY) {
2,147,483,647✔
435
    uTrace("rid:0x%" PRIx64 ", failed to remove, cleaned, rsetId:%d", rid, rsetId);
×
436
    return terrno = TSDB_CODE_REF_ID_REMOVED;
×
437
  }
438

439
  hash = rid % pSet->max;
2,147,483,647✔
440
  taosLockList(pSet->lockedBy + hash);
2,147,483,647✔
441

442
  pNode = pSet->nodeList[hash];
2,147,483,647✔
443
  while (pNode) {
2,147,483,647✔
444
    if (pNode->rid == rid) break;
2,147,483,647✔
445

446
    pNode = pNode->next;
33,407,048✔
447
    iter++;
33,407,048✔
448
  }
449

450
  if (iter >= TSDB_REF_ITER_THRESHOLD) {
2,147,483,647✔
451
    uWarn("rid:0x%" PRIx64 ", iter:%d, rsetId:%d", rid, iter, rsetId);
×
452
  }
453

454
  if (pNode) {
2,147,483,647✔
455
    pNode->count--;
2,147,483,647✔
456
    if (remove) pNode->removed = 1;
2,147,483,647✔
457

458
    if (pNode->count <= 0) {
2,147,483,647✔
459
      if (pNode->prev) {
2,147,483,647✔
460
        pNode->prev->next = pNode->next;
5,320,915✔
461
      } else {
462
        pSet->nodeList[hash] = pNode->next;
2,147,483,647✔
463
      }
464

465
      if (pNode->next) {
2,147,483,647✔
466
        pNode->next->prev = pNode->prev;
1,736,357✔
467
      }
468
      released = 1;
2,147,483,647✔
469
    } else {
470
      uTrace("p:%p, rid:0x%" PRIx64 " is released, remain count:%d, rsetId:%d", pNode->p, rid, pNode->count, rsetId);
2,147,483,647✔
471
    }
472
  } else {
473
    uTrace("rid:0x%" PRIx64 ", is not there, failed to release/remove, rsetId:%d", rid, rsetId);
1,330,869✔
474
    terrno = TSDB_CODE_REF_NOT_EXIST;
1,330,869✔
475
    code = terrno;
1,330,869✔
476
  }
477

478
  taosUnlockList(pSet->lockedBy + hash);
2,147,483,647✔
479

480
  if (released) {
2,147,483,647✔
481
    uTrace("p:%p, rid:0x%" PRIx64 " is removed, count:%d, free mem:%p, rsetId:%d", pNode->p, rid, pSet->count, pNode,
2,147,483,647✔
482
           rsetId);
483
    (*pSet->fp)(pNode->p);
2,147,483,647✔
484
    taosMemoryFree(pNode);
2,147,483,647✔
485

486
    taosDecRsetCount(pSet);
2,147,483,647✔
487
  }
488

489
  if (isReleased) {
2,147,483,647✔
490
    *isReleased = released;
1,026,314,233✔
491
  }
492

493
  return code;
2,147,483,647✔
494
}
495

496
static void taosLockList(int64_t *lockedBy) {
2,147,483,647✔
497
  int64_t tid = taosGetSelfPthreadId();
2,147,483,647✔
498
  int32_t i = 0;
2,147,483,647✔
499
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
2,147,483,647✔
500
    if (++i % 100 == 0) {
2,147,483,647✔
501
      (void)sched_yield();
423,785,433✔
502
    }
503

504
  }
505
}
2,147,483,647✔
506

507
static void taosUnlockList(int64_t *lockedBy) {
2,147,483,647✔
508
  int64_t tid = taosGetSelfPthreadId();
2,147,483,647✔
509
  (void)atomic_val_compare_exchange_64(lockedBy, tid, 0);
2,147,483,647✔
510
}
2,147,483,647✔
511

512
static void taosInitRefModule(void) { (void)taosThreadMutexInit(&tsRefMutex, NULL); }
3,180,231✔
513

514
static void taosIncRsetCount(SRefSet *pSet) {
2,147,483,647✔
515
  (void)atomic_add_fetch_32(&pSet->count, 1);
2,147,483,647✔
516
  // uTrace("rsetId:%d, inc count:%d", pSet->rsetId, count);
517
}
2,147,483,647✔
518

519
static void taosDecRsetCount(SRefSet *pSet) {
2,147,483,647✔
520
  int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
2,147,483,647✔
521
  // uTrace("rsetId:%d, dec count:%d", pSet->rsetId, count);
522

523
  if (count > 0) return;
2,147,483,647✔
524

525
  (void)taosThreadMutexLock(&tsRefMutex);
34,757,401✔
526

527
  if (pSet->state != TSDB_REF_STATE_EMPTY) {
34,886,476✔
528
    pSet->state = TSDB_REF_STATE_EMPTY;
34,886,476✔
529
    pSet->max = 0;
34,886,476✔
530
    pSet->fp = NULL;
34,886,476✔
531

532
    taosMemoryFreeClear(pSet->nodeList);
34,886,476✔
533
    taosMemoryFreeClear(pSet->lockedBy);
34,886,476✔
534

535
    tsRefSetNum--;
34,886,476✔
536
    uTrace("rsetId:%d, is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count);
34,886,476✔
537
  }
538

539
  (void)taosThreadMutexUnlock(&tsRefMutex);
34,886,476✔
540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc