• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3526

10 Nov 2024 03:50AM UTC coverage: 60.225% (-0.6%) from 60.818%
#3526

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

117031 of 249004 branches covered (47.0%)

Branch coverage included in aggregate %.

130 of 169 new or added lines in 23 files covered. (76.92%)

4149 existing lines in 176 files now uncovered.

197577 of 273386 relevant lines covered (72.27%)

5840219.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.48
/source/libs/stream/src/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "os.h"
18
#include "tstream.h"
19
#include "streamInt.h"
20

21
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
104,581✔
22
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
209,162!
23
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
209,162!
24
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
209,162!
25
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
104,581!
26
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
209,172!
27
  return 0;
104,586✔
28
}
29

30
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
34,391✔
31
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
68,782!
32
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
68,782!
33
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
68,782!
34
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
34,391!
35
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
68,784!
36
  return 0;
34,392✔
37
}
38

39
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
2,934✔
40
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
2,934!
41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId));
5,868!
42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId));
5,868!
43
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId));
5,868!
44
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId));
5,868!
45
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps));
2,934!
46
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId));
5,868!
47
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime));
5,868!
48
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId));
5,868!
49
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger));
5,868!
50
  tEndEncode(pEncoder);
2,934✔
51
  return pEncoder->pos;
2,934✔
52
}
53

54
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
1,469✔
55
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
1,469!
56
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId));
2,938!
57
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId));
2,938!
58
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId));
2,938!
59
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId));
2,938!
60
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps));
1,469!
61
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId));
2,938!
62
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime));
2,938!
63
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId));
2,938!
64
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger));
2,937!
65
  tEndDecode(pDecoder);
1,468✔
66
  return 0;
1,468✔
67
}
68

69
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
2,938✔
70
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
2,938!
71
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId));
5,876!
72
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId));
5,876!
73
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId));
5,876!
74
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId));
5,876!
75
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime));
5,876!
76
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success));
5,876!
77
  tEndEncode(pEncoder);
2,938✔
78
  return pEncoder->pos;
2,938✔
79
}
80

81
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
10✔
82
  int32_t code = 0;
10✔
83
  int32_t lino;
84

85
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
10!
86
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
20!
87
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
20!
88

89
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
10✔
90
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
10!
91

92
  for (int32_t i = 0; i < size; ++i) {
20✔
93
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
10✔
94
    if (pInfo == NULL) {
10!
95
      TAOS_CHECK_EXIT(terrno);
×
96
    }
97

98
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
20!
99
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
10!
100
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
10!
101
  }
102

103
  // todo this new attribute will be result in being incompatible with previous version
104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
20!
105
  tEndEncode(pEncoder);
10✔
106
_exit:
10✔
107
  if (code) {
10!
108
    return code;
×
109
  } else {
110
    return pEncoder->pos;
10✔
111
  }
112
}
113

114
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
5✔
115
  int32_t code = 0;
5✔
116
  int32_t lino;
117

118
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
5!
119
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
10!
120
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
10!
121

122
  int32_t size = 0;
5✔
123
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
5!
124
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
5✔
125
  if (pMsg->pNodeList == NULL) {
5!
126
    TAOS_CHECK_EXIT(terrno);
×
127
  }
128
  for (int32_t i = 0; i < size; ++i) {
10✔
129
    SNodeUpdateInfo info = {0};
5✔
130
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
5!
131
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
5!
132
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
5!
133

134
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
10!
135
      TAOS_CHECK_EXIT(terrno);
×
136
    }
137
  }
138

139
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
10!
140

141
  tEndDecode(pDecoder);
5✔
142
_exit:
5✔
143
  return code;
5✔
144
}
145

146
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
24,678✔
147
  int32_t code = 0;
24,678✔
148
  int32_t lino;
149

150
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
24,678!
151
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
49,356!
152
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
49,356!
153
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
49,356!
154
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
49,356!
155
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
49,356!
156
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
49,356!
157
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
49,356!
158
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
49,356!
159
  tEndEncode(pEncoder);
24,678✔
160

161
_exit:
24,678✔
162
  if (code) {
24,678!
163
    return code;
×
164
  } else {
165
    return pEncoder->pos;
24,678✔
166
  }
167
}
168

169
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
12,300✔
170
  int32_t code = 0;
12,300✔
171
  int32_t lino;
172

173
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
12,300!
174
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
24,604!
175
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
24,604!
176
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
24,603!
177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
24,602!
178
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
24,602!
179
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
24,602!
180
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
24,602!
181
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
24,602!
182
  tEndDecode(pDecoder);
12,301✔
183

184
_exit:
12,301✔
185
  return code;
12,301✔
186
}
187

188
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
24,603✔
189
  int32_t code = 0;
24,603✔
190
  int32_t lino;
191

192
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
24,603!
193
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
49,208!
194
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
49,208!
195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
49,208!
196
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
49,208!
197
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
49,208!
198
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
49,208!
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
49,208!
200
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
49,208!
201
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
49,208!
202
  tEndEncode(pEncoder);
24,604✔
203

204
_exit:
24,604✔
205
  if (code) {
24,604!
206
    return code;
×
207
  } else {
208
    return pEncoder->pos;
24,604✔
209
  }
210
}
211

212
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
12,296✔
213
  int32_t code = 0;
12,296✔
214
  int32_t lino;
215

216
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
12,296!
217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
24,592!
218
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
24,592!
219
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
24,592!
220
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
24,592!
221
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
24,592!
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
24,592!
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
24,592!
224
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
24,592!
225
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
24,592!
226
  tEndDecode(pDecoder);
12,296✔
227

228
_exit:
12,296✔
229
  return code;
12,296✔
230
}
231

232
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
9,168✔
233
  int32_t code = 0;
9,168✔
234
  int32_t lino;
235

236
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
9,168!
237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
18,334!
238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
18,334!
239
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
18,334!
240
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
18,334!
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
18,334!
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
18,334!
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
18,334!
244
  tEndEncode(pEncoder);
9,167✔
245

246
_exit:
9,168✔
247
  if (code) {
9,168!
248
    return code;
×
249
  } else {
250
    return pEncoder->pos;
9,168✔
251
  }
252
}
253

254
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
4,584✔
255
  int32_t code = 0;
4,584✔
256
  int32_t lino;
257

258
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
4,584!
259
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
9,168!
260
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
9,168!
261
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
9,168!
262
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
9,168!
263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
9,167!
264
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
9,166!
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
9,166!
266
  tEndDecode(pDecoder);
4,583✔
267

268
_exit:
4,583✔
269
  return code;
4,583✔
270
}
271

272
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
37,448✔
273
  int32_t code = 0;
37,448✔
274
  int32_t lino;
275

276
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
37,448!
277
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
74,896!
278
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
74,896!
279
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
74,896!
280
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
74,896!
281
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
74,896!
282
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
74,896!
283
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
74,896!
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
74,896!
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
74,896!
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
74,896!
287
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
74,896!
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
74,896!
289
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
74,896!
290

291
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
37,448!
292
    stError("invalid dispatch req msg");
×
UNCOV
293
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
294
  }
295

296
  for (int32_t i = 0; i < pReq->blockNum; i++) {
90,098✔
297
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
52,650✔
298
    void*    data = taosArrayGetP(pReq->data, i);
52,650✔
299
    if (data == NULL || pLen == NULL) {
52,650!
300
      TAOS_CHECK_EXIT(terrno);
×
301
    }
302

303
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
105,300!
304
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
105,300!
305
  }
306
  tEndEncode(pEncoder);
37,448✔
307
_exit:
37,448✔
308
  if (code) {
37,448!
309
    return code;
×
310
  } else {
311
    return pEncoder->pos;
37,448✔
312
  }
313
}
314

315
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
18,717✔
316
  int32_t code = 0;
18,717✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
18,717!
320
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
37,438!
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
37,438!
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
37,438!
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
37,438!
324
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
37,438!
325
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
37,438!
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
37,438!
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
37,438!
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
37,437!
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
37,436!
330
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
37,436!
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
37,436!
332
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
37,437!
333

334
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
18,719!
335
    TAOS_CHECK_EXIT(terrno);
×
336
  }
337
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
18,719!
338
    TAOS_CHECK_EXIT(terrno);
×
339
  }
340
  for (int32_t i = 0; i < pReq->blockNum; i++) {
45,033✔
341
    int32_t  len1;
342
    uint64_t len2;
343
    void*    data;
344
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
26,317!
345
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
26,315!
346

347
    if (len1 != len2) {
26,315!
348
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
349
    }
350

351
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
52,630!
352
      TAOS_CHECK_EXIT(terrno);
×
353
    }
354

355
    if (taosArrayPush(pReq->data, &data) == NULL) {
52,629!
356
      TAOS_CHECK_EXIT(terrno);
×
357
    }
358
  }
359

360
  tEndDecode(pDecoder);
18,716✔
361
_exit:
18,717✔
362
  return code;
18,717✔
363
}
364

365
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
18,719✔
366
  taosArrayDestroyP(pReq->data, taosMemoryFree);
18,719✔
367
  taosArrayDestroy(pReq->dataLen);
18,719✔
368
}
18,719✔
369

370
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
876✔
371
  int32_t code = 0;
876✔
372
  int32_t lino;
373

374
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
876!
375
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
1,752!
376
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
1,752!
377
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
1,752!
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
1,752!
379
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
1,752!
380
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
1,752!
381
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
1,752!
382
  tEndEncode(pEncoder);
876✔
383

384
_exit:
876✔
385
  if (code) {
876!
386
    return code;
×
387
  } else {
388
    return pEncoder->pos;
876✔
389
  }
390
}
391

392
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
438✔
393
  int32_t code = 0;
438✔
394
  int32_t lino;
395

396
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
438!
397
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
876!
398
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
876!
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
876!
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
876!
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
876!
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
876!
403
  uint64_t len = 0;
438✔
404
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
876!
405
  pReq->retrieveLen = (int32_t)len;
438✔
406
  tEndDecode(pDecoder);
438✔
407

408
_exit:
438✔
409
  return code;
438✔
410
}
411

412
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
438✔
413

414
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
5,716✔
415
  int32_t code = 0;
5,716✔
416
  int32_t lino;
417

418
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
5,716!
419
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
11,432!
420
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
11,432!
421
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
11,432!
422
  tEndEncode(pEncoder);
5,716✔
423

424
_exit:
5,716✔
425
  return code;
5,716✔
426
}
427

428
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
2,863✔
429
  int32_t code = 0;
2,863✔
430
  int32_t lino;
431

432
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
2,863!
433
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
5,726!
434
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
5,726!
435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
5,726!
436
  tEndDecode(pDecoder);
2,863✔
437

438
_exit:
2,863✔
439
  return code;
2,863✔
440
}
441

442
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
8,826✔
443
  int32_t code = 0;
8,826✔
444
  int32_t lino;
445

446
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
8,826!
447
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId));
17,652!
448
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks));
17,652!
449

450
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
38,358✔
451
    STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
29,532✔
452
    if (ps == NULL) {
29,532!
453
      TAOS_CHECK_EXIT(terrno);
×
454
    }
455

456
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId));
59,064!
457
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId));
59,064!
458
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status));
59,064!
459
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage));
59,064!
460
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId));
59,064!
461
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed));
59,064!
462
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate));
59,064!
463
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal));
59,064!
464
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput));
59,064!
465
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal));
59,064!
466
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput));
59,064!
467
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota));
59,064!
468
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize));
59,064!
469
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer));
59,064!
470
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer));
59,064!
471
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer));
59,064!
472
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId));
59,064!
473
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed));
59,064!
474
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId));
59,064!
475
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId));
59,064!
476
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer));
59,064!
477
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime));
59,064!
478
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize));
59,064!
479
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup));
59,064!
480
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId));
59,064!
481
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs));
59,064!
482
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime));
59,064!
483
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId));
59,064!
484
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer));
59,064!
485
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId));
59,064!
486
  }
487

488
  int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
8,826✔
489
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs));
8,826!
490

491
  for (int j = 0; j < numOfVgs; ++j) {
8,826!
UNCOV
492
    int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
×
UNCOV
493
    if (pVgId == NULL) {
×
494
      TAOS_CHECK_EXIT(terrno);
×
495
    }
496

UNCOV
497
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId));
×
498
  }
499

500
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
17,652!
501
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts));
17,652!
502
  tEndEncode(pEncoder);
8,826✔
503

504
_exit:
8,826✔
505
  if (code) {
8,826!
506
    return code;
×
507
  } else {
508
    return pEncoder->pos;
8,826✔
509
  }
510
}
511

512
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
4,207✔
513
  int32_t code = 0;
4,207✔
514
  int32_t lino;
515

516
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
4,207!
517
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId));
8,414!
518
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks));
8,414!
519

520
  if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) {
4,207!
521
    TAOS_CHECK_EXIT(terrno);
×
522
  }
523
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
18,475✔
524
    int32_t          taskId = 0;
14,267✔
525
    STaskStatusEntry entry = {0};
14,267✔
526

527
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId));
14,268!
528
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
14,268!
529
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status));
14,267!
530
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage));
14,268!
531
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId));
14,268!
532
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed));
14,268!
533
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate));
14,267!
534
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal));
14,268!
535
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput));
14,268!
536
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal));
14,267!
537
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput));
14,268!
538
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota));
14,268!
539
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize));
14,268!
540
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer));
14,267!
541
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer));
14,268!
542
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer));
14,267!
543
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId));
14,267!
544
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed));
14,268!
545
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId));
14,268!
546

547
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId));
14,268!
548
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer));
14,268!
549
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime));
14,268!
550
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize));
14,268!
551
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup));
14,268!
552
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId));
14,268!
553
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs));
14,268!
554
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime));
14,268!
555
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId));
14,267!
556
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer));
14,268!
557
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId));
14,267!
558

559
    entry.id.taskId = taskId;
14,267✔
560
    if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) {
28,535!
561
      TAOS_CHECK_EXIT(terrno);
×
562
    }
563
  }
564

565
  int32_t numOfVgs = 0;
4,208✔
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs));
4,207!
567

568
  if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) {
4,207!
UNCOV
569
    TAOS_CHECK_EXIT(terrno);
×
570
  }
571

572
  for (int j = 0; j < numOfVgs; ++j) {
4,207!
UNCOV
573
    int32_t vgId = 0;
×
UNCOV
574
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
×
UNCOV
575
    if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) {
×
576
      TAOS_CHECK_EXIT(terrno);
×
577
    }
578
  }
579

580
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
8,414!
581
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts));
8,413!
582
  tEndDecode(pDecoder);
4,206✔
583

584
_exit:
4,206✔
585
  return code;
4,206✔
586
}
587

588
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
44,371✔
589
  if (pMsg == NULL) {
44,371!
590
    return;
×
591
  }
592

593
  if (pMsg->pUpdateNodes != NULL) {
44,371✔
594
    taosArrayDestroy(pMsg->pUpdateNodes);
32,316✔
595
    pMsg->pUpdateNodes = NULL;
32,316✔
596
  }
597

598
  if (pMsg->pTaskStatus != NULL) {
44,371✔
599
    taosArrayDestroy(pMsg->pTaskStatus);
32,316✔
600
    pMsg->pTaskStatus = NULL;
32,316✔
601
  }
602

603
  pMsg->msgId = -1;
44,371✔
604
  pMsg->vgId = -1;
44,371✔
605
  pMsg->numOfTasks = -1;
44,371✔
606
}
607

608
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
72,207✔
609
  int32_t code = 0;
72,207✔
610
  int32_t lino;
611

612
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
72,207!
613
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
144,414!
614
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
144,414!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
144,414!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
144,414!
617
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
144,414!
618
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
144,414!
619
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
144,414!
620

621
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
144,414!
622
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
144,414!
623

624
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
144,414!
625
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
144,414!
626
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
72,207!
627
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
72,206!
628

629
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
144,410!
630
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
144,410!
631
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
144,410!
632

633
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
144,410!
634
  int32_t taskId = pTask->hTaskInfo.id.taskId;
72,205✔
635
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
72,205!
636

637
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
144,410!
638
  taskId = pTask->streamTaskId.taskId;
72,205✔
639
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
72,205!
640

641
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
144,410!
642
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
144,410!
643
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
144,410!
644
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
144,410!
645

646
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
72,205✔
647
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
72,206!
648
  for (int32_t i = 0; i < epSz; i++) {
176,792✔
649
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
104,586✔
650
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
104,582!
651
  }
652

653
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
72,206✔
654
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
76,724!
655
  }
656

657
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
72,206✔
658
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
69,972!
659
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
69,972!
660
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
69,972!
661
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
37,220✔
662
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
820!
663
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
36,810!
664
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
665
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
36,810✔
666
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
11,446!
667
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
11,446!
668
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
5,723!
669
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
31,087✔
670
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
31,032!
671
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
62,060!
672
  }
673
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
144,408!
674
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
144,408!
675
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
144,408!
676

677
  tEndEncode(pEncoder);
72,204✔
678
_exit:
72,206✔
679
  return code;
72,206✔
680
}
681

682
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
24,092✔
683
  int32_t taskId = 0;
24,092✔
684
  int32_t code = 0;
24,092✔
685
  int32_t lino;
686

687
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
24,092!
688
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
48,184!
689
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
24,092!
UNCOV
690
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
691
  }
692

693
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
48,184!
694
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
48,184!
695
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
48,184!
696
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
48,184!
697
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
48,184!
698
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
48,184!
699

700
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
48,184!
701
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
48,184!
702

703
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
48,184!
704
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
48,184!
705
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
24,092!
706
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
24,092!
707

708
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
48,184!
709
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
48,184!
710
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
48,184!
711

712
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
48,184!
713
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
24,092!
714
  pTask->hTaskInfo.id.taskId = taskId;
24,092✔
715

716
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
48,184!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
24,091!
718
  pTask->streamTaskId.taskId = taskId;
24,091✔
719

720
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
48,183!
721
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
48,184!
722
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
48,184!
723
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
48,184!
724

725
  int32_t epSz = -1;
24,092✔
726
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
24,092!
727

728
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
24,092!
UNCOV
729
    TAOS_CHECK_EXIT(terrno);
×
730
  }
731
  for (int32_t i = 0; i < epSz; i++) {
58,484✔
732
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
34,390✔
733
    if (pInfo == NULL) {
34,391!
734
      TAOS_CHECK_EXIT(terrno);
×
735
    }
736
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
34,391!
737
      taosMemoryFreeClear(pInfo);
×
738
      goto _exit;
×
739
    }
740
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
68,784!
741
      TAOS_CHECK_EXIT(terrno);
×
742
    }
743
  }
744

745
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
24,094✔
746
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
25,489!
747
  }
748

749
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
24,095✔
750
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
23,350!
751
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
11,675!
752
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
11,675✔
753
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
11,675!
UNCOV
754
      TAOS_CHECK_EXIT(terrno);
×
755
    }
756
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
23,313!
757
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
12,420✔
758
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
290!
759
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
12,275!
760
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
761
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
12,275✔
762
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
3,574!
763
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
3,574!
764
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
1,787!
765
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
10,488✔
766
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
10,465!
767
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
10,465!
768
  }
769
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
48,150!
770
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
24,092!
771
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
48,184!
772
  }
773
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
24,092!
774

775
  tEndDecode(pDecoder);
24,092✔
776

777
_exit:
24,092✔
778
  return code;
24,092✔
779
}
780

781
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
5,686✔
782
  int32_t code = 0;
5,686✔
783
  int32_t lino;
784

785
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
5,686!
786
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
11,372!
787
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
11,372!
788
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
11,372!
789
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
11,372!
790
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer));
11,372!
791
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs));
11,372!
792
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
11,372!
793
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask));
11,372!
794
  tEndEncode(pEncoder);
5,686✔
795

796
_exit:
5,686✔
797
  return code;
5,686✔
798
}
799

800
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) {
2,845✔
801
  int32_t code = 0;
2,845✔
802
  int32_t lino;
803

804
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
2,845!
805
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
5,690!
806
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
5,690!
807
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
5,690!
808
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
5,690!
809
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer));
5,690!
810
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs));
5,690!
811
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
5,690!
812
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask));
5,690!
813
  tEndDecode(pDecoder);
2,845✔
814

815
_exit:
2,845✔
816
  return code;
2,845✔
817
}
818

819
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
20✔
820
  int32_t code = 0;
20✔
821
  int32_t lino;
822

823
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
20!
824
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs));
40!
825
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
40!
826
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
40!
827
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
40!
828
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
40!
829
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
40!
830
  tEndEncode(pEncoder);
20✔
831

832
_exit:
20✔
833
  if (code) {
20!
834
    return code;
×
835
  } else {
836
    return pEncoder->pos;
20✔
837
  }
838
}
839

840
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
10✔
841
  int32_t code = 0;
10✔
842
  int32_t lino;
843

844
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
10!
845
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs));
20!
846
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
20!
847
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
20!
848
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
20!
849
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
20!
850
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
20!
851
  tEndDecode(pDecoder);
10✔
852

853
_exit:
10✔
854
  return code;
10✔
855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc