• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4865

26 Nov 2025 05:46AM UTC coverage: 64.495% (-0.05%) from 64.548%
#4865

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

767 of 945 new or added lines in 33 files covered. (81.16%)

3088 existing lines in 119 files now uncovered.

158096 of 245129 relevant lines covered (64.5%)

111671620.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.8
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "mndStream.h"
19
#include "taoserror.h"
20
#include "tunit.h"
21

22
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
812,438✔
23
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
812,438✔
24

25
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
1,624,876✔
26
  TAOS_CHECK_RETURN(tSerializeSCMCreateStreamReqImpl(pEncoder, pObj->pCreate));
812,438✔
27

28
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->mainSnodeId));
1,624,876✔
29
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->userStopped));
1,624,876✔
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
1,624,876✔
31
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
1,624,876✔
32

33
  tEndEncode(pEncoder);
812,438✔
34
  return pEncoder->pos;
812,438✔
35
}
36

37
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
222,457✔
38
  int32_t code = 0;
222,457✔
39
  int32_t lino = 0;
222,457✔
40
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
222,457✔
41

42
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
222,457✔
43
  pObj->pCreate = taosMemoryCalloc(1, sizeof(*pObj->pCreate));
222,457✔
44
  if (NULL == pObj) {
222,457✔
UNCOV
45
    TAOS_CHECK_EXIT(terrno);
×
46
  }
47
  
48
  if (MND_STREAM_VER_NUMBER == sver) {
222,457✔
49
    TAOS_CHECK_RETURN(tDeserializeSCMCreateStreamReqImpl(pDecoder, pObj->pCreate));
222,457✔
50
  } else {
NEW
51
    TAOS_CHECK_RETURN(
×
52
      tDeserializeSCMCreateStreamReqImplOld(pDecoder, pObj->pCreate, 21));
53
  }
54

55
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->mainSnodeId));
444,914✔
56
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->userStopped));
444,914✔
57
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
444,914✔
58
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
444,914✔
59

60
_exit:
222,457✔
61

62
  tEndDecode(pDecoder);
222,457✔
63
  tDecoderClear(pDecoder);  
222,457✔
64
  
65
  TAOS_RETURN(code);
222,457✔
66
}
67

68
void tFreeStreamObj(SStreamObj *pStream) {
445,967✔
69
  tFreeSCMCreateStreamReq(pStream->pCreate);
445,967✔
70
  taosMemoryFreeClear(pStream->pCreate);
445,967✔
71
}
445,967✔
72

73
void freeSMqConsumerEp(void* data) {
172,282✔
74
  if (data == NULL) return;
172,282✔
75
  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
172,282✔
76
  taosArrayDestroy(pConsumerEp->vgs);
172,282✔
77
  pConsumerEp->vgs = NULL;
172,282✔
78
  taosArrayDestroy(pConsumerEp->offsetRows);
172,282✔
79
  pConsumerEp->offsetRows = NULL;
172,282✔
80
}
81

82
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
1,000,928✔
83
  int32_t tlen = 0;
1,000,928✔
84
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
1,000,928✔
85
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
1,000,928✔
86
  return tlen;
1,000,928✔
87
}
88

89
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
459,393✔
90
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
459,393✔
91
  if (sver == 1) {
459,393✔
UNCOV
92
    uint64_t size = 0;
×
UNCOV
93
    buf = taosDecodeVariantU64(buf, &size);
×
UNCOV
94
    buf = POINTER_SHIFT(buf, size);
×
95
  }
96
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
459,393✔
97
  return (void *)buf;
459,393✔
98
}
99

100
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
55,310✔
101

102
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
240,805✔
103
                           SMqConsumerObj **ppConsumer) {
104
  int32_t         code = 0;
240,805✔
105
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
240,805✔
106
  if (pConsumer == NULL) {
240,805✔
UNCOV
107
    code = terrno;
×
UNCOV
108
    goto END;
×
109
  }
110

111
  pConsumer->consumerId = consumerId;
240,805✔
112
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
240,805✔
113

114
  pConsumer->epoch = 0;
240,805✔
115
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
240,805✔
116
  pConsumer->hbStatus = 0;
240,805✔
117
  pConsumer->pollStatus = 0;
240,805✔
118

119
  taosInitRWLatch(&pConsumer->lock);
240,805✔
120
  pConsumer->createTime = taosGetTimestampMs();
240,805✔
121
  pConsumer->updateType = updateType;
240,805✔
122

123
  if (updateType == CONSUMER_ADD_REB) {
240,805✔
124
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
55,835✔
125
    if (pConsumer->rebNewTopics == NULL) {
55,835✔
UNCOV
126
      code = terrno;
×
UNCOV
127
      goto END;
×
128
    }
129

130
    char *topicTmp = taosStrdup(topic);
55,835✔
131
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
111,670✔
132
      code = terrno;
×
133
      goto END;
×
134
    }
135
  } else if (updateType == CONSUMER_REMOVE_REB) {
184,970✔
136
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
43,822✔
137
    if (pConsumer->rebRemovedTopics == NULL) {
43,822✔
138
      code = terrno;
×
139
      goto END;
×
140
    }
141
    char *topicTmp = taosStrdup(topic);
43,822✔
142
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
87,644✔
UNCOV
143
      code = terrno;
×
144
      goto END;
×
145
    }
146
  } else if (updateType == CONSUMER_INSERT_SUB) {
141,148✔
147
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
54,721✔
148
    pConsumer->withTbName = subscribe->withTbName;
54,721✔
149
    pConsumer->autoCommit = subscribe->autoCommit;
54,721✔
150
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
54,721✔
151
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
54,721✔
152
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
54,721✔
153
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
54,721✔
154
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
54,721✔
155
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
54,721✔
156

157
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
54,721✔
158
    if (pConsumer->rebNewTopics == NULL) {
54,721✔
UNCOV
159
      code = terrno;
×
UNCOV
160
      goto END;
×
161
    }
162
    pConsumer->assignedTopics = subscribe->topicNames;
54,721✔
163
    subscribe->topicNames = NULL;
54,721✔
164
  } else if (updateType == CONSUMER_UPDATE_SUB) {
86,427✔
165
    pConsumer->assignedTopics = subscribe->topicNames;
42,608✔
166
    subscribe->topicNames = NULL;
42,608✔
167
  }
168

169
  *ppConsumer = pConsumer;
240,805✔
170
  return 0;
240,805✔
171

UNCOV
172
END:
×
UNCOV
173
  tDeleteSMqConsumerObj(pConsumer);
×
UNCOV
174
  return code;
×
175
}
176

177
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
842,285✔
178
  if (pConsumer == NULL) return;
842,285✔
179
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
482,477✔
180
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
482,477✔
181
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
482,477✔
182
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
482,477✔
183
}
184

185
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
600,613✔
186
  tClearSMqConsumerObj(pConsumer);
600,613✔
187
  taosMemoryFree(pConsumer);
600,613✔
188
}
600,613✔
189

190
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
558,634✔
191
  int32_t tlen = 0;
558,634✔
192
  int32_t sz;
193
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
558,634✔
194
  tlen += taosEncodeString(buf, pConsumer->clientId);
558,634✔
195
  tlen += taosEncodeString(buf, pConsumer->cgroup);
558,634✔
196
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
558,634✔
197
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
558,634✔
198
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
558,634✔
199

200
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
558,634✔
201
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
558,634✔
202
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
558,634✔
203
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
558,634✔
204
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
558,634✔
205

206
  // current topics
207
  if (pConsumer->currentTopics) {
558,634✔
208
    sz = taosArrayGetSize(pConsumer->currentTopics);
77,944✔
209
    tlen += taosEncodeFixedI32(buf, sz);
77,944✔
210
    for (int32_t i = 0; i < sz; i++) {
121,778✔
211
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
43,834✔
212
      tlen += taosEncodeString(buf, topic);
43,834✔
213
    }
214
  } else {
215
    tlen += taosEncodeFixedI32(buf, 0);
480,690✔
216
  }
217

218
  // reb new topics
219
  if (pConsumer->rebNewTopics) {
558,634✔
220
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
383,352✔
221
    tlen += taosEncodeFixedI32(buf, sz);
383,352✔
222
    for (int32_t i = 0; i < sz; i++) {
630,422✔
223
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
247,070✔
224
      tlen += taosEncodeString(buf, topic);
247,070✔
225
    }
226
  } else {
227
    tlen += taosEncodeFixedI32(buf, 0);
175,282✔
228
  }
229

230
  // reb removed topics
231
  if (pConsumer->rebRemovedTopics) {
558,634✔
232
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
249,884✔
233
    tlen += taosEncodeFixedI32(buf, sz);
249,884✔
234
    for (int32_t i = 0; i < sz; i++) {
422,000✔
235
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
172,116✔
236
      tlen += taosEncodeString(buf, topic);
172,116✔
237
    }
238
  } else {
239
    tlen += taosEncodeFixedI32(buf, 0);
308,750✔
240
  }
241

242
  // lost topics
243
  if (pConsumer->assignedTopics) {
558,634✔
244
    sz = taosArrayGetSize(pConsumer->assignedTopics);
271,682✔
245
    tlen += taosEncodeFixedI32(buf, sz);
271,682✔
246
    for (int32_t i = 0; i < sz; i++) {
450,868✔
247
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
179,186✔
248
      tlen += taosEncodeString(buf, topic);
179,186✔
249
    }
250
  } else {
251
    tlen += taosEncodeFixedI32(buf, 0);
286,952✔
252
  }
253

254
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
558,634✔
255
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
558,634✔
256
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
558,634✔
257
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
558,634✔
258
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
558,634✔
259
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
558,634✔
260
  tlen += taosEncodeString(buf, pConsumer->user);
558,634✔
261
  tlen += taosEncodeString(buf, pConsumer->fqdn);
558,634✔
262
  return tlen;
558,634✔
263
}
264

265
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
241,672✔
266
  int32_t sz;
238,844✔
267
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
241,672✔
268
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
241,672✔
269
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
241,672✔
270
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
241,672✔
271
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
241,672✔
272
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
241,672✔
273

274
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
241,672✔
275
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
241,672✔
276
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
241,672✔
277
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
241,672✔
278
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
483,344✔
279

280
  // current topics
281
  buf = taosDecodeFixedI32(buf, &sz);
241,672✔
282
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
241,672✔
283
  if (pConsumer->currentTopics == NULL) {
241,672✔
UNCOV
284
    return NULL;
×
285
  }
286
  for (int32_t i = 0; i < sz; i++) {
242,506✔
287
    char *topic;
834✔
288
    buf = taosDecodeString(buf, &topic);
834✔
289
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
1,668✔
290
      return NULL;
×
291
    }
292
  }
293

294
  // reb new topics
295
  buf = taosDecodeFixedI32(buf, &sz);
241,672✔
296
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
241,672✔
297
  for (int32_t i = 0; i < sz; i++) {
353,662✔
298
    char *topic;
110,778✔
299
    buf = taosDecodeString(buf, &topic);
111,990✔
300
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
223,980✔
UNCOV
301
      return NULL;
×
302
    }
303
  }
304

305
  // reb removed topics
306
  buf = taosDecodeFixedI32(buf, &sz);
241,672✔
307
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
241,672✔
308
  for (int32_t i = 0; i < sz; i++) {
327,894✔
309
    char *topic;
85,010✔
310
    buf = taosDecodeString(buf, &topic);
86,222✔
311
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
172,444✔
UNCOV
312
      return NULL;
×
313
    }
314
  }
315

316
  // reb removed topics
317
  buf = taosDecodeFixedI32(buf, &sz);
241,672✔
318
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
241,672✔
319
  for (int32_t i = 0; i < sz; i++) {
298,501✔
320
    char *topic;
56,223✔
321
    buf = taosDecodeString(buf, &topic);
56,829✔
322
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
113,658✔
UNCOV
323
      return NULL;
×
324
    }
325
  }
326

327
  if (sver > 1) {
241,672✔
328
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
241,672✔
329
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
241,672✔
330
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
241,672✔
331
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
483,344✔
332
  }
333
  if (sver > 2) {
241,672✔
334
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
241,672✔
335
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
241,672✔
336
    buf = taosDecodeStringTo(buf, pConsumer->user);
241,672✔
337
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
483,344✔
338
  } else {
UNCOV
339
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
UNCOV
340
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
341
  }
342

343
  return (void *)buf;
241,672✔
344
}
345

346
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
463,974✔
347
  int32_t tlen = 0;
463,974✔
348
  int32_t szVgs = taosArrayGetSize(offsetRows);
463,974✔
349
  tlen += taosEncodeFixedI32(buf, szVgs);
463,974✔
350
  for (int32_t j = 0; j < szVgs; ++j) {
1,189,770✔
351
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
725,796✔
352
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
725,796✔
353
    tlen += taosEncodeFixedI64(buf, offRows->rows);
725,796✔
354
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
725,796✔
355
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
725,796✔
356
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
23,118✔
357
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
46,236✔
358
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
702,678✔
359
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
1,238,436✔
360
    } else {
361
      // do nothing
362
    }
363
    tlen += taosEncodeFixedI64(buf, offRows->ever);
1,451,592✔
364
  }
365

366
  return tlen;
463,974✔
367
}
368

369
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
167,468✔
370
  int32_t tlen = 0;
167,468✔
371
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
167,468✔
372
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
167,468✔
373
  tlen += taosEncodeFixedI32(buf, sz);
167,468✔
374
  for (int32_t i = 0; i < sz; i++) {
609,442✔
375
    void* data = taosArrayGet(pConsumerEp->vgs, i);
441,974✔
376
    tlen += tEncodeSMqVgEp(buf, data);
441,974✔
377
  }
378

379
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
167,468✔
380
}
381

382
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
183,035✔
383
  int32_t szVgs = 0;
183,035✔
384
  buf = taosDecodeFixedI32(buf, &szVgs);
183,035✔
385
  if (szVgs > 0) {
183,035✔
386
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
78,656✔
387
    if (NULL == *offsetRows) return NULL;
78,656✔
388
    for (int32_t j = 0; j < szVgs; ++j) {
409,559✔
389
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
330,903✔
390
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
330,903✔
391
      buf = taosDecodeFixedI64(buf, &offRows->rows);
330,903✔
392
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
330,903✔
393
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
330,903✔
394
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
10,539✔
395
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
21,078✔
396
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
320,364✔
397
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
564,952✔
398
      } else {
399
        // do nothing
400
      }
401
      if (sver > 2) {
330,903✔
402
        buf = taosDecodeFixedI64(buf, &offRows->ever);
661,806✔
403
      }
404
    }
405
  }
406
  return (void *)buf;
183,035✔
407
}
408

409
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
62,811✔
410
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
62,811✔
411
  int32_t sz = 0;
62,811✔
412
  buf = taosDecodeFixedI32(buf, &sz);
62,811✔
413
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(SMqVgEp));
62,811✔
414
  if (pConsumerEp->vgs == NULL) {
62,811✔
UNCOV
415
    return NULL;
×
416
  }
417
  for (int32_t i = 0; i < sz; i++) {
262,675✔
418
    SMqVgEp* vgEp = taosArrayReserve(pConsumerEp->vgs, 1);
199,864✔
419
    if (vgEp != NULL)
199,864✔
420
      buf = tDecodeSMqVgEp(buf, vgEp, sver);
199,864✔
421
  }
422
  if (sver > 1) {
62,811✔
423
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
62,811✔
424
  }
425

426
  return (void *)buf;
62,811✔
427
}
428

429
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
40,078✔
430
  int32_t          code = 0;
40,078✔
431
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
40,078✔
432
  MND_TMQ_NULL_CHECK(pSubObj);
40,078✔
433
  *ppSub = pSubObj;
40,078✔
434
  
435
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
40,078✔
436
  taosInitRWLatch(&pSubObj->lock);
40,078✔
437
  pSubObj->vgNum = 0;
40,078✔
438
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
40,078✔
439
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
40,078✔
440
  taosHashSetFreeFp(pSubObj->consumerHash, freeSMqConsumerEp);
40,078✔
441

442
  pSubObj->unassignedVgs = taosArrayInit(0, sizeof(SMqVgEp));
40,078✔
443
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
40,078✔
444

445
END:
40,078✔
446
  return code;
40,078✔
447
}
448

449
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
61,107✔
450
  int32_t          code = 0;
61,107✔
451
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
61,107✔
452
  if (pSubNew == NULL) {
61,107✔
UNCOV
453
    code = terrno;
×
UNCOV
454
    goto END;
×
455
  }
456
  *ppSub = pSubNew;
61,107✔
457

458
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
61,107✔
459
  taosInitRWLatch(&pSubNew->lock);
61,107✔
460

461
  pSubNew->dbUid = pSub->dbUid;
61,107✔
462
  pSubNew->stbUid = pSub->stbUid;
61,107✔
463
  pSubNew->subType = pSub->subType;
61,107✔
464
  pSubNew->withMeta = pSub->withMeta;
61,107✔
465

466
  pSubNew->vgNum = pSub->vgNum;
61,107✔
467
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
61,107✔
468
  if (pSubNew->consumerHash == NULL) {
61,107✔
UNCOV
469
    code = terrno;
×
UNCOV
470
    goto END;
×
471
  }
472
  taosHashSetFreeFp(pSubNew->consumerHash, freeSMqConsumerEp);
61,107✔
473

474
  void          *pIter = NULL;
61,107✔
475
  SMqConsumerEp *pConsumerEp = NULL;
61,107✔
476
  while (1) {
53,636✔
477
    pIter = taosHashIterate(pSub->consumerHash, pIter);
114,743✔
478
    if (pIter == NULL) break;
114,743✔
479
    pConsumerEp = (SMqConsumerEp *)pIter;
53,636✔
480
    SMqConsumerEp newEp = {
54,242✔
481
        .consumerId = pConsumerEp->consumerId,
53,636✔
482
        .vgs = taosArrayDup(pConsumerEp->vgs, NULL),
53,636✔
483
    };
484
    if ((code = taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp,
53,636✔
485
                            sizeof(SMqConsumerEp))) != 0)
UNCOV
486
      goto END;
×
487
  }
488
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL);
61,107✔
489
  if (pSubNew->unassignedVgs == NULL) {
61,107✔
UNCOV
490
    code = terrno;
×
UNCOV
491
    goto END;
×
492
  }
493
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
61,107✔
494
  if (pSub->offsetRows != NULL && pSubNew->offsetRows == NULL) {
61,107✔
UNCOV
495
    code = terrno;
×
496
    goto END;
×
497
  }
498
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
61,107✔
499
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
61,107✔
500
  if (pSubNew->qmsg == NULL) {
61,107✔
501
    code = terrno;
×
502
    goto END;
×
503
  }
504

505
END:
61,107✔
506
  return code;
61,107✔
507
}
508

509
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
221,409✔
510
  if (pSub == NULL) return;
221,409✔
511
  taosHashCleanup(pSub->consumerHash);
221,409✔
512
  pSub->consumerHash = NULL;
221,409✔
513
  taosArrayDestroy(pSub->unassignedVgs);
221,409✔
514
  pSub->unassignedVgs = NULL;
221,409✔
515
  taosMemoryFreeClear(pSub->qmsg);
221,409✔
516
  taosArrayDestroy(pSub->offsetRows);
221,409✔
517
  pSub->offsetRows = NULL;
221,409✔
518
}
519

520
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
296,506✔
521
  int32_t tlen = 0;
296,506✔
522
  tlen += taosEncodeString(buf, pSub->key);
296,506✔
523
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
296,506✔
524
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
296,506✔
525
  tlen += taosEncodeFixedI8(buf, pSub->subType);
296,506✔
526
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
296,506✔
527
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
296,506✔
528

529
  void   *pIter = NULL;
296,506✔
530
  int32_t sz = taosHashGetSize(pSub->consumerHash);
296,506✔
531
  tlen += taosEncodeFixedI32(buf, sz);
296,506✔
532

533
  int32_t cnt = 0;
296,506✔
534
  while (1) {
167,468✔
535
    pIter = taosHashIterate(pSub->consumerHash, pIter);
463,974✔
536
    if (pIter == NULL) break;
463,974✔
537
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
167,468✔
538
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
167,468✔
539
    cnt++;
167,468✔
540
  }
541
  if (cnt != sz) return -1;
296,506✔
542
  int32_t len = taosArrayGetSize(pSub->unassignedVgs);
296,506✔
543
  tlen += taosEncodeFixedI32(buf, len);
296,506✔
544
  for (int32_t i = 0; i < len; i++) {
855,460✔
545
    void* data = taosArrayGet(pSub->unassignedVgs, i);
558,954✔
546
    tlen += tEncodeSMqVgEp(buf, data);
558,954✔
547
  }
548
  tlen += taosEncodeString(buf, pSub->dbName);
296,506✔
549

550
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
296,506✔
551
  tlen += taosEncodeString(buf, pSub->qmsg);
296,506✔
552
  return tlen;
296,506✔
553
}
554

555
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
120,224✔
556
  //
557
  buf = taosDecodeStringTo(buf, pSub->key);
120,224✔
558
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
120,224✔
559
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
120,224✔
560
  buf = taosDecodeFixedI8(buf, &pSub->subType);
120,224✔
561
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
120,224✔
562
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
121,436✔
563

564
  int32_t sz;
119,012✔
565
  buf = taosDecodeFixedI32(buf, &sz);
120,224✔
566

567
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
120,224✔
568
  if (pSub->consumerHash == NULL) {
120,224✔
UNCOV
569
    return NULL;
×
570
  }
571
  taosHashSetFreeFp(pSub->consumerHash, freeSMqConsumerEp);
120,224✔
572

573
  for (int32_t i = 0; i < sz; i++) {
183,035✔
574
    SMqConsumerEp consumerEp = {0};
62,811✔
575
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
62,811✔
576
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
62,811✔
577
        0)
UNCOV
578
      return NULL;
×
579
  }
580

581
  int32_t len = 0;
120,224✔
582
  buf = taosDecodeFixedI32(buf, &len);
120,224✔
583
  pSub->unassignedVgs = taosArrayInit(len, sizeof(SMqVgEp));
120,224✔
584
  if (pSub->unassignedVgs == NULL) {
120,224✔
UNCOV
585
    return NULL;
×
586
  }
587
  for (int32_t i = 0; i < len; i++) {
379,753✔
588
    SMqVgEp* vgEp = taosArrayReserve(pSub->unassignedVgs, 1);
259,529✔
589
    if (vgEp != NULL)
259,529✔
590
      buf = tDecodeSMqVgEp(buf, vgEp, sver);
259,529✔
591
  }
592

593
  buf = taosDecodeStringTo(buf, pSub->dbName);
120,224✔
594

595
  if (sver > 1) {
120,224✔
596
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
120,224✔
597
    buf = taosDecodeString(buf, &pSub->qmsg);
240,448✔
598
  } else {
UNCOV
599
    pSub->qmsg = taosStrdup("");
×
600
  }
601
  return (void *)buf;
120,224✔
602
}
603

604
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
33,916,184✔
605
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
33,916,184✔
606
  pObj->dtype = pItem->dtype;
33,916,184✔
607
  switch (pItem->dtype) {
33,916,184✔
UNCOV
608
    case CFG_DTYPE_NONE:
×
UNCOV
609
      break;
×
610
    case CFG_DTYPE_BOOL:
6,719,244✔
611
      pObj->bval = pItem->bval;
6,719,244✔
612
      break;
6,719,244✔
613
    case CFG_DTYPE_INT32:
18,557,912✔
614
      pObj->i32 = pItem->i32;
18,557,912✔
615
      break;
18,557,912✔
616
    case CFG_DTYPE_INT64:
1,919,784✔
617
      pObj->i64 = pItem->i64;
1,919,784✔
618
      break;
1,919,784✔
619
    case CFG_DTYPE_FLOAT:
639,928✔
620
    case CFG_DTYPE_DOUBLE:
621
      pObj->fval = pItem->fval;
639,928✔
622
      break;
639,928✔
623
    case CFG_DTYPE_STRING:
6,079,316✔
624
    case CFG_DTYPE_DIR:
625
    case CFG_DTYPE_LOCALE:
626
    case CFG_DTYPE_CHARSET:
627
    case CFG_DTYPE_TIMEZONE:
628
      pObj->str = taosStrdup(pItem->str);
6,079,316✔
629
      if (pObj->str == NULL) {
6,079,316✔
UNCOV
630
        taosMemoryFree(pObj);
×
UNCOV
631
        return TSDB_CODE_OUT_OF_MEMORY;
×
632
      }
633
      break;
6,079,316✔
634
  }
635
  return TSDB_CODE_SUCCESS;
33,916,184✔
636
}
637

638
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
18,770✔
639
  int32_t code = 0;
18,770✔
640
  switch (pObjNew->dtype) {
18,770✔
641
    case CFG_DTYPE_BOOL: {
3,185✔
642
      bool tmp = false;
3,185✔
643
      if (strcasecmp(value, "true") == 0) {
3,185✔
UNCOV
644
        tmp = true;
×
645
      }
646
      if (taosStr2Int32(value, NULL, 10) > 0) {
3,185✔
647
        tmp = true;
1,112✔
648
      }
649
      pObjNew->bval = tmp;
3,185✔
650
      break;
3,185✔
651
    }
652
    case CFG_DTYPE_INT32: {
14,178✔
653
      int32_t ival;
14,178✔
654
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
14,178✔
655
      pObjNew->i32 = ival;
14,178✔
656
      break;
14,178✔
657
    }
658
    case CFG_DTYPE_INT64: {
1,120✔
659
      int64_t ival;
1,120✔
660
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
1,120✔
661
      pObjNew->i64 = ival;
1,120✔
662
      break;
1,120✔
663
    }
UNCOV
664
    case CFG_DTYPE_FLOAT:
×
665
    case CFG_DTYPE_DOUBLE: {
UNCOV
666
      float dval = 0;
×
UNCOV
667
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
UNCOV
668
      pObjNew->fval = dval;
×
UNCOV
669
      break;
×
670
    }
671
    case CFG_DTYPE_DIR:
287✔
672
    case CFG_DTYPE_TIMEZONE:
673
    case CFG_DTYPE_CHARSET:
674
    case CFG_DTYPE_LOCALE:
675
    case CFG_DTYPE_STRING: {
676
      pObjNew->str = taosStrdup(value);
287✔
677
      if (pObjNew->str == NULL) {
287✔
UNCOV
678
        code = terrno;
×
UNCOV
679
        return code;
×
680
      }
681
      break;
287✔
682
    }
UNCOV
683
    case CFG_DTYPE_NONE:
×
684
      break;
×
685
    default:
×
UNCOV
686
      code = TSDB_CODE_INVALID_CFG;
×
UNCOV
687
      break;
×
688
  }
689
  return code;
18,770✔
690
}
691

692
SConfigObj mndInitConfigVersion() {
319,964✔
693
  SConfigObj obj;
318,463✔
694
  memset(&obj, 0, sizeof(SConfigObj));
319,964✔
695

696
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
319,964✔
697
  obj.dtype = CFG_DTYPE_INT32;
319,964✔
698
  obj.i32 = 0;
319,964✔
699
  return obj;
319,964✔
700
}
701

702
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
262,037,070✔
703
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
262,037,070✔
704
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
524,074,140✔
705

706
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
524,074,140✔
707
  switch (pObj->dtype) {
262,037,070✔
708
    case CFG_DTYPE_BOOL:
51,412,070✔
709
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
102,824,140✔
710
      break;
51,412,070✔
711
    case CFG_DTYPE_INT32:
144,536,062✔
712
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
289,072,124✔
713
      break;
144,536,062✔
714
    case CFG_DTYPE_INT64:
14,689,660✔
715
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
29,379,320✔
716
      break;
14,689,660✔
717
    case CFG_DTYPE_FLOAT:
4,895,060✔
718
    case CFG_DTYPE_DOUBLE:
719
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
9,790,120✔
720
      break;
4,895,060✔
721
    case CFG_DTYPE_STRING:
46,504,218✔
722
    case CFG_DTYPE_DIR:
723
    case CFG_DTYPE_LOCALE:
724
    case CFG_DTYPE_CHARSET:
725
    case CFG_DTYPE_TIMEZONE:
726
      if (pObj->str != NULL) {
46,504,218✔
727
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
93,008,436✔
728
      } else {
UNCOV
729
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
730
      }
731
      break;
46,504,218✔
UNCOV
732
    default:
×
UNCOV
733
      break;
×
734
  }
735
  tEndEncode(pEncoder);
262,037,070✔
736
  return pEncoder->pos;
262,037,070✔
737
}
738

739
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
53,032,931✔
740
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
53,032,931✔
741
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
53,032,931✔
742
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
106,065,862✔
743
  switch (pObj->dtype) {
53,032,931✔
UNCOV
744
    case CFG_DTYPE_NONE:
×
UNCOV
745
      break;
×
746
    case CFG_DTYPE_BOOL:
10,391,090✔
747
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
10,391,090✔
748
      break;
10,391,090✔
749
    case CFG_DTYPE_INT32:
29,292,291✔
750
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
58,584,582✔
751
      break;
29,292,291✔
752
    case CFG_DTYPE_INT64:
2,970,574✔
753
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
5,941,148✔
754
      break;
2,970,574✔
755
    case CFG_DTYPE_FLOAT:
990,202✔
756
    case CFG_DTYPE_DOUBLE:
757
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
1,980,404✔
758
      break;
990,202✔
759
    case CFG_DTYPE_STRING:
9,388,774✔
760
    case CFG_DTYPE_DIR:
761
    case CFG_DTYPE_LOCALE:
762
    case CFG_DTYPE_CHARSET:
763
    case CFG_DTYPE_TIMEZONE:
764
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
18,777,548✔
765
      break;
9,388,774✔
766
  }
767
  tEndDecode(pDecoder);
53,032,931✔
768
  TAOS_RETURN(TSDB_CODE_SUCCESS);
53,032,931✔
769
}
770

771
void tFreeSConfigObj(SConfigObj *obj) {
87,247,769✔
772
  if (obj == NULL) {
87,247,769✔
UNCOV
773
    return;
×
774
  }
775
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
87,247,769✔
776
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
73,417,142✔
777
    taosMemoryFree(obj->str);
15,457,927✔
778
  }
779
}
780

781
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
782
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
783
//   if (pEntryNew == NULL) return NULL;
784
//   pEntryNew->epoch = pEntry->epoch;
785
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
786
//   return pEntryNew;
787
// }
788
//
789
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
790
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
791
// }
792

793
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
794
//   int32_t tlen = 0;
795
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
796
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
797
//   return tlen;
798
// }
799
//
800
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
801
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
802
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
803
//   return (void *)buf;
804
// }
805

806
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
807
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
808
//   if (pLogNew == NULL) return pLogNew;
809
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
810
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
811
//   return pLogNew;
812
// }
813
//
814
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
815
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
816
// }
817

818
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
819
//   int32_t tlen = 0;
820
//   tlen += taosEncodeString(buf, pLog->key);
821
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
822
//   return tlen;
823
// }
824
//
825
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
826
//   buf = taosDecodeStringTo(buf, pLog->key);
827
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
828
//   return (void *)buf;
829
// }
830
//
831
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
832
//   int32_t tlen = 0;
833
//   tlen += taosEncodeString(buf, pOffset->key);
834
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
835
//   return tlen;
836
// }
837
//
838
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
839
//   buf = taosDecodeStringTo(buf, pOffset->key);
840
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
841
//   return buf;
842
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc