• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3578

11 Jan 2025 11:19AM UTC coverage: 63.183% (-0.03%) from 63.211%
#3578

push

travis-ci

web-flow
Merge pull request #29546 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

139873 of 284461 branches covered (49.17%)

Branch coverage included in aggregate %.

20 of 26 new or added lines in 2 files covered. (76.92%)

717 existing lines in 102 files now uncovered.

217827 of 281671 relevant lines covered (77.33%)

19620733.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.11
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "taoserror.h"
19
#include "tunit.h"
20

21
static void *freeStreamTasks(SArray *pTaskLevel);
22

23
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
14,706✔
24
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
14,706!
25
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
29,412!
26

27
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
29,412!
28
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
29,412!
29
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->version));
29,412!
30
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->totalLevel));
29,412!
31
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->smaId));
29,412!
32

33
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->uid));
29,412!
34
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->status));
29,412!
35

36
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.igExpired));
29,412!
37
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.trigger));
29,412!
38
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.fillHistory));
29,412!
39
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.triggerParam));
29,412!
40
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.watermark));
29,412!
41

42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->sourceDbUid));
29,412!
43
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetDbUid));
29,412!
44
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sourceDb));
29,412!
45
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetDb));
29,412!
46
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetSTbName));
29,412!
47
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetStbUid));
29,412!
48
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->fixedSinkVgId));
29,412!
49

50
  if (pObj->sql != NULL) {
14,706✔
51
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sql));
29,396!
52
  } else {
53
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
8!
54
  }
55

56
  if (pObj->ast != NULL) {
14,706✔
57
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->ast));
29,396!
58
  } else {
59
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
8!
60
  }
61

62
  if (pObj->physicalPlan != NULL) {
14,706✔
63
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->physicalPlan));
29,396!
64
  } else {
65
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
8!
66
  }
67

68
  int32_t sz = taosArrayGetSize(pObj->tasks);
14,706✔
69
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, sz));
14,706!
70
  for (int32_t i = 0; i < sz; i++) {
44,248✔
71
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
29,542✔
72
    int32_t innerSz = taosArrayGetSize(pArray);
29,542✔
73
    TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz));
29,542!
74
    for (int32_t j = 0; j < innerSz; j++) {
102,972✔
75
      SStreamTask *pTask = taosArrayGetP(pArray, j);
73,430✔
76
      if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
73,430✔
77
        pTask->ver = SSTREAM_TASK_VER;
1✔
78
      }
79
      TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask));
73,430!
80
    }
81
  }
82

83
  TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema));
29,412!
84

85
  // 3.0.20 ver =2
86
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointFreq));
29,412!
87
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->igCheckUpdate));
29,412!
88

89
  // 3.0.50 ver = 3
90
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointId));
29,412!
91
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->subTableWithoutMd5));
29,412!
92

93
  TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1));
29,412!
94

95
  tEndEncode(pEncoder);
14,706✔
96
  return pEncoder->pos;
14,706✔
97
}
98

99
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
6,696✔
100
  int32_t code = 0;
6,696✔
101
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
6,696!
102
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
6,696!
103

104
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
13,392!
105
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
13,392!
106
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->version));
13,392!
107
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->totalLevel));
13,392!
108
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->smaId));
13,392!
109

110
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->uid));
13,392!
111
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->status));
13,392!
112

113
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.igExpired));
13,392!
114
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.trigger));
13,392!
115
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.fillHistory));
13,392!
116
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.triggerParam));
13,392!
117
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.watermark));
13,392!
118

119
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->sourceDbUid));
13,392!
120
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetDbUid));
13,392!
121
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->sourceDb));
6,696!
122
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetDb));
6,696!
123
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetSTbName));
6,696!
124
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetStbUid));
13,392!
125
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->fixedSinkVgId));
13,392!
126

127
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->sql));
13,392!
128
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->ast));
13,392!
129
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan));
13,392!
130

131
  if (pObj->tasks != NULL) {
6,696!
132
    pObj->tasks = freeStreamTasks(pObj->tasks);
×
133
  }
134

135
  int32_t sz;
136
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &sz));
6,696!
137

138
  if (sz != 0) {
6,696✔
139
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
6,693✔
140
    if (pObj->tasks == NULL) {
6,693!
141
      code = terrno;
×
142
      TAOS_RETURN(code);
×
143
    }
144

145
    for (int32_t i = 0; i < sz; i++) {
20,245✔
146
      int32_t innerSz;
147
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &innerSz));
13,552!
148
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
13,552✔
149
      if (pArray != NULL) {
13,552!
150
        for (int32_t j = 0; j < innerSz; j++) {
47,460✔
151
          SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
33,908!
152
          if (pTask == NULL) {
33,908!
153
            taosArrayDestroy(pArray);
×
154
            code = terrno;
×
155
            TAOS_RETURN(code);
×
156
          }
157
          if ((code = tDecodeStreamTask(pDecoder, pTask)) < 0) {
33,908!
158
            taosMemoryFree(pTask);
×
159
            taosArrayDestroy(pArray);
×
160
            TAOS_RETURN(code);
×
161
          }
162
          if (taosArrayPush(pArray, &pTask) == NULL) {
67,816!
163
            taosMemoryFree(pTask);
×
164
            taosArrayDestroy(pArray);
×
165
            code = terrno;
×
166
            TAOS_RETURN(code);
×
167
          }
168
        }
169
      }
170
      if (taosArrayPush(pObj->tasks, &pArray) == NULL) {
27,104!
171
        taosArrayDestroy(pArray);
×
172
        code = terrno;
×
173
        TAOS_RETURN(code);
×
174
      }
175
    }
176
  }
177

178
  TAOS_CHECK_RETURN(tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema));
13,392!
179

180
  // 3.0.20
181
  if (sver >= 2) {
6,696!
182
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointFreq));
13,392!
183
    if (!tDecodeIsEnd(pDecoder)) {
6,696!
184
      TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->igCheckUpdate));
13,392!
185
    }
186
  }
187
  if (sver >= 3) {
6,696!
188
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointId));
13,392!
189
  }
190

191
  if (sver >= 5) {
6,696!
192
    TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->subTableWithoutMd5));
13,392!
193
  }
194
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->reserve));
6,696!
195

196
  tEndDecode(pDecoder);
6,696✔
197
  TAOS_RETURN(code);
6,696✔
198
}
199

200
void *freeStreamTasks(SArray *pTaskLevel) {
16,838✔
201
  int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
16,838✔
202

203
  for (int32_t i = 0; i < numOfLevel; i++) {
35,509✔
204
    SArray *pLevel = taosArrayGetP(pTaskLevel, i);
18,671✔
205
    int32_t taskSz = taosArrayGetSize(pLevel);
18,671✔
206
    for (int32_t j = 0; j < taskSz; j++) {
66,346✔
207
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
47,675✔
208
      tFreeStreamTask(pTask);
47,675✔
209
    }
210

211
    taosArrayDestroy(pLevel);
18,671✔
212
  }
213

214
  taosArrayDestroy(pTaskLevel);
16,838✔
215

216
  return NULL;
16,838✔
217
}
218

219
void tFreeStreamObj(SStreamObj *pStream) {
8,419✔
220
  taosMemoryFree(pStream->sql);
8,419!
221
  taosMemoryFree(pStream->ast);
8,419!
222
  taosMemoryFree(pStream->physicalPlan);
8,419!
223

224
  if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
8,419✔
225
    taosMemoryFree(pStream->outputSchema.pSchema);
8,415!
226
  }
227

228
  pStream->tasks = freeStreamTasks(pStream->tasks);
8,419✔
229
  pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
8,419✔
230

231
  // tagSchema.pSchema
232
  if (pStream->tagSchema.nCols > 0) {
8,419✔
233
    taosMemoryFree(pStream->tagSchema.pSchema);
284!
234
  }
235
}
8,419✔
236

237
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
1,707✔
238
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
1,707!
239
  if (pVgEpNew == NULL) {
1,707!
240
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
241
    return NULL;
×
242
  }
243
  pVgEpNew->vgId = pVgEp->vgId;
1,707✔
244
  //  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
245
  pVgEpNew->epSet = pVgEp->epSet;
1,707✔
246
  return pVgEpNew;
1,707✔
247
}
248

249
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
9,118✔
250
  if (pVgEp) {
9,118!
251
    //    taosMemoryFreeClear(pVgEp->qmsg);
252
    taosMemoryFree(pVgEp);
9,118!
253
  }
254
}
9,118✔
255

256
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
7,462✔
257
  int32_t tlen = 0;
7,462✔
258
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
7,462✔
259
  //  tlen += taosEncodeString(buf, pVgEp->qmsg);
260
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
7,462✔
261
  return tlen;
7,462✔
262
}
263

264
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
3,342✔
265
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
3,342!
266
  if (sver == 1) {
3,342!
267
    uint64_t size = 0;
×
268
    buf = taosDecodeVariantU64(buf, &size);
×
269
    buf = POINTER_SHIFT(buf, size);
×
270
  }
271
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
3,342✔
272
  return (void *)buf;
3,342✔
273
}
274

275
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
478!
276

277
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
2,257✔
278
                           SMqConsumerObj **ppConsumer) {
279
  int32_t         code = 0;
2,257✔
280
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
2,257!
281
  if (pConsumer == NULL) {
2,257!
282
    code = terrno;
×
283
    goto END;
×
284
  }
285

286
  pConsumer->consumerId = consumerId;
2,257✔
287
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
2,257✔
288

289
  pConsumer->epoch = 0;
2,257✔
290
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
2,257✔
291
  pConsumer->hbStatus = 0;
2,257✔
292
  pConsumer->pollStatus = 0;
2,257✔
293

294
  taosInitRWLatch(&pConsumer->lock);
2,257✔
295
  pConsumer->createTime = taosGetTimestampMs();
2,257✔
296
  pConsumer->updateType = updateType;
2,257✔
297

298
  if (updateType == CONSUMER_ADD_REB) {
2,257✔
299
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
483✔
300
    if (pConsumer->rebNewTopics == NULL) {
483!
301
      code = terrno;
×
302
      goto END;
×
303
    }
304

305
    char *topicTmp = taosStrdup(topic);
483!
306
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
966!
307
      code = terrno;
×
308
      goto END;
×
309
    }
310
  } else if (updateType == CONSUMER_REMOVE_REB) {
1,774✔
311
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
481✔
312
    if (pConsumer->rebRemovedTopics == NULL) {
481!
313
      code = terrno;
×
314
      goto END;
×
315
    }
316
    char *topicTmp = taosStrdup(topic);
481!
317
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
962!
318
      code = terrno;
×
319
      goto END;
×
320
    }
321
  } else if (updateType == CONSUMER_INSERT_SUB) {
1,293✔
322
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
418✔
323
    pConsumer->withTbName = subscribe->withTbName;
418✔
324
    pConsumer->autoCommit = subscribe->autoCommit;
418✔
325
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
418✔
326
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
418✔
327
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
418✔
328
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
418✔
329
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
418✔
330
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
418✔
331

332
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
418✔
333
    if (pConsumer->rebNewTopics == NULL) {
418!
334
      code = terrno;
×
335
      goto END;
×
336
    }
337
    pConsumer->assignedTopics = subscribe->topicNames;
418✔
338
    subscribe->topicNames = NULL;
418✔
339
  } else if (updateType == CONSUMER_UPDATE_SUB) {
875✔
340
    pConsumer->assignedTopics = subscribe->topicNames;
431✔
341
    subscribe->topicNames = NULL;
431✔
342
  }
343

344
  *ppConsumer = pConsumer;
2,257✔
345
  return 0;
2,257✔
346

347
END:
×
348
  tDeleteSMqConsumerObj(pConsumer);
×
349
  return code;
×
350
}
351

352
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
7,634✔
353
  if (pConsumer == NULL) return;
7,634✔
354
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
4,549✔
355
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
4,549✔
356
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
4,549✔
357
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
4,549✔
358
}
359

360
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
5,342✔
361
  tClearSMqConsumerObj(pConsumer);
5,342✔
362
  taosMemoryFree(pConsumer);
5,342!
363
}
5,342✔
364

365
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
4,634✔
366
  int32_t tlen = 0;
4,634✔
367
  int32_t sz;
368
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
4,634✔
369
  tlen += taosEncodeString(buf, pConsumer->clientId);
4,634✔
370
  tlen += taosEncodeString(buf, pConsumer->cgroup);
4,634✔
371
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
4,634✔
372
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
4,634✔
373
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
4,634✔
374

375
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
4,634✔
376
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
4,634✔
377
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
4,634✔
378
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
4,634✔
379
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
4,634✔
380

381
  // current topics
382
  if (pConsumer->currentTopics) {
4,634✔
383
    sz = taosArrayGetSize(pConsumer->currentTopics);
148✔
384
    tlen += taosEncodeFixedI32(buf, sz);
148✔
385
    for (int32_t i = 0; i < sz; i++) {
206✔
386
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
58✔
387
      tlen += taosEncodeString(buf, topic);
58✔
388
    }
389
  } else {
390
    tlen += taosEncodeFixedI32(buf, 0);
4,486✔
391
  }
392

393
  // reb new topics
394
  if (pConsumer->rebNewTopics) {
4,634✔
395
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
2,784✔
396
    tlen += taosEncodeFixedI32(buf, sz);
2,784✔
397
    for (int32_t i = 0; i < sz; i++) {
4,718✔
398
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
1,934✔
399
      tlen += taosEncodeString(buf, topic);
1,934✔
400
    }
401
  } else {
402
    tlen += taosEncodeFixedI32(buf, 0);
1,850✔
403
  }
404

405
  // reb removed topics
406
  if (pConsumer->rebRemovedTopics) {
4,634✔
407
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
1,944✔
408
    tlen += taosEncodeFixedI32(buf, sz);
1,944✔
409
    for (int32_t i = 0; i < sz; i++) {
3,854✔
410
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
1,910✔
411
      tlen += taosEncodeString(buf, topic);
1,910✔
412
    }
413
  } else {
414
    tlen += taosEncodeFixedI32(buf, 0);
2,690✔
415
  }
416

417
  // lost topics
418
  if (pConsumer->assignedTopics) {
4,634✔
419
    sz = taosArrayGetSize(pConsumer->assignedTopics);
1,818✔
420
    tlen += taosEncodeFixedI32(buf, sz);
1,818✔
421
    for (int32_t i = 0; i < sz; i++) {
2,836✔
422
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
1,018✔
423
      tlen += taosEncodeString(buf, topic);
1,018✔
424
    }
425
  } else {
426
    tlen += taosEncodeFixedI32(buf, 0);
2,816✔
427
  }
428

429
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
4,634✔
430
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
4,634✔
431
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
4,634✔
432
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
4,634✔
433
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
4,634✔
434
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
4,634✔
435
  tlen += taosEncodeString(buf, pConsumer->user);
4,634✔
436
  tlen += taosEncodeString(buf, pConsumer->fqdn);
4,634✔
437
  return tlen;
4,634✔
438
}
439

440
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
2,292✔
441
  int32_t sz;
442
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
2,292!
443
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
2,292✔
444
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
2,292✔
445
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
2,292✔
446
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
2,292!
447
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
2,292!
448

449
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
2,292!
450
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
2,292✔
451
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
2,292!
452
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
2,292!
453
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
4,584!
454

455
  // current topics
456
  buf = taosDecodeFixedI32(buf, &sz);
2,292✔
457
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
2,292✔
458
  if (pConsumer->currentTopics == NULL) {
2,292!
459
    return NULL;
×
460
  }
461
  for (int32_t i = 0; i < sz; i++) {
2,312✔
462
    char *topic;
463
    buf = taosDecodeString(buf, &topic);
20✔
464
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
40!
465
      return NULL;
×
466
    }
467
  }
468

469
  // reb new topics
470
  buf = taosDecodeFixedI32(buf, &sz);
2,292✔
471
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
2,292✔
472
  for (int32_t i = 0; i < sz; i++) {
3,272✔
473
    char *topic;
474
    buf = taosDecodeString(buf, &topic);
980✔
475
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
1,960!
476
      return NULL;
×
477
    }
478
  }
479

480
  // reb removed topics
481
  buf = taosDecodeFixedI32(buf, &sz);
2,292✔
482
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
2,292✔
483
  for (int32_t i = 0; i < sz; i++) {
3,254✔
484
    char *topic;
485
    buf = taosDecodeString(buf, &topic);
962✔
486
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
1,924!
487
      return NULL;
×
488
    }
489
  }
490

491
  // reb removed topics
492
  buf = taosDecodeFixedI32(buf, &sz);
2,292✔
493
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
2,292✔
494
  for (int32_t i = 0; i < sz; i++) {
2,801✔
495
    char *topic;
496
    buf = taosDecodeString(buf, &topic);
509✔
497
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
1,018!
498
      return NULL;
×
499
    }
500
  }
501

502
  if (sver > 1) {
2,292!
503
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
2,292✔
504
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
2,292✔
505
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
2,292!
506
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
4,584!
507
  }
508
  if (sver > 2) {
2,292!
509
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
2,292!
510
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
2,292!
511
    buf = taosDecodeStringTo(buf, pConsumer->user);
2,292✔
512
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
4,584✔
513
  } else {
514
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
515
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
516
  }
517

518
  return (void *)buf;
2,292✔
519
}
520

521
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
3,872✔
522
  int32_t tlen = 0;
3,872✔
523
  int32_t szVgs = taosArrayGetSize(offsetRows);
3,872✔
524
  tlen += taosEncodeFixedI32(buf, szVgs);
3,872✔
525
  for (int32_t j = 0; j < szVgs; ++j) {
9,012✔
526
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
5,140✔
527
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
5,140✔
528
    tlen += taosEncodeFixedI64(buf, offRows->rows);
5,140✔
529
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
5,140✔
530
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
5,140!
531
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
366✔
532
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
732✔
533
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
4,774✔
534
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
8,576✔
535
    } else {
536
      // do nothing
537
    }
538
    tlen += taosEncodeFixedI64(buf, offRows->ever);
10,280✔
539
  }
540

541
  return tlen;
3,872✔
542
}
543

544
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
1,156✔
545
  int32_t tlen = 0;
1,156✔
546
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
1,156✔
547
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
1,156✔
548

549
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
1,156✔
550
}
551

552
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
1,753✔
553
  int32_t szVgs = 0;
1,753!
554
  buf = taosDecodeFixedI32(buf, &szVgs);
1,753✔
555
  if (szVgs > 0) {
1,753✔
556
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
763✔
557
    if (NULL == *offsetRows) return NULL;
763!
558
    for (int32_t j = 0; j < szVgs; ++j) {
2,930✔
559
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
2,167✔
560
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
2,167!
561
      buf = taosDecodeFixedI64(buf, &offRows->rows);
2,167!
562
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
2,167✔
563
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
2,167!
564
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
171!
565
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
342!
566
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
1,996✔
567
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
3,540!
568
      } else {
569
        // do nothing
570
      }
571
      if (sver > 2) {
2,167!
572
        buf = taosDecodeFixedI64(buf, &offRows->ever);
4,334!
573
      }
574
    }
575
  }
576
  return (void *)buf;
1,753✔
577
}
578

579
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
576✔
580
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
576!
581
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
576✔
582
  if (sver > 1) {
576!
583
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
576✔
584
  }
585

586
  return (void *)buf;
576✔
587
}
588

589
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
375✔
590
  int32_t          code = 0;
375✔
591
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
375!
592
  MND_TMQ_NULL_CHECK(pSubObj);
375!
593

594
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
375✔
595
  taosInitRWLatch(&pSubObj->lock);
375✔
596
  pSubObj->vgNum = 0;
375✔
597
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
375✔
598
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
375!
599
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
375✔
600
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
375!
601
  if (ppSub) {
375!
602
    *ppSub = pSubObj;
375✔
603
  }
604
  return code;
375✔
605

606
END:
×
607
  taosMemoryFree(pSubObj);
×
608
  return code;
×
609
}
610

611
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
791✔
612
  int32_t          code = 0;
791✔
613
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
791!
614
  if (pSubNew == NULL) {
791!
615
    code = terrno;
×
616
    goto END;
×
617
  }
618
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
791✔
619
  taosInitRWLatch(&pSubNew->lock);
791✔
620

621
  pSubNew->dbUid = pSub->dbUid;
791✔
622
  pSubNew->stbUid = pSub->stbUid;
791✔
623
  pSubNew->subType = pSub->subType;
791✔
624
  pSubNew->withMeta = pSub->withMeta;
791✔
625

626
  pSubNew->vgNum = pSub->vgNum;
791✔
627
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
791✔
628

629
  void          *pIter = NULL;
791✔
630
  SMqConsumerEp *pConsumerEp = NULL;
791✔
631
  while (1) {
811✔
632
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,602✔
633
    if (pIter == NULL) break;
1,602✔
634
    pConsumerEp = (SMqConsumerEp *)pIter;
811✔
635
    SMqConsumerEp newEp = {
1,622✔
636
        .consumerId = pConsumerEp->consumerId,
811✔
637
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
811✔
638
    };
639
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
811!
640
                            sizeof(SMqConsumerEp))) != 0)
641
      goto END;
×
642
  }
643
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
791✔
644
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
791✔
645
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
791✔
646
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
791!
647
  if (ppSub) {
791!
648
    *ppSub = pSubNew;
791✔
649
  }
650
END:
×
651
  return code;
791✔
652
}
653

654
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
2,343✔
655
  if (pSub == NULL) return;
2,343!
656
  void *pIter = NULL;
2,343✔
657
  while (1) {
1,344✔
658
    pIter = taosHashIterate(pSub->consumerHash, pIter);
3,687✔
659
    if (pIter == NULL) break;
3,687✔
660
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
1,344✔
661
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
1,344✔
662
    taosArrayDestroy(pConsumerEp->offsetRows);
1,344✔
663
  }
664
  taosHashCleanup(pSub->consumerHash);
2,343✔
665
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
2,343✔
666
  taosMemoryFreeClear(pSub->qmsg);
2,343!
667
  taosArrayDestroy(pSub->offsetRows);
2,343✔
668
}
669

670
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
2,716✔
671
  int32_t tlen = 0;
2,716✔
672
  tlen += taosEncodeString(buf, pSub->key);
2,716✔
673
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
2,716✔
674
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
2,716✔
675
  tlen += taosEncodeFixedI8(buf, pSub->subType);
2,716✔
676
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
2,716✔
677
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
2,716✔
678

679
  void   *pIter = NULL;
2,716✔
680
  int32_t sz = taosHashGetSize(pSub->consumerHash);
2,716✔
681
  tlen += taosEncodeFixedI32(buf, sz);
2,716✔
682

683
  int32_t cnt = 0;
2,716✔
684
  while (1) {
1,156✔
685
    pIter = taosHashIterate(pSub->consumerHash, pIter);
3,872✔
686
    if (pIter == NULL) break;
3,872✔
687
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
1,156✔
688
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
1,156✔
689
    cnt++;
1,156✔
690
  }
691
  if (cnt != sz) return -1;
2,716!
692
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
2,716✔
693
  tlen += taosEncodeString(buf, pSub->dbName);
2,716✔
694

695
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
2,716✔
696
  tlen += taosEncodeString(buf, pSub->qmsg);
2,716✔
697
  return tlen;
2,716✔
698
}
699

700
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
1,177✔
701
  //
702
  buf = taosDecodeStringTo(buf, pSub->key);
1,177✔
703
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
1,177!
704
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
1,177!
705
  buf = taosDecodeFixedI8(buf, &pSub->subType);
1,177✔
706
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
1,177✔
707
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
2,354!
708

709
  int32_t sz;
710
  buf = taosDecodeFixedI32(buf, &sz);
1,177✔
711

712
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,177✔
713
  for (int32_t i = 0; i < sz; i++) {
1,753✔
714
    SMqConsumerEp consumerEp = {0};
576✔
715
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
576✔
716
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
576!
717
        0)
718
      return NULL;
×
719
  }
720

721
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
1,177✔
722
  buf = taosDecodeStringTo(buf, pSub->dbName);
1,177✔
723

724
  if (sver > 1) {
1,177!
725
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
1,177✔
726
    buf = taosDecodeString(buf, &pSub->qmsg);
2,354✔
727
  } else {
728
    pSub->qmsg = taosStrdup("");
×
729
  }
730
  return (void *)buf;
1,177✔
731
}
732

733
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
114,576✔
734
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
114,576✔
735
  pObj->dtype = pItem->dtype;
114,576✔
736
  switch (pItem->dtype) {
114,576!
737
    case CFG_DTYPE_NONE:
×
738
      break;
×
739
    case CFG_DTYPE_BOOL:
22,176✔
740
      pObj->bval = pItem->bval;
22,176✔
741
      break;
22,176✔
742
    case CFG_DTYPE_INT32:
60,368✔
743
      pObj->i32 = pItem->i32;
60,368✔
744
      break;
60,368✔
745
    case CFG_DTYPE_INT64:
11,088✔
746
      pObj->i64 = pItem->i64;
11,088✔
747
      break;
11,088✔
748
    case CFG_DTYPE_FLOAT:
3,696✔
749
    case CFG_DTYPE_DOUBLE:
750
      pObj->fval = pItem->fval;
3,696✔
751
      break;
3,696✔
752
    case CFG_DTYPE_STRING:
17,248✔
753
    case CFG_DTYPE_DIR:
754
    case CFG_DTYPE_LOCALE:
755
    case CFG_DTYPE_CHARSET:
756
    case CFG_DTYPE_TIMEZONE:
757
      pObj->str = taosStrdup(pItem->str);
17,248!
758
      if (pObj->str == NULL) {
17,248!
759
        taosMemoryFree(pObj);
×
NEW
760
        return TSDB_CODE_OUT_OF_MEMORY;
×
761
      }
762
      break;
17,248✔
763
  }
764
  return TSDB_CODE_SUCCESS;
114,576✔
765
}
766

767
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
1,034✔
768
  int32_t code = 0;
1,034✔
769
  switch (pObjNew->dtype) {
1,034!
770
    case CFG_DTYPE_BOOL: {
34✔
771
      bool tmp = false;
34✔
772
      if (strcasecmp(value, "true") == 0) {
34!
773
        tmp = true;
×
774
      }
775
      if (taosStr2Int32(value, NULL, 10) > 0) {
34✔
776
        tmp = true;
18✔
777
      }
778
      pObjNew->bval = tmp;
34✔
779
      break;
34✔
780
    }
781
    case CFG_DTYPE_INT32: {
236✔
782
      int32_t ival;
783
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
236!
784
      pObjNew->i32 = ival;
236✔
785
      break;
236✔
786
    }
787
    case CFG_DTYPE_INT64: {
14✔
788
      int64_t ival;
789
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
14!
790
      pObjNew->i64 = ival;
14✔
791
      break;
14✔
792
    }
793
    case CFG_DTYPE_FLOAT:
4✔
794
    case CFG_DTYPE_DOUBLE: {
795
      float dval = 0;
4✔
796
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
4!
797
      pObjNew->fval = dval;
4✔
798
      break;
4✔
799
    }
800
    case CFG_DTYPE_DIR:
746✔
801
    case CFG_DTYPE_TIMEZONE:
802
    case CFG_DTYPE_CHARSET:
803
    case CFG_DTYPE_LOCALE:
804
    case CFG_DTYPE_STRING: {
805
      pObjNew->str = taosStrdup(value);
746!
806
      if (pObjNew->str == NULL) {
746!
807
        code = terrno;
×
808
        return code;
×
809
      }
810
      break;
746✔
811
    }
812
    case CFG_DTYPE_NONE:
×
813
      break;
×
814
    default:
×
815
      code = TSDB_CODE_INVALID_CFG;
×
816
      break;
×
817
  }
818
  return code;
1,034✔
819
}
820

821
SConfigObj mndInitConfigVersion() {
1,240✔
822
  SConfigObj obj;
823
  memset(&obj, 0, sizeof(SConfigObj));
1,240✔
824

825
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
1,240✔
826
  obj.dtype = CFG_DTYPE_INT32;
1,240✔
827
  obj.i32 = 0;
1,240✔
828
  return obj;
1,240✔
829
}
830

831
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
802,178✔
832
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
802,178!
833
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
1,604,356!
834

835
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
1,604,356!
836
  switch (pObj->dtype) {
802,178!
837
    case CFG_DTYPE_BOOL:
152,852✔
838
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
305,704!
839
      break;
152,852✔
840
    case CFG_DTYPE_INT32:
427,110✔
841
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
854,220!
842
      break;
427,110✔
843
    case CFG_DTYPE_INT64:
76,420✔
844
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
152,840!
845
      break;
76,420✔
846
    case CFG_DTYPE_FLOAT:
25,472✔
847
    case CFG_DTYPE_DOUBLE:
848
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
50,944!
849
      break;
25,472✔
850
    case CFG_DTYPE_STRING:
120,324✔
851
    case CFG_DTYPE_DIR:
852
    case CFG_DTYPE_LOCALE:
853
    case CFG_DTYPE_CHARSET:
854
    case CFG_DTYPE_TIMEZONE:
855
      if (pObj->str != NULL) {
120,324!
856
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
240,648!
857
      } else {
858
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
859
      }
860
      break;
120,324✔
861
    default:
×
862
      break;
×
863
  }
864
  tEndEncode(pEncoder);
802,178✔
865
  return pEncoder->pos;
802,178✔
866
}
867

868
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
164,225✔
869
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
164,225!
870
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
164,225!
871
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
328,450!
872
  switch (pObj->dtype) {
164,225!
873
    case CFG_DTYPE_NONE:
×
874
      break;
×
875
    case CFG_DTYPE_BOOL:
31,060✔
876
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
31,060!
877
      break;
31,060✔
878
    case CFG_DTYPE_INT32:
87,605✔
879
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
175,210!
880
      break;
87,605✔
881
    case CFG_DTYPE_INT64:
15,526✔
882
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
31,052!
883
      break;
15,526✔
884
    case CFG_DTYPE_FLOAT:
5,174✔
885
    case CFG_DTYPE_DOUBLE:
886
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
10,348!
887
      break;
5,174✔
888
    case CFG_DTYPE_STRING:
24,860✔
889
    case CFG_DTYPE_DIR:
890
    case CFG_DTYPE_LOCALE:
891
    case CFG_DTYPE_CHARSET:
892
    case CFG_DTYPE_TIMEZONE:
893
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
49,720!
894
      break;
24,860✔
895
  }
896
  tEndDecode(pDecoder);
164,225✔
897
  TAOS_RETURN(TSDB_CODE_SUCCESS);
164,225✔
898
}
899

900
void tFreeSConfigObj(SConfigObj *obj) {
282,015✔
901
  if (obj == NULL) {
282,015!
902
    return;
×
903
  }
904
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
282,015!
905
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
245,081✔
906
    taosMemoryFree(obj->str);
42,840!
907
  }
908
}
909

910
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
911
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
912
//   if (pEntryNew == NULL) return NULL;
913
//   pEntryNew->epoch = pEntry->epoch;
914
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
915
//   return pEntryNew;
916
// }
917
//
918
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
919
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
920
// }
921

922
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
923
//   int32_t tlen = 0;
924
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
925
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
926
//   return tlen;
927
// }
928
//
929
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
930
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
931
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
932
//   return (void *)buf;
933
// }
934

935
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
936
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
937
//   if (pLogNew == NULL) return pLogNew;
938
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
939
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
940
//   return pLogNew;
941
// }
942
//
943
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
944
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
945
// }
946

947
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
948
//   int32_t tlen = 0;
949
//   tlen += taosEncodeString(buf, pLog->key);
950
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
951
//   return tlen;
952
// }
953
//
954
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
955
//   buf = taosDecodeStringTo(buf, pLog->key);
956
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
957
//   return (void *)buf;
958
// }
959
//
960
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
961
//   int32_t tlen = 0;
962
//   tlen += taosEncodeString(buf, pOffset->key);
963
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
964
//   return tlen;
965
// }
966
//
967
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
968
//   buf = taosDecodeStringTo(buf, pOffset->key);
969
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
970
//   return buf;
971
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc