• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4548

22 Jul 2025 02:37AM UTC coverage: 54.273% (-3.0%) from 57.287%
#4548

push

travis-ci

GitHub
Merge pull request #32061 from taosdata/new_testcases

132738 of 315239 branches covered (42.11%)

Branch coverage included in aggregate %.

201371 of 300373 relevant lines covered (67.04%)

3475977.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.26
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "taoserror.h"
19
#include "tunit.h"
20

21
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
×
22
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
×
23

24
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
×
25
  TAOS_CHECK_RETURN(tSerializeSCMCreateStreamReqImpl(pEncoder, pObj->pCreate));
×
26

27
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->mainSnodeId));
×
28
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->userStopped));
×
29
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
×
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
×
31

32
  tEndEncode(pEncoder);
×
33
  return pEncoder->pos;
×
34
}
35

36
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
×
37
  int32_t code = 0;
×
38
  int32_t lino = 0;
×
39
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
×
40

41
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
×
42
  pObj->pCreate = taosMemoryCalloc(1, sizeof(*pObj->pCreate));
×
43
  if (NULL == pObj) {
×
44
    TAOS_CHECK_EXIT(terrno);
×
45
  }
46
  
47
  TAOS_CHECK_RETURN(tDeserializeSCMCreateStreamReqImpl(pDecoder, pObj->pCreate));
×
48

49
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->mainSnodeId));
×
50
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->userStopped));
×
51
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
×
52
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
×
53

54
_exit:
×
55

56
  tEndDecode(pDecoder);
×
57
  tDecoderClear(pDecoder);  
×
58
  
59
  TAOS_RETURN(code);
×
60
}
61

62
void tFreeStreamObj(SStreamObj *pStream) {
×
63
  tFreeSCMCreateStreamReq(pStream->pCreate);
×
64
  taosMemoryFreeClear(pStream->pCreate);
×
65
}
×
66

67
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
383✔
68
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
383!
69
  if (pVgEpNew == NULL) {
383!
70
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
71
    return NULL;
×
72
  }
73
  pVgEpNew->vgId = pVgEp->vgId;
383✔
74
  //  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
75
  pVgEpNew->epSet = pVgEp->epSet;
383✔
76
  return pVgEpNew;
383✔
77
}
78

79
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
2,125✔
80
  if (pVgEp) {
2,125!
81
    //    taosMemoryFreeClear(pVgEp->qmsg);
82
    taosMemoryFree(pVgEp);
2,125!
83
  }
84
}
2,125✔
85

86
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
2,050✔
87
  int32_t tlen = 0;
2,050✔
88
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
2,050✔
89
  //  tlen += taosEncodeString(buf, pVgEp->qmsg);
90
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
2,050✔
91
  return tlen;
2,050✔
92
}
93

94
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
761✔
95
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
761!
96
  if (sver == 1) {
761!
97
    uint64_t size = 0;
×
98
    buf = taosDecodeVariantU64(buf, &size);
×
99
    buf = POINTER_SHIFT(buf, size);
×
100
  }
101
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
761✔
102
  return (void *)buf;
761✔
103
}
104

105
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
162!
106

107
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
697✔
108
                           SMqConsumerObj **ppConsumer) {
109
  int32_t         code = 0;
697✔
110
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
697!
111
  if (pConsumer == NULL) {
697!
112
    code = terrno;
×
113
    goto END;
×
114
  }
115

116
  pConsumer->consumerId = consumerId;
697✔
117
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
697✔
118

119
  pConsumer->epoch = 0;
697✔
120
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
697✔
121
  pConsumer->hbStatus = 0;
697✔
122
  pConsumer->pollStatus = 0;
697✔
123

124
  taosInitRWLatch(&pConsumer->lock);
697✔
125
  pConsumer->createTime = taosGetTimestampMs();
697✔
126
  pConsumer->updateType = updateType;
697✔
127

128
  if (updateType == CONSUMER_ADD_REB) {
697✔
129
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
162✔
130
    if (pConsumer->rebNewTopics == NULL) {
162!
131
      code = terrno;
×
132
      goto END;
×
133
    }
134

135
    char *topicTmp = taosStrdup(topic);
162!
136
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
324!
137
      code = terrno;
×
138
      goto END;
×
139
    }
140
  } else if (updateType == CONSUMER_REMOVE_REB) {
535✔
141
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
161✔
142
    if (pConsumer->rebRemovedTopics == NULL) {
161!
143
      code = terrno;
×
144
      goto END;
×
145
    }
146
    char *topicTmp = taosStrdup(topic);
161!
147
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
322!
148
      code = terrno;
×
149
      goto END;
×
150
    }
151
  } else if (updateType == CONSUMER_INSERT_SUB) {
374✔
152
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
114✔
153
    pConsumer->withTbName = subscribe->withTbName;
114✔
154
    pConsumer->autoCommit = subscribe->autoCommit;
114✔
155
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
114✔
156
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
114✔
157
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
114✔
158
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
114✔
159
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
114✔
160
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
114✔
161

162
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
114✔
163
    if (pConsumer->rebNewTopics == NULL) {
114!
164
      code = terrno;
×
165
      goto END;
×
166
    }
167
    pConsumer->assignedTopics = subscribe->topicNames;
114✔
168
    subscribe->topicNames = NULL;
114✔
169
  } else if (updateType == CONSUMER_UPDATE_SUB) {
260✔
170
    pConsumer->assignedTopics = subscribe->topicNames;
114✔
171
    subscribe->topicNames = NULL;
114✔
172
  }
173

174
  *ppConsumer = pConsumer;
697✔
175
  return 0;
697✔
176

177
END:
×
178
  tDeleteSMqConsumerObj(pConsumer);
×
179
  return code;
×
180
}
181

182
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
2,328✔
183
  if (pConsumer == NULL) return;
2,328✔
184
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
1,394✔
185
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
1,394✔
186
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
1,394✔
187
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
1,394✔
188
}
189

190
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
1,631✔
191
  tClearSMqConsumerObj(pConsumer);
1,631✔
192
  taosMemoryFree(pConsumer);
1,631!
193
}
1,631✔
194

195
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
1,412✔
196
  int32_t tlen = 0;
1,412✔
197
  int32_t sz;
198
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
1,412✔
199
  tlen += taosEncodeString(buf, pConsumer->clientId);
1,412✔
200
  tlen += taosEncodeString(buf, pConsumer->cgroup);
1,412✔
201
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
1,412✔
202
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
1,412✔
203
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
1,412✔
204

205
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
1,412✔
206
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
1,412✔
207
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
1,412✔
208
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
1,412✔
209
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
1,412✔
210

211
  // current topics
212
  if (pConsumer->currentTopics) {
1,412✔
213
    sz = taosArrayGetSize(pConsumer->currentTopics);
18✔
214
    tlen += taosEncodeFixedI32(buf, sz);
18✔
215
    for (int32_t i = 0; i < sz; i++) {
22✔
216
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
4✔
217
      tlen += taosEncodeString(buf, topic);
4✔
218
    }
219
  } else {
220
    tlen += taosEncodeFixedI32(buf, 0);
1,394✔
221
  }
222

223
  // reb new topics
224
  if (pConsumer->rebNewTopics) {
1,412✔
225
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
798✔
226
    tlen += taosEncodeFixedI32(buf, sz);
798✔
227
    for (int32_t i = 0; i < sz; i++) {
1,450✔
228
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
652✔
229
      tlen += taosEncodeString(buf, topic);
652✔
230
    }
231
  } else {
232
    tlen += taosEncodeFixedI32(buf, 0);
614✔
233
  }
234

235
  // reb removed topics
236
  if (pConsumer->rebRemovedTopics) {
1,412✔
237
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
568✔
238
    tlen += taosEncodeFixedI32(buf, sz);
568✔
239
    for (int32_t i = 0; i < sz; i++) {
1,216✔
240
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
648✔
241
      tlen += taosEncodeString(buf, topic);
648✔
242
    }
243
  } else {
244
    tlen += taosEncodeFixedI32(buf, 0);
844✔
245
  }
246

247
  // lost topics
248
  if (pConsumer->assignedTopics) {
1,412✔
249
    sz = taosArrayGetSize(pConsumer->assignedTopics);
474✔
250
    tlen += taosEncodeFixedI32(buf, sz);
474✔
251
    for (int32_t i = 0; i < sz; i++) {
804✔
252
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
330✔
253
      tlen += taosEncodeString(buf, topic);
330✔
254
    }
255
  } else {
256
    tlen += taosEncodeFixedI32(buf, 0);
938✔
257
  }
258

259
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
1,412✔
260
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
1,412✔
261
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
1,412✔
262
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
1,412✔
263
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
1,412✔
264
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
1,412✔
265
  tlen += taosEncodeString(buf, pConsumer->user);
1,412✔
266
  tlen += taosEncodeString(buf, pConsumer->fqdn);
1,412✔
267
  return tlen;
1,412✔
268
}
269

270
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
697✔
271
  int32_t sz;
272
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
697!
273
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
697✔
274
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
697✔
275
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
697✔
276
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
697!
277
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
697!
278

279
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
697!
280
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
697✔
281
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
697!
282
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
697!
283
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
1,394!
284

285
  // current topics
286
  buf = taosDecodeFixedI32(buf, &sz);
697✔
287
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
697✔
288
  if (pConsumer->currentTopics == NULL) {
697!
289
    return NULL;
×
290
  }
291
  for (int32_t i = 0; i < sz; i++) {
697!
292
    char *topic;
293
    buf = taosDecodeString(buf, &topic);
×
294
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
×
295
      return NULL;
×
296
    }
297
  }
298

299
  // reb new topics
300
  buf = taosDecodeFixedI32(buf, &sz);
697✔
301
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
697✔
302
  for (int32_t i = 0; i < sz; i++) {
1,021✔
303
    char *topic;
304
    buf = taosDecodeString(buf, &topic);
324✔
305
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
648!
306
      return NULL;
×
307
    }
308
  }
309

310
  // reb removed topics
311
  buf = taosDecodeFixedI32(buf, &sz);
697✔
312
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
697✔
313
  for (int32_t i = 0; i < sz; i++) {
1,020✔
314
    char *topic;
315
    buf = taosDecodeString(buf, &topic);
323✔
316
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
646!
317
      return NULL;
×
318
    }
319
  }
320

321
  // reb removed topics
322
  buf = taosDecodeFixedI32(buf, &sz);
697✔
323
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
697✔
324
  for (int32_t i = 0; i < sz; i++) {
859✔
325
    char *topic;
326
    buf = taosDecodeString(buf, &topic);
162✔
327
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
324!
328
      return NULL;
×
329
    }
330
  }
331

332
  if (sver > 1) {
697!
333
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
697✔
334
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
697✔
335
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
697!
336
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
1,394!
337
  }
338
  if (sver > 2) {
697!
339
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
697!
340
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
697!
341
    buf = taosDecodeStringTo(buf, pConsumer->user);
697✔
342
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
1,394✔
343
  } else {
344
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
345
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
346
  }
347

348
  return (void *)buf;
697✔
349
}
350

351
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
1,210✔
352
  int32_t tlen = 0;
1,210✔
353
  int32_t szVgs = taosArrayGetSize(offsetRows);
1,210✔
354
  tlen += taosEncodeFixedI32(buf, szVgs);
1,210✔
355
  for (int32_t j = 0; j < szVgs; ++j) {
2,598✔
356
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
1,388✔
357
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
1,388✔
358
    tlen += taosEncodeFixedI64(buf, offRows->rows);
1,388✔
359
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
1,388✔
360
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
1,388!
361
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
60✔
362
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
120✔
363
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
1,328✔
364
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
2,560✔
365
    } else {
366
      // do nothing
367
    }
368
    tlen += taosEncodeFixedI64(buf, offRows->ever);
2,776✔
369
  }
370

371
  return tlen;
1,210✔
372
}
373

374
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
402✔
375
  int32_t tlen = 0;
402✔
376
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
402✔
377
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
402✔
378

379
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
402✔
380
}
381

382
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
483✔
383
  int32_t szVgs = 0;
483!
384
  buf = taosDecodeFixedI32(buf, &szVgs);
483✔
385
  if (szVgs > 0) {
483✔
386
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
166✔
387
    if (NULL == *offsetRows) return NULL;
166!
388
    for (int32_t j = 0; j < szVgs; ++j) {
598✔
389
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
432✔
390
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
432!
391
      buf = taosDecodeFixedI64(buf, &offRows->rows);
432!
392
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
432✔
393
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
432!
394
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
17!
395
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
34!
396
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
415✔
397
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
804!
398
      } else {
399
        // do nothing
400
      }
401
      if (sver > 2) {
432!
402
        buf = taosDecodeFixedI64(buf, &offRows->ever);
864!
403
      }
404
    }
405
  }
406
  return (void *)buf;
483✔
407
}
408

409
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
199✔
410
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
199!
411
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
199✔
412
  if (sver > 1) {
199!
413
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
199✔
414
  }
415

416
  return (void *)buf;
199✔
417
}
418

419
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
117✔
420
  int32_t          code = 0;
117✔
421
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
117!
422
  MND_TMQ_NULL_CHECK(pSubObj);
117!
423

424
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
117✔
425
  taosInitRWLatch(&pSubObj->lock);
117✔
426
  pSubObj->vgNum = 0;
117✔
427
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
117✔
428
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
117!
429
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
117✔
430
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
117!
431
  if (ppSub) {
117!
432
    *ppSub = pSubObj;
117✔
433
  }
434
  return code;
117✔
435

436
END:
×
437
  taosMemoryFree(pSubObj);
×
438
  return code;
×
439
}
440

441
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
153✔
442
  int32_t          code = 0;
153✔
443
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
153!
444
  if (pSubNew == NULL) {
153!
445
    code = terrno;
×
446
    goto END;
×
447
  }
448
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
153✔
449
  taosInitRWLatch(&pSubNew->lock);
153✔
450

451
  pSubNew->dbUid = pSub->dbUid;
153✔
452
  pSubNew->stbUid = pSub->stbUid;
153✔
453
  pSubNew->subType = pSub->subType;
153✔
454
  pSubNew->withMeta = pSub->withMeta;
153✔
455

456
  pSubNew->vgNum = pSub->vgNum;
153✔
457
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
153✔
458

459
  void          *pIter = NULL;
153✔
460
  SMqConsumerEp *pConsumerEp = NULL;
153✔
461
  while (1) {
198✔
462
    pIter = taosHashIterate(pSub->consumerHash, pIter);
351✔
463
    if (pIter == NULL) break;
351✔
464
    pConsumerEp = (SMqConsumerEp *)pIter;
198✔
465
    SMqConsumerEp newEp = {
396✔
466
        .consumerId = pConsumerEp->consumerId,
198✔
467
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
198✔
468
    };
469
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
198!
470
                            sizeof(SMqConsumerEp))) != 0)
471
      goto END;
×
472
  }
473
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
153✔
474
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
153✔
475
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
153✔
476
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
153!
477
  if (ppSub) {
153!
478
    *ppSub = pSubNew;
153✔
479
  }
480
END:
×
481
  return code;
153✔
482
}
483

484
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
554✔
485
  if (pSub == NULL) return;
554!
486
  void *pIter = NULL;
554✔
487
  while (1) {
398✔
488
    pIter = taosHashIterate(pSub->consumerHash, pIter);
952✔
489
    if (pIter == NULL) break;
952✔
490
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
398✔
491
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
398✔
492
    taosArrayDestroy(pConsumerEp->offsetRows);
398✔
493
  }
494
  taosHashCleanup(pSub->consumerHash);
554✔
495
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
554✔
496
  taosMemoryFreeClear(pSub->qmsg);
554!
497
  taosArrayDestroy(pSub->offsetRows);
554✔
498
}
499

500
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
808✔
501
  int32_t tlen = 0;
808✔
502
  tlen += taosEncodeString(buf, pSub->key);
808✔
503
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
808✔
504
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
808✔
505
  tlen += taosEncodeFixedI8(buf, pSub->subType);
808✔
506
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
808✔
507
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
808✔
508

509
  void   *pIter = NULL;
808✔
510
  int32_t sz = taosHashGetSize(pSub->consumerHash);
808✔
511
  tlen += taosEncodeFixedI32(buf, sz);
808✔
512

513
  int32_t cnt = 0;
808✔
514
  while (1) {
402✔
515
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,210✔
516
    if (pIter == NULL) break;
1,210✔
517
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
402✔
518
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
402✔
519
    cnt++;
402✔
520
  }
521
  if (cnt != sz) return -1;
808!
522
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
808✔
523
  tlen += taosEncodeString(buf, pSub->dbName);
808✔
524

525
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
808✔
526
  tlen += taosEncodeString(buf, pSub->qmsg);
808✔
527
  return tlen;
808✔
528
}
529

530
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
284✔
531
  //
532
  buf = taosDecodeStringTo(buf, pSub->key);
284✔
533
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
284!
534
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
284!
535
  buf = taosDecodeFixedI8(buf, &pSub->subType);
284✔
536
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
284✔
537
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
568!
538

539
  int32_t sz;
540
  buf = taosDecodeFixedI32(buf, &sz);
284✔
541

542
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
284✔
543
  for (int32_t i = 0; i < sz; i++) {
483✔
544
    SMqConsumerEp consumerEp = {0};
199✔
545
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
199✔
546
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
199!
547
        0)
548
      return NULL;
×
549
  }
550

551
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
284✔
552
  buf = taosDecodeStringTo(buf, pSub->dbName);
284✔
553

554
  if (sver > 1) {
284!
555
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
284✔
556
    buf = taosDecodeString(buf, &pSub->qmsg);
568✔
557
  } else {
558
    pSub->qmsg = taosStrdup("");
×
559
  }
560
  return (void *)buf;
284✔
561
}
562

563
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
126,575✔
564
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
126,575✔
565
  pObj->dtype = pItem->dtype;
126,575✔
566
  switch (pItem->dtype) {
126,575!
567
    case CFG_DTYPE_NONE:
×
568
      break;
×
569
    case CFG_DTYPE_BOOL:
27,450✔
570
      pObj->bval = pItem->bval;
27,450✔
571
      break;
27,450✔
572
    case CFG_DTYPE_INT32:
68,625✔
573
      pObj->i32 = pItem->i32;
68,625✔
574
      break;
68,625✔
575
    case CFG_DTYPE_INT64:
7,625✔
576
      pObj->i64 = pItem->i64;
7,625✔
577
      break;
7,625✔
578
    case CFG_DTYPE_FLOAT:
3,050✔
579
    case CFG_DTYPE_DOUBLE:
580
      pObj->fval = pItem->fval;
3,050✔
581
      break;
3,050✔
582
    case CFG_DTYPE_STRING:
19,825✔
583
    case CFG_DTYPE_DIR:
584
    case CFG_DTYPE_LOCALE:
585
    case CFG_DTYPE_CHARSET:
586
    case CFG_DTYPE_TIMEZONE:
587
      pObj->str = taosStrdup(pItem->str);
19,825!
588
      if (pObj->str == NULL) {
19,825!
589
        taosMemoryFree(pObj);
×
590
        return TSDB_CODE_OUT_OF_MEMORY;
×
591
      }
592
      break;
19,825✔
593
  }
594
  return TSDB_CODE_SUCCESS;
126,575✔
595
}
596

597
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
10✔
598
  int32_t code = 0;
10✔
599
  switch (pObjNew->dtype) {
10!
600
    case CFG_DTYPE_BOOL: {
9✔
601
      bool tmp = false;
9✔
602
      if (strcasecmp(value, "true") == 0) {
9!
603
        tmp = true;
×
604
      }
605
      if (taosStr2Int32(value, NULL, 10) > 0) {
9✔
606
        tmp = true;
4✔
607
      }
608
      pObjNew->bval = tmp;
9✔
609
      break;
9✔
610
    }
611
    case CFG_DTYPE_INT32: {
×
612
      int32_t ival;
613
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
×
614
      pObjNew->i32 = ival;
×
615
      break;
×
616
    }
617
    case CFG_DTYPE_INT64: {
×
618
      int64_t ival;
619
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
×
620
      pObjNew->i64 = ival;
×
621
      break;
×
622
    }
623
    case CFG_DTYPE_FLOAT:
×
624
    case CFG_DTYPE_DOUBLE: {
625
      float dval = 0;
×
626
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
627
      pObjNew->fval = dval;
×
628
      break;
×
629
    }
630
    case CFG_DTYPE_DIR:
1✔
631
    case CFG_DTYPE_TIMEZONE:
632
    case CFG_DTYPE_CHARSET:
633
    case CFG_DTYPE_LOCALE:
634
    case CFG_DTYPE_STRING: {
635
      pObjNew->str = taosStrdup(value);
1!
636
      if (pObjNew->str == NULL) {
1!
637
        code = terrno;
×
638
        return code;
×
639
      }
640
      break;
1✔
641
    }
642
    case CFG_DTYPE_NONE:
×
643
      break;
×
644
    default:
×
645
      code = TSDB_CODE_INVALID_CFG;
×
646
      break;
×
647
  }
648
  return code;
10✔
649
}
650

651
SConfigObj mndInitConfigVersion() {
1,533✔
652
  SConfigObj obj;
653
  memset(&obj, 0, sizeof(SConfigObj));
1,533✔
654

655
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
1,533✔
656
  obj.dtype = CFG_DTYPE_INT32;
1,533✔
657
  obj.i32 = 0;
1,533✔
658
  return obj;
1,533✔
659
}
660

661
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
836,224✔
662
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
836,224!
663
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
1,672,448!
664

665
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
1,672,448!
666
  switch (pObj->dtype) {
836,224!
667
    case CFG_DTYPE_BOOL:
179,190✔
668
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
358,380!
669
      break;
179,190✔
670
    case CFG_DTYPE_INT32:
457,952✔
671
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
915,904!
672
      break;
457,952✔
673
    case CFG_DTYPE_INT64:
49,770✔
674
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
99,540!
675
      break;
49,770✔
676
    case CFG_DTYPE_FLOAT:
19,908✔
677
    case CFG_DTYPE_DOUBLE:
678
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
39,816!
679
      break;
19,908✔
680
    case CFG_DTYPE_STRING:
129,404✔
681
    case CFG_DTYPE_DIR:
682
    case CFG_DTYPE_LOCALE:
683
    case CFG_DTYPE_CHARSET:
684
    case CFG_DTYPE_TIMEZONE:
685
      if (pObj->str != NULL) {
129,404!
686
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
258,808!
687
      } else {
688
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
689
      }
690
      break;
129,404✔
691
    default:
×
692
      break;
×
693
  }
694
  tEndEncode(pEncoder);
836,224✔
695
  return pEncoder->pos;
836,224✔
696
}
697

698
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
159,872✔
699
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
159,872!
700
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
159,872!
701
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
319,744!
702
  switch (pObj->dtype) {
159,872!
703
    case CFG_DTYPE_NONE:
×
704
      break;
×
705
    case CFG_DTYPE_BOOL:
34,207✔
706
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
34,207!
707
      break;
34,207✔
708
    case CFG_DTYPE_INT32:
87,684✔
709
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
175,368!
710
      break;
87,684✔
711
    case CFG_DTYPE_INT64:
9,505✔
712
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
19,010!
713
      break;
9,505✔
714
    case CFG_DTYPE_FLOAT:
3,802✔
715
    case CFG_DTYPE_DOUBLE:
716
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
7,604!
717
      break;
3,802✔
718
    case CFG_DTYPE_STRING:
24,674✔
719
    case CFG_DTYPE_DIR:
720
    case CFG_DTYPE_LOCALE:
721
    case CFG_DTYPE_CHARSET:
722
    case CFG_DTYPE_TIMEZONE:
723
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
49,348!
724
      break;
24,674✔
725
  }
726
  tEndDecode(pDecoder);
159,872✔
727
  TAOS_RETURN(TSDB_CODE_SUCCESS);
159,872✔
728
}
729

730
void tFreeSConfigObj(SConfigObj *obj) {
288,000✔
731
  if (obj == NULL) {
288,000!
732
    return;
×
733
  }
734
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
288,000!
735
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
250,346✔
736
    taosMemoryFree(obj->str);
44,500!
737
  }
738
}
739

740
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
741
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
742
//   if (pEntryNew == NULL) return NULL;
743
//   pEntryNew->epoch = pEntry->epoch;
744
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
745
//   return pEntryNew;
746
// }
747
//
748
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
749
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
750
// }
751

752
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
753
//   int32_t tlen = 0;
754
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
755
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
756
//   return tlen;
757
// }
758
//
759
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
760
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
761
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
762
//   return (void *)buf;
763
// }
764

765
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
766
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
767
//   if (pLogNew == NULL) return pLogNew;
768
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
769
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
770
//   return pLogNew;
771
// }
772
//
773
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
774
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
775
// }
776

777
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
778
//   int32_t tlen = 0;
779
//   tlen += taosEncodeString(buf, pLog->key);
780
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
781
//   return tlen;
782
// }
783
//
784
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
785
//   buf = taosDecodeStringTo(buf, pLog->key);
786
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
787
//   return (void *)buf;
788
// }
789
//
790
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
791
//   int32_t tlen = 0;
792
//   tlen += taosEncodeString(buf, pOffset->key);
793
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
794
//   return tlen;
795
// }
796
//
797
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
798
//   buf = taosDecodeStringTo(buf, pOffset->key);
799
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
800
//   return buf;
801
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc