• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4416

03 Jul 2025 10:49AM UTC coverage: 61.007% (-1.2%) from 62.241%
#4416

push

travis-ci

GitHub
Merge pull request #31575 from taosdata/fix/huoh/taos_log

150735 of 316232 branches covered (47.67%)

Branch coverage included in aggregate %.

233783 of 314057 relevant lines covered (74.44%)

6782670.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.54
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "taoserror.h"
19
#include "tunit.h"
20

21
#ifdef USE_STREAM
22
static void *freeStreamTasks(SArray *pTaskLevel);
23

24
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
5,904✔
25
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
5,904!
26
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
11,808!
27

28
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
11,808!
29
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
11,808!
30
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->version));
11,808!
31
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->totalLevel));
11,808!
32
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->smaId));
11,808!
33

34
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->uid));
11,808!
35
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->status));
11,808!
36

37
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.igExpired));
11,808!
38
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.trigger));
11,808!
39
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.fillHistory));
11,808!
40
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.triggerParam));
11,808!
41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.watermark));
11,808!
42

43
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->sourceDbUid));
11,808!
44
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetDbUid));
11,808!
45
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sourceDb));
11,808!
46
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetDb));
11,808!
47
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetSTbName));
11,808!
48
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetStbUid));
11,808!
49
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->fixedSinkVgId));
11,808!
50

51
  if (pObj->sql != NULL) {
5,904✔
52
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sql));
11,804!
53
  } else {
54
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
2!
55
  }
56

57
  if (pObj->ast != NULL) {
5,904✔
58
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->ast));
11,804!
59
  } else {
60
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
2!
61
  }
62

63
  if (pObj->physicalPlan != NULL) {
5,904✔
64
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->physicalPlan));
11,804!
65
  } else {
66
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
2!
67
  }
68

69
  int32_t sz = taosArrayGetSize(pObj->pTaskList);
5,904✔
70
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, sz));
5,904!
71
  for (int32_t i = 0; i < sz; i++) {
17,112✔
72
    SArray *pArray = taosArrayGetP(pObj->pTaskList, i);
11,208✔
73
    int32_t innerSz = taosArrayGetSize(pArray);
11,208✔
74
    TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz));
11,208!
75
    for (int32_t j = 0; j < innerSz; j++) {
37,848✔
76
      SStreamTask *pTask = taosArrayGetP(pArray, j);
26,640✔
77
      if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
26,640✔
78
        pTask->ver = SSTREAM_TASK_VER;
1✔
79
      }
80
      TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask));
26,640!
81
    }
82
  }
83

84
  TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema));
11,808!
85

86
  // 3.0.20 ver =2
87
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointFreq));
11,808!
88
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->igCheckUpdate));
11,808!
89

90
  // 3.0.50 ver = 3
91
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointId));
11,808!
92
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->subTableWithoutMd5));
11,808!
93

94
  TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1));
11,808!
95

96
  tEndEncode(pEncoder);
5,904✔
97
  return pEncoder->pos;
5,904✔
98
}
99

100
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
2,523✔
101
  int32_t code = 0;
2,523✔
102
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
2,523!
103
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
2,523!
104

105
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
5,046!
106
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
5,046!
107
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->version));
5,046!
108
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->totalLevel));
5,046!
109
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->smaId));
5,046!
110

111
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->uid));
5,046!
112
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->status));
5,046!
113

114
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.igExpired));
5,046!
115
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.trigger));
5,046!
116
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.fillHistory));
5,046!
117
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.triggerParam));
5,046!
118
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.watermark));
5,046!
119

120
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->sourceDbUid));
5,046!
121
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetDbUid));
5,046!
122
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->sourceDb));
2,523!
123
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetDb));
2,523!
124
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetSTbName));
2,523!
125
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetStbUid));
5,046!
126
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->fixedSinkVgId));
5,046!
127

128
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->sql));
5,046!
129
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->ast));
5,046!
130
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan));
5,046!
131

132
  if (pObj->pTaskList != NULL) {
2,523!
133
    pObj->pTaskList = freeStreamTasks(pObj->pTaskList);
×
134
  }
135

136
  int32_t sz;
137
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &sz));
2,523!
138

139
  if (sz != 0) {
2,523✔
140
    pObj->pTaskList = taosArrayInit(sz, sizeof(void *));
2,516✔
141
    if (pObj->pTaskList == NULL) {
2,516!
142
      code = terrno;
×
143
      TAOS_RETURN(code);
×
144
    }
145

146
    for (int32_t i = 0; i < sz; i++) {
7,365✔
147
      int32_t innerSz;
148
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &innerSz));
4,849!
149
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
4,849✔
150
      if (pArray != NULL) {
4,849!
151
        for (int32_t j = 0; j < innerSz; j++) {
16,398✔
152
          SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
11,549!
153
          if (pTask == NULL) {
11,549!
154
            taosArrayDestroy(pArray);
×
155
            code = terrno;
×
156
            TAOS_RETURN(code);
×
157
          }
158
          if ((code = tDecodeStreamTask(pDecoder, pTask)) < 0) {
11,549!
159
            taosMemoryFree(pTask);
×
160
            taosArrayDestroy(pArray);
×
161
            TAOS_RETURN(code);
×
162
          }
163
          if (taosArrayPush(pArray, &pTask) == NULL) {
23,098!
164
            taosMemoryFree(pTask);
×
165
            taosArrayDestroy(pArray);
×
166
            code = terrno;
×
167
            TAOS_RETURN(code);
×
168
          }
169
        }
170
      }
171
      if (taosArrayPush(pObj->pTaskList, &pArray) == NULL) {
9,698!
172
        taosArrayDestroy(pArray);
×
173
        code = terrno;
×
174
        TAOS_RETURN(code);
×
175
      }
176
    }
177
  }
178

179
  TAOS_CHECK_RETURN(tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema));
5,046!
180

181
  // 3.0.20
182
  if (sver >= 2) {
2,523!
183
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointFreq));
5,046!
184
    if (!tDecodeIsEnd(pDecoder)) {
2,523!
185
      TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->igCheckUpdate));
5,046!
186
    }
187
  }
188
  if (sver >= 3) {
2,523!
189
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointId));
5,046!
190
  }
191

192
  if (sver >= 5) {
2,523!
193
    TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->subTableWithoutMd5));
5,046!
194
  }
195
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->reserve));
2,523!
196

197
  tEndDecode(pDecoder);
2,523✔
198
  TAOS_RETURN(code);
2,523✔
199
}
200

201
void *freeStreamTasks(SArray *pTaskLevel) {
7,322✔
202
  if (pTaskLevel == NULL) return NULL;
7,322✔
203
  int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
4,779✔
204

205
  for (int32_t i = 0; i < numOfLevel; i++) {
12,452✔
206
    SArray *pLevel = taosArrayGetP(pTaskLevel, i);
7,673✔
207
    int32_t taskSz = taosArrayGetSize(pLevel);
7,673✔
208
    for (int32_t j = 0; j < taskSz; j++) {
25,902✔
209
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
18,229✔
210
      tFreeStreamTask(pTask);
18,229✔
211
    }
212

213
    taosArrayDestroy(pLevel);
7,673✔
214
  }
215

216
  taosArrayDestroy(pTaskLevel);
4,779✔
217

218
  return NULL;
4,779✔
219
}
220

221
void tFreeStreamObj(SStreamObj *pStream) {
3,661✔
222
  taosMemoryFree(pStream->sql);
3,661!
223
  taosMemoryFree(pStream->ast);
3,661!
224
  taosMemoryFree(pStream->physicalPlan);
3,661!
225

226
  if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
3,661!
227
    taosMemoryFree(pStream->outputSchema.pSchema);
3,654!
228
  }
229

230
  pStream->pTaskList = freeStreamTasks(pStream->pTaskList);
3,661✔
231
  pStream->pHTaskList = freeStreamTasks(pStream->pHTaskList);
3,661✔
232

233
  // tagSchema.pSchema
234
  if (pStream->tagSchema.nCols > 0) {
3,661✔
235
    taosMemoryFree(pStream->tagSchema.pSchema);
134!
236
  }
237

238
  qDestroyQueryPlan(pStream->pPlan);
3,661✔
239
  pStream->pPlan = NULL;
3,661✔
240

241
  tSimpleHashCleanup(pStream->pVTableMap);
3,661✔
242
  pStream->pVTableMap = NULL;
3,661✔
243
}
3,661✔
244
#endif
245

246
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
2,141✔
247
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
2,141!
248
  if (pVgEpNew == NULL) {
2,141!
249
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
250
    return NULL;
×
251
  }
252
  pVgEpNew->vgId = pVgEp->vgId;
2,141✔
253
  //  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
254
  pVgEpNew->epSet = pVgEp->epSet;
2,141✔
255
  return pVgEpNew;
2,141✔
256
}
257

258
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
11,228✔
259
  if (pVgEp) {
11,228!
260
    //    taosMemoryFreeClear(pVgEp->qmsg);
261
    taosMemoryFree(pVgEp);
11,228!
262
  }
263
}
11,228✔
264

265
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
9,444✔
266
  int32_t tlen = 0;
9,444✔
267
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
9,444✔
268
  //  tlen += taosEncodeString(buf, pVgEp->qmsg);
269
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
9,444✔
270
  return tlen;
9,444✔
271
}
272

273
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
4,146✔
274
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
4,146!
275
  if (sver == 1) {
4,146!
276
    uint64_t size = 0;
×
277
    buf = taosDecodeVariantU64(buf, &size);
×
278
    buf = POINTER_SHIFT(buf, size);
×
279
  }
280
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
4,146✔
281
  return (void *)buf;
4,146✔
282
}
283

284
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
626!
285

286
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
2,690✔
287
                           SMqConsumerObj **ppConsumer) {
288
  int32_t         code = 0;
2,690✔
289
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
2,690!
290
  if (pConsumer == NULL) {
2,690!
291
    code = terrno;
×
292
    goto END;
×
293
  }
294

295
  pConsumer->consumerId = consumerId;
2,690✔
296
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
2,690✔
297

298
  pConsumer->epoch = 0;
2,690✔
299
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
2,690✔
300
  pConsumer->hbStatus = 0;
2,690✔
301
  pConsumer->pollStatus = 0;
2,690✔
302

303
  taosInitRWLatch(&pConsumer->lock);
2,690✔
304
  pConsumer->createTime = taosGetTimestampMs();
2,690✔
305
  pConsumer->updateType = updateType;
2,690✔
306

307
  if (updateType == CONSUMER_ADD_REB) {
2,690✔
308
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
632✔
309
    if (pConsumer->rebNewTopics == NULL) {
632!
310
      code = terrno;
×
311
      goto END;
×
312
    }
313

314
    char *topicTmp = taosStrdup(topic);
632!
315
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
1,264!
316
      code = terrno;
×
317
      goto END;
×
318
    }
319
  } else if (updateType == CONSUMER_REMOVE_REB) {
2,058✔
320
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
529✔
321
    if (pConsumer->rebRemovedTopics == NULL) {
529!
322
      code = terrno;
×
323
      goto END;
×
324
    }
325
    char *topicTmp = taosStrdup(topic);
529!
326
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
1,058!
327
      code = terrno;
×
328
      goto END;
×
329
    }
330
  } else if (updateType == CONSUMER_INSERT_SUB) {
1,529✔
331
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
564✔
332
    pConsumer->withTbName = subscribe->withTbName;
564✔
333
    pConsumer->autoCommit = subscribe->autoCommit;
564✔
334
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
564✔
335
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
564✔
336
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
564✔
337
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
564✔
338
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
564✔
339
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
564✔
340

341
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
564✔
342
    if (pConsumer->rebNewTopics == NULL) {
564!
343
      code = terrno;
×
344
      goto END;
×
345
    }
346
    pConsumer->assignedTopics = subscribe->topicNames;
564✔
347
    subscribe->topicNames = NULL;
564✔
348
  } else if (updateType == CONSUMER_UPDATE_SUB) {
965✔
349
    pConsumer->assignedTopics = subscribe->topicNames;
476✔
350
    subscribe->topicNames = NULL;
476✔
351
  }
352

353
  *ppConsumer = pConsumer;
2,690✔
354
  return 0;
2,690✔
355

356
END:
×
357
  tDeleteSMqConsumerObj(pConsumer);
×
358
  return code;
×
359
}
360

361
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
9,488✔
362
  if (pConsumer == NULL) return;
9,488✔
363
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
5,420✔
364
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
5,420✔
365
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
5,420✔
366
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
5,420✔
367
}
368

369
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
6,758✔
370
  tClearSMqConsumerObj(pConsumer);
6,758✔
371
  taosMemoryFree(pConsumer);
6,758!
372
}
6,758✔
373

374
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
6,088✔
375
  int32_t tlen = 0;
6,088✔
376
  int32_t sz;
377
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
6,088✔
378
  tlen += taosEncodeString(buf, pConsumer->clientId);
6,088✔
379
  tlen += taosEncodeString(buf, pConsumer->cgroup);
6,088✔
380
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
6,088✔
381
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
6,088✔
382
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
6,088✔
383

384
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
6,088✔
385
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
6,088✔
386
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
6,088✔
387
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
6,088✔
388
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
6,088✔
389

390
  // current topics
391
  if (pConsumer->currentTopics) {
6,088✔
392
    sz = taosArrayGetSize(pConsumer->currentTopics);
734✔
393
    tlen += taosEncodeFixedI32(buf, sz);
734✔
394
    for (int32_t i = 0; i < sz; i++) {
1,166✔
395
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
432✔
396
      tlen += taosEncodeString(buf, topic);
432✔
397
    }
398
  } else {
399
    tlen += taosEncodeFixedI32(buf, 0);
5,354✔
400
  }
401

402
  // reb new topics
403
  if (pConsumer->rebNewTopics) {
6,088✔
404
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
4,052✔
405
    tlen += taosEncodeFixedI32(buf, sz);
4,052✔
406
    for (int32_t i = 0; i < sz; i++) {
6,796✔
407
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
2,744✔
408
      tlen += taosEncodeString(buf, topic);
2,744✔
409
    }
410
  } else {
411
    tlen += taosEncodeFixedI32(buf, 0);
2,036✔
412
  }
413

414
  // reb removed topics
415
  if (pConsumer->rebRemovedTopics) {
6,088✔
416
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
2,718✔
417
    tlen += taosEncodeFixedI32(buf, sz);
2,718✔
418
    for (int32_t i = 0; i < sz; i++) {
4,814✔
419
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
2,096✔
420
      tlen += taosEncodeString(buf, topic);
2,096✔
421
    }
422
  } else {
423
    tlen += taosEncodeFixedI32(buf, 0);
3,370✔
424
  }
425

426
  // lost topics
427
  if (pConsumer->assignedTopics) {
6,088✔
428
    sz = taosArrayGetSize(pConsumer->assignedTopics);
2,788✔
429
    tlen += taosEncodeFixedI32(buf, sz);
2,788✔
430
    for (int32_t i = 0; i < sz; i++) {
4,696✔
431
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
1,908✔
432
      tlen += taosEncodeString(buf, topic);
1,908✔
433
    }
434
  } else {
435
    tlen += taosEncodeFixedI32(buf, 0);
3,300✔
436
  }
437

438
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
6,088✔
439
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
6,088✔
440
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
6,088✔
441
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
6,088✔
442
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
6,088✔
443
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
6,088✔
444
  tlen += taosEncodeString(buf, pConsumer->user);
6,088✔
445
  tlen += taosEncodeString(buf, pConsumer->fqdn);
6,088✔
446
  return tlen;
6,088✔
447
}
448

449
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
2,730✔
450
  int32_t sz;
451
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
2,730!
452
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
2,730✔
453
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
2,730✔
454
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
2,730✔
455
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
2,730!
456
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
2,730!
457

458
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
2,730!
459
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
2,730✔
460
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
2,730!
461
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
2,730!
462
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
5,460!
463

464
  // current topics
465
  buf = taosDecodeFixedI32(buf, &sz);
2,730✔
466
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
2,730✔
467
  if (pConsumer->currentTopics == NULL) {
2,730!
468
    return NULL;
×
469
  }
470
  for (int32_t i = 0; i < sz; i++) {
2,753✔
471
    char *topic;
472
    buf = taosDecodeString(buf, &topic);
23✔
473
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
46!
474
      return NULL;
×
475
    }
476
  }
477

478
  // reb new topics
479
  buf = taosDecodeFixedI32(buf, &sz);
2,730✔
480
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
2,730✔
481
  for (int32_t i = 0; i < sz; i++) {
4,009✔
482
    char *topic;
483
    buf = taosDecodeString(buf, &topic);
1,279✔
484
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
2,558!
485
      return NULL;
×
486
    }
487
  }
488

489
  // reb removed topics
490
  buf = taosDecodeFixedI32(buf, &sz);
2,730✔
491
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
2,730✔
492
  for (int32_t i = 0; i < sz; i++) {
3,786✔
493
    char *topic;
494
    buf = taosDecodeString(buf, &topic);
1,056✔
495
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
2,112!
496
      return NULL;
×
497
    }
498
  }
499

500
  // reb removed topics
501
  buf = taosDecodeFixedI32(buf, &sz);
2,730✔
502
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
2,730✔
503
  for (int32_t i = 0; i < sz; i++) {
3,393✔
504
    char *topic;
505
    buf = taosDecodeString(buf, &topic);
663✔
506
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
1,326!
507
      return NULL;
×
508
    }
509
  }
510

511
  if (sver > 1) {
2,730!
512
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
2,730✔
513
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
2,730✔
514
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
2,730!
515
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
5,460!
516
  }
517
  if (sver > 2) {
2,730!
518
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
2,730!
519
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
2,730!
520
    buf = taosDecodeStringTo(buf, pConsumer->user);
2,730✔
521
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
5,460✔
522
  } else {
523
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
524
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
525
  }
526

527
  return (void *)buf;
2,730✔
528
}
529

530
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
5,362✔
531
  int32_t tlen = 0;
5,362✔
532
  int32_t szVgs = taosArrayGetSize(offsetRows);
5,362✔
533
  tlen += taosEncodeFixedI32(buf, szVgs);
5,362✔
534
  for (int32_t j = 0; j < szVgs; ++j) {
11,862✔
535
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
6,500✔
536
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
6,500✔
537
    tlen += taosEncodeFixedI64(buf, offRows->rows);
6,500✔
538
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
6,500✔
539
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
6,500!
540
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
370✔
541
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
740✔
542
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
6,130✔
543
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
11,108✔
544
    } else {
545
      // do nothing
546
    }
547
    tlen += taosEncodeFixedI64(buf, offRows->ever);
13,000✔
548
  }
549

550
  return tlen;
5,362✔
551
}
552

553
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
1,836✔
554
  int32_t tlen = 0;
1,836✔
555
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
1,836✔
556
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
1,836✔
557

558
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
1,836✔
559
}
560

561
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
2,129✔
562
  int32_t szVgs = 0;
2,129!
563
  buf = taosDecodeFixedI32(buf, &szVgs);
2,129✔
564
  if (szVgs > 0) {
2,129✔
565
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
868✔
566
    if (NULL == *offsetRows) return NULL;
868!
567
    for (int32_t j = 0; j < szVgs; ++j) {
3,617✔
568
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
2,749✔
569
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
2,749!
570
      buf = taosDecodeFixedI64(buf, &offRows->rows);
2,749!
571
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
2,749✔
572
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
2,749!
573
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
173!
574
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
346!
575
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
2,576✔
576
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
4,610!
577
      } else {
578
        // do nothing
579
      }
580
      if (sver > 2) {
2,749!
581
        buf = taosDecodeFixedI64(buf, &offRows->ever);
5,498!
582
      }
583
    }
584
  }
585
  return (void *)buf;
2,129✔
586
}
587

588
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
732✔
589
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
732!
590
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
732✔
591
  if (sver > 1) {
732!
592
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
732✔
593
  }
594

595
  return (void *)buf;
732✔
596
}
597

598
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
501✔
599
  int32_t          code = 0;
501✔
600
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
501!
601
  MND_TMQ_NULL_CHECK(pSubObj);
501!
602

603
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
501✔
604
  taosInitRWLatch(&pSubObj->lock);
501✔
605
  pSubObj->vgNum = 0;
501✔
606
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
501✔
607
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
501!
608
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
501✔
609
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
501!
610
  if (ppSub) {
501!
611
    *ppSub = pSubObj;
501✔
612
  }
613
  return code;
501✔
614

615
END:
×
616
  taosMemoryFree(pSubObj);
×
617
  return code;
×
618
}
619

620
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
845✔
621
  int32_t          code = 0;
845✔
622
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
845!
623
  if (pSubNew == NULL) {
845!
624
    code = terrno;
×
625
    goto END;
×
626
  }
627
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
845✔
628
  taosInitRWLatch(&pSubNew->lock);
845✔
629

630
  pSubNew->dbUid = pSub->dbUid;
845✔
631
  pSubNew->stbUid = pSub->stbUid;
845✔
632
  pSubNew->subType = pSub->subType;
845✔
633
  pSubNew->withMeta = pSub->withMeta;
845✔
634

635
  pSubNew->vgNum = pSub->vgNum;
845✔
636
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
845✔
637

638
  void          *pIter = NULL;
845✔
639
  SMqConsumerEp *pConsumerEp = NULL;
845✔
640
  while (1) {
841✔
641
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,686✔
642
    if (pIter == NULL) break;
1,686✔
643
    pConsumerEp = (SMqConsumerEp *)pIter;
841✔
644
    SMqConsumerEp newEp = {
1,682✔
645
        .consumerId = pConsumerEp->consumerId,
841✔
646
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
841✔
647
    };
648
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
841!
649
                            sizeof(SMqConsumerEp))) != 0)
650
      goto END;
×
651
  }
652
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
845✔
653
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
845✔
654
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
845✔
655
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
845!
656
  if (ppSub) {
845!
657
    *ppSub = pSubNew;
845✔
658
  }
659
END:
×
660
  return code;
845✔
661
}
662

663
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
2,742✔
664
  if (pSub == NULL) return;
2,742!
665
  void *pIter = NULL;
2,742✔
666
  while (1) {
1,622✔
667
    pIter = taosHashIterate(pSub->consumerHash, pIter);
4,364✔
668
    if (pIter == NULL) break;
4,364✔
669
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
1,622✔
670
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
1,622✔
671
    taosArrayDestroy(pConsumerEp->offsetRows);
1,622✔
672
  }
673
  taosHashCleanup(pSub->consumerHash);
2,742✔
674
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
2,742✔
675
  taosMemoryFreeClear(pSub->qmsg);
2,742!
676
  taosArrayDestroy(pSub->offsetRows);
2,742✔
677
}
678

679
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
3,526✔
680
  int32_t tlen = 0;
3,526✔
681
  tlen += taosEncodeString(buf, pSub->key);
3,526✔
682
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
3,526✔
683
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
3,526✔
684
  tlen += taosEncodeFixedI8(buf, pSub->subType);
3,526✔
685
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
3,526✔
686
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
3,526✔
687

688
  void   *pIter = NULL;
3,526✔
689
  int32_t sz = taosHashGetSize(pSub->consumerHash);
3,526✔
690
  tlen += taosEncodeFixedI32(buf, sz);
3,526✔
691

692
  int32_t cnt = 0;
3,526✔
693
  while (1) {
1,836✔
694
    pIter = taosHashIterate(pSub->consumerHash, pIter);
5,362✔
695
    if (pIter == NULL) break;
5,362✔
696
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
1,836✔
697
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
1,836✔
698
    cnt++;
1,836✔
699
  }
700
  if (cnt != sz) return -1;
3,526!
701
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
3,526✔
702
  tlen += taosEncodeString(buf, pSub->dbName);
3,526✔
703

704
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
3,526✔
705
  tlen += taosEncodeString(buf, pSub->qmsg);
3,526✔
706
  return tlen;
3,526✔
707
}
708

709
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
1,397✔
710
  //
711
  buf = taosDecodeStringTo(buf, pSub->key);
1,397✔
712
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
1,397!
713
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
1,397!
714
  buf = taosDecodeFixedI8(buf, &pSub->subType);
1,397✔
715
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
1,397✔
716
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
2,794!
717

718
  int32_t sz;
719
  buf = taosDecodeFixedI32(buf, &sz);
1,397✔
720

721
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,397✔
722
  for (int32_t i = 0; i < sz; i++) {
2,129✔
723
    SMqConsumerEp consumerEp = {0};
732✔
724
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
732✔
725
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
732!
726
        0)
727
      return NULL;
×
728
  }
729

730
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
1,397✔
731
  buf = taosDecodeStringTo(buf, pSub->dbName);
1,397✔
732

733
  if (sver > 1) {
1,397!
734
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
1,397✔
735
    buf = taosDecodeString(buf, &pSub->qmsg);
2,794✔
736
  } else {
737
    pSub->qmsg = taosStrdup("");
×
738
  }
739
  return (void *)buf;
1,397✔
740
}
741

742
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
178,281✔
743
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
178,281✔
744
  pObj->dtype = pItem->dtype;
178,281✔
745
  switch (pItem->dtype) {
178,281!
746
    case CFG_DTYPE_NONE:
×
747
      break;
×
748
    case CFG_DTYPE_BOOL:
38,340✔
749
      pObj->bval = pItem->bval;
38,340✔
750
      break;
38,340✔
751
    case CFG_DTYPE_INT32:
92,016✔
752
      pObj->i32 = pItem->i32;
92,016✔
753
      break;
92,016✔
754
    case CFG_DTYPE_INT64:
15,336✔
755
      pObj->i64 = pItem->i64;
15,336✔
756
      break;
15,336✔
757
    case CFG_DTYPE_FLOAT:
5,751✔
758
    case CFG_DTYPE_DOUBLE:
759
      pObj->fval = pItem->fval;
5,751✔
760
      break;
5,751✔
761
    case CFG_DTYPE_STRING:
26,838✔
762
    case CFG_DTYPE_DIR:
763
    case CFG_DTYPE_LOCALE:
764
    case CFG_DTYPE_CHARSET:
765
    case CFG_DTYPE_TIMEZONE:
766
      pObj->str = taosStrdup(pItem->str);
26,838!
767
      if (pObj->str == NULL) {
26,838!
768
        taosMemoryFree(pObj);
×
769
        return TSDB_CODE_OUT_OF_MEMORY;
×
770
      }
771
      break;
26,838✔
772
  }
773
  return TSDB_CODE_SUCCESS;
178,281✔
774
}
775

776
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
707✔
777
  int32_t code = 0;
707✔
778
  switch (pObjNew->dtype) {
707!
779
    case CFG_DTYPE_BOOL: {
16✔
780
      bool tmp = false;
16✔
781
      if (strcasecmp(value, "true") == 0) {
16!
782
        tmp = true;
×
783
      }
784
      if (taosStr2Int32(value, NULL, 10) > 0) {
16✔
785
        tmp = true;
8✔
786
      }
787
      pObjNew->bval = tmp;
16✔
788
      break;
16✔
789
    }
790
    case CFG_DTYPE_INT32: {
201✔
791
      int32_t ival;
792
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
201!
793
      pObjNew->i32 = ival;
201✔
794
      break;
201✔
795
    }
796
    case CFG_DTYPE_INT64: {
5✔
797
      int64_t ival;
798
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
5!
799
      pObjNew->i64 = ival;
5✔
800
      break;
5✔
801
    }
802
    case CFG_DTYPE_FLOAT:
×
803
    case CFG_DTYPE_DOUBLE: {
804
      float dval = 0;
×
805
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
806
      pObjNew->fval = dval;
×
807
      break;
×
808
    }
809
    case CFG_DTYPE_DIR:
485✔
810
    case CFG_DTYPE_TIMEZONE:
811
    case CFG_DTYPE_CHARSET:
812
    case CFG_DTYPE_LOCALE:
813
    case CFG_DTYPE_STRING: {
814
      pObjNew->str = taosStrdup(value);
485!
815
      if (pObjNew->str == NULL) {
485!
816
        code = terrno;
×
817
        return code;
×
818
      }
819
      break;
485✔
820
    }
821
    case CFG_DTYPE_NONE:
×
822
      break;
×
823
    default:
×
824
      code = TSDB_CODE_INVALID_CFG;
×
825
      break;
×
826
  }
827
  return code;
707✔
828
}
829

830
SConfigObj mndInitConfigVersion() {
1,925✔
831
  SConfigObj obj;
832
  memset(&obj, 0, sizeof(SConfigObj));
1,925✔
833

834
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
1,925✔
835
  obj.dtype = CFG_DTYPE_INT32;
1,925✔
836
  obj.i32 = 0;
1,925✔
837
  return obj;
1,925✔
838
}
839

840
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
1,222,118✔
841
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
1,222,118!
842
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
2,444,236!
843

844
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
2,444,236!
845
  switch (pObj->dtype) {
1,222,118!
846
    case CFG_DTYPE_BOOL:
259,432✔
847
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
518,864!
848
      break;
259,432✔
849
    case CFG_DTYPE_INT32:
637,456✔
850
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
1,274,912!
851
      break;
637,456✔
852
    case CFG_DTYPE_INT64:
103,770✔
853
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
207,540!
854
      break;
103,770✔
855
    case CFG_DTYPE_FLOAT:
38,910✔
856
    case CFG_DTYPE_DOUBLE:
857
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
77,820!
858
      break;
38,910✔
859
    case CFG_DTYPE_STRING:
182,550✔
860
    case CFG_DTYPE_DIR:
861
    case CFG_DTYPE_LOCALE:
862
    case CFG_DTYPE_CHARSET:
863
    case CFG_DTYPE_TIMEZONE:
864
      if (pObj->str != NULL) {
182,550!
865
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
365,100!
866
      } else {
867
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
868
      }
869
      break;
182,550✔
870
    default:
×
871
      break;
×
872
  }
873
  tEndEncode(pEncoder);
1,222,118✔
874
  return pEncoder->pos;
1,222,118✔
875
}
876

877
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
239,337✔
878
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
239,337!
879
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
239,337!
880
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
478,674!
881
  switch (pObj->dtype) {
239,337!
882
    case CFG_DTYPE_NONE:
×
883
      break;
×
884
    case CFG_DTYPE_BOOL:
50,636✔
885
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
50,636!
886
      break;
50,636✔
887
    case CFG_DTYPE_INT32:
124,936✔
888
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
249,872!
889
      break;
124,936✔
890
    case CFG_DTYPE_INT64:
20,253✔
891
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
40,506!
892
      break;
20,253✔
893
    case CFG_DTYPE_FLOAT:
7,593✔
894
    case CFG_DTYPE_DOUBLE:
895
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
15,186!
896
      break;
7,593✔
897
    case CFG_DTYPE_STRING:
35,919✔
898
    case CFG_DTYPE_DIR:
899
    case CFG_DTYPE_LOCALE:
900
    case CFG_DTYPE_CHARSET:
901
    case CFG_DTYPE_TIMEZONE:
902
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
71,838!
903
      break;
35,919✔
904
  }
905
  tEndDecode(pDecoder);
239,337✔
906
  TAOS_RETURN(TSDB_CODE_SUCCESS);
239,337✔
907
}
908

909
void tFreeSConfigObj(SConfigObj *obj) {
420,769✔
910
  if (obj == NULL) {
420,769!
911
    return;
×
912
  }
913
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
420,769!
914
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
366,447✔
915
    taosMemoryFree(obj->str);
63,214!
916
  }
917
}
918

919
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
920
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
921
//   if (pEntryNew == NULL) return NULL;
922
//   pEntryNew->epoch = pEntry->epoch;
923
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
924
//   return pEntryNew;
925
// }
926
//
927
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
928
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
929
// }
930

931
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
932
//   int32_t tlen = 0;
933
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
934
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
935
//   return tlen;
936
// }
937
//
938
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
939
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
940
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
941
//   return (void *)buf;
942
// }
943

944
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
945
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
946
//   if (pLogNew == NULL) return pLogNew;
947
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
948
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
949
//   return pLogNew;
950
// }
951
//
952
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
953
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
954
// }
955

956
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
957
//   int32_t tlen = 0;
958
//   tlen += taosEncodeString(buf, pLog->key);
959
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
960
//   return tlen;
961
// }
962
//
963
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
964
//   buf = taosDecodeStringTo(buf, pLog->key);
965
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
966
//   return (void *)buf;
967
// }
968
//
969
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
970
//   int32_t tlen = 0;
971
//   tlen += taosEncodeString(buf, pOffset->key);
972
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
973
//   return tlen;
974
// }
975
//
976
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
977
//   buf = taosDecodeStringTo(buf, pOffset->key);
978
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
979
//   return buf;
980
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc