• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.09
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "taoserror.h"
19
#include "tunit.h"
20

21
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
104,324✔
22
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
104,324!
23

24
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
208,648!
25
  TAOS_CHECK_RETURN(tSerializeSCMCreateStreamReqImpl(pEncoder, pObj->pCreate));
104,324!
26

27
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->mainSnodeId));
208,648!
28
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->userStopped));
208,648!
29
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
208,648!
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
208,648!
31

32
  tEndEncode(pEncoder);
104,324✔
33
  return pEncoder->pos;
104,324✔
34
}
35

36
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
27,263✔
37
  int32_t code = 0;
27,263✔
38
  int32_t lino = 0;
27,263✔
39
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
27,263!
40

41
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
27,263!
42
  pObj->pCreate = taosMemoryCalloc(1, sizeof(*pObj->pCreate));
27,263!
43
  if (NULL == pObj) {
27,263!
44
    TAOS_CHECK_EXIT(terrno);
×
45
  }
46
  
47
  TAOS_CHECK_RETURN(tDeserializeSCMCreateStreamReqImpl(pDecoder, pObj->pCreate));
27,263!
48

49
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->mainSnodeId));
54,526!
50
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->userStopped));
54,526!
51
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
54,526!
52
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
54,526!
53

54
_exit:
27,263✔
55

56
  tEndDecode(pDecoder);
27,263✔
57
  tDecoderClear(pDecoder);  
27,263✔
58
  
59
  TAOS_RETURN(code);
27,263✔
60
}
61

62
void tFreeStreamObj(SStreamObj *pStream) {
65,229✔
63
  tFreeSCMCreateStreamReq(pStream->pCreate);
65,229✔
64
  taosMemoryFreeClear(pStream->pCreate);
65,229!
65
}
65,229✔
66

67
void freeSMqConsumerEp(void* data) {
42,871✔
68
  if (data == NULL) return;
42,871!
69
  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
42,871✔
70
  taosArrayDestroy(pConsumerEp->vgs);
42,871✔
71
  pConsumerEp->vgs = NULL;
42,871✔
72
  taosArrayDestroy(pConsumerEp->offsetRows);
42,871✔
73
  pConsumerEp->offsetRows = NULL;
42,871✔
74
}
75

76
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
215,824✔
77
  int32_t tlen = 0;
215,824✔
78
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
215,824✔
79
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
215,824✔
80
  return tlen;
215,824✔
81
}
82

83
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
102,923✔
84
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
102,923!
85
  if (sver == 1) {
102,923!
86
    uint64_t size = 0;
×
87
    buf = taosDecodeVariantU64(buf, &size);
×
88
    buf = POINTER_SHIFT(buf, size);
×
89
  }
90
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
102,923✔
91
  return (void *)buf;
102,923✔
92
}
93

94
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
12,890!
95

96
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
63,627✔
97
                           SMqConsumerObj **ppConsumer) {
98
  int32_t         code = 0;
63,627✔
99
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
63,627!
100
  if (pConsumer == NULL) {
63,627!
101
    code = terrno;
×
102
    goto END;
×
103
  }
104

105
  pConsumer->consumerId = consumerId;
63,627✔
106
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
63,627!
107

108
  pConsumer->epoch = 0;
63,627✔
109
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
63,627✔
110
  pConsumer->hbStatus = 0;
63,627✔
111
  pConsumer->pollStatus = 0;
63,627✔
112

113
  taosInitRWLatch(&pConsumer->lock);
63,627✔
114
  pConsumer->createTime = taosGetTimestampMs();
63,627✔
115
  pConsumer->updateType = updateType;
63,627✔
116

117
  if (updateType == CONSUMER_ADD_REB) {
63,627✔
118
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
12,890✔
119
    if (pConsumer->rebNewTopics == NULL) {
12,890!
120
      code = terrno;
×
121
      goto END;
×
122
    }
123

124
    char *topicTmp = taosStrdup(topic);
12,890!
125
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
25,780!
126
      code = terrno;
×
127
      goto END;
×
128
    }
129
  } else if (updateType == CONSUMER_REMOVE_REB) {
50,737✔
130
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
12,877✔
131
    if (pConsumer->rebRemovedTopics == NULL) {
12,877!
132
      code = terrno;
×
133
      goto END;
×
134
    }
135
    char *topicTmp = taosStrdup(topic);
12,877!
136
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
25,754!
137
      code = terrno;
×
138
      goto END;
×
139
    }
140
  } else if (updateType == CONSUMER_INSERT_SUB) {
37,860✔
141
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
12,504!
142
    pConsumer->withTbName = subscribe->withTbName;
12,504✔
143
    pConsumer->autoCommit = subscribe->autoCommit;
12,504✔
144
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
12,504✔
145
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
12,504✔
146
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
12,504✔
147
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
12,504✔
148
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
12,504!
149
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
12,504!
150

151
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
12,504✔
152
    if (pConsumer->rebNewTopics == NULL) {
12,504!
153
      code = terrno;
×
154
      goto END;
×
155
    }
156
    pConsumer->assignedTopics = subscribe->topicNames;
12,504✔
157
    subscribe->topicNames = NULL;
12,504✔
158
  } else if (updateType == CONSUMER_UPDATE_SUB) {
25,356✔
159
    pConsumer->assignedTopics = subscribe->topicNames;
11,490✔
160
    subscribe->topicNames = NULL;
11,490✔
161
  }
162

163
  *ppConsumer = pConsumer;
63,627✔
164
  return 0;
63,627✔
165

166
END:
×
167
  tDeleteSMqConsumerObj(pConsumer);
×
168
  return code;
×
169
}
170

171
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
222,876✔
172
  if (pConsumer == NULL) return;
222,876✔
173
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
127,778✔
174
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
127,778✔
175
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
127,778✔
176
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
127,778✔
177
}
178

179
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
158,725✔
180
  tClearSMqConsumerObj(pConsumer);
158,725✔
181
  taosMemoryFree(pConsumer);
158,725!
182
}
158,725✔
183

184
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
129,856✔
185
  int32_t tlen = 0;
129,856✔
186
  int32_t sz;
187
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
129,856✔
188
  tlen += taosEncodeString(buf, pConsumer->clientId);
129,856!
189
  tlen += taosEncodeString(buf, pConsumer->cgroup);
129,856!
190
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
129,856✔
191
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
129,856✔
192
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
129,856✔
193

194
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
129,856✔
195
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
129,856✔
196
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
129,856✔
197
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
129,856✔
198
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
129,856✔
199

200
  // current topics
201
  if (pConsumer->currentTopics) {
129,856✔
202
    sz = taosArrayGetSize(pConsumer->currentTopics);
2,602✔
203
    tlen += taosEncodeFixedI32(buf, sz);
2,602✔
204
    for (int32_t i = 0; i < sz; i++) {
4,068✔
205
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,466✔
206
      tlen += taosEncodeString(buf, topic);
1,466✔
207
    }
208
  } else {
209
    tlen += taosEncodeFixedI32(buf, 0);
127,254✔
210
  }
211

212
  // reb new topics
213
  if (pConsumer->rebNewTopics) {
129,856✔
214
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
76,370✔
215
    tlen += taosEncodeFixedI32(buf, sz);
76,370✔
216
    for (int32_t i = 0; i < sz; i++) {
127,930✔
217
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
51,560✔
218
      tlen += taosEncodeString(buf, topic);
51,560✔
219
    }
220
  } else {
221
    tlen += taosEncodeFixedI32(buf, 0);
53,486✔
222
  }
223

224
  // reb removed topics
225
  if (pConsumer->rebRemovedTopics) {
129,856✔
226
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
51,336✔
227
    tlen += taosEncodeFixedI32(buf, sz);
51,336✔
228
    for (int32_t i = 0; i < sz; i++) {
100,842✔
229
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
49,506✔
230
      tlen += taosEncodeString(buf, topic);
49,506✔
231
    }
232
  } else {
233
    tlen += taosEncodeFixedI32(buf, 0);
78,520✔
234
  }
235

236
  // lost topics
237
  if (pConsumer->assignedTopics) {
129,856✔
238
    sz = taosArrayGetSize(pConsumer->assignedTopics);
50,590✔
239
    tlen += taosEncodeFixedI32(buf, sz);
50,590✔
240
    for (int32_t i = 0; i < sz; i++) {
77,836✔
241
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
27,246✔
242
      tlen += taosEncodeString(buf, topic);
27,246✔
243
    }
244
  } else {
245
    tlen += taosEncodeFixedI32(buf, 0);
79,266✔
246
  }
247

248
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
129,856✔
249
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
129,856✔
250
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
129,856✔
251
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
129,856✔
252
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
129,856✔
253
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
129,856✔
254
  tlen += taosEncodeString(buf, pConsumer->user);
129,856!
255
  tlen += taosEncodeString(buf, pConsumer->fqdn);
129,856!
256
  return tlen;
129,856✔
257
}
258

259
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
64,151✔
260
  int32_t sz;
64,151✔
261
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
64,151!
262
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
64,151✔
263
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
64,151✔
264
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
64,151✔
265
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
64,151!
266
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
64,151!
267

268
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
64,151!
269
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
64,151✔
270
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
64,151!
271
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
64,151!
272
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
128,302!
273

274
  // current topics
275
  buf = taosDecodeFixedI32(buf, &sz);
64,151✔
276
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
64,151✔
277
  if (pConsumer->currentTopics == NULL) {
64,151!
278
    return NULL;
×
279
  }
280
  for (int32_t i = 0; i < sz; i++) {
64,871✔
281
    char *topic;
720✔
282
    buf = taosDecodeString(buf, &topic);
720✔
283
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
1,440!
284
      return NULL;
×
285
    }
286
  }
287

288
  // reb new topics
289
  buf = taosDecodeFixedI32(buf, &sz);
64,151✔
290
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
64,151✔
291
  for (int32_t i = 0; i < sz; i++) {
89,931✔
292
    char *topic;
25,780✔
293
    buf = taosDecodeString(buf, &topic);
25,780✔
294
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
51,560!
295
      return NULL;
×
296
    }
297
  }
298

299
  // reb removed topics
300
  buf = taosDecodeFixedI32(buf, &sz);
64,151✔
301
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
64,151✔
302
  for (int32_t i = 0; i < sz; i++) {
88,904✔
303
    char *topic;
24,753✔
304
    buf = taosDecodeString(buf, &topic);
24,753✔
305
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
49,506!
306
      return NULL;
×
307
    }
308
  }
309

310
  // reb removed topics
311
  buf = taosDecodeFixedI32(buf, &sz);
64,151✔
312
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
64,151✔
313
  for (int32_t i = 0; i < sz; i++) {
77,761✔
314
    char *topic;
13,610✔
315
    buf = taosDecodeString(buf, &topic);
13,610✔
316
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
27,220!
317
      return NULL;
×
318
    }
319
  }
320

321
  if (sver > 1) {
64,151!
322
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
64,151✔
323
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
64,151✔
324
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
64,151!
325
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
128,302!
326
  }
327
  if (sver > 2) {
64,151!
328
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
64,151!
329
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
64,151!
330
    buf = taosDecodeStringTo(buf, pConsumer->user);
64,151✔
331
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
128,302✔
332
  } else {
333
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
334
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
335
  }
336

337
  return (void *)buf;
64,151✔
338
}
339

340
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
102,584✔
341
  int32_t tlen = 0;
102,584✔
342
  int32_t szVgs = taosArrayGetSize(offsetRows);
102,584✔
343
  tlen += taosEncodeFixedI32(buf, szVgs);
102,584✔
344
  for (int32_t j = 0; j < szVgs; ++j) {
250,926✔
345
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
148,342✔
346
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
148,342✔
347
    tlen += taosEncodeFixedI64(buf, offRows->rows);
148,342✔
348
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
148,342✔
349
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
148,342!
350
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
17,942✔
351
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
35,884✔
352
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
130,400✔
353
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
236,880✔
354
    } else {
355
      // do nothing
356
    }
357
    tlen += taosEncodeFixedI64(buf, offRows->ever);
296,684✔
358
  }
359

360
  return tlen;
102,584✔
361
}
362

363
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
30,740✔
364
  int32_t tlen = 0;
30,740✔
365
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
30,740✔
366
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
30,740✔
367
  tlen += taosEncodeFixedI32(buf, sz);
30,740✔
368
  for (int32_t i = 0; i < sz; i++) {
114,838✔
369
    void* data = taosArrayGet(pConsumerEp->vgs, i);
84,098✔
370
    tlen += tEncodeSMqVgEp(buf, data);
84,098✔
371
  }
372

373
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
30,740✔
374
}
375

376
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
49,553✔
377
  int32_t szVgs = 0;
49,553!
378
  buf = taosDecodeFixedI32(buf, &szVgs);
49,553✔
379
  if (szVgs > 0) {
49,553✔
380
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
23,526✔
381
    if (NULL == *offsetRows) return NULL;
23,526!
382
    for (int32_t j = 0; j < szVgs; ++j) {
92,708✔
383
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
69,182✔
384
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
69,182!
385
      buf = taosDecodeFixedI64(buf, &offRows->rows);
69,182!
386
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
69,182✔
387
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
69,182!
388
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
8,716!
389
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
17,432!
390
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
60,466✔
391
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
112,542!
392
      } else {
393
        // do nothing
394
      }
395
      if (sver > 2) {
69,182!
396
        buf = taosDecodeFixedI64(buf, &offRows->ever);
138,364!
397
      }
398
    }
399
  }
400
  return (void *)buf;
49,553✔
401
}
402

403
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
15,357✔
404
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
15,357!
405
  int32_t sz = 0;
15,357!
406
  buf = taosDecodeFixedI32(buf, &sz);
15,357✔
407
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(SMqVgEp));
15,357✔
408
  if (pConsumerEp->vgs == NULL) {
15,357!
409
    return NULL;
×
410
  }
411
  for (int32_t i = 0; i < sz; i++) {
57,380✔
412
    SMqVgEp* vgEp = taosArrayReserve(pConsumerEp->vgs, 1);
42,023✔
413
    if (vgEp != NULL)
42,023!
414
      buf = tDecodeSMqVgEp(buf, vgEp, sver);
42,023✔
415
  }
416
  if (sver > 1) {
15,357!
417
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
15,357✔
418
  }
419

420
  return (void *)buf;
15,357✔
421
}
422

423
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
10,308✔
424
  int32_t          code = 0;
10,308✔
425
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
10,308!
426
  MND_TMQ_NULL_CHECK(pSubObj);
10,308!
427
  *ppSub = pSubObj;
10,308✔
428
  
429
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
10,308!
430
  taosInitRWLatch(&pSubObj->lock);
10,308✔
431
  pSubObj->vgNum = 0;
10,308✔
432
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
10,308✔
433
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
10,308!
434
  taosHashSetFreeFp(pSubObj->consumerHash, freeSMqConsumerEp);
10,308✔
435

436
  pSubObj->unassignedVgs = taosArrayInit(0, sizeof(SMqVgEp));
10,308✔
437
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
10,308!
438

439
END:
10,308✔
440
  return code;
10,308✔
441
}
442

443
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
14,586✔
444
  int32_t          code = 0;
14,586✔
445
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
14,586!
446
  if (pSubNew == NULL) {
14,586!
447
    code = terrno;
×
448
    goto END;
×
449
  }
450
  *ppSub = pSubNew;
14,586✔
451

452
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
14,586!
453
  taosInitRWLatch(&pSubNew->lock);
14,586✔
454

455
  pSubNew->dbUid = pSub->dbUid;
14,586✔
456
  pSubNew->stbUid = pSub->stbUid;
14,586✔
457
  pSubNew->subType = pSub->subType;
14,586✔
458
  pSubNew->withMeta = pSub->withMeta;
14,586✔
459

460
  pSubNew->vgNum = pSub->vgNum;
14,586✔
461
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
14,586✔
462
  if (pSubNew->consumerHash == NULL) {
14,586!
463
    code = terrno;
×
464
    goto END;
×
465
  }
466
  taosHashSetFreeFp(pSubNew->consumerHash, freeSMqConsumerEp);
14,586✔
467

468
  void          *pIter = NULL;
14,586✔
469
  SMqConsumerEp *pConsumerEp = NULL;
14,586✔
470
  while (1) {
14,624✔
471
    pIter = taosHashIterate(pSub->consumerHash, pIter);
29,210✔
472
    if (pIter == NULL) break;
29,210✔
473
    pConsumerEp = (SMqConsumerEp *)pIter;
14,624✔
474
    SMqConsumerEp newEp = {
14,624✔
475
        .consumerId = pConsumerEp->consumerId,
14,624✔
476
        .vgs = taosArrayDup(pConsumerEp->vgs, NULL),
14,624✔
477
    };
478
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
14,624!
479
                            sizeof(SMqConsumerEp))) != 0)
480
      goto END;
×
481
  }
482
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL);
14,586✔
483
  if (pSubNew->unassignedVgs == NULL) {
14,586!
484
    code = terrno;
×
485
    goto END;
×
486
  }
487
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
14,586✔
488
  if (pSub->offsetRows != NULL && pSubNew->offsetRows == NULL) {
14,586!
489
    code = terrno;
×
490
    goto END;
×
491
  }
492
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
14,586!
493
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
14,586!
494
  if (pSubNew->qmsg == NULL) {
14,586!
495
    code = terrno;
×
496
    goto END;
×
497
  }
498

499
END:
14,586✔
500
  return code;
14,586✔
501
}
502

503
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
59,090✔
504
  if (pSub == NULL) return;
59,090!
505
  taosHashCleanup(pSub->consumerHash);
59,090✔
506
  pSub->consumerHash = NULL;
59,090✔
507
  taosArrayDestroy(pSub->unassignedVgs);
59,090✔
508
  pSub->unassignedVgs = NULL;
59,090✔
509
  taosMemoryFreeClear(pSub->qmsg);
59,090!
510
  taosArrayDestroy(pSub->offsetRows);
59,090✔
511
  pSub->offsetRows = NULL;
59,090✔
512
}
513

514
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
71,844✔
515
  int32_t tlen = 0;
71,844✔
516
  tlen += taosEncodeString(buf, pSub->key);
71,844!
517
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
71,844✔
518
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
71,844✔
519
  tlen += taosEncodeFixedI8(buf, pSub->subType);
71,844✔
520
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
71,844✔
521
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
71,844✔
522

523
  void   *pIter = NULL;
71,844✔
524
  int32_t sz = taosHashGetSize(pSub->consumerHash);
71,844✔
525
  tlen += taosEncodeFixedI32(buf, sz);
71,844✔
526

527
  int32_t cnt = 0;
71,844✔
528
  while (1) {
30,740✔
529
    pIter = taosHashIterate(pSub->consumerHash, pIter);
102,584✔
530
    if (pIter == NULL) break;
102,584✔
531
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
30,740✔
532
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
30,740✔
533
    cnt++;
30,740✔
534
  }
535
  if (cnt != sz) return -1;
71,844!
536
  int32_t len = taosArrayGetSize(pSub->unassignedVgs);
71,844✔
537
  tlen += taosEncodeFixedI32(buf, len);
71,844✔
538
  for (int32_t i = 0; i < len; i++) {
203,570✔
539
    void* data = taosArrayGet(pSub->unassignedVgs, i);
131,726✔
540
    tlen += tEncodeSMqVgEp(buf, data);
131,726✔
541
  }
542
  tlen += taosEncodeString(buf, pSub->dbName);
71,844!
543

544
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
71,844✔
545
  tlen += taosEncodeString(buf, pSub->qmsg);
71,844!
546
  return tlen;
71,844✔
547
}
548

549
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
34,196✔
550
  //
551
  buf = taosDecodeStringTo(buf, pSub->key);
34,196✔
552
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
34,196!
553
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
34,196!
554
  buf = taosDecodeFixedI8(buf, &pSub->subType);
34,196✔
555
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
34,196✔
556
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
34,196!
557

558
  int32_t sz;
34,196✔
559
  buf = taosDecodeFixedI32(buf, &sz);
34,196✔
560

561
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
34,196✔
562
  if (pSub->consumerHash == NULL) {
34,196!
563
    return NULL;
×
564
  }
565
  taosHashSetFreeFp(pSub->consumerHash, freeSMqConsumerEp);
34,196✔
566

567
  for (int32_t i = 0; i < sz; i++) {
49,553✔
568
    SMqConsumerEp consumerEp = {0};
15,357✔
569
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
15,357✔
570
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
15,357!
571
        0)
572
      return NULL;
×
573
  }
574

575
  int32_t len = 0;
34,196!
576
  buf = taosDecodeFixedI32(buf, &len);
34,196✔
577
  pSub->unassignedVgs = taosArrayInit(len, sizeof(SMqVgEp));
34,196✔
578
  if (pSub->unassignedVgs == NULL) {
34,196!
579
    return NULL;
×
580
  }
581
  for (int32_t i = 0; i < len; i++) {
95,096✔
582
    SMqVgEp* vgEp = taosArrayReserve(pSub->unassignedVgs, 1);
60,900✔
583
    if (vgEp != NULL)
60,900!
584
      buf = tDecodeSMqVgEp(buf, vgEp, sver);
60,900✔
585
  }
586

587
  buf = taosDecodeStringTo(buf, pSub->dbName);
34,196✔
588

589
  if (sver > 1) {
34,196!
590
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
34,196✔
591
    buf = taosDecodeString(buf, &pSub->qmsg);
68,392✔
592
  } else {
593
    pSub->qmsg = taosStrdup("");
×
594
  }
595
  return (void *)buf;
34,196✔
596
}
597

598
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
8,059,530✔
599
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
8,059,530!
600
  pObj->dtype = pItem->dtype;
8,059,530✔
601
  switch (pItem->dtype) {
8,059,530!
602
    case CFG_DTYPE_NONE:
×
603
      break;
×
604
    case CFG_DTYPE_BOOL:
1,501,285✔
605
      pObj->bval = pItem->bval;
1,501,285!
606
      break;
1,501,285✔
607
    case CFG_DTYPE_INT32:
4,503,855✔
608
      pObj->i32 = pItem->i32;
4,503,855✔
609
      break;
4,503,855✔
610
    case CFG_DTYPE_INT64:
474,090✔
611
      pObj->i64 = pItem->i64;
474,090✔
612
      break;
474,090✔
613
    case CFG_DTYPE_FLOAT:
158,030✔
614
    case CFG_DTYPE_DOUBLE:
615
      pObj->fval = pItem->fval;
158,030✔
616
      break;
158,030✔
617
    case CFG_DTYPE_STRING:
1,422,270✔
618
    case CFG_DTYPE_DIR:
619
    case CFG_DTYPE_LOCALE:
620
    case CFG_DTYPE_CHARSET:
621
    case CFG_DTYPE_TIMEZONE:
622
      pObj->str = taosStrdup(pItem->str);
1,422,270!
623
      if (pObj->str == NULL) {
1,422,270!
624
        taosMemoryFree(pObj);
×
625
        return TSDB_CODE_OUT_OF_MEMORY;
×
626
      }
627
      break;
1,422,270✔
628
  }
629
  return TSDB_CODE_SUCCESS;
8,059,530✔
630
}
631

632
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
280✔
633
  int32_t code = 0;
280✔
634
  switch (pObjNew->dtype) {
280!
UNCOV
635
    case CFG_DTYPE_BOOL: {
×
UNCOV
636
      bool tmp = false;
×
UNCOV
637
      if (strcasecmp(value, "true") == 0) {
×
638
        tmp = true;
×
639
      }
UNCOV
640
      if (taosStr2Int32(value, NULL, 10) > 0) {
×
UNCOV
641
        tmp = true;
×
642
      }
UNCOV
643
      pObjNew->bval = tmp;
×
UNCOV
644
      break;
×
645
    }
UNCOV
646
    case CFG_DTYPE_INT32: {
×
UNCOV
647
      int32_t ival;
×
UNCOV
648
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
×
UNCOV
649
      pObjNew->i32 = ival;
×
UNCOV
650
      break;
×
651
    }
UNCOV
652
    case CFG_DTYPE_INT64: {
×
UNCOV
653
      int64_t ival;
×
UNCOV
654
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
×
UNCOV
655
      pObjNew->i64 = ival;
×
UNCOV
656
      break;
×
657
    }
658
    case CFG_DTYPE_FLOAT:
×
659
    case CFG_DTYPE_DOUBLE: {
660
      float dval = 0;
×
661
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
662
      pObjNew->fval = dval;
×
663
      break;
×
664
    }
665
    case CFG_DTYPE_DIR:
280✔
666
    case CFG_DTYPE_TIMEZONE:
667
    case CFG_DTYPE_CHARSET:
668
    case CFG_DTYPE_LOCALE:
669
    case CFG_DTYPE_STRING: {
670
      pObjNew->str = taosStrdup(value);
280!
671
      if (pObjNew->str == NULL) {
280!
672
        code = terrno;
×
673
        return code;
×
674
      }
675
      break;
280✔
676
    }
677
    case CFG_DTYPE_NONE:
×
678
      break;
×
679
    default:
×
680
      code = TSDB_CODE_INVALID_CFG;
×
681
      break;
×
682
  }
683
  return code;
280✔
684
}
685

686
SConfigObj mndInitConfigVersion() {
79,015✔
687
  SConfigObj obj;
78,957✔
688
  memset(&obj, 0, sizeof(SConfigObj));
79,015!
689

690
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
79,015!
691
  obj.dtype = CFG_DTYPE_INT32;
79,015✔
692
  obj.i32 = 0;
79,015✔
693
  return obj;
79,015✔
694
}
695

696
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
59,722,168✔
697
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
59,722,168!
698
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
119,444,336!
699

700
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
119,444,336!
701
  switch (pObj->dtype) {
59,722,168!
702
    case CFG_DTYPE_BOOL:
11,016,504✔
703
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
22,033,008!
704
      break;
11,016,504✔
705
    case CFG_DTYPE_INT32:
33,629,888✔
706
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
67,259,776!
707
      break;
33,629,888✔
708
    case CFG_DTYPE_INT64:
3,478,896✔
709
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
6,957,792!
710
      break;
3,478,896✔
711
    case CFG_DTYPE_FLOAT:
1,159,632✔
712
    case CFG_DTYPE_DOUBLE:
713
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
2,319,264!
714
      break;
1,159,632✔
715
    case CFG_DTYPE_STRING:
10,437,248✔
716
    case CFG_DTYPE_DIR:
717
    case CFG_DTYPE_LOCALE:
718
    case CFG_DTYPE_CHARSET:
719
    case CFG_DTYPE_TIMEZONE:
720
      if (pObj->str != NULL) {
10,437,248!
721
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
20,874,496!
722
      } else {
723
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
724
      }
725
      break;
10,437,248✔
726
    default:
×
727
      break;
×
728
  }
729
  tEndEncode(pEncoder);
59,722,168✔
730
  return pEncoder->pos;
59,722,168✔
731
}
732

733
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
12,556,421✔
734
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
12,556,421!
735
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
12,556,421!
736
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
25,112,842!
737
  switch (pObj->dtype) {
12,556,421!
738
    case CFG_DTYPE_NONE:
×
739
      break;
×
740
    case CFG_DTYPE_BOOL:
2,316,233✔
741
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
2,316,233!
742
      break;
2,316,233✔
743
    case CFG_DTYPE_INT32:
7,070,606✔
744
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
14,141,212!
745
      break;
7,070,606✔
746
    case CFG_DTYPE_INT64:
731,442✔
747
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
1,462,884!
748
      break;
731,442✔
749
    case CFG_DTYPE_FLOAT:
243,814✔
750
    case CFG_DTYPE_DOUBLE:
751
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
487,628!
752
      break;
243,814✔
753
    case CFG_DTYPE_STRING:
2,194,326✔
754
    case CFG_DTYPE_DIR:
755
    case CFG_DTYPE_LOCALE:
756
    case CFG_DTYPE_CHARSET:
757
    case CFG_DTYPE_TIMEZONE:
758
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
4,388,652!
759
      break;
2,194,326✔
760
  }
761
  tEndDecode(pDecoder);
12,556,421✔
762
  TAOS_RETURN(TSDB_CODE_SUCCESS);
12,556,421✔
763
}
764

765
void tFreeSConfigObj(SConfigObj *obj) {
20,689,552✔
766
  if (obj == NULL) {
20,689,552!
767
    return;
×
768
  }
769
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
20,689,552!
770
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
17,475,728✔
771
    taosMemoryFree(obj->str);
3,615,832!
772
  }
773
}
774

775
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
776
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
777
//   if (pEntryNew == NULL) return NULL;
778
//   pEntryNew->epoch = pEntry->epoch;
779
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
780
//   return pEntryNew;
781
// }
782
//
783
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
784
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
785
// }
786

787
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
788
//   int32_t tlen = 0;
789
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
790
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
791
//   return tlen;
792
// }
793
//
794
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
795
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
796
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
797
//   return (void *)buf;
798
// }
799

800
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
801
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
802
//   if (pLogNew == NULL) return pLogNew;
803
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
804
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
805
//   return pLogNew;
806
// }
807
//
808
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
809
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
810
// }
811

812
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
813
//   int32_t tlen = 0;
814
//   tlen += taosEncodeString(buf, pLog->key);
815
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
816
//   return tlen;
817
// }
818
//
819
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
820
//   buf = taosDecodeStringTo(buf, pLog->key);
821
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
822
//   return (void *)buf;
823
// }
824
//
825
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
826
//   int32_t tlen = 0;
827
//   tlen += taosEncodeString(buf, pOffset->key);
828
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
829
//   return tlen;
830
// }
831
//
832
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
833
//   buf = taosDecodeStringTo(buf, pOffset->key);
834
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
835
//   return buf;
836
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc