• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.5
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "os.h"
18
#include "tcommon.h"
19

20
typedef struct STaskId {
21
  int64_t streamId;
22
  int64_t taskId;
23
} STaskId;
24

25
typedef struct STaskCkptInfo {
26
  int64_t latestId;          // saved checkpoint id
27
  int64_t latestVer;         // saved checkpoint ver
28
  int64_t latestTime;        // latest checkpoint time
29
  int64_t latestSize;        // latest checkpoint size
30
  int8_t  remoteBackup;      // latest checkpoint backup done
31
  int64_t activeId;          // current active checkpoint id
32
  int32_t activeTransId;     // checkpoint trans id
33
  int8_t  failed;            // denote if the checkpoint is failed or not
34
  int8_t  consensusChkptId;  // required the consensus-checkpointId
35
  int64_t consensusTs;       //
36
} STaskCkptInfo;
37

38
typedef struct STaskStatusEntry {
39
  STaskId       id;
40
  int32_t       status;
41
  int32_t       statusLastDuration;  // to record the last duration of current status
42
  int64_t       stage;
43
  int32_t       nodeId;
44
  SVersionRange verRange;      // start/end version in WAL, only valid for source task
45
  int64_t       processedVer;  // only valid for source task
46
  double        inputQUsed;    // in MiB
47
  double        inputRate;
48
  double        procsThroughput;   // duration between one element put into input queue and being processed.
49
  double        procsTotal;        // duration between one element put into input queue and being processed.
50
  double        outputThroughput;  // the size of dispatched result blocks in bytes
51
  double        outputTotal;       // the size of dispatched result blocks in bytes
52
  double        sinkQuota;         // existed quota size for sink task
53
  double        sinkDataSize;      // sink to dst data size
54
  int64_t       startTime;
55
  int64_t       startCheckpointId;
56
  int64_t       startCheckpointVer;
57
  int64_t       hTaskId;
58
  STaskCkptInfo checkpointInfo;
59
  STaskNotifyEventStat notifyEventStat;
60
} STaskStatusEntry;
61

62
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
197,359✔
63
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
394,718!
64
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
394,718!
65
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
394,718!
66
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
197,359!
67
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
394,754!
68
  return 0;
197,377✔
69
}
70

71
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
68,259✔
72
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
136,538!
73
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
136,554!
74
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
136,550!
75
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
68,275!
76
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
136,613!
77
  return 0;
68,307✔
78
}
79

80
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
6,594✔
81
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
6,594!
82
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId));
13,188!
83
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId));
13,188!
84
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId));
13,188!
85
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId));
13,188!
86
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps));
6,594!
87
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId));
13,188!
88
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime));
13,188!
89
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId));
13,188!
90
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger));
13,188!
91
  tEndEncode(pEncoder);
6,594✔
92
  return pEncoder->pos;
6,594✔
93
}
94

95
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
3,353✔
96
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
3,353!
97
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId));
6,713!
98
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId));
6,707!
99
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId));
6,704!
100
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId));
6,701!
101
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps));
3,349!
102
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId));
6,712!
103
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime));
6,710!
104
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId));
6,708!
105
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger));
6,703!
106
  tEndDecode(pDecoder);
3,349✔
107
  return 0;
3,350✔
108
}
109

110
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
6,716✔
111
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
6,716✔
112
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId));
13,428!
113
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId));
13,428!
114
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId));
13,428!
115
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId));
13,428!
116
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime));
13,428!
117
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success));
13,428!
118
  tEndEncode(pEncoder);
6,714✔
119
  return pEncoder->pos;
6,714✔
120
}
121

122
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
110✔
123
  int32_t code = 0;
110✔
124
  int32_t lino;
125

126
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
110!
127
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
220!
128
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
220!
129

130
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
110✔
131
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
110!
132

133
  for (int32_t i = 0; i < size; ++i) {
324✔
134
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
214✔
135
    if (pInfo == NULL) {
214!
136
      TAOS_CHECK_EXIT(terrno);
×
137
    }
138

139
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
428!
140
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
214!
141
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
214!
142
  }
143

144
  // todo this new attribute will be result in being incompatible with previous version
145
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
220!
146
  tEndEncode(pEncoder);
110✔
147
_exit:
110✔
148
  if (code) {
110!
149
    return code;
×
150
  } else {
151
    return pEncoder->pos;
110✔
152
  }
153
}
154

155
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
89✔
156
  int32_t code = 0;
89✔
157
  int32_t lino;
158

159
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
89!
160
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
177!
161
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
178!
162

163
  int32_t size = 0;
89✔
164
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
89!
165
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
89✔
166
  if (pMsg->pNodeList == NULL) {
89!
167
    TAOS_CHECK_EXIT(terrno);
×
168
  }
169
  for (int32_t i = 0; i < size; ++i) {
278✔
170
    SNodeUpdateInfo info = {0};
189✔
171
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
189!
172
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
189!
173
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
189!
174

175
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
378!
176
      TAOS_CHECK_EXIT(terrno);
×
177
    }
178
  }
179

180
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
178!
181

182
  tEndDecode(pDecoder);
89✔
183
_exit:
89✔
184
  return code;
89✔
185
}
186

187
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
46,715✔
188
  int32_t code = 0;
46,715✔
189
  int32_t lino;
190

191
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
46,715!
192
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
93,436!
193
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
93,436!
194
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
93,436!
195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
93,436!
196
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
93,436!
197
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
93,436!
198
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
93,436!
199
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
93,436!
200
  tEndEncode(pEncoder);
46,718✔
201

202
_exit:
46,716✔
203
  if (code) {
46,716!
204
    return code;
×
205
  } else {
206
    return pEncoder->pos;
46,716✔
207
  }
208
}
209

210
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
23,331✔
211
  int32_t code = 0;
23,331✔
212
  int32_t lino;
213

214
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
23,331!
215
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
46,662!
216
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
46,662!
217
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
46,662!
218
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
46,662!
219
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
46,662!
220
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
46,662!
221
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
46,662!
222
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
46,662!
223
  tEndDecode(pDecoder);
23,331✔
224

225
_exit:
23,331✔
226
  return code;
23,331✔
227
}
228

229
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
46,661✔
230
  int32_t code = 0;
46,661✔
231
  int32_t lino;
232

233
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
46,661!
234
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
93,324!
235
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
93,324!
236
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
93,324!
237
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
93,324!
238
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
93,324!
239
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
93,324!
240
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
93,324!
241
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
93,324!
242
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
93,324!
243
  tEndEncode(pEncoder);
46,662✔
244

245
_exit:
46,662✔
246
  if (code) {
46,662!
247
    return code;
×
248
  } else {
249
    return pEncoder->pos;
46,662✔
250
  }
251
}
252

253
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
23,324✔
254
  int32_t code = 0;
23,324✔
255
  int32_t lino;
256

257
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
23,324!
258
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
46,645!
259
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
46,644!
260
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
46,644!
261
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
46,645!
262
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
46,645!
263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
46,643!
264
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
46,642!
265
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
46,642!
266
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
46,642!
267
  tEndDecode(pDecoder);
23,321✔
268

269
_exit:
23,321✔
270
  return code;
23,321✔
271
}
272

273
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
16,990✔
274
  int32_t code = 0;
16,990✔
275
  int32_t lino;
276

277
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
16,990!
278
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
33,980!
279
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
33,980!
280
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
33,980!
281
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
33,980!
282
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
33,980!
283
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
33,980!
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
33,980!
285
  tEndEncode(pEncoder);
16,990✔
286

287
_exit:
16,990✔
288
  if (code) {
16,990!
289
    return code;
×
290
  } else {
291
    return pEncoder->pos;
16,990✔
292
  }
293
}
294

295
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
8,494✔
296
  int32_t code = 0;
8,494✔
297
  int32_t lino;
298

299
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
8,494!
300
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
16,988!
301
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
16,988!
302
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
16,988!
303
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
16,988!
304
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
16,988!
305
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
16,988!
306
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
16,988!
307
  tEndDecode(pDecoder);
8,494✔
308

309
_exit:
8,494✔
310
  return code;
8,494✔
311
}
312

313
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
120,337✔
314
  int32_t code = 0;
120,337✔
315
  int32_t lino;
316

317
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
120,337!
318
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
240,702!
319
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
240,702!
320
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
240,702!
321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
240,702!
322
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
240,702!
323
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
240,702!
324
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
240,702!
325
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
240,702!
326
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
240,702!
327
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
240,702!
328
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
240,702!
329
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
240,702!
330
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
240,702!
331

332
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
120,351!
333
    uError("invalid dispatch req msg");
×
UNCOV
334
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
335
  }
336

337
  for (int32_t i = 0; i < pReq->blockNum; i++) {
421,154✔
338
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
300,820✔
339
    void*    data = taosArrayGetP(pReq->data, i);
300,814✔
340
    if (data == NULL || pLen == NULL) {
300,819✔
341
      TAOS_CHECK_EXIT(terrno);
4!
342
    }
343

344
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
601,630!
345
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
601,630!
346
  }
347
  tEndEncode(pEncoder);
120,334✔
348
_exit:
120,339✔
349
  if (code) {
120,339!
350
    return code;
×
351
  } else {
352
    return pEncoder->pos;
120,339✔
353
  }
354
}
355

356
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
60,157✔
357
  int32_t code = 0;
60,157✔
358
  int32_t lino;
359

360
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
60,157!
361
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
120,314!
362
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
120,314!
363
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
120,312!
364
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
120,311!
365
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
120,312!
366
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
120,312!
367
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
120,313!
368
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
120,313!
369
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
120,312!
370
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
120,312!
371
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
120,311!
372
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
120,311!
373
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
120,313!
374

375
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
60,157!
376
    TAOS_CHECK_EXIT(terrno);
×
377
  }
378
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
60,157!
379
    TAOS_CHECK_EXIT(terrno);
×
380
  }
381
  for (int32_t i = 0; i < pReq->blockNum; i++) {
210,543✔
382
    int32_t  len1;
383
    uint64_t len2;
384
    void*    data;
385
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
150,386!
386
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
150,386!
387

388
    if (len1 != len2) {
150,386!
389
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
390
    }
391

392
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
300,771!
393
      TAOS_CHECK_EXIT(terrno);
×
394
    }
395

396
    if (taosArrayPush(pReq->data, &data) == NULL) {
300,771!
397
      TAOS_CHECK_EXIT(terrno);
×
398
    }
399
  }
400

401
  tEndDecode(pDecoder);
60,157✔
402
_exit:
60,157✔
403
  return code;
60,157✔
404
}
405

406
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
60,157✔
407
  taosArrayDestroyP(pReq->data, NULL);
60,157✔
408
  taosArrayDestroy(pReq->dataLen);
60,157✔
409
}
60,157✔
410

411
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
1,068✔
412
  int32_t code = 0;
1,068✔
413
  int32_t lino;
414

415
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
1,068!
416
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
2,136!
417
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
2,136!
418
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
2,136!
419
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
2,136!
420
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
2,136!
421
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
2,136!
422
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
2,136!
423
  tEndEncode(pEncoder);
1,068✔
424

425
_exit:
1,068✔
426
  if (code) {
1,068!
427
    return code;
×
428
  } else {
429
    return pEncoder->pos;
1,068✔
430
  }
431
}
432

433
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
534✔
434
  int32_t code = 0;
534✔
435
  int32_t lino;
436

437
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
534!
438
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
1,068!
439
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
1,068!
440
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
1,067!
441
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
1,066!
442
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
1,067!
443
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
1,068!
444
  uint64_t len = 0;
534✔
445
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
1,067!
446
  pReq->retrieveLen = (int32_t)len;
533✔
447
  tEndDecode(pDecoder);
533✔
448

449
_exit:
533✔
450
  return code;
533✔
451
}
452

453
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
533!
454

455
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
8,811✔
456
  int32_t code = 0;
8,811✔
457
  int32_t lino;
458

459
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
8,811!
460
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
17,626!
461
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
17,626!
462
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
17,626!
463
  tEndEncode(pEncoder);
8,813✔
464

465
_exit:
8,815✔
466
  return code;
8,815✔
467
}
468

469
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
4,413✔
470
  int32_t code = 0;
4,413✔
471
  int32_t lino;
472

473
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
4,413!
474
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
8,826!
475
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
8,826!
476
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
8,826!
477
  tEndDecode(pDecoder);
4,413✔
478

479
_exit:
4,413✔
480
  return code;
4,413✔
481
}
482

483
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
65,238✔
484
  int32_t code = 0;
65,238✔
485
  int32_t lino;
486

487
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
65,238!
488
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId));
130,476!
489
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks));
130,476!
490

491
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
209,244✔
492
    STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
144,006✔
493
    if (ps == NULL) {
144,006!
494
      TAOS_CHECK_EXIT(terrno);
×
495
    }
496

497
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId));
288,012!
498
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId));
288,012!
499
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status));
288,012!
500
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage));
288,012!
501
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId));
288,012!
502
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed));
288,012!
503
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate));
288,012!
504
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal));
288,012!
505
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput));
288,012!
506
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal));
288,012!
507
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput));
288,012!
508
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota));
288,012!
509
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize));
288,012!
510
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer));
288,012!
511
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer));
288,012!
512
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer));
288,012!
513
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId));
288,012!
514
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed));
288,012!
515
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId));
288,012!
516
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId));
288,012!
517
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer));
288,012!
518
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime));
288,012!
519
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize));
288,012!
520
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup));
288,012!
521
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId));
288,012!
522
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs));
288,012!
523
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime));
288,012!
524
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId));
288,012!
525
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer));
288,012!
526
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId));
288,012!
527
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventAddTimes));
288,012!
528
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventAddElems));
288,012!
529
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->notifyEventStat.notifyEventAddCostSec));
288,012!
530
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventPushTimes));
288,012!
531
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventPushElems));
288,012!
532
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->notifyEventStat.notifyEventPushCostSec));
288,012!
533
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventPackTimes));
288,012!
534
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventPackElems));
288,012!
535
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->notifyEventStat.notifyEventPackCostSec));
288,012!
536
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventSendTimes));
288,012!
537
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventSendElems));
288,012!
538
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->notifyEventStat.notifyEventSendCostSec));
288,012!
539
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->notifyEventStat.notifyEventHoldElems));
288,012!
540
  }
541

542
  int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
65,238✔
543
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs));
65,238!
544

545
  for (int j = 0; j < numOfVgs; ++j) {
65,290✔
546
    int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
52✔
547
    if (pVgId == NULL) {
52!
548
      TAOS_CHECK_EXIT(terrno);
×
549
    }
550

551
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId));
104!
552
  }
553

554
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
130,476!
555
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts));
130,476!
556
  tEndEncode(pEncoder);
65,238✔
557

558
_exit:
65,238✔
559
  if (code) {
65,238!
560
    return code;
×
561
  } else {
562
    return pEncoder->pos;
65,238✔
563
  }
564
}
565

566
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
31,994✔
567
  int32_t code = 0;
31,994✔
568
  int32_t lino;
569

570
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
31,994!
571
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId));
63,985!
572
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks));
63,969!
573

574
  if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) {
31,981!
575
    TAOS_CHECK_EXIT(terrno);
×
576
  }
577
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
102,320✔
578
    int32_t          taskId = 0;
70,324✔
579
    STaskStatusEntry entry = {0};
70,324✔
580

581
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId));
70,363!
582
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
70,348!
583
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status));
70,342!
584
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage));
70,342!
585
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId));
70,336!
586
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed));
70,334!
587
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate));
70,336!
588
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal));
70,324!
589
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput));
70,320!
590
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal));
70,314!
591
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput));
70,318!
592
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota));
70,292!
593
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize));
70,293!
594
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer));
70,281!
595
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer));
70,281!
596
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer));
70,276!
597
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId));
70,285!
598
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed));
70,283!
599
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId));
70,284!
600

601
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId));
70,279!
602
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer));
70,288!
603
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime));
70,268!
604
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize));
70,280!
605
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup));
70,286!
606
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId));
70,278!
607
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs));
70,262!
608
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime));
70,275!
609
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId));
70,280!
610
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer));
70,288!
611
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId));
70,270!
612

613
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventAddTimes));
70,285!
614
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventAddElems));
70,293!
615
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.notifyEventStat.notifyEventAddCostSec));
70,292!
616
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventPushTimes));
70,288!
617
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventPushElems));
70,282!
618
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.notifyEventStat.notifyEventPushCostSec));
70,285!
619
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventPackTimes));
70,280!
620
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventPackElems));
70,274!
621
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.notifyEventStat.notifyEventPackCostSec));
70,289!
622
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventSendTimes));
70,280!
623
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventSendElems));
70,289!
624
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.notifyEventStat.notifyEventSendCostSec));
70,291!
625
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.notifyEventStat.notifyEventHoldElems));
70,273!
626

627
    entry.id.taskId = taskId;
70,273✔
628
    if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) {
140,590!
629
      TAOS_CHECK_EXIT(terrno);
×
630
    }
631
  }
632

633
  int32_t numOfVgs = 0;
31,996✔
634
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs));
32,005!
635

636
  if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) {
32,005!
637
    TAOS_CHECK_EXIT(terrno);
×
638
  }
639

640
  for (int j = 0; j < numOfVgs; ++j) {
32,030✔
641
    int32_t vgId = 0;
26✔
642
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
26!
643
    if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) {
52!
644
      TAOS_CHECK_EXIT(terrno);
×
645
    }
646
  }
647

648
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
63,992!
649
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts));
63,979!
650
  tEndDecode(pDecoder);
31,991✔
651

652
_exit:
31,994✔
653
  return code;
31,994✔
654
}
655

656
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
270,989✔
657
  if (pMsg == NULL) {
270,989!
658
    return;
×
659
  }
660

661
  if (pMsg->pUpdateNodes != NULL) {
270,989✔
662
    taosArrayDestroy(pMsg->pUpdateNodes);
259,507✔
663
    pMsg->pUpdateNodes = NULL;
259,508✔
664
  }
665

666
  if (pMsg->pTaskStatus != NULL) {
270,990✔
667
    taosArrayDestroy(pMsg->pTaskStatus);
259,508✔
668
    pMsg->pTaskStatus = NULL;
259,508✔
669
  }
670

671
  pMsg->msgId = -1;
270,990✔
672
  pMsg->vgId = -1;
270,990✔
673
  pMsg->numOfTasks = -1;
270,990✔
674
}
675

676
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
64,021✔
677
  int32_t code = 0;
64,021✔
678
  int32_t lino;
679

680
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
64,021!
681
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId));
128,044!
682
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pRsp->mndEpset));
64,022!
683
  tEndEncode(pEncoder);
64,021✔
684

685
_exit:
64,020✔
686
  return code;
64,020✔
687
}
688

689
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
32,010✔
690
  int32_t code = 0;
32,010✔
691
  int32_t lino;
692

693
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
32,010!
694
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId));
64,020!
695
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pRsp->mndEpset));
32,010!
696
  tEndDecode(pDecoder);
32,010✔
697

698
_exit:
32,010✔
699
  return code;
32,010✔
700
}
701

702
int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq) {
2✔
703
  int32_t code = 0;
2✔
704
  int32_t lino;
705

706
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
2!
707
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
4!
708
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
4!
709
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
4!
710
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
4!
711
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
4!
712
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->downstreamTaskId));
4!
713
  tEndEncode(pEncoder);
2✔
714

715
_exit:
2✔
716
  return code;
2✔
717
}
718

719
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq) {
×
720
  int32_t code = 0;
×
721
  int32_t lino;
722

723
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
724
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
725
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
×
726
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
727
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
728
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
×
729
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->downstreamTaskId));
×
730
  tEndDecode(pDecoder);
×
731

732
_exit:
×
733
  return code;
×
734
}
735

736
int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp) {
2✔
737
  int32_t code = 0;
2✔
738
  int32_t lino;
739

740
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
2!
741
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
4!
742
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->checkpointId));
4!
743
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
4!
744
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->taskId));
4!
745
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->transId));
4!
746
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->rspCode));
4!
747
  tEndEncode(pEncoder);
2✔
748

749
_exit:
2✔
750
  return code;
2✔
751
}
752

753
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp) {
×
754
  int32_t code = 0;
×
755
  int32_t lino;
756

757
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
758
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
759
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
×
760
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
×
761
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->taskId));
×
762
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->transId));
×
763
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->rspCode));
×
764
  tEndDecode(pDecoder);
×
765

766
_exit:
×
767
  return code;
×
768
}
769

770
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
12,998✔
771
  int32_t code = 0;
12,998✔
772
  int32_t lino;
773

774
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
12,998!
775
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
25,996!
776
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
25,996!
777
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
25,996!
778
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
25,996!
779
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer));
25,996!
780
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs));
25,996!
781
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
25,996!
782
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask));
25,996!
783
  tEndEncode(pEncoder);
12,998✔
784

785
_exit:
12,998✔
786
  return code;
12,998✔
787
}
788

789
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) {
6,499✔
790
  int32_t code = 0;
6,499✔
791
  int32_t lino;
792

793
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
6,499!
794
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
12,998!
795
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
12,998!
796
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
12,998!
797
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
12,998!
798
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer));
12,998!
799
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs));
12,998!
800
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
12,998!
801
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask));
12,998!
802
  tEndDecode(pDecoder);
6,499✔
803

804
_exit:
6,499✔
805
  return code;
6,499✔
806
}
807

808
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
480✔
809
  int32_t code = 0;
480✔
810
  int32_t lino;
811

812
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
480!
813
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs));
960!
814
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
960!
815
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
960!
816
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
960!
817
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
960!
818
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
960!
819
  tEndEncode(pEncoder);
480✔
820

821
_exit:
480✔
822
  return code;
480✔
823
}
824

825
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
238✔
826
  int32_t code = 0;
238✔
827
  int32_t lino;
828

829
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
238!
830
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs));
476!
831
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
476!
832
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
476!
833
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
476!
834
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
476!
835
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
476!
836
  tEndDecode(pDecoder);
238✔
837

838
_exit:
238✔
839
  return code;
238✔
840
}
841

842
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
776,230✔
843
  int32_t code = 0;
776,230✔
844
  int32_t lino;
845

846
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
776,230!
847
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
1,552,708!
848
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
1,552,708!
849
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
1,552,708!
850
  tEndEncode(pEncoder);
776,354✔
851

852
_exit:
776,349✔
853
  return code;
776,349✔
854
}
855

856
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
484,658✔
857
  int32_t code = 0;
484,658✔
858
  int32_t lino;
859

860
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
484,658!
861
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
969,271!
862
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
969,179!
863
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
969,095!
864
  tEndDecode(pDecoder);
484,499✔
865

866
_exit:
484,493✔
867
  return code;
484,493✔
868
}
869

870
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
8,696✔
871
  int32_t code = 0;
8,696✔
872
  int32_t lino;
873

874
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
8,696!
875
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
17,392!
876
  tEndEncode(pEncoder);
8,696✔
877

878
_exit:
8,696✔
879
  return code;
8,696✔
880
}
881

882
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
4,147✔
883
  int32_t code = 0;
4,147✔
884
  int32_t lino;
885

886
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
4,147!
887
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
8,317!
888
  tEndDecode(pDecoder);
4,156✔
889

890
_exit:
4,155✔
891
  return code;
4,155✔
892

893
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc