• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.67
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "mndStream.h"
19
#include "taoserror.h"
20
#include "tunit.h"
21

22
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
×
23
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
×
24

25
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
×
26
  TAOS_CHECK_RETURN(tSerializeSCMCreateStreamReqImpl(pEncoder, pObj->pCreate));
×
27

28
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->mainSnodeId));
×
29
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->userStopped));
×
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
×
31
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
×
32
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->createUser));
×
33
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->ownerId));
×
34

35
  tEndEncode(pEncoder);
×
36
  return pEncoder->pos;
×
37
}
38

39
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
×
40
  int32_t code = 0;
×
41
  int32_t lino = 0;
×
42
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
×
43

44
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
×
45
  pObj->pCreate = taosMemoryCalloc(1, sizeof(*pObj->pCreate));
×
46
  if (NULL == pObj) {
×
47
    TAOS_CHECK_EXIT(terrno);
×
48
  }
49
  
50
  if (MND_STREAM_VER_NUMBER == sver) {
×
51
    TAOS_CHECK_RETURN(tDeserializeSCMCreateStreamReqImpl(pDecoder, pObj->pCreate));
×
52
  } else {
53
    TAOS_CHECK_RETURN(
×
54
      tDeserializeSCMCreateStreamReqImplOld(pDecoder, pObj->pCreate, 21));
55
  }
56

57
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->mainSnodeId));
×
58
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->userStopped));
×
59
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
×
60
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
×
61
  if (!tDecodeIsEnd(pDecoder)) {
×
62
    TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->createUser));
×
63
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->ownerId));
×
64
  }
65

66
_exit:
×
67

68
  tEndDecode(pDecoder);
×
69
  tDecoderClear(pDecoder);  
×
70
  
71
  TAOS_RETURN(code);
×
72
}
73

74
void tFreeStreamObj(SStreamObj *pStream) {
×
75
  tFreeSCMCreateStreamReq(pStream->pCreate);
×
76
  taosMemoryFreeClear(pStream->pCreate);
×
77
}
×
78

79
void freeSMqConsumerEp(void* data) {
×
80
  if (data == NULL) return;
×
81
  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
×
82
  taosArrayDestroy(pConsumerEp->vgs);
×
83
  pConsumerEp->vgs = NULL;
×
84
  taosArrayDestroy(pConsumerEp->offsetRows);
×
85
  pConsumerEp->offsetRows = NULL;
×
86
}
87

88
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
×
89
  int32_t tlen = 0;
×
90
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
×
91
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
×
92
  return tlen;
×
93
}
94

95
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
×
96
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
×
97
  if (sver == 1) {
×
98
    uint64_t size = 0;
×
99
    buf = taosDecodeVariantU64(buf, &size);
×
100
    buf = POINTER_SHIFT(buf, size);
×
101
  }
102
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
×
103
  return (void *)buf;
×
104
}
105

106
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
×
107

108
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
×
109
                           SMqConsumerObj **ppConsumer) {
110
  int32_t         code = 0;
×
111
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
×
112
  if (pConsumer == NULL) {
×
113
    code = terrno;
×
114
    goto END;
×
115
  }
116

117
  pConsumer->consumerId = consumerId;
×
118
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
×
119

120
  pConsumer->epoch = 0;
×
121
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
×
122
  pConsumer->hbStatus = 0;
×
123
  pConsumer->pollStatus = 0;
×
124

125
  taosInitRWLatch(&pConsumer->lock);
×
126
  pConsumer->createTime = taosGetTimestampMs();
×
127
  pConsumer->updateType = updateType;
×
128

129
  if (updateType == CONSUMER_ADD_REB) {
×
130
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
×
131
    if (pConsumer->rebNewTopics == NULL) {
×
132
      code = terrno;
×
133
      goto END;
×
134
    }
135

136
    char *topicTmp = taosStrdup(topic);
×
137
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
×
138
      code = terrno;
×
139
      goto END;
×
140
    }
141
  } else if (updateType == CONSUMER_REMOVE_REB) {
×
142
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
×
143
    if (pConsumer->rebRemovedTopics == NULL) {
×
144
      code = terrno;
×
145
      goto END;
×
146
    }
147
    char *topicTmp = taosStrdup(topic);
×
148
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
×
149
      code = terrno;
×
150
      goto END;
×
151
    }
152
  } else if (updateType == CONSUMER_INSERT_SUB) {
×
153
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
×
154
    pConsumer->withTbName = subscribe->withTbName;
×
155
    pConsumer->autoCommit = subscribe->autoCommit;
×
156
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
×
157
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
×
158
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
×
159
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
×
160
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
×
161
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
×
162

163
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
×
164
    if (pConsumer->rebNewTopics == NULL) {
×
165
      code = terrno;
×
166
      goto END;
×
167
    }
168
    pConsumer->assignedTopics = subscribe->topicNames;
×
169
    subscribe->topicNames = NULL;
×
170
  } else if (updateType == CONSUMER_UPDATE_SUB) {
×
171
    pConsumer->assignedTopics = subscribe->topicNames;
×
172
    subscribe->topicNames = NULL;
×
173
  }
174

175
  *ppConsumer = pConsumer;
×
176
  return 0;
×
177

178
END:
×
179
  tDeleteSMqConsumerObj(pConsumer);
×
180
  return code;
×
181
}
182

183
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
×
184
  if (pConsumer == NULL) return;
×
185
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
×
186
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
×
187
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
×
188
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
×
189
}
190

191
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
×
192
  tClearSMqConsumerObj(pConsumer);
×
193
  taosMemoryFree(pConsumer);
×
194
}
×
195

196
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
×
197
  int32_t tlen = 0;
×
198
  int32_t sz;
199
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
×
200
  tlen += taosEncodeString(buf, pConsumer->clientId);
×
201
  tlen += taosEncodeString(buf, pConsumer->cgroup);
×
202
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
×
203
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
×
204
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
×
205

206
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
×
207
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
×
208
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
×
209
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
×
210
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
×
211

212
  // current topics
213
  if (pConsumer->currentTopics) {
×
214
    sz = taosArrayGetSize(pConsumer->currentTopics);
×
215
    tlen += taosEncodeFixedI32(buf, sz);
×
216
    for (int32_t i = 0; i < sz; i++) {
×
217
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
218
      tlen += taosEncodeString(buf, topic);
×
219
    }
220
  } else {
221
    tlen += taosEncodeFixedI32(buf, 0);
×
222
  }
223

224
  // reb new topics
225
  if (pConsumer->rebNewTopics) {
×
226
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
×
227
    tlen += taosEncodeFixedI32(buf, sz);
×
228
    for (int32_t i = 0; i < sz; i++) {
×
229
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
×
230
      tlen += taosEncodeString(buf, topic);
×
231
    }
232
  } else {
233
    tlen += taosEncodeFixedI32(buf, 0);
×
234
  }
235

236
  // reb removed topics
237
  if (pConsumer->rebRemovedTopics) {
×
238
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
×
239
    tlen += taosEncodeFixedI32(buf, sz);
×
240
    for (int32_t i = 0; i < sz; i++) {
×
241
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
×
242
      tlen += taosEncodeString(buf, topic);
×
243
    }
244
  } else {
245
    tlen += taosEncodeFixedI32(buf, 0);
×
246
  }
247

248
  // lost topics
249
  if (pConsumer->assignedTopics) {
×
250
    sz = taosArrayGetSize(pConsumer->assignedTopics);
×
251
    tlen += taosEncodeFixedI32(buf, sz);
×
252
    for (int32_t i = 0; i < sz; i++) {
×
253
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
×
254
      tlen += taosEncodeString(buf, topic);
×
255
    }
256
  } else {
257
    tlen += taosEncodeFixedI32(buf, 0);
×
258
  }
259

260
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
×
261
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
×
262
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
×
263
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
×
264
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
×
265
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
×
266
  tlen += taosEncodeString(buf, pConsumer->user);
×
267
  tlen += taosEncodeString(buf, pConsumer->fqdn);
×
268
  return tlen;
×
269
}
270

271
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
×
272
  int32_t sz;
273
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
×
274
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
×
275
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
×
276
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
×
277
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
×
278
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
×
279

280
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
×
281
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
×
282
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
×
283
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
×
284
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
×
285

286
  // current topics
287
  buf = taosDecodeFixedI32(buf, &sz);
×
288
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
×
289
  if (pConsumer->currentTopics == NULL) {
×
290
    return NULL;
×
291
  }
292
  for (int32_t i = 0; i < sz; i++) {
×
293
    char *topic;
294
    buf = taosDecodeString(buf, &topic);
×
295
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
×
296
      return NULL;
×
297
    }
298
  }
299

300
  // reb new topics
301
  buf = taosDecodeFixedI32(buf, &sz);
×
302
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
×
303
  for (int32_t i = 0; i < sz; i++) {
×
304
    char *topic;
305
    buf = taosDecodeString(buf, &topic);
×
306
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
×
307
      return NULL;
×
308
    }
309
  }
310

311
  // reb removed topics
312
  buf = taosDecodeFixedI32(buf, &sz);
×
313
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
×
314
  for (int32_t i = 0; i < sz; i++) {
×
315
    char *topic;
316
    buf = taosDecodeString(buf, &topic);
×
317
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
×
318
      return NULL;
×
319
    }
320
  }
321

322
  // reb removed topics
323
  buf = taosDecodeFixedI32(buf, &sz);
×
324
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
×
325
  for (int32_t i = 0; i < sz; i++) {
×
326
    char *topic;
327
    buf = taosDecodeString(buf, &topic);
×
328
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
×
329
      return NULL;
×
330
    }
331
  }
332

333
  if (sver > 1) {
×
334
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
×
335
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
×
336
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
×
337
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
×
338
  }
339
  if (sver > 2) {
×
340
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
×
341
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
×
342
    buf = taosDecodeStringTo(buf, pConsumer->user);
×
343
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
×
344
  } else {
345
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
346
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
347
  }
348

349
  return (void *)buf;
×
350
}
351

352
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
×
353
  int32_t tlen = 0;
×
354
  int32_t szVgs = taosArrayGetSize(offsetRows);
×
355
  tlen += taosEncodeFixedI32(buf, szVgs);
×
356
  for (int32_t j = 0; j < szVgs; ++j) {
×
357
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
×
358
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
×
359
    tlen += taosEncodeFixedI64(buf, offRows->rows);
×
360
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
×
361
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
×
362
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
×
363
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
×
364
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
×
365
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
×
366
    } else {
367
      // do nothing
368
    }
369
    tlen += taosEncodeFixedI64(buf, offRows->ever);
×
370
  }
371

372
  return tlen;
×
373
}
374

375
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
×
376
  int32_t tlen = 0;
×
377
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
×
378
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
×
379
  tlen += taosEncodeFixedI32(buf, sz);
×
380
  for (int32_t i = 0; i < sz; i++) {
×
381
    void* data = taosArrayGet(pConsumerEp->vgs, i);
×
382
    tlen += tEncodeSMqVgEp(buf, data);
×
383
  }
384

385
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
×
386
}
387

388
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
×
389
  int32_t szVgs = 0;
×
390
  buf = taosDecodeFixedI32(buf, &szVgs);
×
391
  if (szVgs > 0) {
×
392
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
×
393
    if (NULL == *offsetRows) return NULL;
×
394
    for (int32_t j = 0; j < szVgs; ++j) {
×
395
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
×
396
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
×
397
      buf = taosDecodeFixedI64(buf, &offRows->rows);
×
398
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
×
399
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
×
400
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
×
401
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
×
402
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
×
403
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
×
404
      } else {
405
        // do nothing
406
      }
407
      if (sver > 2) {
×
408
        buf = taosDecodeFixedI64(buf, &offRows->ever);
×
409
      }
410
    }
411
  }
412
  return (void *)buf;
×
413
}
414

415
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
×
416
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
×
417
  int32_t sz = 0;
×
418
  buf = taosDecodeFixedI32(buf, &sz);
×
419
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(SMqVgEp));
×
420
  if (pConsumerEp->vgs == NULL) {
×
421
    return NULL;
×
422
  }
423
  for (int32_t i = 0; i < sz; i++) {
×
424
    SMqVgEp* vgEp = taosArrayReserve(pConsumerEp->vgs, 1);
×
425
    if (vgEp != NULL)
×
426
      buf = tDecodeSMqVgEp(buf, vgEp, sver);
×
427
  }
428
  if (sver > 1) {
×
429
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
×
430
  }
431

432
  return (void *)buf;
×
433
}
434

435
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
×
436
  int32_t          code = 0;
×
437
  int32_t          lino = 0;
×
438
  PRINT_LOG_START
×
439
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
×
440
  MND_TMQ_NULL_CHECK(pSubObj);
×
441
  
442
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
×
443
  taosInitRWLatch(&pSubObj->lock);
×
444
  pSubObj->vgNum = 0;
×
445
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
446
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
×
447
  taosHashSetFreeFp(pSubObj->consumerHash, freeSMqConsumerEp);
×
448

449
  pSubObj->unassignedVgs = taosArrayInit(0, sizeof(SMqVgEp));
×
450
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
×
451
  *ppSub = pSubObj;
×
452
  pSubObj = NULL;
×
453

454
END:
×
455
  tDeleteSubscribeObj(pSubObj);
×
456
  PRINT_LOG_END
×
457
  return code;
×
458
}
459

460
int32_t tCloneSubscribeObj(SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
×
461
  int32_t          code = 0;
×
462
  int32_t          lino = 0;
×
463
  PRINT_LOG_START
×
464
  taosRLockLatch(&pSub->lock);
×
465

466
  SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
×
467
  MND_TMQ_NULL_CHECK(pSubNew);
×
468
  *ppSub = pSubNew;
×
469

470
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
×
471
  taosInitRWLatch(&pSubNew->lock);
×
472

473
  pSubNew->dbUid = pSub->dbUid;
×
474
  pSubNew->stbUid = pSub->stbUid;
×
475
  pSubNew->subType = pSub->subType;
×
476
  pSubNew->withMeta = pSub->withMeta;
×
477

478
  pSubNew->vgNum = pSub->vgNum;
×
479
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
480
  MND_TMQ_NULL_CHECK(pSubNew->consumerHash);
×
481
  taosHashSetFreeFp(pSubNew->consumerHash, freeSMqConsumerEp);
×
482

483
  void          *pIter = NULL;
×
484
  SMqConsumerEp *pConsumerEp = NULL;
×
485
  while (1) {
×
486
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
487
    if (pIter == NULL) break;
×
488
    pConsumerEp = (SMqConsumerEp *)pIter;
×
489
    SMqConsumerEp newEp = {
×
490
        .consumerId = pConsumerEp->consumerId,
×
491
        .vgs = taosArrayDup(pConsumerEp->vgs, NULL),
×
492
    };
493
    MND_TMQ_RETURN_CHECK(taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)));
×
494
  }
495
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL);
×
496
  MND_TMQ_NULL_CHECK(pSubNew->unassignedVgs);
×
497
  
498
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
×
499
  MND_TMQ_CONDITION_CHECK(pSub->offsetRows == NULL || pSubNew->offsetRows != NULL, terrno);
×
500
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
×
501

502
END:
×
503
  taosRUnLockLatch(&pSub->lock);
×
504
  return code;
×
505
}
506

507
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
×
508
  if (pSub == NULL) return;
×
509
  taosHashCleanup(pSub->consumerHash);
×
510
  pSub->consumerHash = NULL;
×
511
  taosArrayDestroy(pSub->unassignedVgs);
×
512
  pSub->unassignedVgs = NULL;
×
513
  taosArrayDestroy(pSub->offsetRows);
×
514
  pSub->offsetRows = NULL;
×
515
}
516

517
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
×
518
  int32_t tlen = 0;
×
519
  tlen += taosEncodeString(buf, pSub->key);
×
520
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
×
521
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
×
522
  tlen += taosEncodeFixedI8(buf, pSub->subType);
×
523
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
×
524
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
×
525

526
  void   *pIter = NULL;
×
527
  int32_t sz = taosHashGetSize(pSub->consumerHash);
×
528
  tlen += taosEncodeFixedI32(buf, sz);
×
529

530
  int32_t cnt = 0;
×
531
  while (1) {
×
532
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
533
    if (pIter == NULL) break;
×
534
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
535
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
×
536
    cnt++;
×
537
  }
538
  if (cnt != sz) return -1;
×
539
  int32_t len = taosArrayGetSize(pSub->unassignedVgs);
×
540
  tlen += taosEncodeFixedI32(buf, len);
×
541
  for (int32_t i = 0; i < len; i++) {
×
542
    void* data = taosArrayGet(pSub->unassignedVgs, i);
×
543
    tlen += tEncodeSMqVgEp(buf, data);
×
544
  }
545
  tlen += taosEncodeString(buf, pSub->dbName);
×
546

547
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
×
548
  return tlen;
×
549
}
550

551
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
×
552
  //
553
  buf = taosDecodeStringTo(buf, pSub->key);
×
554
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
×
555
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
×
556
  buf = taosDecodeFixedI8(buf, &pSub->subType);
×
557
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
×
558
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
×
559

560
  int32_t sz;
561
  buf = taosDecodeFixedI32(buf, &sz);
×
562

563
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
564
  if (pSub->consumerHash == NULL) {
×
565
    return NULL;
×
566
  }
567
  taosHashSetFreeFp(pSub->consumerHash, freeSMqConsumerEp);
×
568

569
  for (int32_t i = 0; i < sz; i++) {
×
570
    SMqConsumerEp consumerEp = {0};
×
571
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
×
572
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
×
573
        0)
574
      return NULL;
×
575
  }
576

577
  int32_t len = 0;
×
578
  buf = taosDecodeFixedI32(buf, &len);
×
579
  pSub->unassignedVgs = taosArrayInit(len, sizeof(SMqVgEp));
×
580
  if (pSub->unassignedVgs == NULL) {
×
581
    return NULL;
×
582
  }
583
  for (int32_t i = 0; i < len; i++) {
×
584
    SMqVgEp* vgEp = taosArrayReserve(pSub->unassignedVgs, 1);
×
585
    if (vgEp != NULL)
×
586
      buf = tDecodeSMqVgEp(buf, vgEp, sver);
×
587
  }
588

589
  buf = taosDecodeStringTo(buf, pSub->dbName);
×
590

591
  if (sver > 1) {
×
592
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
×
593
  }
594
  return (void *)buf;
×
595
}
596

597
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
×
598
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
×
599
  pObj->dtype = pItem->dtype;
×
600
  switch (pItem->dtype) {
×
601
    case CFG_DTYPE_NONE:
×
602
      break;
×
603
    case CFG_DTYPE_BOOL:
×
604
      pObj->bval = pItem->bval;
×
605
      break;
×
606
    case CFG_DTYPE_INT32:
×
607
      pObj->i32 = pItem->i32;
×
608
      break;
×
609
    case CFG_DTYPE_INT64:
×
610
      pObj->i64 = pItem->i64;
×
611
      break;
×
612
    case CFG_DTYPE_FLOAT:
×
613
    case CFG_DTYPE_DOUBLE:
614
      pObj->fval = pItem->fval;
×
615
      break;
×
616
    case CFG_DTYPE_STRING:
×
617
    case CFG_DTYPE_DIR:
618
    case CFG_DTYPE_LOCALE:
619
    case CFG_DTYPE_CHARSET:
620
    case CFG_DTYPE_TIMEZONE:
621
      pObj->str = taosStrdup(pItem->str);
×
622
      if (pObj->str == NULL) {
×
623
        taosMemoryFree(pObj);
×
624
        return TSDB_CODE_OUT_OF_MEMORY;
×
625
      }
626
      break;
×
627
  }
628
  return TSDB_CODE_SUCCESS;
×
629
}
630

631
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
×
632
  int32_t code = 0;
×
633
  switch (pObjNew->dtype) {
×
634
    case CFG_DTYPE_BOOL: {
×
635
      bool tmp = false;
×
636
      if (strcasecmp(value, "true") == 0) {
×
637
        tmp = true;
×
638
      }
639
      if (taosStr2Int32(value, NULL, 10) > 0) {
×
640
        tmp = true;
×
641
      }
642
      pObjNew->bval = tmp;
×
643
      break;
×
644
    }
645
    case CFG_DTYPE_INT32: {
×
646
      int32_t ival;
647
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
×
648
      pObjNew->i32 = ival;
×
649
      break;
×
650
    }
651
    case CFG_DTYPE_INT64: {
×
652
      int64_t ival;
653
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
×
654
      pObjNew->i64 = ival;
×
655
      break;
×
656
    }
657
    case CFG_DTYPE_FLOAT:
×
658
    case CFG_DTYPE_DOUBLE: {
659
      float dval = 0;
×
660
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
661
      pObjNew->fval = dval;
×
662
      break;
×
663
    }
664
    case CFG_DTYPE_DIR:
×
665
    case CFG_DTYPE_TIMEZONE:
666
    case CFG_DTYPE_CHARSET:
667
    case CFG_DTYPE_LOCALE:
668
    case CFG_DTYPE_STRING: {
669
      pObjNew->str = taosStrdup(value);
×
670
      if (pObjNew->str == NULL) {
×
671
        code = terrno;
×
672
        return code;
×
673
      }
674
      break;
×
675
    }
676
    case CFG_DTYPE_NONE:
×
677
      break;
×
678
    default:
×
679
      code = TSDB_CODE_INVALID_CFG;
×
680
      break;
×
681
  }
682
  return code;
×
683
}
684

685
SConfigObj mndInitConfigVersion() {
16✔
686
  SConfigObj obj;
687
  memset(&obj, 0, sizeof(SConfigObj));
16✔
688

689
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
16✔
690
  obj.dtype = CFG_DTYPE_INT32;
16✔
691
  obj.i32 = 0;
16✔
692
  return obj;
16✔
693
}
694

695
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
110✔
696
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
110✔
697
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
220✔
698

699
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
220✔
700
  switch (pObj->dtype) {
110✔
701
    case CFG_DTYPE_BOOL:
×
702
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
×
703
      break;
×
704
    case CFG_DTYPE_INT32:
110✔
705
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
220✔
706
      break;
110✔
707
    case CFG_DTYPE_INT64:
×
708
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
×
709
      break;
×
710
    case CFG_DTYPE_FLOAT:
×
711
    case CFG_DTYPE_DOUBLE:
712
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
×
713
      break;
×
714
    case CFG_DTYPE_STRING:
×
715
    case CFG_DTYPE_DIR:
716
    case CFG_DTYPE_LOCALE:
717
    case CFG_DTYPE_CHARSET:
718
    case CFG_DTYPE_TIMEZONE:
719
      if (pObj->str != NULL) {
×
720
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
×
721
      } else {
722
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
723
      }
724
      break;
×
725
    default:
×
726
      break;
×
727
  }
728
  tEndEncode(pEncoder);
110✔
729
  return pEncoder->pos;
110✔
730
}
731

732
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
16✔
733
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
16✔
734
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
16✔
735
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
32✔
736
  switch (pObj->dtype) {
16✔
737
    case CFG_DTYPE_NONE:
×
738
      break;
×
739
    case CFG_DTYPE_BOOL:
×
740
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
×
741
      break;
×
742
    case CFG_DTYPE_INT32:
16✔
743
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
32✔
744
      break;
16✔
745
    case CFG_DTYPE_INT64:
×
746
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
×
747
      break;
×
748
    case CFG_DTYPE_FLOAT:
×
749
    case CFG_DTYPE_DOUBLE:
750
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
×
751
      break;
×
752
    case CFG_DTYPE_STRING:
×
753
    case CFG_DTYPE_DIR:
754
    case CFG_DTYPE_LOCALE:
755
    case CFG_DTYPE_CHARSET:
756
    case CFG_DTYPE_TIMEZONE:
757
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
×
758
      break;
×
759
  }
760
  tEndDecode(pDecoder);
16✔
761
  TAOS_RETURN(TSDB_CODE_SUCCESS);
16✔
762
}
763

764
void tFreeSConfigObj(SConfigObj *obj) {
32✔
765
  if (obj == NULL) {
32✔
766
    return;
×
767
  }
768
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
32✔
769
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
32✔
770
    taosMemoryFree(obj->str);
×
771
  }
772
}
773

774
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
775
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
776
//   if (pEntryNew == NULL) return NULL;
777
//   pEntryNew->epoch = pEntry->epoch;
778
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
779
//   return pEntryNew;
780
// }
781
//
782
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
783
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
784
// }
785

786
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
787
//   int32_t tlen = 0;
788
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
789
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
790
//   return tlen;
791
// }
792
//
793
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
794
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
795
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
796
//   return (void *)buf;
797
// }
798

799
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
800
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
801
//   if (pLogNew == NULL) return pLogNew;
802
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
803
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
804
//   return pLogNew;
805
// }
806
//
807
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
808
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
809
// }
810

811
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
812
//   int32_t tlen = 0;
813
//   tlen += taosEncodeString(buf, pLog->key);
814
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
815
//   return tlen;
816
// }
817
//
818
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
819
//   buf = taosDecodeStringTo(buf, pLog->key);
820
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
821
//   return (void *)buf;
822
// }
823
//
824
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
825
//   int32_t tlen = 0;
826
//   tlen += taosEncodeString(buf, pOffset->key);
827
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
828
//   return tlen;
829
// }
830
//
831
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
832
//   buf = taosDecodeStringTo(buf, pOffset->key);
833
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
834
//   return buf;
835
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc