• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3520

06 Nov 2024 11:19AM UTC coverage: 56.943% (-0.8%) from 57.706%
#3520

push

travis-ci

web-flow
Merge pull request #28535 from taosdata/fix/TD-30837

feat(stream):stream interp && twa

108576 of 248375 branches covered (43.71%)

Branch coverage included in aggregate %.

2353 of 3792 new or added lines in 40 files covered. (62.05%)

8730 existing lines in 213 files now uncovered.

188369 of 273100 relevant lines covered (68.97%)

2375614.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.4
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndDef.h"
17
#include "mndConsumer.h"
18
#include "taoserror.h"
19

20
static void *freeStreamTasks(SArray *pTaskLevel);
21

22
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
1,432✔
23
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
1,432!
24
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
2,864!
25

26
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
2,864!
27
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
2,864!
28
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->version));
2,864!
29
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->totalLevel));
2,864!
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->smaId));
2,864!
31

32
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->uid));
2,864!
33
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->status));
2,864!
34

35
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.igExpired));
2,864!
36
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.trigger));
2,864!
37
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.fillHistory));
2,864!
38
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.triggerParam));
2,864!
39
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.watermark));
2,864!
40

41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->sourceDbUid));
2,864!
42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetDbUid));
2,864!
43
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sourceDb));
2,864!
44
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetDb));
2,864!
45
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetSTbName));
2,864!
46
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetStbUid));
2,864!
47
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->fixedSinkVgId));
2,864!
48

49
  if (pObj->sql != NULL) {
1,432!
50
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sql));
2,864!
51
  } else {
UNCOV
52
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
53
  }
54

55
  if (pObj->ast != NULL) {
1,432!
56
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->ast));
2,864!
57
  } else {
UNCOV
58
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
59
  }
60

61
  if (pObj->physicalPlan != NULL) {
1,432!
62
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->physicalPlan));
2,864!
63
  } else {
UNCOV
64
    TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
65
  }
66

67
  int32_t sz = taosArrayGetSize(pObj->tasks);
1,432✔
68
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, sz));
1,432!
69
  for (int32_t i = 0; i < sz; i++) {
3,932✔
70
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
2,500✔
71
    int32_t innerSz = taosArrayGetSize(pArray);
2,500✔
72
    TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz));
2,500!
73
    for (int32_t j = 0; j < innerSz; j++) {
7,810✔
74
      SStreamTask *pTask = taosArrayGetP(pArray, j);
5,310✔
75
      if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
5,310!
76
        pTask->ver = SSTREAM_TASK_VER;
×
77
      }
78
      TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask));
5,310!
79
    }
80
  }
81

82
  TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema));
2,864!
83

84
  // 3.0.20 ver =2
85
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointFreq));
2,864!
86
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->igCheckUpdate));
2,864!
87

88
  // 3.0.50 ver = 3
89
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointId));
2,864!
90
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->subTableWithoutMd5));
2,864!
91

92
  TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1));
2,864!
93

94
  tEndEncode(pEncoder);
1,432✔
95
  return pEncoder->pos;
1,432✔
96
}
97

98
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
511✔
99
  int32_t code = 0;
511✔
100
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
511!
101
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
511!
102

103
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
1,022!
104
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
1,022!
105
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->version));
1,022!
106
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->totalLevel));
1,022!
107
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->smaId));
1,022!
108

109
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->uid));
1,022!
110
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->status));
1,022!
111

112
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.igExpired));
1,022!
113
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.trigger));
1,022!
114
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.fillHistory));
1,022!
115
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.triggerParam));
1,022!
116
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.watermark));
1,022!
117

118
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->sourceDbUid));
1,022!
119
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetDbUid));
1,022!
120
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->sourceDb));
511!
121
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetDb));
511!
122
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetSTbName));
511!
123
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetStbUid));
1,022!
124
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->fixedSinkVgId));
1,022!
125

126
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->sql));
1,022!
127
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->ast));
1,022!
128
  TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan));
1,022!
129

130
  if (pObj->tasks != NULL) {
511!
131
    pObj->tasks = freeStreamTasks(pObj->tasks);
×
132
  }
133

134
  int32_t sz;
135
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &sz));
511!
136

137
  if (sz != 0) {
511!
138
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
511✔
139
    if (pObj->tasks == NULL) {
511!
140
      code = terrno;
×
141
      TAOS_RETURN(code);
×
142
    }
143

144
    for (int32_t i = 0; i < sz; i++) {
1,409✔
145
      int32_t innerSz;
146
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &innerSz));
898!
147
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
898✔
148
      if (pArray != NULL) {
898!
149
        for (int32_t j = 0; j < innerSz; j++) {
2,676✔
150
          SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
1,778✔
151
          if (pTask == NULL) {
1,778!
152
            taosArrayDestroy(pArray);
×
153
            code = terrno;
×
154
            TAOS_RETURN(code);
×
155
          }
156
          if ((code = tDecodeStreamTask(pDecoder, pTask)) < 0) {
1,778!
157
            taosMemoryFree(pTask);
×
158
            taosArrayDestroy(pArray);
×
159
            TAOS_RETURN(code);
×
160
          }
161
          if (taosArrayPush(pArray, &pTask) == NULL) {
3,556!
162
            taosMemoryFree(pTask);
×
163
            taosArrayDestroy(pArray);
×
164
            code = terrno;
×
165
            TAOS_RETURN(code);
×
166
          }
167
        }
168
      }
169
      if (taosArrayPush(pObj->tasks, &pArray) == NULL) {
1,796!
170
        taosArrayDestroy(pArray);
×
171
        code = terrno;
×
172
        TAOS_RETURN(code);
×
173
      }
174
    }
175
  }
176

177
  TAOS_CHECK_RETURN(tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema));
1,022!
178

179
  // 3.0.20
180
  if (sver >= 2) {
511!
181
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointFreq));
1,022!
182
    if (!tDecodeIsEnd(pDecoder)) {
511!
183
      TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->igCheckUpdate));
1,022!
184
    }
185
  }
186
  if (sver >= 3) {
511!
187
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointId));
1,022!
188
  }
189

190
  if (sver >= 5) {
511!
191
    TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->subTableWithoutMd5));
1,022!
192
  }
193
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->reserve));
511!
194

195
  tEndDecode(pDecoder);
511✔
196
  TAOS_RETURN(code);
511✔
197
}
198

199
void *freeStreamTasks(SArray *pTaskLevel) {
1,660✔
200
  int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
1,660✔
201

202
  for (int32_t i = 0; i < numOfLevel; i++) {
3,178✔
203
    SArray *pLevel = taosArrayGetP(pTaskLevel, i);
1,518✔
204
    int32_t taskSz = taosArrayGetSize(pLevel);
1,518✔
205
    for (int32_t j = 0; j < taskSz; j++) {
4,585✔
206
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
3,067✔
207
      tFreeStreamTask(pTask);
3,067✔
208
    }
209

210
    taosArrayDestroy(pLevel);
1,518✔
211
  }
212

213
  taosArrayDestroy(pTaskLevel);
1,660✔
214

215
  return NULL;
1,660✔
216
}
217

218
void tFreeStreamObj(SStreamObj *pStream) {
830✔
219
  taosMemoryFree(pStream->sql);
830✔
220
  taosMemoryFree(pStream->ast);
830✔
221
  taosMemoryFree(pStream->physicalPlan);
830✔
222

223
  if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
830!
224
    taosMemoryFree(pStream->outputSchema.pSchema);
827✔
225
  }
226

227
  pStream->tasks = freeStreamTasks(pStream->tasks);
830✔
228
  pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
830✔
229

230
  // tagSchema.pSchema
231
  if (pStream->tagSchema.nCols > 0) {
830✔
232
    taosMemoryFree(pStream->tagSchema.pSchema);
14✔
233
  }
234
}
830✔
235

236
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
303✔
237
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
303✔
238
  if (pVgEpNew == NULL) {
303!
239
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
240
    return NULL;
×
241
  }
242
  pVgEpNew->vgId = pVgEp->vgId;
303✔
243
  //  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
244
  pVgEpNew->epSet = pVgEp->epSet;
303✔
245
  return pVgEpNew;
303✔
246
}
247

248
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
1,663✔
249
  if (pVgEp) {
1,663!
250
    //    taosMemoryFreeClear(pVgEp->qmsg);
251
    taosMemoryFree(pVgEp);
1,663✔
252
  }
253
}
1,663✔
254

255
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
1,692✔
256
  int32_t tlen = 0;
1,692✔
257
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
1,692✔
258
  //  tlen += taosEncodeString(buf, pVgEp->qmsg);
259
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
1,692✔
260
  return tlen;
1,692✔
261
}
262

263
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
565✔
264
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
565!
265
  if (sver == 1) {
565!
266
    uint64_t size = 0;
×
267
    buf = taosDecodeVariantU64(buf, &size);
×
268
    buf = POINTER_SHIFT(buf, size);
×
269
  }
270
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
565✔
271
  return (void *)buf;
565✔
272
}
273

274
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
171✔
275

276
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
685✔
277
                                   char *topic, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer) {
278
  int32_t code = 0;
685✔
279
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
685✔
280
  if (pConsumer == NULL) {
685!
281
    code = terrno;
×
282
    goto END;
×
283
  }
284

285
  pConsumer->consumerId = consumerId;
685✔
286
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
685✔
287

288
  pConsumer->epoch = 0;
685✔
289
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
685✔
290
  pConsumer->hbStatus = 0;
685✔
291
  pConsumer->pollStatus = 0;
685✔
292

293
  taosInitRWLatch(&pConsumer->lock);
685✔
294
  pConsumer->createTime = taosGetTimestampMs();
685✔
295
  pConsumer->updateType = updateType;
685✔
296

297
  if (updateType == CONSUMER_ADD_REB){
685✔
298
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
171✔
299
    if(pConsumer->rebNewTopics == NULL){
171!
300
      code = terrno;
×
301
      goto END;
×
302
    }
303

304
    char* topicTmp = taosStrdup(topic);
171✔
305
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
342!
306
      code = terrno;
×
307
      goto END;
×
308
    }
309
  }else if (updateType == CONSUMER_REMOVE_REB) {
514✔
310
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
154✔
311
    if(pConsumer->rebRemovedTopics == NULL){
154!
312
      code = terrno;
×
313
      goto END;
×
314
    }
315
    char* topicTmp = taosStrdup(topic);
154✔
316
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
308!
317
      code = terrno;
×
318
      goto END;
×
319
    }
320
  }else if (updateType == CONSUMER_INSERT_SUB){
360✔
321
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
123✔
322
    pConsumer->withTbName = subscribe->withTbName;
123✔
323
    pConsumer->autoCommit = subscribe->autoCommit;
123✔
324
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
123✔
325
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
123✔
326
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
123✔
327
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
123✔
328
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
123✔
329
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
123✔
330

331
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
123✔
332
    if (pConsumer->rebNewTopics == NULL){
123!
333
      code = terrno;
×
334
      goto END;
×
335
    }
336
    pConsumer->assignedTopics = subscribe->topicNames;
123✔
337
    subscribe->topicNames = NULL;
123✔
338
  }else if (updateType == CONSUMER_UPDATE_SUB){
237✔
339
    pConsumer->assignedTopics = subscribe->topicNames;
106✔
340
    subscribe->topicNames = NULL;
106✔
341
  }
342

343
  *ppConsumer = pConsumer;
685✔
344
  return 0;
685✔
345

346
END:
×
347
  tDeleteSMqConsumerObj(pConsumer);
×
348
  return code;
×
349
}
350

351
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
2,296✔
352
  if (pConsumer == NULL) return;
2,296✔
353
  taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
1,377✔
354
  taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
1,377✔
355
  taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
1,377✔
356
  taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
1,377✔
357
}
358

359
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
1,604✔
360
  tClearSMqConsumerObj(pConsumer);
1,604✔
361
  taosMemoryFree(pConsumer);
1,604✔
362
}
1,604✔
363

364
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
1,436✔
365
  int32_t tlen = 0;
1,436✔
366
  int32_t sz;
367
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
1,436✔
368
  tlen += taosEncodeString(buf, pConsumer->clientId);
1,436✔
369
  tlen += taosEncodeString(buf, pConsumer->cgroup);
1,436✔
370
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
1,436✔
371
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
1,436✔
372
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
1,436✔
373

374
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
1,436✔
375
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
1,436✔
376
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
1,436✔
377
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
1,436✔
378
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
1,436✔
379

380
  // current topics
381
  if (pConsumer->currentTopics) {
1,436✔
382
    sz = taosArrayGetSize(pConsumer->currentTopics);
66✔
383
    tlen += taosEncodeFixedI32(buf, sz);
66✔
384
    for (int32_t i = 0; i < sz; i++) {
116✔
385
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
50✔
386
      tlen += taosEncodeString(buf, topic);
50✔
387
    }
388
  } else {
389
    tlen += taosEncodeFixedI32(buf, 0);
1,370✔
390
  }
391

392
  // reb new topics
393
  if (pConsumer->rebNewTopics) {
1,436✔
394
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
866✔
395
    tlen += taosEncodeFixedI32(buf, sz);
866✔
396
    for (int32_t i = 0; i < sz; i++) {
1,550✔
397
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
684✔
398
      tlen += taosEncodeString(buf, topic);
684✔
399
    }
400
  } else {
401
    tlen += taosEncodeFixedI32(buf, 0);
570✔
402
  }
403

404
  // reb removed topics
405
  if (pConsumer->rebRemovedTopics) {
1,436✔
406
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
586✔
407
    tlen += taosEncodeFixedI32(buf, sz);
586✔
408
    for (int32_t i = 0; i < sz; i++) {
1,206✔
409
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
620✔
410
      tlen += taosEncodeString(buf, topic);
620✔
411
    }
412
  } else {
413
    tlen += taosEncodeFixedI32(buf, 0);
850✔
414
  }
415

416
  // lost topics
417
  if (pConsumer->assignedTopics) {
1,436✔
418
    sz = taosArrayGetSize(pConsumer->assignedTopics);
524✔
419
    tlen += taosEncodeFixedI32(buf, sz);
524✔
420
    for (int32_t i = 0; i < sz; i++) {
912✔
421
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
388✔
422
      tlen += taosEncodeString(buf, topic);
388✔
423
    }
424
  } else {
425
    tlen += taosEncodeFixedI32(buf, 0);
912✔
426
  }
427

428
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
1,436✔
429
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
1,436✔
430
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
1,436✔
431
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
1,436✔
432
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
1,436✔
433
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
1,436✔
434
  tlen += taosEncodeString(buf, pConsumer->user);
1,436✔
435
  tlen += taosEncodeString(buf, pConsumer->fqdn);
1,436✔
436
  return tlen;
1,436✔
437
}
438

439
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
692✔
440
  int32_t sz;
441
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
692!
442
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
692✔
443
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
692✔
444
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
692✔
445
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
692!
446
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
692!
447

448
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
692!
449
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
692✔
450
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
692!
451
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
692!
452
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
1,384!
453

454
  // current topics
455
  buf = taosDecodeFixedI32(buf, &sz);
692✔
456
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
692✔
457
  if (pConsumer->currentTopics == NULL) {
692!
458
    return NULL;
×
459
  }
460
  for (int32_t i = 0; i < sz; i++) {
695✔
461
    char *topic;
462
    buf = taosDecodeString(buf, &topic);
3✔
463
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
6!
464
      return NULL;
×
465
    }
466
  }
467

468
  // reb new topics
469
  buf = taosDecodeFixedI32(buf, &sz);
692✔
470
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
692✔
471
  for (int32_t i = 0; i < sz; i++) {
1,038✔
472
    char *topic;
473
    buf = taosDecodeString(buf, &topic);
346✔
474
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
692!
475
      return NULL;
×
476
    }
477
  }
478

479
  // reb removed topics
480
  buf = taosDecodeFixedI32(buf, &sz);
692✔
481
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
692✔
482
  for (int32_t i = 0; i < sz; i++) {
1,000✔
483
    char *topic;
484
    buf = taosDecodeString(buf, &topic);
308✔
485
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
616!
486
      return NULL;
×
487
    }
488
  }
489

490
  // reb removed topics
491
  buf = taosDecodeFixedI32(buf, &sz);
692✔
492
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
692✔
493
  for (int32_t i = 0; i < sz; i++) {
868✔
494
    char *topic;
495
    buf = taosDecodeString(buf, &topic);
176✔
496
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
352!
497
      return NULL;
×
498
    }
499
  }
500

501
  if (sver > 1) {
692!
502
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
692✔
503
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
692✔
504
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
692!
505
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
1,384!
506
  }
507
  if (sver > 2){
692!
508
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
692!
509
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
692!
510
    buf = taosDecodeStringTo(buf, pConsumer->user);
692✔
511
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
1,384✔
512
  } else{
513
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
514
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
515
  }
516

517
  return (void *)buf;
692✔
518
}
519

520
int32_t tEncodeOffRows(void **buf, SArray *offsetRows){
1,304✔
521
  int32_t tlen = 0;
1,304✔
522
  int32_t szVgs = taosArrayGetSize(offsetRows);
1,304✔
523
  tlen += taosEncodeFixedI32(buf, szVgs);
1,304✔
524
  for (int32_t j = 0; j < szVgs; ++j) {
2,420✔
525
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
1,116✔
526
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
1,116✔
527
    tlen += taosEncodeFixedI64(buf, offRows->rows);
1,116✔
528
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
1,116✔
529
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
1,116!
530
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
70✔
531
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
140✔
532
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
1,046✔
533
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
2,052✔
534
    } else {
535
      // do nothing
536
    }
537
    tlen += taosEncodeFixedI64(buf, offRows->ever);
2,232✔
538
  }
539

540
  return tlen;
1,304✔
541
}
542

543
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
458✔
544
  int32_t tlen = 0;
458✔
545
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
458✔
546
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
458✔
547

548

549
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
458✔
550
}
551

552
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver){
497✔
553
  int32_t szVgs = 0;
497!
554
  buf = taosDecodeFixedI32(buf, &szVgs);
497✔
555
  if (szVgs > 0) {
497✔
556
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
160✔
557
    if (NULL == *offsetRows) return NULL;
160!
558
    for (int32_t j = 0; j < szVgs; ++j) {
445✔
559
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
285✔
560
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
285!
561
      buf = taosDecodeFixedI64(buf, &offRows->rows);
285!
562
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
285✔
563
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
285!
564
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
22!
565
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
44!
566
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
263✔
567
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
516!
568
      } else {
569
        // do nothing
570
      }
571
      if(sver > 2){
285!
572
        buf = taosDecodeFixedI64(buf, &offRows->ever);
570!
573
      }
574
    }
575
  }
576
  return (void *)buf;
497✔
577
}
578

579
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
209✔
580
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
209!
581
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
209✔
582
  if (sver > 1) {
209!
583
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
209✔
584
  }
585

586
  return (void *)buf;
209✔
587
}
588

589
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
125✔
590
  int32_t code = 0;
125✔
591
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
125✔
592
  MND_TMQ_NULL_CHECK(pSubObj);
125!
593

594
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
125✔
595
  taosInitRWLatch(&pSubObj->lock);
125✔
596
  pSubObj->vgNum = 0;
125✔
597
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
125✔
598
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
125!
599
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
125✔
600
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
125!
601
  if (ppSub){
125!
602
    *ppSub = pSubObj;
125✔
603
  }
604
  return code;
125✔
605

606
END:
×
607
  taosMemoryFree(pSubObj);
×
608
  return code;
×
609
}
610

611
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
157✔
612
  int32_t code = 0;
157✔
613
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
157✔
614
  if (pSubNew == NULL) {
157!
615
    code = terrno;
×
616
    goto END;
×
617
  }
618
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
157✔
619
  taosInitRWLatch(&pSubNew->lock);
157✔
620

621
  pSubNew->dbUid = pSub->dbUid;
157✔
622
  pSubNew->stbUid = pSub->stbUid;
157✔
623
  pSubNew->subType = pSub->subType;
157✔
624
  pSubNew->withMeta = pSub->withMeta;
157✔
625

626
  pSubNew->vgNum = pSub->vgNum;
157✔
627
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
157✔
628

629
  void          *pIter = NULL;
157✔
630
  SMqConsumerEp *pConsumerEp = NULL;
157✔
631
  while (1) {
195✔
632
    pIter = taosHashIterate(pSub->consumerHash, pIter);
352✔
633
    if (pIter == NULL) break;
352✔
634
    pConsumerEp = (SMqConsumerEp *)pIter;
195✔
635
    SMqConsumerEp newEp = {
390✔
636
        .consumerId = pConsumerEp->consumerId,
195✔
637
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
195✔
638
    };
639
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
195!
640
                            sizeof(SMqConsumerEp))) != 0)
641
      goto END;
×
642
  }
643
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
157✔
644
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
157✔
645
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
157✔
646
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
157✔
647
  if (ppSub) {
157!
648
    *ppSub = pSubNew;
157✔
649
  }
650
END:
×
651
  return code;
157✔
652
}
653

654
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
570✔
655
  if (pSub == NULL) return;
570!
656
  void *pIter = NULL;
570✔
657
  while (1) {
413✔
658
    pIter = taosHashIterate(pSub->consumerHash, pIter);
983✔
659
    if (pIter == NULL) break;
983✔
660
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
413✔
661
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
413✔
662
    taosArrayDestroy(pConsumerEp->offsetRows);
413✔
663
  }
664
  taosHashCleanup(pSub->consumerHash);
570✔
665
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
570✔
666
  taosMemoryFreeClear(pSub->qmsg);
570!
667
  taosArrayDestroy(pSub->offsetRows);
570✔
668
}
669

670
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
846✔
671
  int32_t tlen = 0;
846✔
672
  tlen += taosEncodeString(buf, pSub->key);
846✔
673
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
846✔
674
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
846✔
675
  tlen += taosEncodeFixedI8(buf, pSub->subType);
846✔
676
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
846✔
677
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
846✔
678

679
  void   *pIter = NULL;
846✔
680
  int32_t sz = taosHashGetSize(pSub->consumerHash);
846✔
681
  tlen += taosEncodeFixedI32(buf, sz);
846✔
682

683
  int32_t cnt = 0;
846✔
684
  while (1) {
458✔
685
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,304✔
686
    if (pIter == NULL) break;
1,304✔
687
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
458✔
688
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
458✔
689
    cnt++;
458✔
690
  }
691
  if (cnt != sz) return -1;
846!
692
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
846✔
693
  tlen += taosEncodeString(buf, pSub->dbName);
846✔
694

695
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
846✔
696
  tlen += taosEncodeString(buf, pSub->qmsg);
846✔
697
  return tlen;
846✔
698
}
699

700
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
288✔
701
  //
702
  buf = taosDecodeStringTo(buf, pSub->key);
288✔
703
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
288!
704
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
288!
705
  buf = taosDecodeFixedI8(buf, &pSub->subType);
288✔
706
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
288✔
707
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
576!
708

709
  int32_t sz;
710
  buf = taosDecodeFixedI32(buf, &sz);
288✔
711

712
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
288✔
713
  for (int32_t i = 0; i < sz; i++) {
497✔
714
    SMqConsumerEp consumerEp = {0};
209✔
715
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
209✔
716
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
209!
717
        0)
718
      return NULL;
×
719
  }
720

721
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
288✔
722
  buf = taosDecodeStringTo(buf, pSub->dbName);
288✔
723

724
  if (sver > 1) {
288!
725
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
288✔
726
    buf = taosDecodeString(buf, &pSub->qmsg);
576✔
727
  } else {
728
    pSub->qmsg = taosStrdup("");
×
729
  }
730
  return (void *)buf;
288✔
731
}
732

733
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
734
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
735
//   if (pEntryNew == NULL) return NULL;
736
//   pEntryNew->epoch = pEntry->epoch;
737
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
738
//   return pEntryNew;
739
// }
740
//
741
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
742
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
743
// }
744

745
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
746
//   int32_t tlen = 0;
747
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
748
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
749
//   return tlen;
750
// }
751
//
752
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
753
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
754
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
755
//   return (void *)buf;
756
// }
757

758
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
759
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
760
//   if (pLogNew == NULL) return pLogNew;
761
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
762
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
763
//   return pLogNew;
764
// }
765
//
766
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
767
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
768
// }
769

770
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
771
//   int32_t tlen = 0;
772
//   tlen += taosEncodeString(buf, pLog->key);
773
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
774
//   return tlen;
775
// }
776
//
777
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
778
//   buf = taosDecodeStringTo(buf, pLog->key);
779
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
780
//   return (void *)buf;
781
// }
782
//
783
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
784
//   int32_t tlen = 0;
785
//   tlen += taosEncodeString(buf, pOffset->key);
786
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
787
//   return tlen;
788
// }
789
//
790
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
791
//   buf = taosDecodeStringTo(buf, pOffset->key);
792
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
793
//   return buf;
794
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc