• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.93
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "taoserror.h"
19
#include "tunit.h"
20

21
#ifdef USE_STREAM
22
static void *freeStreamTasks(SArray *pTaskLevel);
23

24
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
2✔
25
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
2!
26
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
4!
27

28
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
4!
29
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
4!
30
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->version));
4!
31
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->totalLevel));
4!
32
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->smaId));
4!
33

34
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->uid));
4!
35
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->status));
4!
36

37
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.igExpired));
4!
38
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.trigger));
4!
39
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.fillHistory));
4!
40
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.triggerParam));
4!
41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.watermark));
4!
42

43
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->sourceDbUid));
4!
44
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetDbUid));
4!
45
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sourceDb));
4!
46
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetDb));
4!
47
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetSTbName));
4!
48
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetStbUid));
4!
49
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->fixedSinkVgId));
4!
50

51
  if (pObj->sql != NULL) {
2!
UNCOV
52
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sql));
×
53
  } else {
54
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
2!
55
  }
56

57
  if (pObj->ast != NULL) {
2!
UNCOV
58
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->ast));
×
59
  } else {
60
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
2!
61
  }
62

63
  if (pObj->physicalPlan != NULL) {
2!
UNCOV
64
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->physicalPlan));
×
65
  } else {
66
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
2!
67
  }
68

69
  int32_t sz = taosArrayGetSize(pObj->tasks);
2✔
70
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, sz));
2!
71
  for (int32_t i = 0; i < sz; i++) {
4✔
72
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
2✔
73
    int32_t innerSz = taosArrayGetSize(pArray);
2✔
74
    TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz));
2!
75
    for (int32_t j = 0; j < innerSz; j++) {
4✔
76
      SStreamTask *pTask = taosArrayGetP(pArray, j);
2✔
77
      if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
2✔
78
        pTask->ver = SSTREAM_TASK_VER;
1✔
79
      }
80
      TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask));
2!
81
    }
82
  }
83

84
  TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema));
4!
85

86
  // 3.0.20 ver =2
87
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointFreq));
4!
88
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->igCheckUpdate));
4!
89

90
  // 3.0.50 ver = 3
91
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointId));
4!
92
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->subTableWithoutMd5));
4!
93

94
  TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1));
4!
95

96
  tEndEncode(pEncoder);
2✔
97
  return pEncoder->pos;
2✔
98
}
99

UNCOV
100
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
×
UNCOV
101
  int32_t code = 0;
×
UNCOV
102
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
×
UNCOV
103
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
×
104

UNCOV
105
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
×
UNCOV
106
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
×
UNCOV
107
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->version));
×
UNCOV
108
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->totalLevel));
×
UNCOV
109
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->smaId));
×
110

UNCOV
111
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->uid));
×
UNCOV
112
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->status));
×
113

UNCOV
114
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.igExpired));
×
UNCOV
115
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.trigger));
×
UNCOV
116
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.fillHistory));
×
UNCOV
117
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.triggerParam));
×
UNCOV
118
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.watermark));
×
119

UNCOV
120
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->sourceDbUid));
×
UNCOV
121
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetDbUid));
×
UNCOV
122
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->sourceDb));
×
UNCOV
123
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetDb));
×
UNCOV
124
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetSTbName));
×
UNCOV
125
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetStbUid));
×
UNCOV
126
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->fixedSinkVgId));
×
127

UNCOV
128
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->sql));
×
UNCOV
129
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->ast));
×
UNCOV
130
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan));
×
131

132
  if (pObj->tasks != NULL) {
×
UNCOV
133
    pObj->tasks = freeStreamTasks(pObj->tasks);
×
134
  }
135

136
  int32_t sz;
UNCOV
137
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &sz));
×
138

UNCOV
139
  if (sz != 0) {
×
UNCOV
140
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
×
141
    if (pObj->tasks == NULL) {
×
142
      code = terrno;
×
UNCOV
143
      TAOS_RETURN(code);
×
144
    }
145

UNCOV
146
    for (int32_t i = 0; i < sz; i++) {
×
147
      int32_t innerSz;
UNCOV
148
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &innerSz));
×
UNCOV
149
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
×
UNCOV
150
      if (pArray != NULL) {
×
UNCOV
151
        for (int32_t j = 0; j < innerSz; j++) {
×
UNCOV
152
          SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
×
153
          if (pTask == NULL) {
×
154
            taosArrayDestroy(pArray);
×
155
            code = terrno;
×
UNCOV
156
            TAOS_RETURN(code);
×
157
          }
158
          if ((code = tDecodeStreamTask(pDecoder, pTask)) < 0) {
×
159
            taosMemoryFree(pTask);
×
160
            taosArrayDestroy(pArray);
×
UNCOV
161
            TAOS_RETURN(code);
×
162
          }
163
          if (taosArrayPush(pArray, &pTask) == NULL) {
×
164
            taosMemoryFree(pTask);
×
165
            taosArrayDestroy(pArray);
×
166
            code = terrno;
×
UNCOV
167
            TAOS_RETURN(code);
×
168
          }
169
        }
170
      }
171
      if (taosArrayPush(pObj->tasks, &pArray) == NULL) {
×
172
        taosArrayDestroy(pArray);
×
173
        code = terrno;
×
UNCOV
174
        TAOS_RETURN(code);
×
175
      }
176
    }
177
  }
178

UNCOV
179
  TAOS_CHECK_RETURN(tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema));
×
180

181
  // 3.0.20
UNCOV
182
  if (sver >= 2) {
×
UNCOV
183
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointFreq));
×
UNCOV
184
    if (!tDecodeIsEnd(pDecoder)) {
×
UNCOV
185
      TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->igCheckUpdate));
×
186
    }
187
  }
UNCOV
188
  if (sver >= 3) {
×
UNCOV
189
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointId));
×
190
  }
191

UNCOV
192
  if (sver >= 5) {
×
UNCOV
193
    TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->subTableWithoutMd5));
×
194
  }
UNCOV
195
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->reserve));
×
196

UNCOV
197
  tEndDecode(pDecoder);
×
UNCOV
198
  TAOS_RETURN(code);
×
199
}
200

201
void *freeStreamTasks(SArray *pTaskLevel) {
2✔
202
  if (pTaskLevel == NULL) return NULL;
2!
203
  int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
2✔
204

205
  for (int32_t i = 0; i < numOfLevel; i++) {
3✔
206
    SArray *pLevel = taosArrayGetP(pTaskLevel, i);
1✔
207
    int32_t taskSz = taosArrayGetSize(pLevel);
1✔
208
    for (int32_t j = 0; j < taskSz; j++) {
2✔
209
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
1✔
210
      tFreeStreamTask(pTask);
1✔
211
    }
212

213
    taosArrayDestroy(pLevel);
1✔
214
  }
215

216
  taosArrayDestroy(pTaskLevel);
2✔
217

218
  return NULL;
2✔
219
}
220

221
void tFreeStreamObj(SStreamObj *pStream) {
1✔
222
  taosMemoryFree(pStream->sql);
1!
223
  taosMemoryFree(pStream->ast);
1!
224
  taosMemoryFree(pStream->physicalPlan);
1!
225

226
  if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
1!
UNCOV
227
    taosMemoryFree(pStream->outputSchema.pSchema);
×
228
  }
229

230
  pStream->tasks = freeStreamTasks(pStream->tasks);
1✔
231
  pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
1✔
232

233
  // tagSchema.pSchema
234
  if (pStream->tagSchema.nCols > 0) {
1!
UNCOV
235
    taosMemoryFree(pStream->tagSchema.pSchema);
×
236
  }
237
}
1✔
238
#endif
239

240
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
×
241
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
×
UNCOV
242
  if (pVgEpNew == NULL) {
×
UNCOV
243
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
244
    return NULL;
×
245
  }
UNCOV
246
  pVgEpNew->vgId = pVgEp->vgId;
×
247
  //  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
UNCOV
248
  pVgEpNew->epSet = pVgEp->epSet;
×
UNCOV
249
  return pVgEpNew;
×
250
}
251

UNCOV
252
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
×
UNCOV
253
  if (pVgEp) {
×
254
    //    taosMemoryFreeClear(pVgEp->qmsg);
UNCOV
255
    taosMemoryFree(pVgEp);
×
256
  }
UNCOV
257
}
×
258

UNCOV
259
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
×
UNCOV
260
  int32_t tlen = 0;
×
UNCOV
261
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
×
262
  //  tlen += taosEncodeString(buf, pVgEp->qmsg);
UNCOV
263
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
×
UNCOV
264
  return tlen;
×
265
}
266

267
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
×
268
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
×
269
  if (sver == 1) {
×
UNCOV
270
    uint64_t size = 0;
×
UNCOV
271
    buf = taosDecodeVariantU64(buf, &size);
×
UNCOV
272
    buf = POINTER_SHIFT(buf, size);
×
273
  }
UNCOV
274
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
×
UNCOV
275
  return (void *)buf;
×
276
}
277

UNCOV
278
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
×
279

UNCOV
280
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
×
281
                           SMqConsumerObj **ppConsumer) {
282
  int32_t         code = 0;
×
283
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
×
UNCOV
284
  if (pConsumer == NULL) {
×
UNCOV
285
    code = terrno;
×
UNCOV
286
    goto END;
×
287
  }
288

UNCOV
289
  pConsumer->consumerId = consumerId;
×
UNCOV
290
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
×
291

UNCOV
292
  pConsumer->epoch = 0;
×
UNCOV
293
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
×
UNCOV
294
  pConsumer->hbStatus = 0;
×
UNCOV
295
  pConsumer->pollStatus = 0;
×
296

UNCOV
297
  taosInitRWLatch(&pConsumer->lock);
×
UNCOV
298
  pConsumer->createTime = taosGetTimestampMs();
×
UNCOV
299
  pConsumer->updateType = updateType;
×
300

301
  if (updateType == CONSUMER_ADD_REB) {
×
302
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
×
UNCOV
303
    if (pConsumer->rebNewTopics == NULL) {
×
UNCOV
304
      code = terrno;
×
UNCOV
305
      goto END;
×
306
    }
307

308
    char *topicTmp = taosStrdup(topic);
×
UNCOV
309
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
×
UNCOV
310
      code = terrno;
×
UNCOV
311
      goto END;
×
312
    }
313
  } else if (updateType == CONSUMER_REMOVE_REB) {
×
314
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
×
UNCOV
315
    if (pConsumer->rebRemovedTopics == NULL) {
×
UNCOV
316
      code = terrno;
×
UNCOV
317
      goto END;
×
318
    }
319
    char *topicTmp = taosStrdup(topic);
×
UNCOV
320
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
×
UNCOV
321
      code = terrno;
×
UNCOV
322
      goto END;
×
323
    }
UNCOV
324
  } else if (updateType == CONSUMER_INSERT_SUB) {
×
UNCOV
325
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
×
UNCOV
326
    pConsumer->withTbName = subscribe->withTbName;
×
UNCOV
327
    pConsumer->autoCommit = subscribe->autoCommit;
×
UNCOV
328
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
×
UNCOV
329
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
×
UNCOV
330
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
×
UNCOV
331
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
×
UNCOV
332
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
×
UNCOV
333
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
×
334

335
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
×
UNCOV
336
    if (pConsumer->rebNewTopics == NULL) {
×
UNCOV
337
      code = terrno;
×
UNCOV
338
      goto END;
×
339
    }
UNCOV
340
    pConsumer->assignedTopics = subscribe->topicNames;
×
UNCOV
341
    subscribe->topicNames = NULL;
×
UNCOV
342
  } else if (updateType == CONSUMER_UPDATE_SUB) {
×
UNCOV
343
    pConsumer->assignedTopics = subscribe->topicNames;
×
UNCOV
344
    subscribe->topicNames = NULL;
×
345
  }
346

347
  *ppConsumer = pConsumer;
×
348
  return 0;
×
349

UNCOV
350
END:
×
UNCOV
351
  tDeleteSMqConsumerObj(pConsumer);
×
UNCOV
352
  return code;
×
353
}
354

UNCOV
355
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
×
UNCOV
356
  if (pConsumer == NULL) return;
×
UNCOV
357
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
×
UNCOV
358
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
×
UNCOV
359
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
×
UNCOV
360
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
×
361
}
362

UNCOV
363
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
×
UNCOV
364
  tClearSMqConsumerObj(pConsumer);
×
UNCOV
365
  taosMemoryFree(pConsumer);
×
UNCOV
366
}
×
367

UNCOV
368
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
×
UNCOV
369
  int32_t tlen = 0;
×
370
  int32_t sz;
UNCOV
371
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
×
UNCOV
372
  tlen += taosEncodeString(buf, pConsumer->clientId);
×
UNCOV
373
  tlen += taosEncodeString(buf, pConsumer->cgroup);
×
UNCOV
374
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
×
UNCOV
375
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
×
UNCOV
376
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
×
377

UNCOV
378
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
×
UNCOV
379
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
×
UNCOV
380
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
×
UNCOV
381
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
×
UNCOV
382
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
×
383

384
  // current topics
UNCOV
385
  if (pConsumer->currentTopics) {
×
UNCOV
386
    sz = taosArrayGetSize(pConsumer->currentTopics);
×
UNCOV
387
    tlen += taosEncodeFixedI32(buf, sz);
×
UNCOV
388
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
389
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
UNCOV
390
      tlen += taosEncodeString(buf, topic);
×
391
    }
392
  } else {
UNCOV
393
    tlen += taosEncodeFixedI32(buf, 0);
×
394
  }
395

396
  // reb new topics
UNCOV
397
  if (pConsumer->rebNewTopics) {
×
UNCOV
398
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
×
UNCOV
399
    tlen += taosEncodeFixedI32(buf, sz);
×
UNCOV
400
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
401
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
×
UNCOV
402
      tlen += taosEncodeString(buf, topic);
×
403
    }
404
  } else {
UNCOV
405
    tlen += taosEncodeFixedI32(buf, 0);
×
406
  }
407

408
  // reb removed topics
UNCOV
409
  if (pConsumer->rebRemovedTopics) {
×
UNCOV
410
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
×
UNCOV
411
    tlen += taosEncodeFixedI32(buf, sz);
×
UNCOV
412
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
413
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
×
UNCOV
414
      tlen += taosEncodeString(buf, topic);
×
415
    }
416
  } else {
UNCOV
417
    tlen += taosEncodeFixedI32(buf, 0);
×
418
  }
419

420
  // lost topics
UNCOV
421
  if (pConsumer->assignedTopics) {
×
UNCOV
422
    sz = taosArrayGetSize(pConsumer->assignedTopics);
×
UNCOV
423
    tlen += taosEncodeFixedI32(buf, sz);
×
UNCOV
424
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
425
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
×
UNCOV
426
      tlen += taosEncodeString(buf, topic);
×
427
    }
428
  } else {
UNCOV
429
    tlen += taosEncodeFixedI32(buf, 0);
×
430
  }
431

UNCOV
432
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
×
UNCOV
433
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
×
UNCOV
434
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
×
UNCOV
435
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
×
UNCOV
436
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
×
UNCOV
437
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
×
UNCOV
438
  tlen += taosEncodeString(buf, pConsumer->user);
×
UNCOV
439
  tlen += taosEncodeString(buf, pConsumer->fqdn);
×
UNCOV
440
  return tlen;
×
441
}
442

UNCOV
443
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
×
444
  int32_t sz;
UNCOV
445
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
×
UNCOV
446
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
×
UNCOV
447
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
×
UNCOV
448
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
×
UNCOV
449
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
×
UNCOV
450
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
×
451

UNCOV
452
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
×
UNCOV
453
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
×
UNCOV
454
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
×
UNCOV
455
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
×
UNCOV
456
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
×
457

458
  // current topics
459
  buf = taosDecodeFixedI32(buf, &sz);
×
UNCOV
460
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
×
UNCOV
461
  if (pConsumer->currentTopics == NULL) {
×
UNCOV
462
    return NULL;
×
463
  }
UNCOV
464
  for (int32_t i = 0; i < sz; i++) {
×
465
    char *topic;
UNCOV
466
    buf = taosDecodeString(buf, &topic);
×
UNCOV
467
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
×
UNCOV
468
      return NULL;
×
469
    }
470
  }
471

472
  // reb new topics
UNCOV
473
  buf = taosDecodeFixedI32(buf, &sz);
×
UNCOV
474
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
×
UNCOV
475
  for (int32_t i = 0; i < sz; i++) {
×
476
    char *topic;
UNCOV
477
    buf = taosDecodeString(buf, &topic);
×
UNCOV
478
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
×
UNCOV
479
      return NULL;
×
480
    }
481
  }
482

483
  // reb removed topics
UNCOV
484
  buf = taosDecodeFixedI32(buf, &sz);
×
UNCOV
485
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
×
UNCOV
486
  for (int32_t i = 0; i < sz; i++) {
×
487
    char *topic;
UNCOV
488
    buf = taosDecodeString(buf, &topic);
×
UNCOV
489
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
×
UNCOV
490
      return NULL;
×
491
    }
492
  }
493

494
  // reb removed topics
UNCOV
495
  buf = taosDecodeFixedI32(buf, &sz);
×
UNCOV
496
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
×
UNCOV
497
  for (int32_t i = 0; i < sz; i++) {
×
498
    char *topic;
UNCOV
499
    buf = taosDecodeString(buf, &topic);
×
UNCOV
500
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
×
UNCOV
501
      return NULL;
×
502
    }
503
  }
504

UNCOV
505
  if (sver > 1) {
×
UNCOV
506
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
×
UNCOV
507
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
×
UNCOV
508
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
×
UNCOV
509
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
×
510
  }
UNCOV
511
  if (sver > 2) {
×
UNCOV
512
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
×
UNCOV
513
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
×
514
    buf = taosDecodeStringTo(buf, pConsumer->user);
×
515
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
×
516
  } else {
UNCOV
517
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
UNCOV
518
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
519
  }
520

UNCOV
521
  return (void *)buf;
×
522
}
523

UNCOV
524
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
×
UNCOV
525
  int32_t tlen = 0;
×
UNCOV
526
  int32_t szVgs = taosArrayGetSize(offsetRows);
×
UNCOV
527
  tlen += taosEncodeFixedI32(buf, szVgs);
×
UNCOV
528
  for (int32_t j = 0; j < szVgs; ++j) {
×
UNCOV
529
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
×
UNCOV
530
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
×
UNCOV
531
    tlen += taosEncodeFixedI64(buf, offRows->rows);
×
UNCOV
532
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
×
UNCOV
533
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
×
UNCOV
534
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
×
UNCOV
535
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
×
UNCOV
536
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
×
UNCOV
537
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
×
538
    } else {
539
      // do nothing
540
    }
UNCOV
541
    tlen += taosEncodeFixedI64(buf, offRows->ever);
×
542
  }
543

UNCOV
544
  return tlen;
×
545
}
546

UNCOV
547
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
×
UNCOV
548
  int32_t tlen = 0;
×
UNCOV
549
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
×
UNCOV
550
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
×
551

UNCOV
552
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
×
553
}
554

UNCOV
555
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
×
UNCOV
556
  int32_t szVgs = 0;
×
UNCOV
557
  buf = taosDecodeFixedI32(buf, &szVgs);
×
UNCOV
558
  if (szVgs > 0) {
×
UNCOV
559
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
×
UNCOV
560
    if (NULL == *offsetRows) return NULL;
×
UNCOV
561
    for (int32_t j = 0; j < szVgs; ++j) {
×
UNCOV
562
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
×
UNCOV
563
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
×
UNCOV
564
      buf = taosDecodeFixedI64(buf, &offRows->rows);
×
UNCOV
565
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
×
UNCOV
566
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
×
UNCOV
567
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
×
UNCOV
568
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
×
UNCOV
569
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
×
UNCOV
570
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
×
571
      } else {
572
        // do nothing
573
      }
UNCOV
574
      if (sver > 2) {
×
UNCOV
575
        buf = taosDecodeFixedI64(buf, &offRows->ever);
×
576
      }
577
    }
578
  }
UNCOV
579
  return (void *)buf;
×
580
}
581

UNCOV
582
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
×
UNCOV
583
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
×
UNCOV
584
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
×
UNCOV
585
  if (sver > 1) {
×
UNCOV
586
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
×
587
  }
588

UNCOV
589
  return (void *)buf;
×
590
}
591

UNCOV
592
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
×
UNCOV
593
  int32_t          code = 0;
×
UNCOV
594
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
×
UNCOV
595
  MND_TMQ_NULL_CHECK(pSubObj);
×
596

UNCOV
597
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
×
UNCOV
598
  taosInitRWLatch(&pSubObj->lock);
×
UNCOV
599
  pSubObj->vgNum = 0;
×
UNCOV
600
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
601
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
×
UNCOV
602
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
603
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
×
UNCOV
604
  if (ppSub) {
×
UNCOV
605
    *ppSub = pSubObj;
×
606
  }
607
  return code;
×
608

UNCOV
609
END:
×
UNCOV
610
  taosMemoryFree(pSubObj);
×
UNCOV
611
  return code;
×
612
}
613

UNCOV
614
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
×
615
  int32_t          code = 0;
×
616
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
×
UNCOV
617
  if (pSubNew == NULL) {
×
UNCOV
618
    code = terrno;
×
UNCOV
619
    goto END;
×
620
  }
UNCOV
621
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
×
UNCOV
622
  taosInitRWLatch(&pSubNew->lock);
×
623

UNCOV
624
  pSubNew->dbUid = pSub->dbUid;
×
UNCOV
625
  pSubNew->stbUid = pSub->stbUid;
×
UNCOV
626
  pSubNew->subType = pSub->subType;
×
UNCOV
627
  pSubNew->withMeta = pSub->withMeta;
×
628

UNCOV
629
  pSubNew->vgNum = pSub->vgNum;
×
UNCOV
630
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
631

UNCOV
632
  void          *pIter = NULL;
×
UNCOV
633
  SMqConsumerEp *pConsumerEp = NULL;
×
UNCOV
634
  while (1) {
×
UNCOV
635
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
636
    if (pIter == NULL) break;
×
UNCOV
637
    pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
638
    SMqConsumerEp newEp = {
×
UNCOV
639
        .consumerId = pConsumerEp->consumerId,
×
UNCOV
640
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
×
641
    };
UNCOV
642
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
×
643
                            sizeof(SMqConsumerEp))) != 0)
UNCOV
644
      goto END;
×
645
  }
UNCOV
646
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
×
UNCOV
647
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
×
UNCOV
648
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
×
UNCOV
649
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
×
650
  if (ppSub) {
×
UNCOV
651
    *ppSub = pSubNew;
×
652
  }
UNCOV
653
END:
×
UNCOV
654
  return code;
×
655
}
656

UNCOV
657
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
×
UNCOV
658
  if (pSub == NULL) return;
×
UNCOV
659
  void *pIter = NULL;
×
UNCOV
660
  while (1) {
×
UNCOV
661
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
662
    if (pIter == NULL) break;
×
UNCOV
663
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
664
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
×
UNCOV
665
    taosArrayDestroy(pConsumerEp->offsetRows);
×
666
  }
UNCOV
667
  taosHashCleanup(pSub->consumerHash);
×
UNCOV
668
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
×
UNCOV
669
  taosMemoryFreeClear(pSub->qmsg);
×
UNCOV
670
  taosArrayDestroy(pSub->offsetRows);
×
671
}
672

UNCOV
673
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
×
UNCOV
674
  int32_t tlen = 0;
×
UNCOV
675
  tlen += taosEncodeString(buf, pSub->key);
×
UNCOV
676
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
×
UNCOV
677
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
×
UNCOV
678
  tlen += taosEncodeFixedI8(buf, pSub->subType);
×
UNCOV
679
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
×
UNCOV
680
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
×
681

UNCOV
682
  void   *pIter = NULL;
×
UNCOV
683
  int32_t sz = taosHashGetSize(pSub->consumerHash);
×
UNCOV
684
  tlen += taosEncodeFixedI32(buf, sz);
×
685

UNCOV
686
  int32_t cnt = 0;
×
UNCOV
687
  while (1) {
×
UNCOV
688
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
689
    if (pIter == NULL) break;
×
UNCOV
690
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
691
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
×
UNCOV
692
    cnt++;
×
693
  }
UNCOV
694
  if (cnt != sz) return -1;
×
UNCOV
695
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
×
UNCOV
696
  tlen += taosEncodeString(buf, pSub->dbName);
×
697

UNCOV
698
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
×
UNCOV
699
  tlen += taosEncodeString(buf, pSub->qmsg);
×
UNCOV
700
  return tlen;
×
701
}
702

UNCOV
703
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
×
704
  //
UNCOV
705
  buf = taosDecodeStringTo(buf, pSub->key);
×
UNCOV
706
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
×
UNCOV
707
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
×
UNCOV
708
  buf = taosDecodeFixedI8(buf, &pSub->subType);
×
UNCOV
709
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
×
UNCOV
710
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
×
711

712
  int32_t sz;
UNCOV
713
  buf = taosDecodeFixedI32(buf, &sz);
×
714

UNCOV
715
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
UNCOV
716
  for (int32_t i = 0; i < sz; i++) {
×
UNCOV
717
    SMqConsumerEp consumerEp = {0};
×
718
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
×
UNCOV
719
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
×
720
        0)
UNCOV
721
      return NULL;
×
722
  }
723

UNCOV
724
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
×
UNCOV
725
  buf = taosDecodeStringTo(buf, pSub->dbName);
×
726

UNCOV
727
  if (sver > 1) {
×
728
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
×
UNCOV
729
    buf = taosDecodeString(buf, &pSub->qmsg);
×
730
  } else {
UNCOV
731
    pSub->qmsg = taosStrdup("");
×
732
  }
UNCOV
733
  return (void *)buf;
×
734
}
735

UNCOV
736
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
×
737
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
×
738
  pObj->dtype = pItem->dtype;
×
UNCOV
739
  switch (pItem->dtype) {
×
UNCOV
740
    case CFG_DTYPE_NONE:
×
UNCOV
741
      break;
×
UNCOV
742
    case CFG_DTYPE_BOOL:
×
UNCOV
743
      pObj->bval = pItem->bval;
×
UNCOV
744
      break;
×
UNCOV
745
    case CFG_DTYPE_INT32:
×
UNCOV
746
      pObj->i32 = pItem->i32;
×
UNCOV
747
      break;
×
UNCOV
748
    case CFG_DTYPE_INT64:
×
UNCOV
749
      pObj->i64 = pItem->i64;
×
UNCOV
750
      break;
×
UNCOV
751
    case CFG_DTYPE_FLOAT:
×
752
    case CFG_DTYPE_DOUBLE:
UNCOV
753
      pObj->fval = pItem->fval;
×
UNCOV
754
      break;
×
UNCOV
755
    case CFG_DTYPE_STRING:
×
756
    case CFG_DTYPE_DIR:
757
    case CFG_DTYPE_LOCALE:
758
    case CFG_DTYPE_CHARSET:
759
    case CFG_DTYPE_TIMEZONE:
760
      pObj->str = taosStrdup(pItem->str);
×
UNCOV
761
      if (pObj->str == NULL) {
×
UNCOV
762
        taosMemoryFree(pObj);
×
UNCOV
763
        return TSDB_CODE_OUT_OF_MEMORY;
×
764
      }
UNCOV
765
      break;
×
766
  }
UNCOV
767
  return TSDB_CODE_SUCCESS;
×
768
}
769

UNCOV
770
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
×
UNCOV
771
  int32_t code = 0;
×
UNCOV
772
  switch (pObjNew->dtype) {
×
773
    case CFG_DTYPE_BOOL: {
×
UNCOV
774
      bool tmp = false;
×
UNCOV
775
      if (strcasecmp(value, "true") == 0) {
×
UNCOV
776
        tmp = true;
×
777
      }
UNCOV
778
      if (taosStr2Int32(value, NULL, 10) > 0) {
×
UNCOV
779
        tmp = true;
×
780
      }
UNCOV
781
      pObjNew->bval = tmp;
×
UNCOV
782
      break;
×
783
    }
UNCOV
784
    case CFG_DTYPE_INT32: {
×
785
      int32_t ival;
UNCOV
786
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
×
UNCOV
787
      pObjNew->i32 = ival;
×
UNCOV
788
      break;
×
789
    }
UNCOV
790
    case CFG_DTYPE_INT64: {
×
791
      int64_t ival;
UNCOV
792
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
×
UNCOV
793
      pObjNew->i64 = ival;
×
UNCOV
794
      break;
×
795
    }
UNCOV
796
    case CFG_DTYPE_FLOAT:
×
797
    case CFG_DTYPE_DOUBLE: {
UNCOV
798
      float dval = 0;
×
UNCOV
799
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
UNCOV
800
      pObjNew->fval = dval;
×
UNCOV
801
      break;
×
802
    }
UNCOV
803
    case CFG_DTYPE_DIR:
×
804
    case CFG_DTYPE_TIMEZONE:
805
    case CFG_DTYPE_CHARSET:
806
    case CFG_DTYPE_LOCALE:
807
    case CFG_DTYPE_STRING: {
808
      pObjNew->str = taosStrdup(value);
×
UNCOV
809
      if (pObjNew->str == NULL) {
×
UNCOV
810
        code = terrno;
×
UNCOV
811
        return code;
×
812
      }
813
      break;
×
814
    }
815
    case CFG_DTYPE_NONE:
×
816
      break;
×
UNCOV
817
    default:
×
UNCOV
818
      code = TSDB_CODE_INVALID_CFG;
×
UNCOV
819
      break;
×
820
  }
UNCOV
821
  return code;
×
822
}
823

824
SConfigObj mndInitConfigVersion() {
8✔
825
  SConfigObj obj;
826
  memset(&obj, 0, sizeof(SConfigObj));
8✔
827

828
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
8✔
829
  obj.dtype = CFG_DTYPE_INT32;
8✔
830
  obj.i32 = 0;
8✔
831
  return obj;
8✔
832
}
833

834
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
48✔
835
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
48!
836
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
96!
837

838
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
96!
839
  switch (pObj->dtype) {
48!
UNCOV
840
    case CFG_DTYPE_BOOL:
×
UNCOV
841
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
×
UNCOV
842
      break;
×
843
    case CFG_DTYPE_INT32:
48✔
844
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
96!
845
      break;
48✔
UNCOV
846
    case CFG_DTYPE_INT64:
×
UNCOV
847
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
×
UNCOV
848
      break;
×
UNCOV
849
    case CFG_DTYPE_FLOAT:
×
850
    case CFG_DTYPE_DOUBLE:
UNCOV
851
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
×
UNCOV
852
      break;
×
UNCOV
853
    case CFG_DTYPE_STRING:
×
854
    case CFG_DTYPE_DIR:
855
    case CFG_DTYPE_LOCALE:
856
    case CFG_DTYPE_CHARSET:
857
    case CFG_DTYPE_TIMEZONE:
858
      if (pObj->str != NULL) {
×
UNCOV
859
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
×
860
      } else {
861
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
862
      }
UNCOV
863
      break;
×
UNCOV
864
    default:
×
UNCOV
865
      break;
×
866
  }
867
  tEndEncode(pEncoder);
48✔
868
  return pEncoder->pos;
48✔
869
}
870

871
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
8✔
872
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
8!
873
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
8!
874
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
16!
875
  switch (pObj->dtype) {
8!
UNCOV
876
    case CFG_DTYPE_NONE:
×
UNCOV
877
      break;
×
UNCOV
878
    case CFG_DTYPE_BOOL:
×
UNCOV
879
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
×
UNCOV
880
      break;
×
881
    case CFG_DTYPE_INT32:
8✔
882
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
16!
883
      break;
8✔
UNCOV
884
    case CFG_DTYPE_INT64:
×
UNCOV
885
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
×
UNCOV
886
      break;
×
UNCOV
887
    case CFG_DTYPE_FLOAT:
×
888
    case CFG_DTYPE_DOUBLE:
UNCOV
889
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
×
UNCOV
890
      break;
×
UNCOV
891
    case CFG_DTYPE_STRING:
×
892
    case CFG_DTYPE_DIR:
893
    case CFG_DTYPE_LOCALE:
894
    case CFG_DTYPE_CHARSET:
895
    case CFG_DTYPE_TIMEZONE:
UNCOV
896
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
×
UNCOV
897
      break;
×
898
  }
899
  tEndDecode(pDecoder);
8✔
900
  TAOS_RETURN(TSDB_CODE_SUCCESS);
8✔
901
}
902

903
void tFreeSConfigObj(SConfigObj *obj) {
16✔
904
  if (obj == NULL) {
16!
UNCOV
905
    return;
×
906
  }
907
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
16!
908
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
16!
UNCOV
909
    taosMemoryFree(obj->str);
×
910
  }
911
}
912

913
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
914
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
915
//   if (pEntryNew == NULL) return NULL;
916
//   pEntryNew->epoch = pEntry->epoch;
917
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
918
//   return pEntryNew;
919
// }
920
//
921
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
922
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
923
// }
924

925
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
926
//   int32_t tlen = 0;
927
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
928
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
929
//   return tlen;
930
// }
931
//
932
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
933
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
934
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
935
//   return (void *)buf;
936
// }
937

938
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
939
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
940
//   if (pLogNew == NULL) return pLogNew;
941
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
942
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
943
//   return pLogNew;
944
// }
945
//
946
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
947
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
948
// }
949

950
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
951
//   int32_t tlen = 0;
952
//   tlen += taosEncodeString(buf, pLog->key);
953
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
954
//   return tlen;
955
// }
956
//
957
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
958
//   buf = taosDecodeStringTo(buf, pLog->key);
959
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
960
//   return (void *)buf;
961
// }
962
//
963
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
964
//   int32_t tlen = 0;
965
//   tlen += taosEncodeString(buf, pOffset->key);
966
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
967
//   return tlen;
968
// }
969
//
970
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
971
//   buf = taosDecodeStringTo(buf, pOffset->key);
972
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
973
//   return buf;
974
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc