• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4905

29 Dec 2025 02:08PM UTC coverage: 65.423% (-0.3%) from 65.734%
#4905

push

travis-ci

web-flow
enh: sign connect request (#34067)

23 of 29 new or added lines in 4 files covered. (79.31%)

11614 existing lines in 186 files now uncovered.

193476 of 295730 relevant lines covered (65.42%)

115752566.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.96
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "mndStream.h"
19
#include "taoserror.h"
20
#include "tunit.h"
21

22
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
508,924✔
23
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
508,924✔
24

25
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
1,017,848✔
26
  TAOS_CHECK_RETURN(tSerializeSCMCreateStreamReqImpl(pEncoder, pObj->pCreate));
508,924✔
27

28
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->mainSnodeId));
1,017,848✔
29
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->userStopped));
1,017,848✔
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
1,017,848✔
31
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
1,017,848✔
32
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->createUser));
1,017,848✔
33
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->ownerId));
1,017,848✔
34

35
  tEndEncode(pEncoder);
508,924✔
36
  return pEncoder->pos;
508,924✔
37
}
38

39
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
137,041✔
40
  int32_t code = 0;
137,041✔
41
  int32_t lino = 0;
137,041✔
42
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
137,041✔
43

44
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
137,041✔
45
  pObj->pCreate = taosMemoryCalloc(1, sizeof(*pObj->pCreate));
137,041✔
46
  if (NULL == pObj) {
137,041✔
UNCOV
47
    TAOS_CHECK_EXIT(terrno);
×
48
  }
49
  
50
  if (MND_STREAM_VER_NUMBER == sver) {
137,041✔
51
    TAOS_CHECK_RETURN(tDeserializeSCMCreateStreamReqImpl(pDecoder, pObj->pCreate));
137,041✔
52
  } else {
UNCOV
53
    TAOS_CHECK_RETURN(
×
54
      tDeserializeSCMCreateStreamReqImplOld(pDecoder, pObj->pCreate, 21));
55
  }
56

57
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->mainSnodeId));
274,082✔
58
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->userStopped));
274,082✔
59
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
274,082✔
60
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
274,082✔
61
  if (!tDecodeIsEnd(pDecoder)) {
137,041✔
62
    TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->createUser));
137,041✔
63
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->ownerId));
274,082✔
64
  }
65

66
_exit:
137,041✔
67

68
  tEndDecode(pDecoder);
137,041✔
69
  tDecoderClear(pDecoder);  
137,041✔
70
  
71
  TAOS_RETURN(code);
137,041✔
72
}
73

74
void tFreeStreamObj(SStreamObj *pStream) {
268,365✔
75
  tFreeSCMCreateStreamReq(pStream->pCreate);
268,365✔
76
  taosMemoryFreeClear(pStream->pCreate);
268,365✔
77
}
268,365✔
78

79
void freeSMqConsumerEp(void* data) {
478,969✔
80
  if (data == NULL) return;
478,969✔
81
  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
478,969✔
82
  taosArrayDestroy(pConsumerEp->vgs);
478,969✔
83
  pConsumerEp->vgs = NULL;
478,969✔
84
  taosArrayDestroy(pConsumerEp->offsetRows);
478,969✔
85
  pConsumerEp->offsetRows = NULL;
478,969✔
86
}
87

88
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
2,180,550✔
89
  int32_t tlen = 0;
2,180,550✔
90
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
2,180,550✔
91
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
2,180,550✔
92
  return tlen;
2,180,550✔
93
}
94

95
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
1,000,392✔
96
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
1,000,392✔
97
  if (sver == 1) {
1,000,392✔
UNCOV
98
    uint64_t size = 0;
×
UNCOV
99
    buf = taosDecodeVariantU64(buf, &size);
×
UNCOV
100
    buf = POINTER_SHIFT(buf, size);
×
101
  }
102
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
1,000,392✔
103
  return (void *)buf;
1,000,392✔
104
}
105

106
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
139,557✔
107

108
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
601,304✔
109
                           SMqConsumerObj **ppConsumer) {
110
  int32_t         code = 0;
601,304✔
111
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
601,304✔
112
  if (pConsumer == NULL) {
601,304✔
UNCOV
113
    code = terrno;
×
UNCOV
114
    goto END;
×
115
  }
116

117
  pConsumer->consumerId = consumerId;
601,304✔
118
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
601,304✔
119

120
  pConsumer->epoch = 0;
601,304✔
121
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
601,304✔
122
  pConsumer->hbStatus = 0;
601,304✔
123
  pConsumer->pollStatus = 0;
601,304✔
124

125
  taosInitRWLatch(&pConsumer->lock);
601,304✔
126
  pConsumer->createTime = taosGetTimestampMs();
601,304✔
127
  pConsumer->updateType = updateType;
601,304✔
128

129
  if (updateType == CONSUMER_ADD_REB) {
601,304✔
130
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
141,348✔
131
    if (pConsumer->rebNewTopics == NULL) {
141,348✔
132
      code = terrno;
×
133
      goto END;
×
134
    }
135

136
    char *topicTmp = taosStrdup(topic);
141,348✔
137
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
282,696✔
138
      code = terrno;
×
139
      goto END;
×
140
    }
141
  } else if (updateType == CONSUMER_REMOVE_REB) {
459,956✔
142
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
107,440✔
143
    if (pConsumer->rebRemovedTopics == NULL) {
107,440✔
144
      code = terrno;
×
UNCOV
145
      goto END;
×
146
    }
147
    char *topicTmp = taosStrdup(topic);
107,440✔
148
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
214,880✔
UNCOV
149
      code = terrno;
×
UNCOV
150
      goto END;
×
151
    }
152
  } else if (updateType == CONSUMER_INSERT_SUB) {
352,516✔
153
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
137,145✔
154
    pConsumer->withTbName = subscribe->withTbName;
137,145✔
155
    pConsumer->autoCommit = subscribe->autoCommit;
137,145✔
156
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
137,145✔
157
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
137,145✔
158
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
137,145✔
159
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
137,145✔
160
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
137,145✔
161
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
137,145✔
162

163
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
137,145✔
164
    if (pConsumer->rebNewTopics == NULL) {
137,145✔
UNCOV
165
      code = terrno;
×
UNCOV
166
      goto END;
×
167
    }
168
    pConsumer->assignedTopics = subscribe->topicNames;
137,145✔
169
    subscribe->topicNames = NULL;
137,145✔
170
  } else if (updateType == CONSUMER_UPDATE_SUB) {
215,371✔
171
    pConsumer->assignedTopics = subscribe->topicNames;
110,023✔
172
    subscribe->topicNames = NULL;
110,023✔
173
  }
174

175
  *ppConsumer = pConsumer;
601,304✔
176
  return 0;
601,304✔
177

UNCOV
178
END:
×
UNCOV
179
  tDeleteSMqConsumerObj(pConsumer);
×
UNCOV
180
  return code;
×
181
}
182

183
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
2,313,605✔
184
  if (pConsumer == NULL) return;
2,313,605✔
185
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
1,208,385✔
186
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
1,208,385✔
187
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
1,208,385✔
188
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
1,208,385✔
189
}
190

191
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
1,706,524✔
192
  tClearSMqConsumerObj(pConsumer);
1,706,524✔
193
  taosMemoryFree(pConsumer);
1,706,524✔
194
}
1,706,524✔
195

196
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
1,409,258✔
197
  int32_t tlen = 0;
1,409,258✔
198
  int32_t sz;
199
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
1,409,258✔
200
  tlen += taosEncodeString(buf, pConsumer->clientId);
1,409,258✔
201
  tlen += taosEncodeString(buf, pConsumer->cgroup);
1,409,258✔
202
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
1,409,258✔
203
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
1,409,258✔
204
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
1,409,258✔
205

206
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
1,409,258✔
207
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
1,409,258✔
208
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
1,409,258✔
209
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
1,409,258✔
210
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
1,409,258✔
211

212
  // current topics
213
  if (pConsumer->currentTopics) {
1,409,258✔
214
    sz = taosArrayGetSize(pConsumer->currentTopics);
219,208✔
215
    tlen += taosEncodeFixedI32(buf, sz);
219,208✔
216
    for (int32_t i = 0; i < sz; i++) {
350,612✔
217
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
131,404✔
218
      tlen += taosEncodeString(buf, topic);
131,404✔
219
    }
220
  } else {
221
    tlen += taosEncodeFixedI32(buf, 0);
1,190,050✔
222
  }
223

224
  // reb new topics
225
  if (pConsumer->rebNewTopics) {
1,409,258✔
226
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
983,682✔
227
    tlen += taosEncodeFixedI32(buf, sz);
983,682✔
228
    for (int32_t i = 0; i < sz; i++) {
1,620,306✔
229
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
636,624✔
230
      tlen += taosEncodeString(buf, topic);
636,624✔
231
    }
232
  } else {
233
    tlen += taosEncodeFixedI32(buf, 0);
425,576✔
234
  }
235

236
  // reb removed topics
237
  if (pConsumer->rebRemovedTopics) {
1,409,258✔
238
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
641,576✔
239
    tlen += taosEncodeFixedI32(buf, sz);
641,576✔
240
    for (int32_t i = 0; i < sz; i++) {
1,064,588✔
241
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
423,012✔
242
      tlen += taosEncodeString(buf, topic);
423,012✔
243
    }
244
  } else {
245
    tlen += taosEncodeFixedI32(buf, 0);
767,682✔
246
  }
247

248
  // lost topics
249
  if (pConsumer->assignedTopics) {
1,409,258✔
250
    sz = taosArrayGetSize(pConsumer->assignedTopics);
700,986✔
251
    tlen += taosEncodeFixedI32(buf, sz);
700,986✔
252
    for (int32_t i = 0; i < sz; i++) {
1,186,318✔
253
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
485,332✔
254
      tlen += taosEncodeString(buf, topic);
485,332✔
255
    }
256
  } else {
257
    tlen += taosEncodeFixedI32(buf, 0);
708,272✔
258
  }
259

260
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
1,409,258✔
261
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
1,409,258✔
262
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
1,409,258✔
263
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
1,409,258✔
264
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
1,409,258✔
265
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
1,409,258✔
266
  tlen += taosEncodeString(buf, pConsumer->user);
1,409,258✔
267
  tlen += taosEncodeString(buf, pConsumer->fqdn);
1,409,258✔
268
  return tlen;
1,409,258✔
269
}
270

271
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
607,081✔
272
  int32_t sz;
606,325✔
273
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
607,081✔
274
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
607,081✔
275
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
607,081✔
276
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
607,081✔
277
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
607,081✔
278
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
607,081✔
279

280
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
607,081✔
281
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
607,081✔
282
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
607,081✔
283
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
607,081✔
284
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
1,214,162✔
285

286
  // current topics
287
  buf = taosDecodeFixedI32(buf, &sz);
607,081✔
288
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
607,081✔
289
  if (pConsumer->currentTopics == NULL) {
607,081✔
290
    return NULL;
×
291
  }
292
  for (int32_t i = 0; i < sz; i++) {
612,743✔
293
    char *topic;
5,662✔
294
    buf = taosDecodeString(buf, &topic);
5,662✔
295
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
11,324✔
UNCOV
296
      return NULL;
×
297
    }
298
  }
299

300
  // reb new topics
301
  buf = taosDecodeFixedI32(buf, &sz);
607,081✔
302
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
607,081✔
303
  for (int32_t i = 0; i < sz; i++) {
893,154✔
304
    char *topic;
285,749✔
305
    buf = taosDecodeString(buf, &topic);
286,073✔
306
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
572,146✔
UNCOV
307
      return NULL;
×
308
    }
309
  }
310

311
  // reb removed topics
312
  buf = taosDecodeFixedI32(buf, &sz);
607,081✔
313
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
607,081✔
314
  for (int32_t i = 0; i < sz; i++) {
820,623✔
315
    char *topic;
213,218✔
316
    buf = taosDecodeString(buf, &topic);
213,542✔
317
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
427,084✔
UNCOV
318
      return NULL;
×
319
    }
320
  }
321

322
  // reb removed topics
323
  buf = taosDecodeFixedI32(buf, &sz);
607,081✔
324
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
607,081✔
325
  for (int32_t i = 0; i < sz; i++) {
755,929✔
326
    char *topic;
148,686✔
327
    buf = taosDecodeString(buf, &topic);
148,848✔
328
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
297,696✔
UNCOV
329
      return NULL;
×
330
    }
331
  }
332

333
  if (sver > 1) {
607,081✔
334
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
607,081✔
335
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
607,081✔
336
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
607,081✔
337
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
1,214,162✔
338
  }
339
  if (sver > 2) {
607,081✔
340
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
607,081✔
341
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
607,081✔
342
    buf = taosDecodeStringTo(buf, pConsumer->user);
607,081✔
343
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
1,214,162✔
344
  } else {
UNCOV
345
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
UNCOV
346
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
347
  }
348

349
  return (void *)buf;
607,081✔
350
}
351

352
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
1,241,462✔
353
  int32_t tlen = 0;
1,241,462✔
354
  int32_t szVgs = taosArrayGetSize(offsetRows);
1,241,462✔
355
  tlen += taosEncodeFixedI32(buf, szVgs);
1,241,462✔
356
  for (int32_t j = 0; j < szVgs; ++j) {
2,765,582✔
357
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
1,524,120✔
358
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
1,524,120✔
359
    tlen += taosEncodeFixedI64(buf, offRows->rows);
1,524,120✔
360
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
1,524,120✔
361
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
1,524,120✔
362
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
73,482✔
363
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
146,964✔
364
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
1,450,638✔
365
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
2,590,816✔
366
    } else {
367
      // do nothing
368
    }
369
    tlen += taosEncodeFixedI64(buf, offRows->ever);
3,048,240✔
370
  }
371

372
  return tlen;
1,241,462✔
373
}
374

375
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
433,880✔
376
  int32_t tlen = 0;
433,880✔
377
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
433,880✔
378
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
433,880✔
379
  tlen += taosEncodeFixedI32(buf, sz);
433,880✔
380
  for (int32_t i = 0; i < sz; i++) {
1,397,238✔
381
    void* data = taosArrayGet(pConsumerEp->vgs, i);
963,358✔
382
    tlen += tEncodeSMqVgEp(buf, data);
963,358✔
383
  }
384

385
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
433,880✔
386
}
387

388
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
487,909✔
389
  int32_t szVgs = 0;
487,909✔
390
  buf = taosDecodeFixedI32(buf, &szVgs);
487,909✔
391
  if (szVgs > 0) {
487,909✔
392
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
211,241✔
393
    if (NULL == *offsetRows) return NULL;
211,241✔
394
    for (int32_t j = 0; j < szVgs; ++j) {
906,160✔
395
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
694,919✔
396
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
694,919✔
397
      buf = taosDecodeFixedI64(buf, &offRows->rows);
694,919✔
398
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
694,919✔
399
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
694,919✔
400
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
36,688✔
401
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
73,376✔
402
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
658,231✔
403
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
1,166,804✔
404
      } else {
405
        // do nothing
406
      }
407
      if (sver > 2) {
694,919✔
408
        buf = taosDecodeFixedI64(buf, &offRows->ever);
1,389,838✔
409
      }
410
    }
411
  }
412
  return (void *)buf;
487,909✔
413
}
414

415
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
158,439✔
416
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
158,439✔
417
  int32_t sz = 0;
158,439✔
418
  buf = taosDecodeFixedI32(buf, &sz);
158,439✔
419
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(SMqVgEp));
158,439✔
420
  if (pConsumerEp->vgs == NULL) {
158,439✔
UNCOV
421
    return NULL;
×
422
  }
423
  for (int32_t i = 0; i < sz; i++) {
581,932✔
424
    SMqVgEp* vgEp = taosArrayReserve(pConsumerEp->vgs, 1);
423,493✔
425
    if (vgEp != NULL)
423,493✔
426
      buf = tDecodeSMqVgEp(buf, vgEp, sver);
423,493✔
427
  }
428
  if (sver > 1) {
158,439✔
429
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
158,439✔
430
  }
431

432
  return (void *)buf;
158,439✔
433
}
434

435
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
114,544✔
436
  int32_t          code = 0;
114,544✔
437
  int32_t          lino = 0;
114,544✔
438
  PRINT_LOG_START
114,544✔
439
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
114,544✔
440
  MND_TMQ_NULL_CHECK(pSubObj);
114,544✔
441
  
442
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
114,544✔
443
  taosInitRWLatch(&pSubObj->lock);
114,544✔
444
  pSubObj->vgNum = 0;
114,544✔
445
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
114,544✔
446
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
114,544✔
447
  taosHashSetFreeFp(pSubObj->consumerHash, freeSMqConsumerEp);
114,544✔
448

449
  pSubObj->unassignedVgs = taosArrayInit(0, sizeof(SMqVgEp));
114,544✔
450
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
114,544✔
451
  *ppSub = pSubObj;
114,544✔
452
  pSubObj = NULL;
114,544✔
453

454
END:
114,544✔
455
  tDeleteSubscribeObj(pSubObj);
114,544✔
456
  PRINT_LOG_END
114,544✔
457
  return code;
114,544✔
458
}
459

460
int32_t tCloneSubscribeObj(SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
194,680✔
461
  int32_t          code = 0;
194,680✔
462
  int32_t          lino = 0;
194,680✔
463
  PRINT_LOG_START
194,680✔
464
  taosRLockLatch(&pSub->lock);
194,680✔
465

466
  SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
194,680✔
467
  MND_TMQ_NULL_CHECK(pSubNew);
194,680✔
468
  *ppSub = pSubNew;
194,680✔
469

470
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
194,680✔
471
  taosInitRWLatch(&pSubNew->lock);
194,680✔
472

473
  pSubNew->dbUid = pSub->dbUid;
194,680✔
474
  pSubNew->stbUid = pSub->stbUid;
194,680✔
475
  pSubNew->subType = pSub->subType;
194,680✔
476
  pSubNew->withMeta = pSub->withMeta;
194,680✔
477

478
  pSubNew->vgNum = pSub->vgNum;
194,680✔
479
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
194,680✔
480
  MND_TMQ_NULL_CHECK(pSubNew->consumerHash);
194,680✔
481
  taosHashSetFreeFp(pSubNew->consumerHash, freeSMqConsumerEp);
194,680✔
482

483
  void          *pIter = NULL;
194,680✔
484
  SMqConsumerEp *pConsumerEp = NULL;
194,680✔
485
  while (1) {
176,842✔
486
    pIter = taosHashIterate(pSub->consumerHash, pIter);
371,522✔
487
    if (pIter == NULL) break;
371,522✔
488
    pConsumerEp = (SMqConsumerEp *)pIter;
176,842✔
489
    SMqConsumerEp newEp = {
177,004✔
490
        .consumerId = pConsumerEp->consumerId,
176,842✔
491
        .vgs = taosArrayDup(pConsumerEp->vgs, NULL),
176,842✔
492
    };
493
    MND_TMQ_RETURN_CHECK(taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)));
176,842✔
494
  }
495
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL);
194,680✔
496
  MND_TMQ_NULL_CHECK(pSubNew->unassignedVgs);
194,680✔
497
  
498
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
194,680✔
499
  MND_TMQ_CONDITION_CHECK(pSub->offsetRows == NULL || pSubNew->offsetRows != NULL, terrno);
194,680✔
500
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
194,680✔
501

502
END:
194,680✔
503
  taosRUnLockLatch(&pSub->lock);
194,680✔
504
  return code;
194,680✔
505
}
506

507
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
753,238✔
508
  if (pSub == NULL) return;
753,238✔
509
  taosHashCleanup(pSub->consumerHash);
638,694✔
510
  pSub->consumerHash = NULL;
638,694✔
511
  taosArrayDestroy(pSub->unassignedVgs);
638,694✔
512
  pSub->unassignedVgs = NULL;
638,694✔
513
  taosArrayDestroy(pSub->offsetRows);
638,694✔
514
  pSub->offsetRows = NULL;
638,694✔
515
}
516

517
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
807,582✔
518
  int32_t tlen = 0;
807,582✔
519
  tlen += taosEncodeString(buf, pSub->key);
807,582✔
520
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
807,582✔
521
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
807,582✔
522
  tlen += taosEncodeFixedI8(buf, pSub->subType);
807,582✔
523
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
807,582✔
524
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
807,582✔
525

526
  void   *pIter = NULL;
807,582✔
527
  int32_t sz = taosHashGetSize(pSub->consumerHash);
807,582✔
528
  tlen += taosEncodeFixedI32(buf, sz);
807,582✔
529

530
  int32_t cnt = 0;
807,582✔
531
  while (1) {
433,880✔
532
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,241,462✔
533
    if (pIter == NULL) break;
1,241,462✔
534
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
433,880✔
535
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
433,880✔
536
    cnt++;
433,880✔
537
  }
538
  if (cnt != sz) return -1;
807,582✔
539
  int32_t len = taosArrayGetSize(pSub->unassignedVgs);
807,582✔
540
  tlen += taosEncodeFixedI32(buf, len);
807,582✔
541
  for (int32_t i = 0; i < len; i++) {
2,024,774✔
542
    void* data = taosArrayGet(pSub->unassignedVgs, i);
1,217,192✔
543
    tlen += tEncodeSMqVgEp(buf, data);
1,217,192✔
544
  }
545
  tlen += taosEncodeString(buf, pSub->dbName);
807,582✔
546

547
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
807,582✔
548
  return tlen;
807,582✔
549
}
550

551
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
329,470✔
552
  //
553
  buf = taosDecodeStringTo(buf, pSub->key);
329,470✔
554
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
329,470✔
555
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
329,470✔
556
  buf = taosDecodeFixedI8(buf, &pSub->subType);
329,470✔
557
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
329,470✔
558
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
329,794✔
559

560
  int32_t sz;
329,146✔
561
  buf = taosDecodeFixedI32(buf, &sz);
329,470✔
562

563
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
329,470✔
564
  if (pSub->consumerHash == NULL) {
329,470✔
UNCOV
565
    return NULL;
×
566
  }
567
  taosHashSetFreeFp(pSub->consumerHash, freeSMqConsumerEp);
329,470✔
568

569
  for (int32_t i = 0; i < sz; i++) {
487,909✔
570
    SMqConsumerEp consumerEp = {0};
158,439✔
571
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
158,439✔
572
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
158,439✔
573
        0)
UNCOV
574
      return NULL;
×
575
  }
576

577
  int32_t len = 0;
329,470✔
578
  buf = taosDecodeFixedI32(buf, &len);
329,470✔
579
  pSub->unassignedVgs = taosArrayInit(len, sizeof(SMqVgEp));
329,470✔
580
  if (pSub->unassignedVgs == NULL) {
329,470✔
UNCOV
581
    return NULL;
×
582
  }
583
  for (int32_t i = 0; i < len; i++) {
906,369✔
584
    SMqVgEp* vgEp = taosArrayReserve(pSub->unassignedVgs, 1);
576,899✔
585
    if (vgEp != NULL)
576,899✔
586
      buf = tDecodeSMqVgEp(buf, vgEp, sver);
576,899✔
587
  }
588

589
  buf = taosDecodeStringTo(buf, pSub->dbName);
329,470✔
590

591
  if (sver > 1) {
329,470✔
592
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
329,470✔
593
  }
594
  return (void *)buf;
329,470✔
595
}
596

597
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
30,976,400✔
598
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
30,976,400✔
599
  pObj->dtype = pItem->dtype;
30,976,400✔
600
  switch (pItem->dtype) {
30,976,400✔
UNCOV
601
    case CFG_DTYPE_NONE:
×
UNCOV
602
      break;
×
603
    case CFG_DTYPE_BOOL:
7,190,950✔
604
      pObj->bval = pItem->bval;
7,190,950✔
605
      break;
7,190,950✔
606
    case CFG_DTYPE_INT32:
16,317,925✔
607
      pObj->i32 = pItem->i32;
16,317,925✔
608
      break;
16,317,925✔
609
    case CFG_DTYPE_INT64:
1,659,450✔
610
      pObj->i64 = pItem->i64;
1,659,450✔
611
      break;
1,659,450✔
612
    case CFG_DTYPE_FLOAT:
553,150✔
613
    case CFG_DTYPE_DOUBLE:
614
      pObj->fval = pItem->fval;
553,150✔
615
      break;
553,150✔
616
    case CFG_DTYPE_STRING:
5,254,925✔
617
    case CFG_DTYPE_DIR:
618
    case CFG_DTYPE_LOCALE:
619
    case CFG_DTYPE_CHARSET:
620
    case CFG_DTYPE_TIMEZONE:
621
      pObj->str = taosStrdup(pItem->str);
5,254,925✔
622
      if (pObj->str == NULL) {
5,254,925✔
UNCOV
623
        taosMemoryFree(pObj);
×
UNCOV
624
        return TSDB_CODE_OUT_OF_MEMORY;
×
625
      }
626
      break;
5,254,925✔
627
  }
628
  return TSDB_CODE_SUCCESS;
30,976,400✔
629
}
630

631
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
9,182✔
632
  int32_t code = 0;
9,182✔
633
  switch (pObjNew->dtype) {
9,182✔
634
    case CFG_DTYPE_BOOL: {
1,631✔
635
      bool tmp = false;
1,631✔
636
      if (strcasecmp(value, "true") == 0) {
1,631✔
UNCOV
637
        tmp = true;
×
638
      }
639
      if (taosStr2Int32(value, NULL, 10) > 0) {
1,631✔
640
        tmp = true;
475✔
641
      }
642
      pObjNew->bval = tmp;
1,631✔
643
      break;
1,631✔
644
    }
645
    case CFG_DTYPE_INT32: {
6,989✔
646
      int32_t ival;
6,989✔
647
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
6,989✔
648
      pObjNew->i32 = ival;
6,989✔
649
      break;
6,989✔
650
    }
651
    case CFG_DTYPE_INT64: {
497✔
652
      int64_t ival;
497✔
653
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
497✔
654
      pObjNew->i64 = ival;
497✔
655
      break;
497✔
656
    }
UNCOV
657
    case CFG_DTYPE_FLOAT:
×
658
    case CFG_DTYPE_DOUBLE: {
UNCOV
659
      float dval = 0;
×
UNCOV
660
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
UNCOV
661
      pObjNew->fval = dval;
×
UNCOV
662
      break;
×
663
    }
664
    case CFG_DTYPE_DIR:
65✔
665
    case CFG_DTYPE_TIMEZONE:
666
    case CFG_DTYPE_CHARSET:
667
    case CFG_DTYPE_LOCALE:
668
    case CFG_DTYPE_STRING: {
669
      pObjNew->str = taosStrdup(value);
65✔
670
      if (pObjNew->str == NULL) {
65✔
671
        code = terrno;
×
672
        return code;
×
673
      }
674
      break;
65✔
675
    }
UNCOV
676
    case CFG_DTYPE_NONE:
×
UNCOV
677
      break;
×
UNCOV
678
    default:
×
UNCOV
679
      code = TSDB_CODE_INVALID_CFG;
×
UNCOV
680
      break;
×
681
  }
682
  return code;
9,182✔
683
}
684

685
SConfigObj mndInitConfigVersion() {
276,575✔
686
  SConfigObj obj;
276,464✔
687
  memset(&obj, 0, sizeof(SConfigObj));
276,575✔
688

689
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
276,575✔
690
  obj.dtype = CFG_DTYPE_INT32;
276,575✔
691
  obj.i32 = 0;
276,575✔
692
  return obj;
276,575✔
693
}
694

695
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
228,101,950✔
696
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
228,101,950✔
697
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
456,203,900✔
698

699
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
456,203,900✔
700
  switch (pObj->dtype) {
228,101,950✔
701
    case CFG_DTYPE_BOOL:
52,474,064✔
702
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
104,948,128✔
703
      break;
52,474,064✔
704
    case CFG_DTYPE_INT32:
121,141,204✔
705
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
242,282,408✔
706
      break;
121,141,204✔
707
    case CFG_DTYPE_INT64:
12,109,640✔
708
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
24,219,280✔
709
      break;
12,109,640✔
710
    case CFG_DTYPE_FLOAT:
4,035,884✔
711
    case CFG_DTYPE_DOUBLE:
712
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
8,071,768✔
713
      break;
4,035,884✔
714
    case CFG_DTYPE_STRING:
38,341,158✔
715
    case CFG_DTYPE_DIR:
716
    case CFG_DTYPE_LOCALE:
717
    case CFG_DTYPE_CHARSET:
718
    case CFG_DTYPE_TIMEZONE:
719
      if (pObj->str != NULL) {
38,341,158✔
720
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
76,682,316✔
721
      } else {
UNCOV
722
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
723
      }
724
      break;
38,341,158✔
UNCOV
725
    default:
×
UNCOV
726
      break;
×
727
  }
728
  tEndEncode(pEncoder);
228,101,950✔
729
  return pEncoder->pos;
228,101,950✔
730
}
731

732
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
43,534,359✔
733
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
43,534,359✔
734
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
43,534,359✔
735
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
87,068,718✔
736
  switch (pObj->dtype) {
43,534,359✔
UNCOV
737
    case CFG_DTYPE_NONE:
×
UNCOV
738
      break;
×
739
    case CFG_DTYPE_BOOL:
10,003,304✔
740
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
10,003,304✔
741
      break;
10,003,304✔
742
    case CFG_DTYPE_INT32:
23,147,134✔
743
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
46,294,268✔
744
      break;
23,147,134✔
745
    case CFG_DTYPE_INT64:
2,309,760✔
746
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
4,619,520✔
747
      break;
2,309,760✔
748
    case CFG_DTYPE_FLOAT:
769,766✔
749
    case CFG_DTYPE_DOUBLE:
750
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
1,539,532✔
751
      break;
769,766✔
752
    case CFG_DTYPE_STRING:
7,304,395✔
753
    case CFG_DTYPE_DIR:
754
    case CFG_DTYPE_LOCALE:
755
    case CFG_DTYPE_CHARSET:
756
    case CFG_DTYPE_TIMEZONE:
757
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
14,608,790✔
758
      break;
7,304,395✔
759
  }
760
  tEndDecode(pDecoder);
43,534,359✔
761
  TAOS_RETURN(TSDB_CODE_SUCCESS);
43,534,359✔
762
}
763

764
void tFreeSConfigObj(SConfigObj *obj) {
74,800,387✔
765
  if (obj == NULL) {
74,800,387✔
UNCOV
766
    return;
×
767
  }
768
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
74,800,387✔
769
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
63,563,848✔
770
    taosMemoryFree(obj->str);
12,558,492✔
771
  }
772
}
773

774
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
775
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
776
//   if (pEntryNew == NULL) return NULL;
777
//   pEntryNew->epoch = pEntry->epoch;
778
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
779
//   return pEntryNew;
780
// }
781
//
782
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
783
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
784
// }
785

786
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
787
//   int32_t tlen = 0;
788
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
789
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
790
//   return tlen;
791
// }
792
//
793
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
794
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
795
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
796
//   return (void *)buf;
797
// }
798

799
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
800
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
801
//   if (pLogNew == NULL) return pLogNew;
802
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
803
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
804
//   return pLogNew;
805
// }
806
//
807
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
808
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
809
// }
810

811
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
812
//   int32_t tlen = 0;
813
//   tlen += taosEncodeString(buf, pLog->key);
814
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
815
//   return tlen;
816
// }
817
//
818
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
819
//   buf = taosDecodeStringTo(buf, pLog->key);
820
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
821
//   return (void *)buf;
822
// }
823
//
824
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
825
//   int32_t tlen = 0;
826
//   tlen += taosEncodeString(buf, pOffset->key);
827
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
828
//   return tlen;
829
// }
830
//
831
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
832
//   buf = taosDecodeStringTo(buf, pOffset->key);
833
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
834
//   return buf;
835
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc