• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.59
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "taoserror.h"
19
#include "tunit.h"
20

21
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
×
22
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
×
23

24
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
×
25
  TAOS_CHECK_RETURN(tSerializeSCMCreateStreamReqImpl(pEncoder, pObj->pCreate));
×
26

27
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->mainSnodeId));
×
28
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->userStopped));
×
29
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
×
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
×
31

32
  tEndEncode(pEncoder);
×
33
  return pEncoder->pos;
×
34
}
35

36
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
×
37
  int32_t code = 0;
×
38
  int32_t lino = 0;
×
39
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
×
40

41
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
×
42
  pObj->pCreate = taosMemoryCalloc(1, sizeof(*pObj->pCreate));
×
43
  if (NULL == pObj) {
×
44
    TAOS_CHECK_EXIT(terrno);
×
45
  }
46
  
47
  TAOS_CHECK_RETURN(tDeserializeSCMCreateStreamReqImpl(pDecoder, pObj->pCreate));
×
48

49
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->mainSnodeId));
×
50
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->userStopped));
×
51
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
×
52
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
×
53

54
_exit:
×
55

56
  tEndDecode(pDecoder);
×
57
  tDecoderClear(pDecoder);  
×
58
  
59
  TAOS_RETURN(code);
×
60
}
61

62
void tFreeStreamObj(SStreamObj *pStream) {
×
63
  tFreeSCMCreateStreamReq(pStream->pCreate);
×
64
  taosMemoryFreeClear(pStream->pCreate);
×
65
}
×
66

67
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
12✔
68
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
12!
69
  if (pVgEpNew == NULL) {
12!
70
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
71
    return NULL;
×
72
  }
73
  pVgEpNew->vgId = pVgEp->vgId;
12✔
74
  //  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
75
  pVgEpNew->epSet = pVgEp->epSet;
12✔
76
  return pVgEpNew;
12✔
77
}
78

79
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
63✔
80
  if (pVgEp) {
63!
81
    //    taosMemoryFreeClear(pVgEp->qmsg);
82
    taosMemoryFree(pVgEp);
63!
83
  }
84
}
63✔
85

86
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
60✔
87
  int32_t tlen = 0;
60✔
88
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
60✔
89
  //  tlen += taosEncodeString(buf, pVgEp->qmsg);
90
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
60✔
91
  return tlen;
60✔
92
}
93

94
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
21✔
95
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
21!
96
  if (sver == 1) {
21!
97
    uint64_t size = 0;
×
98
    buf = taosDecodeVariantU64(buf, &size);
×
99
    buf = POINTER_SHIFT(buf, size);
×
100
  }
101
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
21✔
102
  return (void *)buf;
21✔
103
}
104

105
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
12!
106

107
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
45✔
108
                           SMqConsumerObj **ppConsumer) {
109
  int32_t         code = 0;
45✔
110
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
45!
111
  if (pConsumer == NULL) {
45!
112
    code = terrno;
×
113
    goto END;
×
114
  }
115

116
  pConsumer->consumerId = consumerId;
45✔
117
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
45✔
118

119
  pConsumer->epoch = 0;
45✔
120
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
45✔
121
  pConsumer->hbStatus = 0;
45✔
122
  pConsumer->pollStatus = 0;
45✔
123

124
  taosInitRWLatch(&pConsumer->lock);
45✔
125
  pConsumer->createTime = taosGetTimestampMs();
45✔
126
  pConsumer->updateType = updateType;
45✔
127

128
  if (updateType == CONSUMER_ADD_REB) {
45✔
129
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
12✔
130
    if (pConsumer->rebNewTopics == NULL) {
12!
131
      code = terrno;
×
132
      goto END;
×
133
    }
134

135
    char *topicTmp = taosStrdup(topic);
12!
136
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
24!
137
      code = terrno;
×
138
      goto END;
×
139
    }
140
  } else if (updateType == CONSUMER_REMOVE_REB) {
33✔
141
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
12✔
142
    if (pConsumer->rebRemovedTopics == NULL) {
12!
143
      code = terrno;
×
144
      goto END;
×
145
    }
146
    char *topicTmp = taosStrdup(topic);
12!
147
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
24!
148
      code = terrno;
×
149
      goto END;
×
150
    }
151
  } else if (updateType == CONSUMER_INSERT_SUB) {
21✔
152
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
6✔
153
    pConsumer->withTbName = subscribe->withTbName;
6✔
154
    pConsumer->autoCommit = subscribe->autoCommit;
6✔
155
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
6✔
156
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
6✔
157
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
6✔
158
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
6✔
159
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
6✔
160
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
6✔
161

162
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
6✔
163
    if (pConsumer->rebNewTopics == NULL) {
6!
164
      code = terrno;
×
165
      goto END;
×
166
    }
167
    pConsumer->assignedTopics = subscribe->topicNames;
6✔
168
    subscribe->topicNames = NULL;
6✔
169
  } else if (updateType == CONSUMER_UPDATE_SUB) {
15✔
170
    pConsumer->assignedTopics = subscribe->topicNames;
6✔
171
    subscribe->topicNames = NULL;
6✔
172
  }
173

174
  *ppConsumer = pConsumer;
45✔
175
  return 0;
45✔
176

177
END:
×
178
  tDeleteSMqConsumerObj(pConsumer);
×
179
  return code;
×
180
}
181

182
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
159✔
183
  if (pConsumer == NULL) return;
159✔
184
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
90✔
185
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
90✔
186
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
90✔
187
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
90✔
188
}
189

190
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
114✔
191
  tClearSMqConsumerObj(pConsumer);
114✔
192
  taosMemoryFree(pConsumer);
114!
193
}
114✔
194

195
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
90✔
196
  int32_t tlen = 0;
90✔
197
  int32_t sz;
198
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
90✔
199
  tlen += taosEncodeString(buf, pConsumer->clientId);
90✔
200
  tlen += taosEncodeString(buf, pConsumer->cgroup);
90✔
201
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
90✔
202
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
90✔
203
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
90✔
204

205
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
90✔
206
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
90✔
207
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
90✔
208
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
90✔
209
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
90✔
210

211
  // current topics
212
  if (pConsumer->currentTopics) {
90!
213
    sz = taosArrayGetSize(pConsumer->currentTopics);
×
214
    tlen += taosEncodeFixedI32(buf, sz);
×
215
    for (int32_t i = 0; i < sz; i++) {
×
216
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
217
      tlen += taosEncodeString(buf, topic);
×
218
    }
219
  } else {
220
    tlen += taosEncodeFixedI32(buf, 0);
90✔
221
  }
222

223
  // reb new topics
224
  if (pConsumer->rebNewTopics) {
90✔
225
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
48✔
226
    tlen += taosEncodeFixedI32(buf, sz);
48✔
227
    for (int32_t i = 0; i < sz; i++) {
96✔
228
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
48✔
229
      tlen += taosEncodeString(buf, topic);
48✔
230
    }
231
  } else {
232
    tlen += taosEncodeFixedI32(buf, 0);
42✔
233
  }
234

235
  // reb removed topics
236
  if (pConsumer->rebRemovedTopics) {
90✔
237
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
36✔
238
    tlen += taosEncodeFixedI32(buf, sz);
36✔
239
    for (int32_t i = 0; i < sz; i++) {
84✔
240
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
48✔
241
      tlen += taosEncodeString(buf, topic);
48✔
242
    }
243
  } else {
244
    tlen += taosEncodeFixedI32(buf, 0);
54✔
245
  }
246

247
  // lost topics
248
  if (pConsumer->assignedTopics) {
90✔
249
    sz = taosArrayGetSize(pConsumer->assignedTopics);
24✔
250
    tlen += taosEncodeFixedI32(buf, sz);
24✔
251
    for (int32_t i = 0; i < sz; i++) {
48✔
252
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
24✔
253
      tlen += taosEncodeString(buf, topic);
24✔
254
    }
255
  } else {
256
    tlen += taosEncodeFixedI32(buf, 0);
66✔
257
  }
258

259
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
90✔
260
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
90✔
261
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
90✔
262
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
90✔
263
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
90✔
264
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
90✔
265
  tlen += taosEncodeString(buf, pConsumer->user);
90✔
266
  tlen += taosEncodeString(buf, pConsumer->fqdn);
90✔
267
  return tlen;
90✔
268
}
269

270
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
45✔
271
  int32_t sz;
272
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
45!
273
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
45✔
274
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
45✔
275
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
45✔
276
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
45!
277
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
45!
278

279
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
45!
280
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
45✔
281
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
45!
282
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
45!
283
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
90!
284

285
  // current topics
286
  buf = taosDecodeFixedI32(buf, &sz);
45✔
287
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
45✔
288
  if (pConsumer->currentTopics == NULL) {
45!
289
    return NULL;
×
290
  }
291
  for (int32_t i = 0; i < sz; i++) {
45!
292
    char *topic;
293
    buf = taosDecodeString(buf, &topic);
×
294
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
×
295
      return NULL;
×
296
    }
297
  }
298

299
  // reb new topics
300
  buf = taosDecodeFixedI32(buf, &sz);
45✔
301
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
45✔
302
  for (int32_t i = 0; i < sz; i++) {
69✔
303
    char *topic;
304
    buf = taosDecodeString(buf, &topic);
24✔
305
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
48!
306
      return NULL;
×
307
    }
308
  }
309

310
  // reb removed topics
311
  buf = taosDecodeFixedI32(buf, &sz);
45✔
312
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
45✔
313
  for (int32_t i = 0; i < sz; i++) {
69✔
314
    char *topic;
315
    buf = taosDecodeString(buf, &topic);
24✔
316
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
48!
317
      return NULL;
×
318
    }
319
  }
320

321
  // reb removed topics
322
  buf = taosDecodeFixedI32(buf, &sz);
45✔
323
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
45✔
324
  for (int32_t i = 0; i < sz; i++) {
57✔
325
    char *topic;
326
    buf = taosDecodeString(buf, &topic);
12✔
327
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
24!
328
      return NULL;
×
329
    }
330
  }
331

332
  if (sver > 1) {
45!
333
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
45✔
334
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
45✔
335
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
45!
336
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
90!
337
  }
338
  if (sver > 2) {
45!
339
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
45!
340
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
45!
341
    buf = taosDecodeStringTo(buf, pConsumer->user);
45✔
342
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
90✔
343
  } else {
344
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
345
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
346
  }
347

348
  return (void *)buf;
45✔
349
}
350

351
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
90✔
352
  int32_t tlen = 0;
90✔
353
  int32_t szVgs = taosArrayGetSize(offsetRows);
90✔
354
  tlen += taosEncodeFixedI32(buf, szVgs);
90✔
355
  for (int32_t j = 0; j < szVgs; ++j) {
132✔
356
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
42✔
357
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
42✔
358
    tlen += taosEncodeFixedI64(buf, offRows->rows);
42✔
359
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
42✔
360
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
42!
361
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
×
362
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
×
363
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
42✔
364
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
76✔
365
    } else {
366
      // do nothing
367
    }
368
    tlen += taosEncodeFixedI64(buf, offRows->ever);
84✔
369
  }
370

371
  return tlen;
90✔
372
}
373

374
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
30✔
375
  int32_t tlen = 0;
30✔
376
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
30✔
377
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
30✔
378

379
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
30✔
380
}
381

382
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
36✔
383
  int32_t szVgs = 0;
36!
384
  buf = taosDecodeFixedI32(buf, &szVgs);
36✔
385
  if (szVgs > 0) {
36✔
386
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
12✔
387
    if (NULL == *offsetRows) return NULL;
12!
388
    for (int32_t j = 0; j < szVgs; ++j) {
24✔
389
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
12✔
390
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
12!
391
      buf = taosDecodeFixedI64(buf, &offRows->rows);
12!
392
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
12✔
393
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
12!
394
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
×
395
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
×
396
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
12✔
397
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
22!
398
      } else {
399
        // do nothing
400
      }
401
      if (sver > 2) {
12!
402
        buf = taosDecodeFixedI64(buf, &offRows->ever);
24!
403
      }
404
    }
405
  }
406
  return (void *)buf;
36✔
407
}
408

409
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
15✔
410
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
15!
411
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
15✔
412
  if (sver > 1) {
15!
413
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
15✔
414
  }
415

416
  return (void *)buf;
15✔
417
}
418

419
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
9✔
420
  int32_t          code = 0;
9✔
421
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
9!
422
  MND_TMQ_NULL_CHECK(pSubObj);
9!
423

424
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
9✔
425
  taosInitRWLatch(&pSubObj->lock);
9✔
426
  pSubObj->vgNum = 0;
9✔
427
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
9✔
428
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
9!
429
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
9✔
430
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
9!
431
  if (ppSub) {
9!
432
    *ppSub = pSubObj;
9✔
433
  }
434
  return code;
9✔
435

436
END:
×
437
  taosMemoryFree(pSubObj);
×
438
  return code;
×
439
}
440

441
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
12✔
442
  int32_t          code = 0;
12✔
443
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
12!
444
  if (pSubNew == NULL) {
12!
445
    code = terrno;
×
446
    goto END;
×
447
  }
448
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
12✔
449
  taosInitRWLatch(&pSubNew->lock);
12✔
450

451
  pSubNew->dbUid = pSub->dbUid;
12✔
452
  pSubNew->stbUid = pSub->stbUid;
12✔
453
  pSubNew->subType = pSub->subType;
12✔
454
  pSubNew->withMeta = pSub->withMeta;
12✔
455

456
  pSubNew->vgNum = pSub->vgNum;
12✔
457
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
12✔
458

459
  void          *pIter = NULL;
12✔
460
  SMqConsumerEp *pConsumerEp = NULL;
12✔
461
  while (1) {
15✔
462
    pIter = taosHashIterate(pSub->consumerHash, pIter);
27✔
463
    if (pIter == NULL) break;
27✔
464
    pConsumerEp = (SMqConsumerEp *)pIter;
15✔
465
    SMqConsumerEp newEp = {
30✔
466
        .consumerId = pConsumerEp->consumerId,
15✔
467
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
15✔
468
    };
469
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
15!
470
                            sizeof(SMqConsumerEp))) != 0)
471
      goto END;
×
472
  }
473
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
12✔
474
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
12✔
475
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
12✔
476
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
12!
477
  if (ppSub) {
12!
478
    *ppSub = pSubNew;
12✔
479
  }
480
END:
×
481
  return code;
12✔
482
}
483

484
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
42✔
485
  if (pSub == NULL) return;
42!
486
  void *pIter = NULL;
42✔
487
  while (1) {
30✔
488
    pIter = taosHashIterate(pSub->consumerHash, pIter);
72✔
489
    if (pIter == NULL) break;
72✔
490
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
30✔
491
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
30✔
492
    taosArrayDestroy(pConsumerEp->offsetRows);
30✔
493
  }
494
  taosHashCleanup(pSub->consumerHash);
42✔
495
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
42✔
496
  taosMemoryFreeClear(pSub->qmsg);
42!
497
  taosArrayDestroy(pSub->offsetRows);
42✔
498
}
499

500
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
60✔
501
  int32_t tlen = 0;
60✔
502
  tlen += taosEncodeString(buf, pSub->key);
60✔
503
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
60✔
504
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
60✔
505
  tlen += taosEncodeFixedI8(buf, pSub->subType);
60✔
506
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
60✔
507
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
60✔
508

509
  void   *pIter = NULL;
60✔
510
  int32_t sz = taosHashGetSize(pSub->consumerHash);
60✔
511
  tlen += taosEncodeFixedI32(buf, sz);
60✔
512

513
  int32_t cnt = 0;
60✔
514
  while (1) {
30✔
515
    pIter = taosHashIterate(pSub->consumerHash, pIter);
90✔
516
    if (pIter == NULL) break;
90✔
517
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
30✔
518
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
30✔
519
    cnt++;
30✔
520
  }
521
  if (cnt != sz) return -1;
60!
522
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
60✔
523
  tlen += taosEncodeString(buf, pSub->dbName);
60✔
524

525
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
60✔
526
  tlen += taosEncodeString(buf, pSub->qmsg);
60✔
527
  return tlen;
60✔
528
}
529

530
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
21✔
531
  //
532
  buf = taosDecodeStringTo(buf, pSub->key);
21✔
533
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
21!
534
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
21!
535
  buf = taosDecodeFixedI8(buf, &pSub->subType);
21✔
536
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
21✔
537
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
42!
538

539
  int32_t sz;
540
  buf = taosDecodeFixedI32(buf, &sz);
21✔
541

542
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
21✔
543
  for (int32_t i = 0; i < sz; i++) {
36✔
544
    SMqConsumerEp consumerEp = {0};
15✔
545
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
15✔
546
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
15!
547
        0)
548
      return NULL;
×
549
  }
550

551
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
21✔
552
  buf = taosDecodeStringTo(buf, pSub->dbName);
21✔
553

554
  if (sver > 1) {
21!
555
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
21✔
556
    buf = taosDecodeString(buf, &pSub->qmsg);
42✔
557
  } else {
558
    pSub->qmsg = taosStrdup("");
×
559
  }
560
  return (void *)buf;
21✔
561
}
562

563
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
415✔
564
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
415✔
565
  pObj->dtype = pItem->dtype;
415✔
566
  switch (pItem->dtype) {
415!
567
    case CFG_DTYPE_NONE:
×
568
      break;
×
569
    case CFG_DTYPE_BOOL:
90✔
570
      pObj->bval = pItem->bval;
90✔
571
      break;
90✔
572
    case CFG_DTYPE_INT32:
225✔
573
      pObj->i32 = pItem->i32;
225✔
574
      break;
225✔
575
    case CFG_DTYPE_INT64:
25✔
576
      pObj->i64 = pItem->i64;
25✔
577
      break;
25✔
578
    case CFG_DTYPE_FLOAT:
10✔
579
    case CFG_DTYPE_DOUBLE:
580
      pObj->fval = pItem->fval;
10✔
581
      break;
10✔
582
    case CFG_DTYPE_STRING:
65✔
583
    case CFG_DTYPE_DIR:
584
    case CFG_DTYPE_LOCALE:
585
    case CFG_DTYPE_CHARSET:
586
    case CFG_DTYPE_TIMEZONE:
587
      pObj->str = taosStrdup(pItem->str);
65!
588
      if (pObj->str == NULL) {
65!
589
        taosMemoryFree(pObj);
×
590
        return TSDB_CODE_OUT_OF_MEMORY;
×
591
      }
592
      break;
65✔
593
  }
594
  return TSDB_CODE_SUCCESS;
415✔
595
}
596

597
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
×
598
  int32_t code = 0;
×
599
  switch (pObjNew->dtype) {
×
600
    case CFG_DTYPE_BOOL: {
×
601
      bool tmp = false;
×
602
      if (strcasecmp(value, "true") == 0) {
×
603
        tmp = true;
×
604
      }
605
      if (taosStr2Int32(value, NULL, 10) > 0) {
×
606
        tmp = true;
×
607
      }
608
      pObjNew->bval = tmp;
×
609
      break;
×
610
    }
611
    case CFG_DTYPE_INT32: {
×
612
      int32_t ival;
613
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
×
614
      pObjNew->i32 = ival;
×
615
      break;
×
616
    }
617
    case CFG_DTYPE_INT64: {
×
618
      int64_t ival;
619
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
×
620
      pObjNew->i64 = ival;
×
621
      break;
×
622
    }
623
    case CFG_DTYPE_FLOAT:
×
624
    case CFG_DTYPE_DOUBLE: {
625
      float dval = 0;
×
626
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
627
      pObjNew->fval = dval;
×
628
      break;
×
629
    }
630
    case CFG_DTYPE_DIR:
×
631
    case CFG_DTYPE_TIMEZONE:
632
    case CFG_DTYPE_CHARSET:
633
    case CFG_DTYPE_LOCALE:
634
    case CFG_DTYPE_STRING: {
635
      pObjNew->str = taosStrdup(value);
×
636
      if (pObjNew->str == NULL) {
×
637
        code = terrno;
×
638
        return code;
×
639
      }
640
      break;
×
641
    }
642
    case CFG_DTYPE_NONE:
×
643
      break;
×
644
    default:
×
645
      code = TSDB_CODE_INVALID_CFG;
×
646
      break;
×
647
  }
648
  return code;
×
649
}
650

651
SConfigObj mndInitConfigVersion() {
13✔
652
  SConfigObj obj;
653
  memset(&obj, 0, sizeof(SConfigObj));
13✔
654

655
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
13✔
656
  obj.dtype = CFG_DTYPE_INT32;
13✔
657
  obj.i32 = 0;
13✔
658
  return obj;
13✔
659
}
660

661
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
2,904✔
662
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
2,904!
663
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
5,808!
664

665
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
5,808!
666
  switch (pObj->dtype) {
2,904!
667
    case CFG_DTYPE_BOOL:
612✔
668
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
1,224!
669
      break;
612✔
670
    case CFG_DTYPE_INT32:
1,612✔
671
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
3,224!
672
      break;
1,612✔
673
    case CFG_DTYPE_INT64:
170✔
674
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
340!
675
      break;
170✔
676
    case CFG_DTYPE_FLOAT:
68✔
677
    case CFG_DTYPE_DOUBLE:
678
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
136!
679
      break;
68✔
680
    case CFG_DTYPE_STRING:
442✔
681
    case CFG_DTYPE_DIR:
682
    case CFG_DTYPE_LOCALE:
683
    case CFG_DTYPE_CHARSET:
684
    case CFG_DTYPE_TIMEZONE:
685
      if (pObj->str != NULL) {
442!
686
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
884!
687
      } else {
688
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
689
      }
690
      break;
442✔
691
    default:
×
692
      break;
×
693
  }
694
  tEndEncode(pEncoder);
2,904✔
695
  return pEncoder->pos;
2,904✔
696
}
697

698
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
596✔
699
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
596!
700
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
596!
701
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
1,192!
702
  switch (pObj->dtype) {
596!
703
    case CFG_DTYPE_NONE:
×
704
      break;
×
705
    case CFG_DTYPE_BOOL:
126✔
706
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
126!
707
      break;
126✔
708
    case CFG_DTYPE_INT32:
330✔
709
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
660!
710
      break;
330✔
711
    case CFG_DTYPE_INT64:
35✔
712
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
70!
713
      break;
35✔
714
    case CFG_DTYPE_FLOAT:
14✔
715
    case CFG_DTYPE_DOUBLE:
716
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
28!
717
      break;
14✔
718
    case CFG_DTYPE_STRING:
91✔
719
    case CFG_DTYPE_DIR:
720
    case CFG_DTYPE_LOCALE:
721
    case CFG_DTYPE_CHARSET:
722
    case CFG_DTYPE_TIMEZONE:
723
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
182!
724
      break;
91✔
725
  }
726
  tEndDecode(pDecoder);
596✔
727
  TAOS_RETURN(TSDB_CODE_SUCCESS);
596✔
728
}
729

730
void tFreeSConfigObj(SConfigObj *obj) {
1,024✔
731
  if (obj == NULL) {
1,024!
732
    return;
×
733
  }
734
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
1,024!
735
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
892✔
736
    taosMemoryFree(obj->str);
156!
737
  }
738
}
739

740
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
741
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
742
//   if (pEntryNew == NULL) return NULL;
743
//   pEntryNew->epoch = pEntry->epoch;
744
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
745
//   return pEntryNew;
746
// }
747
//
748
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
749
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
750
// }
751

752
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
753
//   int32_t tlen = 0;
754
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
755
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
756
//   return tlen;
757
// }
758
//
759
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
760
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
761
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
762
//   return (void *)buf;
763
// }
764

765
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
766
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
767
//   if (pLogNew == NULL) return pLogNew;
768
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
769
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
770
//   return pLogNew;
771
// }
772
//
773
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
774
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
775
// }
776

777
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
778
//   int32_t tlen = 0;
779
//   tlen += taosEncodeString(buf, pLog->key);
780
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
781
//   return tlen;
782
// }
783
//
784
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
785
//   buf = taosDecodeStringTo(buf, pLog->key);
786
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
787
//   return (void *)buf;
788
// }
789
//
790
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
791
//   int32_t tlen = 0;
792
//   tlen += taosEncodeString(buf, pOffset->key);
793
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
794
//   return tlen;
795
// }
796
//
797
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
798
//   buf = taosDecodeStringTo(buf, pOffset->key);
799
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
800
//   return buf;
801
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc