• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3526

10 Nov 2024 03:50AM UTC coverage: 60.225% (-0.6%) from 60.818%
#3526

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

117031 of 249004 branches covered (47.0%)

Branch coverage included in aggregate %.

130 of 169 new or added lines in 23 files covered. (76.92%)

4149 existing lines in 176 files now uncovered.

197577 of 273386 relevant lines covered (72.27%)

5840219.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.4
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndDef.h"
17
#include "mndConsumer.h"
18
#include "taoserror.h"
19

20
static void *freeStreamTasks(SArray *pTaskLevel);
21

22
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
6,866✔
23
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
6,866!
24
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
13,732!
25

26
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
13,732!
27
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
13,732!
28
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->version));
13,732!
29
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->totalLevel));
13,732!
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->smaId));
13,732!
31

32
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->uid));
13,732!
33
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->status));
13,732!
34

35
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.igExpired));
13,732!
36
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.trigger));
13,732!
37
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.fillHistory));
13,732!
38
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.triggerParam));
13,732!
39
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.watermark));
13,732!
40

41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->sourceDbUid));
13,732!
42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetDbUid));
13,732!
43
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sourceDb));
13,732!
44
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetDb));
13,732!
45
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetSTbName));
13,732!
46
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetStbUid));
13,732!
47
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->fixedSinkVgId));
13,732!
48

49
  if (pObj->sql != NULL) {
6,866!
50
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sql));
13,732!
51
  } else {
UNCOV
52
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
53
  }
54

55
  if (pObj->ast != NULL) {
6,866!
56
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->ast));
13,732!
57
  } else {
UNCOV
58
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
59
  }
60

61
  if (pObj->physicalPlan != NULL) {
6,866!
62
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->physicalPlan));
13,732!
63
  } else {
UNCOV
64
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
65
  }
66

67
  int32_t sz = taosArrayGetSize(pObj->tasks);
6,866✔
68
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, sz));
6,866!
69
  for (int32_t i = 0; i < sz; i++) {
20,610✔
70
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
13,744✔
71
    int32_t innerSz = taosArrayGetSize(pArray);
13,744✔
72
    TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz));
13,744!
73
    for (int32_t j = 0; j < innerSz; j++) {
48,626✔
74
      SStreamTask *pTask = taosArrayGetP(pArray, j);
34,882✔
75
      if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
34,882!
76
        pTask->ver = SSTREAM_TASK_VER;
×
77
      }
78
      TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask));
34,882!
79
    }
80
  }
81

82
  TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema));
13,732!
83

84
  // 3.0.20 ver =2
85
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointFreq));
13,732!
86
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->igCheckUpdate));
13,732!
87

88
  // 3.0.50 ver = 3
89
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointId));
13,732!
90
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->subTableWithoutMd5));
13,732!
91

92
  TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1));
13,732!
93

94
  tEndEncode(pEncoder);
6,866✔
95
  return pEncoder->pos;
6,866✔
96
}
97

98
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
3,123✔
99
  int32_t code = 0;
3,123✔
100
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
3,123!
101
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
3,123!
102

103
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
6,246!
104
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
6,246!
105
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->version));
6,246!
106
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->totalLevel));
6,246!
107
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->smaId));
6,246!
108

109
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->uid));
6,246!
110
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->status));
6,246!
111

112
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.igExpired));
6,246!
113
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.trigger));
6,246!
114
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.fillHistory));
6,246!
115
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.triggerParam));
6,246!
116
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.watermark));
6,246!
117

118
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->sourceDbUid));
6,246!
119
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetDbUid));
6,246!
120
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->sourceDb));
3,123!
121
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetDb));
3,123!
122
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetSTbName));
3,123!
123
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetStbUid));
6,246!
124
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->fixedSinkVgId));
6,246!
125

126
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->sql));
6,246!
127
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->ast));
6,246!
128
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan));
6,246!
129

130
  if (pObj->tasks != NULL) {
3,123!
131
    pObj->tasks = freeStreamTasks(pObj->tasks);
×
132
  }
133

134
  int32_t sz;
135
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &sz));
3,123!
136

137
  if (sz != 0) {
3,123!
138
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
3,123✔
139
    if (pObj->tasks == NULL) {
3,123!
140
      code = terrno;
×
141
      TAOS_RETURN(code);
×
142
    }
143

144
    for (int32_t i = 0; i < sz; i++) {
9,412✔
145
      int32_t innerSz;
146
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &innerSz));
6,289!
147
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
6,289✔
148
      if (pArray != NULL) {
6,289!
149
        for (int32_t j = 0; j < innerSz; j++) {
22,231✔
150
          SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
15,942✔
151
          if (pTask == NULL) {
15,942!
152
            taosArrayDestroy(pArray);
×
153
            code = terrno;
×
154
            TAOS_RETURN(code);
×
155
          }
156
          if ((code = tDecodeStreamTask(pDecoder, pTask)) < 0) {
15,942!
157
            taosMemoryFree(pTask);
×
158
            taosArrayDestroy(pArray);
×
159
            TAOS_RETURN(code);
×
160
          }
161
          if (taosArrayPush(pArray, &pTask) == NULL) {
31,884!
162
            taosMemoryFree(pTask);
×
163
            taosArrayDestroy(pArray);
×
164
            code = terrno;
×
165
            TAOS_RETURN(code);
×
166
          }
167
        }
168
      }
169
      if (taosArrayPush(pObj->tasks, &pArray) == NULL) {
12,578!
170
        taosArrayDestroy(pArray);
×
171
        code = terrno;
×
172
        TAOS_RETURN(code);
×
173
      }
174
    }
175
  }
176

177
  TAOS_CHECK_RETURN(tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema));
6,246!
178

179
  // 3.0.20
180
  if (sver >= 2) {
3,123!
181
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointFreq));
6,246!
182
    if (!tDecodeIsEnd(pDecoder)) {
3,123!
183
      TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->igCheckUpdate));
6,246!
184
    }
185
  }
186
  if (sver >= 3) {
3,123!
187
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointId));
6,246!
188
  }
189

190
  if (sver >= 5) {
3,123!
191
    TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->subTableWithoutMd5));
6,246!
192
  }
193
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->reserve));
3,123!
194

195
  tEndDecode(pDecoder);
3,123✔
196
  TAOS_RETURN(code);
3,123✔
197
}
198

199
void *freeStreamTasks(SArray *pTaskLevel) {
8,350✔
200
  int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
8,350✔
201

202
  for (int32_t i = 0; i < numOfLevel; i++) {
17,803✔
203
    SArray *pLevel = taosArrayGetP(pTaskLevel, i);
9,453✔
204
    int32_t taskSz = taosArrayGetSize(pLevel);
9,453✔
205
    for (int32_t j = 0; j < taskSz; j++) {
33,464✔
206
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
24,011✔
207
      tFreeStreamTask(pTask);
24,011✔
208
    }
209

210
    taosArrayDestroy(pLevel);
9,453✔
211
  }
212

213
  taosArrayDestroy(pTaskLevel);
8,350✔
214

215
  return NULL;
8,350✔
216
}
217

218
void tFreeStreamObj(SStreamObj *pStream) {
4,175✔
219
  taosMemoryFree(pStream->sql);
4,175✔
220
  taosMemoryFree(pStream->ast);
4,175✔
221
  taosMemoryFree(pStream->physicalPlan);
4,175✔
222

223
  if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
4,175!
224
    taosMemoryFree(pStream->outputSchema.pSchema);
4,172✔
225
  }
226

227
  pStream->tasks = freeStreamTasks(pStream->tasks);
4,175✔
228
  pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
4,175✔
229

230
  // tagSchema.pSchema
231
  if (pStream->tagSchema.nCols > 0) {
4,175✔
232
    taosMemoryFree(pStream->tagSchema.pSchema);
275✔
233
  }
234
}
4,175✔
235

236
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
2,098✔
237
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
2,098✔
238
  if (pVgEpNew == NULL) {
2,098!
239
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
240
    return NULL;
×
241
  }
242
  pVgEpNew->vgId = pVgEp->vgId;
2,098✔
243
  //  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
244
  pVgEpNew->epSet = pVgEp->epSet;
2,098✔
245
  return pVgEpNew;
2,098✔
246
}
247

248
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
10,672✔
249
  if (pVgEp) {
10,672!
250
    //    taosMemoryFreeClear(pVgEp->qmsg);
251
    taosMemoryFree(pVgEp);
10,672✔
252
  }
253
}
10,672✔
254

255
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
8,892✔
256
  int32_t tlen = 0;
8,892✔
257
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
8,892✔
258
  //  tlen += taosEncodeString(buf, pVgEp->qmsg);
259
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
8,892✔
260
  return tlen;
8,892✔
261
}
262

263
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
3,852✔
264
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
3,852!
265
  if (sver == 1) {
3,852!
266
    uint64_t size = 0;
×
267
    buf = taosDecodeVariantU64(buf, &size);
×
268
    buf = POINTER_SHIFT(buf, size);
×
269
  }
270
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
3,852✔
271
  return (void *)buf;
3,852✔
272
}
273

274
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
615✔
275

276
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
2,627✔
277
                                   char *topic, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer) {
278
  int32_t code = 0;
2,627✔
279
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
2,627✔
280
  if (pConsumer == NULL) {
2,627!
281
    code = terrno;
×
282
    goto END;
×
283
  }
284

285
  pConsumer->consumerId = consumerId;
2,627✔
286
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
2,627✔
287

288
  pConsumer->epoch = 0;
2,627✔
289
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
2,627✔
290
  pConsumer->hbStatus = 0;
2,627✔
291
  pConsumer->pollStatus = 0;
2,627✔
292

293
  taosInitRWLatch(&pConsumer->lock);
2,627✔
294
  pConsumer->createTime = taosGetTimestampMs();
2,627✔
295
  pConsumer->updateType = updateType;
2,627✔
296

297
  if (updateType == CONSUMER_ADD_REB){
2,627✔
298
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
619✔
299
    if(pConsumer->rebNewTopics == NULL){
619!
300
      code = terrno;
×
301
      goto END;
×
302
    }
303

304
    char* topicTmp = taosStrdup(topic);
619✔
305
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
1,238!
306
      code = terrno;
×
307
      goto END;
×
308
    }
309
  }else if (updateType == CONSUMER_REMOVE_REB) {
2,008✔
310
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
518✔
311
    if(pConsumer->rebRemovedTopics == NULL){
518!
312
      code = terrno;
×
313
      goto END;
×
314
    }
315
    char* topicTmp = taosStrdup(topic);
518✔
316
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
1,036!
317
      code = terrno;
×
318
      goto END;
×
319
    }
320
  }else if (updateType == CONSUMER_INSERT_SUB){
1,490✔
321
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
555✔
322
    pConsumer->withTbName = subscribe->withTbName;
555✔
323
    pConsumer->autoCommit = subscribe->autoCommit;
555✔
324
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
555✔
325
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
555✔
326
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
555✔
327
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
555✔
328
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
555✔
329
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
555✔
330

331
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
555✔
332
    if (pConsumer->rebNewTopics == NULL){
555!
333
      code = terrno;
×
334
      goto END;
×
335
    }
336
    pConsumer->assignedTopics = subscribe->topicNames;
555✔
337
    subscribe->topicNames = NULL;
555✔
338
  }else if (updateType == CONSUMER_UPDATE_SUB){
935✔
339
    pConsumer->assignedTopics = subscribe->topicNames;
469✔
340
    subscribe->topicNames = NULL;
469✔
341
  }
342

343
  *ppConsumer = pConsumer;
2,627✔
344
  return 0;
2,627✔
345

346
END:
×
347
  tDeleteSMqConsumerObj(pConsumer);
×
348
  return code;
×
349
}
350

351
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
8,900✔
352
  if (pConsumer == NULL) return;
8,900✔
353
  taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
5,285✔
354
  taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
5,285✔
355
  taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
5,285✔
356
  taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
5,285✔
357
}
358

359
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
6,242✔
360
  tClearSMqConsumerObj(pConsumer);
6,242✔
361
  taosMemoryFree(pConsumer);
6,242✔
362
}
6,242✔
363

364
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
5,978✔
365
  int32_t tlen = 0;
5,978✔
366
  int32_t sz;
367
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
5,978✔
368
  tlen += taosEncodeString(buf, pConsumer->clientId);
5,978✔
369
  tlen += taosEncodeString(buf, pConsumer->cgroup);
5,978✔
370
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
5,978✔
371
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
5,978✔
372
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
5,978✔
373

374
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
5,978✔
375
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
5,978✔
376
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
5,978✔
377
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
5,978✔
378
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
5,978✔
379

380
  // current topics
381
  if (pConsumer->currentTopics) {
5,978✔
382
    sz = taosArrayGetSize(pConsumer->currentTopics);
750✔
383
    tlen += taosEncodeFixedI32(buf, sz);
750✔
384
    for (int32_t i = 0; i < sz; i++) {
1,186✔
385
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
436✔
386
      tlen += taosEncodeString(buf, topic);
436✔
387
    }
388
  } else {
389
    tlen += taosEncodeFixedI32(buf, 0);
5,228✔
390
  }
391

392
  // reb new topics
393
  if (pConsumer->rebNewTopics) {
5,978✔
394
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
4,010✔
395
    tlen += taosEncodeFixedI32(buf, sz);
4,010✔
396
    for (int32_t i = 0; i < sz; i++) {
6,694✔
397
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
2,684✔
398
      tlen += taosEncodeString(buf, topic);
2,684✔
399
    }
400
  } else {
401
    tlen += taosEncodeFixedI32(buf, 0);
1,968✔
402
  }
403

404
  // reb removed topics
405
  if (pConsumer->rebRemovedTopics) {
5,978✔
406
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
2,698✔
407
    tlen += taosEncodeFixedI32(buf, sz);
2,698✔
408
    for (int32_t i = 0; i < sz; i++) {
4,762✔
409
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
2,064✔
410
      tlen += taosEncodeString(buf, topic);
2,064✔
411
    }
412
  } else {
413
    tlen += taosEncodeFixedI32(buf, 0);
3,280✔
414
  }
415

416
  // lost topics
417
  if (pConsumer->assignedTopics) {
5,978✔
418
    sz = taosArrayGetSize(pConsumer->assignedTopics);
2,772✔
419
    tlen += taosEncodeFixedI32(buf, sz);
2,772✔
420
    for (int32_t i = 0; i < sz; i++) {
4,646✔
421
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
1,874✔
422
      tlen += taosEncodeString(buf, topic);
1,874✔
423
    }
424
  } else {
425
    tlen += taosEncodeFixedI32(buf, 0);
3,206✔
426
  }
427

428
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
5,978✔
429
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
5,978✔
430
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
5,978✔
431
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
5,978✔
432
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
5,978✔
433
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
5,978✔
434
  tlen += taosEncodeString(buf, pConsumer->user);
5,978✔
435
  tlen += taosEncodeString(buf, pConsumer->fqdn);
5,978✔
436
  return tlen;
5,978✔
437
}
438

439
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
2,658✔
440
  int32_t sz;
441
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
2,658!
442
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
2,658✔
443
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
2,658✔
444
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
2,658✔
445
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
2,658!
446
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
2,658!
447

448
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
2,658!
449
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
2,658✔
450
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
2,658!
451
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
2,658!
452
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
5,316!
453

454
  // current topics
455
  buf = taosDecodeFixedI32(buf, &sz);
2,658✔
456
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
2,658✔
457
  if (pConsumer->currentTopics == NULL) {
2,658!
458
    return NULL;
×
459
  }
460
  for (int32_t i = 0; i < sz; i++) {
2,677✔
461
    char *topic;
462
    buf = taosDecodeString(buf, &topic);
19✔
463
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
38!
464
      return NULL;
×
465
    }
466
  }
467

468
  // reb new topics
469
  buf = taosDecodeFixedI32(buf, &sz);
2,658✔
470
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
2,658✔
471
  for (int32_t i = 0; i < sz; i++) {
3,908✔
472
    char *topic;
473
    buf = taosDecodeString(buf, &topic);
1,250✔
474
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
2,500!
475
      return NULL;
×
476
    }
477
  }
478

479
  // reb removed topics
480
  buf = taosDecodeFixedI32(buf, &sz);
2,658✔
481
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
2,658✔
482
  for (int32_t i = 0; i < sz; i++) {
3,695✔
483
    char *topic;
484
    buf = taosDecodeString(buf, &topic);
1,037✔
485
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
2,074!
486
      return NULL;
×
487
    }
488
  }
489

490
  // reb removed topics
491
  buf = taosDecodeFixedI32(buf, &sz);
2,658✔
492
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
2,658✔
493
  for (int32_t i = 0; i < sz; i++) {
3,301✔
494
    char *topic;
495
    buf = taosDecodeString(buf, &topic);
643✔
496
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
1,286!
497
      return NULL;
×
498
    }
499
  }
500

501
  if (sver > 1) {
2,658!
502
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
2,658✔
503
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
2,658✔
504
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
2,658!
505
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
5,316!
506
  }
507
  if (sver > 2){
2,658!
508
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
2,658!
509
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
2,658!
510
    buf = taosDecodeStringTo(buf, pConsumer->user);
2,658✔
511
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
5,316✔
512
  } else{
513
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
514
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
515
  }
516

517
  return (void *)buf;
2,658✔
518
}
519

520
int32_t tEncodeOffRows(void **buf, SArray *offsetRows){
5,240✔
521
  int32_t tlen = 0;
5,240✔
522
  int32_t szVgs = taosArrayGetSize(offsetRows);
5,240✔
523
  tlen += taosEncodeFixedI32(buf, szVgs);
5,240✔
524
  for (int32_t j = 0; j < szVgs; ++j) {
11,418✔
525
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
6,178✔
526
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
6,178✔
527
    tlen += taosEncodeFixedI64(buf, offRows->rows);
6,178✔
528
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
6,178✔
529
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
6,178!
530
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
406✔
531
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
812✔
532
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
5,772✔
533
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
10,400✔
534
    } else {
535
      // do nothing
536
    }
537
    tlen += taosEncodeFixedI64(buf, offRows->ever);
12,356✔
538
  }
539

540
  return tlen;
5,240✔
541
}
542

543
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
1,794✔
544
  int32_t tlen = 0;
1,794✔
545
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
1,794✔
546
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
1,794✔
547

548

549
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
1,794✔
550
}
551

552
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver){
2,052✔
553
  int32_t szVgs = 0;
2,052!
554
  buf = taosDecodeFixedI32(buf, &szVgs);
2,052✔
555
  if (szVgs > 0) {
2,052✔
556
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
835✔
557
    if (NULL == *offsetRows) return NULL;
835!
558
    for (int32_t j = 0; j < szVgs; ++j) {
3,407✔
559
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
2,572✔
560
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
2,572!
561
      buf = taosDecodeFixedI64(buf, &offRows->rows);
2,572!
562
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
2,572✔
563
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
2,572!
564
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
191!
565
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
382!
566
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
2,381✔
567
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
4,222!
568
      } else {
569
        // do nothing
570
      }
571
      if(sver > 2){
2,572!
572
        buf = taosDecodeFixedI64(buf, &offRows->ever);
5,144!
573
      }
574
    }
575
  }
576
  return (void *)buf;
2,052✔
577
}
578

579
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
704✔
580
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
704!
581
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
704✔
582
  if (sver > 1) {
704!
583
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
704✔
584
  }
585

586
  return (void *)buf;
704✔
587
}
588

589
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
481✔
590
  int32_t code = 0;
481✔
591
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
481✔
592
  MND_TMQ_NULL_CHECK(pSubObj);
481!
593

594
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
481✔
595
  taosInitRWLatch(&pSubObj->lock);
481✔
596
  pSubObj->vgNum = 0;
481✔
597
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
481✔
598
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
481!
599
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
481✔
600
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
481!
601
  if (ppSub){
481!
602
    *ppSub = pSubObj;
481✔
603
  }
604
  return code;
481✔
605

606
END:
×
607
  taosMemoryFree(pSubObj);
×
608
  return code;
×
609
}
610

611
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
874✔
612
  int32_t code = 0;
874✔
613
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
874✔
614
  if (pSubNew == NULL) {
874!
615
    code = terrno;
×
616
    goto END;
×
617
  }
618
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
874✔
619
  taosInitRWLatch(&pSubNew->lock);
874✔
620

621
  pSubNew->dbUid = pSub->dbUid;
874✔
622
  pSubNew->stbUid = pSub->stbUid;
874✔
623
  pSubNew->subType = pSub->subType;
874✔
624
  pSubNew->withMeta = pSub->withMeta;
874✔
625

626
  pSubNew->vgNum = pSub->vgNum;
874✔
627
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
874✔
628

629
  void          *pIter = NULL;
874✔
630
  SMqConsumerEp *pConsumerEp = NULL;
874✔
631
  while (1) {
862✔
632
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,736✔
633
    if (pIter == NULL) break;
1,736✔
634
    pConsumerEp = (SMqConsumerEp *)pIter;
862✔
635
    SMqConsumerEp newEp = {
1,724✔
636
        .consumerId = pConsumerEp->consumerId,
862✔
637
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
862✔
638
    };
639
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
862!
640
                            sizeof(SMqConsumerEp))) != 0)
641
      goto END;
×
642
  }
643
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
874✔
644
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
874✔
645
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
874✔
646
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
874✔
647
  if (ppSub) {
874!
648
    *ppSub = pSubNew;
874✔
649
  }
650
END:
×
651
  return code;
874✔
652
}
653

654
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
2,703✔
655
  if (pSub == NULL) return;
2,703!
656
  void *pIter = NULL;
2,703✔
657
  while (1) {
1,636✔
658
    pIter = taosHashIterate(pSub->consumerHash, pIter);
4,339✔
659
    if (pIter == NULL) break;
4,339✔
660
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
1,636✔
661
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
1,636✔
662
    taosArrayDestroy(pConsumerEp->offsetRows);
1,636✔
663
  }
664
  taosHashCleanup(pSub->consumerHash);
2,703✔
665
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
2,703✔
666
  taosMemoryFreeClear(pSub->qmsg);
2,703!
667
  taosArrayDestroy(pSub->offsetRows);
2,703✔
668
}
669

670
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
3,446✔
671
  int32_t tlen = 0;
3,446✔
672
  tlen += taosEncodeString(buf, pSub->key);
3,446✔
673
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
3,446✔
674
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
3,446✔
675
  tlen += taosEncodeFixedI8(buf, pSub->subType);
3,446✔
676
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
3,446✔
677
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
3,446✔
678

679
  void   *pIter = NULL;
3,446✔
680
  int32_t sz = taosHashGetSize(pSub->consumerHash);
3,446✔
681
  tlen += taosEncodeFixedI32(buf, sz);
3,446✔
682

683
  int32_t cnt = 0;
3,446✔
684
  while (1) {
1,794✔
685
    pIter = taosHashIterate(pSub->consumerHash, pIter);
5,240✔
686
    if (pIter == NULL) break;
5,240✔
687
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
1,794✔
688
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
1,794✔
689
    cnt++;
1,794✔
690
  }
691
  if (cnt != sz) return -1;
3,446!
692
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
3,446✔
693
  tlen += taosEncodeString(buf, pSub->dbName);
3,446✔
694

695
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
3,446✔
696
  tlen += taosEncodeString(buf, pSub->qmsg);
3,446✔
697
  return tlen;
3,446✔
698
}
699

700
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
1,348✔
701
  //
702
  buf = taosDecodeStringTo(buf, pSub->key);
1,348✔
703
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
1,348!
704
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
1,348!
705
  buf = taosDecodeFixedI8(buf, &pSub->subType);
1,348✔
706
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
1,348✔
707
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
2,696!
708

709
  int32_t sz;
710
  buf = taosDecodeFixedI32(buf, &sz);
1,348✔
711

712
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,348✔
713
  for (int32_t i = 0; i < sz; i++) {
2,052✔
714
    SMqConsumerEp consumerEp = {0};
704✔
715
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
704✔
716
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
704!
717
        0)
718
      return NULL;
×
719
  }
720

721
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
1,348✔
722
  buf = taosDecodeStringTo(buf, pSub->dbName);
1,348✔
723

724
  if (sver > 1) {
1,348!
725
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
1,348✔
726
    buf = taosDecodeString(buf, &pSub->qmsg);
2,696✔
727
  } else {
728
    pSub->qmsg = taosStrdup("");
×
729
  }
730
  return (void *)buf;
1,348✔
731
}
732

733
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
734
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
735
//   if (pEntryNew == NULL) return NULL;
736
//   pEntryNew->epoch = pEntry->epoch;
737
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
738
//   return pEntryNew;
739
// }
740
//
741
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
742
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
743
// }
744

745
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
746
//   int32_t tlen = 0;
747
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
748
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
749
//   return tlen;
750
// }
751
//
752
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
753
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
754
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
755
//   return (void *)buf;
756
// }
757

758
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
759
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
760
//   if (pLogNew == NULL) return pLogNew;
761
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
762
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
763
//   return pLogNew;
764
// }
765
//
766
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
767
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
768
// }
769

770
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
771
//   int32_t tlen = 0;
772
//   tlen += taosEncodeString(buf, pLog->key);
773
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
774
//   return tlen;
775
// }
776
//
777
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
778
//   buf = taosDecodeStringTo(buf, pLog->key);
779
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
780
//   return (void *)buf;
781
// }
782
//
783
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
784
//   int32_t tlen = 0;
785
//   tlen += taosEncodeString(buf, pOffset->key);
786
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
787
//   return tlen;
788
// }
789
//
790
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
791
//   buf = taosDecodeStringTo(buf, pOffset->key);
792
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
793
//   return buf;
794
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc