• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.37
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "taoserror.h"
19
#include "tunit.h"
20

21
static void *freeStreamTasks(SArray *pTaskLevel);
22

UNCOV
23
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
×
UNCOV
24
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
×
UNCOV
25
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
×
26

UNCOV
27
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
×
UNCOV
28
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
×
UNCOV
29
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->version));
×
UNCOV
30
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->totalLevel));
×
UNCOV
31
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->smaId));
×
32

UNCOV
33
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->uid));
×
UNCOV
34
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->status));
×
35

UNCOV
36
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.igExpired));
×
UNCOV
37
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.trigger));
×
UNCOV
38
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.fillHistory));
×
UNCOV
39
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.triggerParam));
×
UNCOV
40
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.watermark));
×
41

UNCOV
42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->sourceDbUid));
×
UNCOV
43
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetDbUid));
×
UNCOV
44
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sourceDb));
×
UNCOV
45
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetDb));
×
UNCOV
46
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetSTbName));
×
UNCOV
47
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetStbUid));
×
UNCOV
48
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->fixedSinkVgId));
×
49

UNCOV
50
  if (pObj->sql != NULL) {
×
UNCOV
51
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sql));
×
52
  } else {
UNCOV
53
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
54
  }
55

UNCOV
56
  if (pObj->ast != NULL) {
×
UNCOV
57
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->ast));
×
58
  } else {
UNCOV
59
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
60
  }
61

UNCOV
62
  if (pObj->physicalPlan != NULL) {
×
UNCOV
63
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->physicalPlan));
×
64
  } else {
UNCOV
65
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
66
  }
67

UNCOV
68
  int32_t sz = taosArrayGetSize(pObj->tasks);
×
UNCOV
69
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, sz));
×
UNCOV
70
  for (int32_t i = 0; i < sz; i++) {
×
UNCOV
71
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
×
UNCOV
72
    int32_t innerSz = taosArrayGetSize(pArray);
×
UNCOV
73
    TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz));
×
UNCOV
74
    for (int32_t j = 0; j < innerSz; j++) {
×
UNCOV
75
      SStreamTask *pTask = taosArrayGetP(pArray, j);
×
UNCOV
76
      if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
UNCOV
77
        pTask->ver = SSTREAM_TASK_VER;
×
78
      }
UNCOV
79
      TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask));
×
80
    }
81
  }
82

UNCOV
83
  TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema));
×
84

85
  // 3.0.20 ver =2
UNCOV
86
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointFreq));
×
UNCOV
87
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->igCheckUpdate));
×
88

89
  // 3.0.50 ver = 3
UNCOV
90
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointId));
×
UNCOV
91
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->subTableWithoutMd5));
×
92

UNCOV
93
  TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1));
×
94

UNCOV
95
  tEndEncode(pEncoder);
×
UNCOV
96
  return pEncoder->pos;
×
97
}
98

UNCOV
99
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
×
UNCOV
100
  int32_t code = 0;
×
UNCOV
101
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
×
UNCOV
102
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
×
103

UNCOV
104
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
×
UNCOV
105
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
×
UNCOV
106
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->version));
×
UNCOV
107
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->totalLevel));
×
UNCOV
108
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->smaId));
×
109

UNCOV
110
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->uid));
×
UNCOV
111
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->status));
×
112

UNCOV
113
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.igExpired));
×
UNCOV
114
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.trigger));
×
UNCOV
115
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.fillHistory));
×
UNCOV
116
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.triggerParam));
×
UNCOV
117
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.watermark));
×
118

UNCOV
119
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->sourceDbUid));
×
UNCOV
120
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetDbUid));
×
UNCOV
121
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->sourceDb));
×
UNCOV
122
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetDb));
×
UNCOV
123
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetSTbName));
×
UNCOV
124
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetStbUid));
×
UNCOV
125
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->fixedSinkVgId));
×
126

UNCOV
127
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->sql));
×
UNCOV
128
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->ast));
×
UNCOV
129
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan));
×
130

UNCOV
131
  if (pObj->tasks != NULL) {
×
132
    pObj->tasks = freeStreamTasks(pObj->tasks);
×
133
  }
134

135
  int32_t sz;
UNCOV
136
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &sz));
×
137

UNCOV
138
  if (sz != 0) {
×
UNCOV
139
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
×
UNCOV
140
    if (pObj->tasks == NULL) {
×
141
      code = terrno;
×
142
      TAOS_RETURN(code);
×
143
    }
144

UNCOV
145
    for (int32_t i = 0; i < sz; i++) {
×
146
      int32_t innerSz;
UNCOV
147
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &innerSz));
×
UNCOV
148
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
×
UNCOV
149
      if (pArray != NULL) {
×
UNCOV
150
        for (int32_t j = 0; j < innerSz; j++) {
×
UNCOV
151
          SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
×
UNCOV
152
          if (pTask == NULL) {
×
153
            taosArrayDestroy(pArray);
×
154
            code = terrno;
×
155
            TAOS_RETURN(code);
×
156
          }
UNCOV
157
          if ((code = tDecodeStreamTask(pDecoder, pTask)) < 0) {
×
158
            taosMemoryFree(pTask);
×
159
            taosArrayDestroy(pArray);
×
160
            TAOS_RETURN(code);
×
161
          }
UNCOV
162
          if (taosArrayPush(pArray, &pTask) == NULL) {
×
163
            taosMemoryFree(pTask);
×
164
            taosArrayDestroy(pArray);
×
165
            code = terrno;
×
166
            TAOS_RETURN(code);
×
167
          }
168
        }
169
      }
UNCOV
170
      if (taosArrayPush(pObj->tasks, &pArray) == NULL) {
×
171
        taosArrayDestroy(pArray);
×
172
        code = terrno;
×
173
        TAOS_RETURN(code);
×
174
      }
175
    }
176
  }
177

UNCOV
178
  TAOS_CHECK_RETURN(tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema));
×
179

180
  // 3.0.20
UNCOV
181
  if (sver >= 2) {
×
UNCOV
182
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointFreq));
×
UNCOV
183
    if (!tDecodeIsEnd(pDecoder)) {
×
UNCOV
184
      TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->igCheckUpdate));
×
185
    }
186
  }
UNCOV
187
  if (sver >= 3) {
×
UNCOV
188
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointId));
×
189
  }
190

UNCOV
191
  if (sver >= 5) {
×
UNCOV
192
    TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->subTableWithoutMd5));
×
193
  }
UNCOV
194
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->reserve));
×
195

UNCOV
196
  tEndDecode(pDecoder);
×
UNCOV
197
  TAOS_RETURN(code);
×
198
}
199

UNCOV
200
void *freeStreamTasks(SArray *pTaskLevel) {
×
UNCOV
201
  int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
×
202

UNCOV
203
  for (int32_t i = 0; i < numOfLevel; i++) {
×
UNCOV
204
    SArray *pLevel = taosArrayGetP(pTaskLevel, i);
×
UNCOV
205
    int32_t taskSz = taosArrayGetSize(pLevel);
×
UNCOV
206
    for (int32_t j = 0; j < taskSz; j++) {
×
UNCOV
207
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
×
UNCOV
208
      tFreeStreamTask(pTask);
×
209
    }
210

UNCOV
211
    taosArrayDestroy(pLevel);
×
212
  }
213

UNCOV
214
  taosArrayDestroy(pTaskLevel);
×
215

UNCOV
216
  return NULL;
×
217
}
218

UNCOV
219
void tFreeStreamObj(SStreamObj *pStream) {
×
UNCOV
220
  taosMemoryFree(pStream->sql);
×
UNCOV
221
  taosMemoryFree(pStream->ast);
×
UNCOV
222
  taosMemoryFree(pStream->physicalPlan);
×
223

UNCOV
224
  if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
×
UNCOV
225
    taosMemoryFree(pStream->outputSchema.pSchema);
×
226
  }
227

UNCOV
228
  pStream->tasks = freeStreamTasks(pStream->tasks);
×
UNCOV
229
  pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
×
230

231
  // tagSchema.pSchema
UNCOV
232
  if (pStream->tagSchema.nCols > 0) {
×
UNCOV
233
    taosMemoryFree(pStream->tagSchema.pSchema);
×
234
  }
UNCOV
235
}
×
236

UNCOV
237
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
×
UNCOV
238
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
×
UNCOV
239
  if (pVgEpNew == NULL) {
×
240
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
241
    return NULL;
×
242
  }
UNCOV
243
  pVgEpNew->vgId = pVgEp->vgId;
×
244
  //  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
UNCOV
245
  pVgEpNew->epSet = pVgEp->epSet;
×
UNCOV
246
  return pVgEpNew;
×
247
}
248

UNCOV
249
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
×
UNCOV
250
  if (pVgEp) {
×
251
    //    taosMemoryFreeClear(pVgEp->qmsg);
UNCOV
252
    taosMemoryFree(pVgEp);
×
253
  }
UNCOV
254
}
×
255

UNCOV
256
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
×
UNCOV
257
  int32_t tlen = 0;
×
UNCOV
258
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
×
259
  //  tlen += taosEncodeString(buf, pVgEp->qmsg);
UNCOV
260
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
×
UNCOV
261
  return tlen;
×
262
}
263

UNCOV
264
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
×
UNCOV
265
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
×
UNCOV
266
  if (sver == 1) {
×
267
    uint64_t size = 0;
×
268
    buf = taosDecodeVariantU64(buf, &size);
×
269
    buf = POINTER_SHIFT(buf, size);
×
270
  }
UNCOV
271
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
×
UNCOV
272
  return (void *)buf;
×
273
}
274

UNCOV
275
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
×
276

UNCOV
277
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
×
278
                           SMqConsumerObj **ppConsumer) {
UNCOV
279
  int32_t         code = 0;
×
UNCOV
280
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
×
UNCOV
281
  if (pConsumer == NULL) {
×
282
    code = terrno;
×
283
    goto END;
×
284
  }
285

UNCOV
286
  pConsumer->consumerId = consumerId;
×
UNCOV
287
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
×
288

UNCOV
289
  pConsumer->epoch = 0;
×
UNCOV
290
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
×
UNCOV
291
  pConsumer->hbStatus = 0;
×
UNCOV
292
  pConsumer->pollStatus = 0;
×
293

UNCOV
294
  taosInitRWLatch(&pConsumer->lock);
×
UNCOV
295
  pConsumer->createTime = taosGetTimestampMs();
×
UNCOV
296
  pConsumer->updateType = updateType;
×
297

UNCOV
298
  if (updateType == CONSUMER_ADD_REB) {
×
UNCOV
299
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
×
UNCOV
300
    if (pConsumer->rebNewTopics == NULL) {
×
301
      code = terrno;
×
302
      goto END;
×
303
    }
304

UNCOV
305
    char *topicTmp = taosStrdup(topic);
×
UNCOV
306
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
×
307
      code = terrno;
×
308
      goto END;
×
309
    }
UNCOV
310
  } else if (updateType == CONSUMER_REMOVE_REB) {
×
UNCOV
311
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
×
UNCOV
312
    if (pConsumer->rebRemovedTopics == NULL) {
×
313
      code = terrno;
×
314
      goto END;
×
315
    }
UNCOV
316
    char *topicTmp = taosStrdup(topic);
×
UNCOV
317
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
×
318
      code = terrno;
×
319
      goto END;
×
320
    }
UNCOV
321
  } else if (updateType == CONSUMER_INSERT_SUB) {
×
UNCOV
322
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
×
UNCOV
323
    pConsumer->withTbName = subscribe->withTbName;
×
UNCOV
324
    pConsumer->autoCommit = subscribe->autoCommit;
×
UNCOV
325
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
×
UNCOV
326
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
×
UNCOV
327
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
×
UNCOV
328
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
×
UNCOV
329
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
×
UNCOV
330
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
×
331

UNCOV
332
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
×
UNCOV
333
    if (pConsumer->rebNewTopics == NULL) {
×
334
      code = terrno;
×
335
      goto END;
×
336
    }
UNCOV
337
    pConsumer->assignedTopics = subscribe->topicNames;
×
UNCOV
338
    subscribe->topicNames = NULL;
×
UNCOV
339
  } else if (updateType == CONSUMER_UPDATE_SUB) {
×
UNCOV
340
    pConsumer->assignedTopics = subscribe->topicNames;
×
UNCOV
341
    subscribe->topicNames = NULL;
×
342
  }
343

UNCOV
344
  *ppConsumer = pConsumer;
×
UNCOV
345
  return 0;
×
346

347
END:
×
348
  tDeleteSMqConsumerObj(pConsumer);
×
349
  return code;
×
350
}
351

UNCOV
352
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
×
UNCOV
353
  if (pConsumer == NULL) return;
×
UNCOV
354
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
×
UNCOV
355
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
×
UNCOV
356
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
×
UNCOV
357
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
×
358
}
359

UNCOV
360
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
×
UNCOV
361
  tClearSMqConsumerObj(pConsumer);
×
UNCOV
362
  taosMemoryFree(pConsumer);
×
UNCOV
363
}
×
364

UNCOV
365
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
×
UNCOV
366
  int32_t tlen = 0;
×
367
  int32_t sz;
UNCOV
368
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
×
UNCOV
369
  tlen += taosEncodeString(buf, pConsumer->clientId);
×
UNCOV
370
  tlen += taosEncodeString(buf, pConsumer->cgroup);
×
UNCOV
371
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
×
UNCOV
372
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
×
UNCOV
373
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
×
374

UNCOV
375
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
×
UNCOV
376
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
×
UNCOV
377
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
×
UNCOV
378
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
×
UNCOV
379
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
×
380

381
  // current topics
UNCOV
382
  if (pConsumer->currentTopics) {
×
UNCOV
383
    sz = taosArrayGetSize(pConsumer->currentTopics);
×
UNCOV
384
    tlen += taosEncodeFixedI32(buf, sz);
×
UNCOV
385
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
386
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
UNCOV
387
      tlen += taosEncodeString(buf, topic);
×
388
    }
389
  } else {
UNCOV
390
    tlen += taosEncodeFixedI32(buf, 0);
×
391
  }
392

393
  // reb new topics
UNCOV
394
  if (pConsumer->rebNewTopics) {
×
UNCOV
395
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
×
UNCOV
396
    tlen += taosEncodeFixedI32(buf, sz);
×
UNCOV
397
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
398
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
×
UNCOV
399
      tlen += taosEncodeString(buf, topic);
×
400
    }
401
  } else {
UNCOV
402
    tlen += taosEncodeFixedI32(buf, 0);
×
403
  }
404

405
  // reb removed topics
UNCOV
406
  if (pConsumer->rebRemovedTopics) {
×
UNCOV
407
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
×
UNCOV
408
    tlen += taosEncodeFixedI32(buf, sz);
×
UNCOV
409
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
410
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
×
UNCOV
411
      tlen += taosEncodeString(buf, topic);
×
412
    }
413
  } else {
UNCOV
414
    tlen += taosEncodeFixedI32(buf, 0);
×
415
  }
416

417
  // lost topics
UNCOV
418
  if (pConsumer->assignedTopics) {
×
UNCOV
419
    sz = taosArrayGetSize(pConsumer->assignedTopics);
×
UNCOV
420
    tlen += taosEncodeFixedI32(buf, sz);
×
UNCOV
421
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
422
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
×
UNCOV
423
      tlen += taosEncodeString(buf, topic);
×
424
    }
425
  } else {
UNCOV
426
    tlen += taosEncodeFixedI32(buf, 0);
×
427
  }
428

UNCOV
429
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
×
UNCOV
430
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
×
UNCOV
431
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
×
UNCOV
432
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
×
UNCOV
433
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
×
UNCOV
434
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
×
UNCOV
435
  tlen += taosEncodeString(buf, pConsumer->user);
×
UNCOV
436
  tlen += taosEncodeString(buf, pConsumer->fqdn);
×
UNCOV
437
  return tlen;
×
438
}
439

UNCOV
440
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
×
441
  int32_t sz;
UNCOV
442
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
×
UNCOV
443
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
×
UNCOV
444
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
×
UNCOV
445
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
×
UNCOV
446
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
×
UNCOV
447
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
×
448

UNCOV
449
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
×
UNCOV
450
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
×
UNCOV
451
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
×
UNCOV
452
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
×
UNCOV
453
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
×
454

455
  // current topics
UNCOV
456
  buf = taosDecodeFixedI32(buf, &sz);
×
UNCOV
457
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
×
UNCOV
458
  if (pConsumer->currentTopics == NULL) {
×
459
    return NULL;
×
460
  }
UNCOV
461
  for (int32_t i = 0; i < sz; i++) {
×
462
    char *topic;
UNCOV
463
    buf = taosDecodeString(buf, &topic);
×
UNCOV
464
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
×
465
      return NULL;
×
466
    }
467
  }
468

469
  // reb new topics
UNCOV
470
  buf = taosDecodeFixedI32(buf, &sz);
×
UNCOV
471
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
×
UNCOV
472
  for (int32_t i = 0; i < sz; i++) {
×
473
    char *topic;
UNCOV
474
    buf = taosDecodeString(buf, &topic);
×
UNCOV
475
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
×
476
      return NULL;
×
477
    }
478
  }
479

480
  // reb removed topics
UNCOV
481
  buf = taosDecodeFixedI32(buf, &sz);
×
UNCOV
482
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
×
UNCOV
483
  for (int32_t i = 0; i < sz; i++) {
×
484
    char *topic;
UNCOV
485
    buf = taosDecodeString(buf, &topic);
×
UNCOV
486
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
×
487
      return NULL;
×
488
    }
489
  }
490

491
  // reb removed topics
UNCOV
492
  buf = taosDecodeFixedI32(buf, &sz);
×
UNCOV
493
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
×
UNCOV
494
  for (int32_t i = 0; i < sz; i++) {
×
495
    char *topic;
UNCOV
496
    buf = taosDecodeString(buf, &topic);
×
UNCOV
497
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
×
498
      return NULL;
×
499
    }
500
  }
501

UNCOV
502
  if (sver > 1) {
×
UNCOV
503
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
×
UNCOV
504
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
×
UNCOV
505
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
×
UNCOV
506
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
×
507
  }
UNCOV
508
  if (sver > 2) {
×
UNCOV
509
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
×
UNCOV
510
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
×
UNCOV
511
    buf = taosDecodeStringTo(buf, pConsumer->user);
×
UNCOV
512
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
×
513
  } else {
514
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
515
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
516
  }
517

UNCOV
518
  return (void *)buf;
×
519
}
520

UNCOV
521
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
×
UNCOV
522
  int32_t tlen = 0;
×
UNCOV
523
  int32_t szVgs = taosArrayGetSize(offsetRows);
×
UNCOV
524
  tlen += taosEncodeFixedI32(buf, szVgs);
×
UNCOV
525
  for (int32_t j = 0; j < szVgs; ++j) {
×
UNCOV
526
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
×
UNCOV
527
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
×
UNCOV
528
    tlen += taosEncodeFixedI64(buf, offRows->rows);
×
UNCOV
529
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
×
UNCOV
530
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
×
UNCOV
531
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
×
UNCOV
532
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
×
UNCOV
533
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
×
UNCOV
534
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
×
535
    } else {
536
      // do nothing
537
    }
UNCOV
538
    tlen += taosEncodeFixedI64(buf, offRows->ever);
×
539
  }
540

UNCOV
541
  return tlen;
×
542
}
543

UNCOV
544
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
×
UNCOV
545
  int32_t tlen = 0;
×
UNCOV
546
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
×
UNCOV
547
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
×
548

UNCOV
549
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
×
550
}
551

UNCOV
552
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
×
UNCOV
553
  int32_t szVgs = 0;
×
UNCOV
554
  buf = taosDecodeFixedI32(buf, &szVgs);
×
UNCOV
555
  if (szVgs > 0) {
×
UNCOV
556
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
×
UNCOV
557
    if (NULL == *offsetRows) return NULL;
×
UNCOV
558
    for (int32_t j = 0; j < szVgs; ++j) {
×
UNCOV
559
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
×
UNCOV
560
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
×
UNCOV
561
      buf = taosDecodeFixedI64(buf, &offRows->rows);
×
UNCOV
562
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
×
UNCOV
563
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
×
UNCOV
564
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
×
UNCOV
565
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
×
UNCOV
566
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
×
UNCOV
567
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
×
568
      } else {
569
        // do nothing
570
      }
UNCOV
571
      if (sver > 2) {
×
UNCOV
572
        buf = taosDecodeFixedI64(buf, &offRows->ever);
×
573
      }
574
    }
575
  }
UNCOV
576
  return (void *)buf;
×
577
}
578

UNCOV
579
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
×
UNCOV
580
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
×
UNCOV
581
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
×
UNCOV
582
  if (sver > 1) {
×
UNCOV
583
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
×
584
  }
585

UNCOV
586
  return (void *)buf;
×
587
}
588

UNCOV
589
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
×
UNCOV
590
  int32_t          code = 0;
×
UNCOV
591
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
×
UNCOV
592
  MND_TMQ_NULL_CHECK(pSubObj);
×
593

UNCOV
594
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
×
UNCOV
595
  taosInitRWLatch(&pSubObj->lock);
×
UNCOV
596
  pSubObj->vgNum = 0;
×
UNCOV
597
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
598
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
×
UNCOV
599
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
600
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
×
UNCOV
601
  if (ppSub) {
×
UNCOV
602
    *ppSub = pSubObj;
×
603
  }
UNCOV
604
  return code;
×
605

606
END:
×
607
  taosMemoryFree(pSubObj);
×
608
  return code;
×
609
}
610

UNCOV
611
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
×
UNCOV
612
  int32_t          code = 0;
×
UNCOV
613
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
×
UNCOV
614
  if (pSubNew == NULL) {
×
615
    code = terrno;
×
616
    goto END;
×
617
  }
UNCOV
618
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
×
UNCOV
619
  taosInitRWLatch(&pSubNew->lock);
×
620

UNCOV
621
  pSubNew->dbUid = pSub->dbUid;
×
UNCOV
622
  pSubNew->stbUid = pSub->stbUid;
×
UNCOV
623
  pSubNew->subType = pSub->subType;
×
UNCOV
624
  pSubNew->withMeta = pSub->withMeta;
×
625

UNCOV
626
  pSubNew->vgNum = pSub->vgNum;
×
UNCOV
627
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
628

UNCOV
629
  void          *pIter = NULL;
×
UNCOV
630
  SMqConsumerEp *pConsumerEp = NULL;
×
UNCOV
631
  while (1) {
×
UNCOV
632
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
633
    if (pIter == NULL) break;
×
UNCOV
634
    pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
635
    SMqConsumerEp newEp = {
×
UNCOV
636
        .consumerId = pConsumerEp->consumerId,
×
UNCOV
637
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
×
638
    };
UNCOV
639
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
×
640
                            sizeof(SMqConsumerEp))) != 0)
641
      goto END;
×
642
  }
UNCOV
643
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
×
UNCOV
644
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
×
UNCOV
645
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
×
UNCOV
646
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
×
UNCOV
647
  if (ppSub) {
×
UNCOV
648
    *ppSub = pSubNew;
×
649
  }
650
END:
×
UNCOV
651
  return code;
×
652
}
653

UNCOV
654
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
×
UNCOV
655
  if (pSub == NULL) return;
×
UNCOV
656
  void *pIter = NULL;
×
UNCOV
657
  while (1) {
×
UNCOV
658
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
659
    if (pIter == NULL) break;
×
UNCOV
660
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
661
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
×
UNCOV
662
    taosArrayDestroy(pConsumerEp->offsetRows);
×
663
  }
UNCOV
664
  taosHashCleanup(pSub->consumerHash);
×
UNCOV
665
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
×
UNCOV
666
  taosMemoryFreeClear(pSub->qmsg);
×
UNCOV
667
  taosArrayDestroy(pSub->offsetRows);
×
668
}
669

UNCOV
670
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
×
UNCOV
671
  int32_t tlen = 0;
×
UNCOV
672
  tlen += taosEncodeString(buf, pSub->key);
×
UNCOV
673
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
×
UNCOV
674
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
×
UNCOV
675
  tlen += taosEncodeFixedI8(buf, pSub->subType);
×
UNCOV
676
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
×
UNCOV
677
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
×
678

UNCOV
679
  void   *pIter = NULL;
×
UNCOV
680
  int32_t sz = taosHashGetSize(pSub->consumerHash);
×
UNCOV
681
  tlen += taosEncodeFixedI32(buf, sz);
×
682

UNCOV
683
  int32_t cnt = 0;
×
UNCOV
684
  while (1) {
×
UNCOV
685
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
686
    if (pIter == NULL) break;
×
UNCOV
687
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
688
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
×
UNCOV
689
    cnt++;
×
690
  }
UNCOV
691
  if (cnt != sz) return -1;
×
UNCOV
692
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
×
UNCOV
693
  tlen += taosEncodeString(buf, pSub->dbName);
×
694

UNCOV
695
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
×
UNCOV
696
  tlen += taosEncodeString(buf, pSub->qmsg);
×
UNCOV
697
  return tlen;
×
698
}
699

UNCOV
700
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
×
701
  //
UNCOV
702
  buf = taosDecodeStringTo(buf, pSub->key);
×
UNCOV
703
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
×
UNCOV
704
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
×
UNCOV
705
  buf = taosDecodeFixedI8(buf, &pSub->subType);
×
UNCOV
706
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
×
UNCOV
707
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
×
708

709
  int32_t sz;
UNCOV
710
  buf = taosDecodeFixedI32(buf, &sz);
×
711

UNCOV
712
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
UNCOV
713
  for (int32_t i = 0; i < sz; i++) {
×
UNCOV
714
    SMqConsumerEp consumerEp = {0};
×
UNCOV
715
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
×
UNCOV
716
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
×
717
        0)
718
      return NULL;
×
719
  }
720

UNCOV
721
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
×
UNCOV
722
  buf = taosDecodeStringTo(buf, pSub->dbName);
×
723

UNCOV
724
  if (sver > 1) {
×
UNCOV
725
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
×
UNCOV
726
    buf = taosDecodeString(buf, &pSub->qmsg);
×
727
  } else {
728
    pSub->qmsg = taosStrdup("");
×
729
  }
UNCOV
730
  return (void *)buf;
×
731
}
732

733
SConfigObj *mndInitConfigObj(SConfigItem *pItem) {
1,034✔
734
  SConfigObj *pObj = taosMemoryCalloc(1, sizeof(SConfigObj));
1,034!
735
  if (pObj == NULL) {
1,034!
736
    return NULL;
×
737
  }
738
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
1,034✔
739
  pObj->dtype = pItem->dtype;
1,034✔
740
  switch (pItem->dtype) {
1,034!
741
    case CFG_DTYPE_NONE:
×
742
      break;
×
743
    case CFG_DTYPE_BOOL:
198✔
744
      pObj->bval = pItem->bval;
198✔
745
      break;
198✔
746
    case CFG_DTYPE_INT32:
550✔
747
      pObj->i32 = pItem->i32;
550✔
748
      break;
550✔
749
    case CFG_DTYPE_INT64:
99✔
750
      pObj->i64 = pItem->i64;
99✔
751
      break;
99✔
752
    case CFG_DTYPE_FLOAT:
33✔
753
    case CFG_DTYPE_DOUBLE:
754
      pObj->fval = pItem->fval;
33✔
755
      break;
33✔
756
    case CFG_DTYPE_STRING:
154✔
757
    case CFG_DTYPE_DIR:
758
    case CFG_DTYPE_LOCALE:
759
    case CFG_DTYPE_CHARSET:
760
    case CFG_DTYPE_TIMEZONE:
761
      pObj->str = taosStrdup(pItem->str);
154!
762
      if (pObj->str == NULL) {
154!
763
        taosMemoryFree(pObj);
×
764
        return NULL;
×
765
      }
766
      break;
154✔
767
  }
768
  return pObj;
1,034✔
769
}
770

771
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
12✔
772
  int32_t code = 0;
12✔
773
  switch (pObjNew->dtype) {
12!
UNCOV
774
    case CFG_DTYPE_BOOL: {
×
UNCOV
775
      bool tmp = false;
×
UNCOV
776
      if (strcasecmp(value, "true") == 0) {
×
777
        tmp = true;
×
778
      }
UNCOV
779
      if (taosStr2Int32(value, NULL, 10) > 0) {
×
UNCOV
780
        tmp = true;
×
781
      }
UNCOV
782
      pObjNew->bval = tmp;
×
UNCOV
783
      break;
×
784
    }
785
    case CFG_DTYPE_INT32: {
2✔
786
      int32_t ival;
787
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
2!
788
      pObjNew->i32 = ival;
2✔
789
      break;
2✔
790
    }
UNCOV
791
    case CFG_DTYPE_INT64: {
×
792
      int64_t ival;
UNCOV
793
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
×
UNCOV
794
      pObjNew->i64 = ival;
×
UNCOV
795
      break;
×
796
    }
UNCOV
797
    case CFG_DTYPE_FLOAT:
×
798
    case CFG_DTYPE_DOUBLE: {
UNCOV
799
      float dval = 0;
×
UNCOV
800
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
UNCOV
801
      pObjNew->fval = dval;
×
UNCOV
802
      break;
×
803
    }
804
    case CFG_DTYPE_DIR:
10✔
805
    case CFG_DTYPE_TIMEZONE:
806
    case CFG_DTYPE_CHARSET:
807
    case CFG_DTYPE_LOCALE:
808
    case CFG_DTYPE_STRING: {
809
      pObjNew->str = taosStrdup(value);
10!
810
      if (pObjNew->str == NULL) {
10!
811
        code = terrno;
×
812
        return code;
×
813
      }
814
      break;
10✔
815
    }
816
    case CFG_DTYPE_NONE:
×
817
      break;
×
818
    default:
×
819
      code = TSDB_CODE_INVALID_CFG;
×
820
      break;
×
821
  }
822
  return code;
12✔
823
}
824

825
SConfigObj *mndInitConfigVersion() {
12✔
826
  SConfigObj *pObj = taosMemoryCalloc(1, sizeof(SConfigObj));
12!
827
  if (pObj == NULL) {
12!
828
    return NULL;
×
829
  }
830
  tstrncpy(pObj->name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
12✔
831
  pObj->dtype = CFG_DTYPE_INT32;
12✔
832
  pObj->i32 = 0;
12✔
833
  return pObj;
12✔
834
}
835

836
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
6,328✔
837
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
6,328!
838
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
12,656!
839

840
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
12,656!
841
  switch (pObj->dtype) {
6,328!
842
    case CFG_DTYPE_BOOL:
1,188✔
843
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
2,376!
844
      break;
1,188✔
845
    case CFG_DTYPE_INT32:
3,404✔
846
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
6,808!
847
      break;
3,404✔
848
    case CFG_DTYPE_INT64:
594✔
849
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
1,188!
850
      break;
594✔
851
    case CFG_DTYPE_FLOAT:
198✔
852
    case CFG_DTYPE_DOUBLE:
853
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
396!
854
      break;
198✔
855
    case CFG_DTYPE_STRING:
944✔
856
    case CFG_DTYPE_DIR:
857
    case CFG_DTYPE_LOCALE:
858
    case CFG_DTYPE_CHARSET:
859
    case CFG_DTYPE_TIMEZONE:
860
      if (pObj->str != NULL) {
944!
861
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
1,888!
862
      } else {
863
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
864
      }
865
      break;
944✔
866
    default:
×
867
      break;
×
868
  }
869
  tEndEncode(pEncoder);
6,328✔
870
  return pEncoder->pos;
6,328✔
871
}
872

873
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
1,165✔
874
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
1,165!
875
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
1,165!
876
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
2,330!
877
  switch (pObj->dtype) {
1,165!
878
    case CFG_DTYPE_NONE:
×
879
      break;
×
880
    case CFG_DTYPE_BOOL:
216✔
881
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
216!
882
      break;
216✔
883
    case CFG_DTYPE_INT32:
627✔
884
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
1,254!
885
      break;
627✔
886
    case CFG_DTYPE_INT64:
108✔
887
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
216!
888
      break;
108✔
889
    case CFG_DTYPE_FLOAT:
36✔
890
    case CFG_DTYPE_DOUBLE:
891
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
72!
892
      break;
36✔
893
    case CFG_DTYPE_STRING:
178✔
894
    case CFG_DTYPE_DIR:
895
    case CFG_DTYPE_LOCALE:
896
    case CFG_DTYPE_CHARSET:
897
    case CFG_DTYPE_TIMEZONE:
898
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
356!
899
      break;
178✔
900
  }
901
  tEndDecode(pDecoder);
1,165✔
902
  TAOS_RETURN(TSDB_CODE_SUCCESS);
1,165✔
903
}
904

905
void tFreeSConfigObj(SConfigObj *obj) {
2,235✔
906
  if (obj == NULL) {
2,235!
907
    return;
×
908
  }
909
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
2,235!
910
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
1,939✔
911
    taosMemoryFree(obj->str);
342!
912
  }
913
}
914

915
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
916
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
917
//   if (pEntryNew == NULL) return NULL;
918
//   pEntryNew->epoch = pEntry->epoch;
919
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
920
//   return pEntryNew;
921
// }
922
//
923
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
924
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
925
// }
926

927
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
928
//   int32_t tlen = 0;
929
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
930
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
931
//   return tlen;
932
// }
933
//
934
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
935
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
936
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
937
//   return (void *)buf;
938
// }
939

940
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
941
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
942
//   if (pLogNew == NULL) return pLogNew;
943
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
944
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
945
//   return pLogNew;
946
// }
947
//
948
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
949
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
950
// }
951

952
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
953
//   int32_t tlen = 0;
954
//   tlen += taosEncodeString(buf, pLog->key);
955
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
956
//   return tlen;
957
// }
958
//
959
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
960
//   buf = taosDecodeStringTo(buf, pLog->key);
961
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
962
//   return (void *)buf;
963
// }
964
//
965
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
966
//   int32_t tlen = 0;
967
//   tlen += taosEncodeString(buf, pOffset->key);
968
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
969
//   return tlen;
970
// }
971
//
972
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
973
//   buf = taosDecodeStringTo(buf, pOffset->key);
974
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
975
//   return buf;
976
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc