• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3528

13 Nov 2024 02:14AM UTC coverage: 60.905% (+0.09%) from 60.819%
#3528

push

travis-ci

web-flow
Merge pull request #28748 from taosdata/test/chr-3.0-TD14758

test:add docs ci in jenkinsfile2

118800 of 249004 branches covered (47.71%)

Branch coverage included in aggregate %.

199361 of 273386 relevant lines covered (72.92%)

14738389.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.9
/source/libs/stream/src/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "os.h"
18
#include "tstream.h"
19
#include "streamInt.h"
20

21
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
183,162✔
22
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
366,324!
23
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
366,324!
24
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
366,324!
25
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
183,162!
26
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
366,366!
27
  return 0;
183,183✔
28
}
29

30
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
63,313✔
31
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
126,640!
32
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
126,654!
33
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
126,654!
34
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
63,327!
35
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
126,667!
36
  return 0;
63,333✔
37
}
38

39
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
6,638✔
40
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
6,638!
41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId));
13,276!
42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId));
13,276!
43
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId));
13,276!
44
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId));
13,276!
45
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps));
6,638!
46
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId));
13,276!
47
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime));
13,276!
48
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId));
13,276!
49
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger));
13,276!
50
  tEndEncode(pEncoder);
6,638✔
51
  return pEncoder->pos;
6,638✔
52
}
53

54
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
4,282✔
55
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
4,282!
56
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId));
8,581!
57
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId));
8,573!
58
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId));
8,561!
59
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId));
8,553!
60
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps));
4,277!
61
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId));
8,561!
62
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime));
8,563!
63
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId));
8,556!
64
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger));
8,550!
65
  tEndDecode(pDecoder);
4,275✔
66
  return 0;
4,277✔
67
}
68

69
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
8,573✔
70
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
8,573!
71
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId));
17,162!
72
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId));
17,162!
73
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId));
17,162!
74
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId));
17,162!
75
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime));
17,162!
76
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success));
17,162!
77
  tEndEncode(pEncoder);
8,581✔
78
  return pEncoder->pos;
8,575✔
79
}
80

81
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
118✔
82
  int32_t code = 0;
118✔
83
  int32_t lino;
84

85
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
118!
86
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
236!
87
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
236!
88

89
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
118✔
90
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
118!
91

92
  for (int32_t i = 0; i < size; ++i) {
284✔
93
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
166✔
94
    if (pInfo == NULL) {
166!
95
      TAOS_CHECK_EXIT(terrno);
×
96
    }
97

98
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
332!
99
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
166!
100
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
166!
101
  }
102

103
  // todo this new attribute will be result in being incompatible with previous version
104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
236!
105
  tEndEncode(pEncoder);
118✔
106
_exit:
118✔
107
  if (code) {
118!
108
    return code;
×
109
  } else {
110
    return pEncoder->pos;
118✔
111
  }
112
}
113

114
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
83✔
115
  int32_t code = 0;
83✔
116
  int32_t lino;
117

118
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
83!
119
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
166!
120
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
166!
121

122
  int32_t size = 0;
83✔
123
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
83!
124
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
83✔
125
  if (pMsg->pNodeList == NULL) {
83!
126
    TAOS_CHECK_EXIT(terrno);
×
127
  }
128
  for (int32_t i = 0; i < size; ++i) {
178✔
129
    SNodeUpdateInfo info = {0};
95✔
130
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
95!
131
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
95!
132
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
95!
133

134
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
190!
135
      TAOS_CHECK_EXIT(terrno);
×
136
    }
137
  }
138

139
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
166!
140

141
  tEndDecode(pDecoder);
83✔
142
_exit:
83✔
143
  return code;
83✔
144
}
145

146
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
42,617✔
147
  int32_t code = 0;
42,617✔
148
  int32_t lino;
149

150
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
42,617!
151
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
85,246!
152
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
85,246!
153
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
85,246!
154
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
85,246!
155
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
85,246!
156
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
85,246!
157
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
85,246!
158
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
85,246!
159
  tEndEncode(pEncoder);
42,623✔
160

161
_exit:
42,621✔
162
  if (code) {
42,621!
163
    return code;
×
164
  } else {
165
    return pEncoder->pos;
42,621✔
166
  }
167
}
168

169
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
21,231✔
170
  int32_t code = 0;
21,231✔
171
  int32_t lino;
172

173
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
21,231!
174
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
42,485!
175
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
42,480!
176
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
42,474!
177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
42,461!
178
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
42,453!
179
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
42,448!
180
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
42,443!
181
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
42,439!
182
  tEndDecode(pDecoder);
21,218✔
183

184
_exit:
21,219✔
185
  return code;
21,219✔
186
}
187

188
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
42,417✔
189
  int32_t code = 0;
42,417✔
190
  int32_t lino;
191

192
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
42,417!
193
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
84,886!
194
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
84,886!
195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
84,886!
196
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
84,886!
197
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
84,886!
198
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
84,886!
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
84,886!
200
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
84,886!
201
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
84,886!
202
  tEndEncode(pEncoder);
42,443✔
203

204
_exit:
42,437✔
205
  if (code) {
42,437!
206
    return code;
×
207
  } else {
208
    return pEncoder->pos;
42,437✔
209
  }
210
}
211

212
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
21,233✔
213
  int32_t code = 0;
21,233✔
214
  int32_t lino;
215

216
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
21,233!
217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
42,469!
218
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
42,457!
219
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
42,450!
220
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
42,443!
221
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
42,432!
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
42,424!
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
42,423!
224
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
42,421!
225
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
42,419!
226
  tEndDecode(pDecoder);
21,209✔
227

228
_exit:
21,209✔
229
  return code;
21,209✔
230
}
231

232
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
17,216✔
233
  int32_t code = 0;
17,216✔
234
  int32_t lino;
235

236
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
17,216!
237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
34,482!
238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
34,482!
239
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
34,482!
240
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
34,482!
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
34,482!
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
34,482!
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
34,482!
244
  tEndEncode(pEncoder);
17,241✔
245

246
_exit:
17,219✔
247
  if (code) {
17,219!
248
    return code;
×
249
  } else {
250
    return pEncoder->pos;
17,219✔
251
  }
252
}
253

254
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
8,617✔
255
  int32_t code = 0;
8,617✔
256
  int32_t lino;
257

258
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
8,617!
259
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
17,253!
260
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
17,245!
261
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
17,243!
262
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
17,237!
263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
17,229!
264
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
17,216!
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
17,213!
266
  tEndDecode(pDecoder);
8,609✔
267

268
_exit:
8,606✔
269
  return code;
8,606✔
270
}
271

272
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
70,073✔
273
  int32_t code = 0;
70,073✔
274
  int32_t lino;
275

276
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
70,073!
277
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
140,190!
278
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
140,190!
279
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
140,190!
280
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
140,190!
281
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
140,190!
282
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
140,190!
283
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
140,190!
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
140,190!
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
140,190!
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
140,190!
287
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
140,190!
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
140,190!
289
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
140,190!
290

291
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
70,095!
292
    stError("invalid dispatch req msg");
×
293
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
294
  }
295

296
  for (int32_t i = 0; i < pReq->blockNum; i++) {
247,844✔
297
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
177,782✔
298
    void*    data = taosArrayGetP(pReq->data, i);
177,771✔
299
    if (data == NULL || pLen == NULL) {
177,761!
300
      TAOS_CHECK_EXIT(terrno);
×
301
    }
302

303
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
355,524!
304
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
355,524!
305
  }
306
  tEndEncode(pEncoder);
70,062✔
307
_exit:
70,090✔
308
  if (code) {
70,090!
309
    return code;
×
310
  } else {
311
    return pEncoder->pos;
70,090✔
312
  }
313
}
314

315
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
35,035✔
316
  int32_t code = 0;
35,035✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
35,035!
320
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
70,088!
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
70,083!
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
70,076!
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
70,064!
324
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
70,059!
325
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
70,056!
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
70,052!
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
70,050!
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
70,049!
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
70,047!
330
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
70,044!
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
70,043!
332
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
70,039!
333

334
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
35,018!
335
    TAOS_CHECK_EXIT(terrno);
×
336
  }
337
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
35,032✔
338
    TAOS_CHECK_EXIT(terrno);
2!
339
  }
340
  for (int32_t i = 0; i < pReq->blockNum; i++) {
123,518✔
341
    int32_t  len1;
342
    uint64_t len2;
343
    void*    data;
344
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
88,522!
345
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
88,491!
346

347
    if (len1 != len2) {
88,491!
348
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
349
    }
350

351
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
176,981!
352
      TAOS_CHECK_EXIT(terrno);
×
353
    }
354

355
    if (taosArrayPush(pReq->data, &data) == NULL) {
176,974!
356
      TAOS_CHECK_EXIT(terrno);
×
357
    }
358
  }
359

360
  tEndDecode(pDecoder);
34,999✔
361
_exit:
35,003✔
362
  return code;
35,003✔
363
}
364

365
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
35,027✔
366
  taosArrayDestroyP(pReq->data, taosMemoryFree);
35,027✔
367
  taosArrayDestroy(pReq->dataLen);
35,029✔
368
}
35,026✔
369

370
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
1,084✔
371
  int32_t code = 0;
1,084✔
372
  int32_t lino;
373

374
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
1,084!
375
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
2,168!
376
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
2,168!
377
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
2,168!
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
2,168!
379
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
2,168!
380
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
2,168!
381
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
2,168!
382
  tEndEncode(pEncoder);
1,084✔
383

384
_exit:
1,084✔
385
  if (code) {
1,084!
386
    return code;
×
387
  } else {
388
    return pEncoder->pos;
1,084✔
389
  }
390
}
391

392
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
542✔
393
  int32_t code = 0;
542✔
394
  int32_t lino;
395

396
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
542!
397
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
1,084!
398
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
1,084!
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
1,084!
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
1,084!
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
1,084!
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
1,084!
403
  uint64_t len = 0;
542✔
404
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
1,084!
405
  pReq->retrieveLen = (int32_t)len;
542✔
406
  tEndDecode(pDecoder);
542✔
407

408
_exit:
542✔
409
  return code;
542✔
410
}
411

412
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
542✔
413

414
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
9,431✔
415
  int32_t code = 0;
9,431✔
416
  int32_t lino;
417

418
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
9,431!
419
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
18,864!
420
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
18,864!
421
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
18,864!
422
  tEndEncode(pEncoder);
9,432✔
423

424
_exit:
9,428✔
425
  return code;
9,428✔
426
}
427

428
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
4,726✔
429
  int32_t code = 0;
4,726✔
430
  int32_t lino;
431

432
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
4,726!
433
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
9,452!
434
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
9,452!
435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
9,452!
436
  tEndDecode(pDecoder);
4,726✔
437

438
_exit:
4,726✔
439
  return code;
4,726✔
440
}
441

442
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
52,332✔
443
  int32_t code = 0;
52,332✔
444
  int32_t lino;
445

446
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
52,332!
447
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId));
104,664!
448
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks));
104,664!
449

450
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
160,902✔
451
    STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
108,570✔
452
    if (ps == NULL) {
108,570!
453
      TAOS_CHECK_EXIT(terrno);
×
454
    }
455

456
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId));
217,140!
457
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId));
217,140!
458
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status));
217,140!
459
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage));
217,140!
460
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId));
217,140!
461
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed));
217,140!
462
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate));
217,140!
463
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal));
217,140!
464
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput));
217,140!
465
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal));
217,140!
466
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput));
217,140!
467
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota));
217,140!
468
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize));
217,140!
469
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer));
217,140!
470
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer));
217,140!
471
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer));
217,140!
472
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId));
217,140!
473
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed));
217,140!
474
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId));
217,140!
475
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId));
217,140!
476
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer));
217,140!
477
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime));
217,140!
478
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize));
217,140!
479
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup));
217,140!
480
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId));
217,140!
481
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs));
217,140!
482
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime));
217,140!
483
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId));
217,140!
484
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer));
217,140!
485
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId));
217,140!
486
  }
487

488
  int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
52,332✔
489
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs));
52,332!
490

491
  for (int j = 0; j < numOfVgs; ++j) {
52,360✔
492
    int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
28✔
493
    if (pVgId == NULL) {
28!
494
      TAOS_CHECK_EXIT(terrno);
×
495
    }
496

497
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId));
56!
498
  }
499

500
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
104,664!
501
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts));
104,664!
502
  tEndEncode(pEncoder);
52,332✔
503

504
_exit:
52,332✔
505
  if (code) {
52,332!
506
    return code;
×
507
  } else {
508
    return pEncoder->pos;
52,332✔
509
  }
510
}
511

512
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
25,934✔
513
  int32_t code = 0;
25,934✔
514
  int32_t lino;
515

516
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
25,934!
517
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId));
51,893!
518
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks));
51,876!
519

520
  if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) {
25,935!
521
    TAOS_CHECK_EXIT(terrno);
×
522
  }
523
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
79,677✔
524
    int32_t          taskId = 0;
53,754✔
525
    STaskStatusEntry entry = {0};
53,754✔
526

527
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId));
53,782!
528
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
53,774!
529
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status));
53,764!
530
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage));
53,756!
531
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId));
53,747!
532
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed));
53,750!
533
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate));
53,754!
534
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal));
53,746!
535
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput));
53,735!
536
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal));
53,736!
537
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput));
53,741!
538
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota));
53,731!
539
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize));
53,722!
540
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer));
53,717!
541
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer));
53,704!
542
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer));
53,699!
543
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId));
53,703!
544
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed));
53,713!
545
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId));
53,708!
546

547
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId));
53,715!
548
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer));
53,705!
549
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime));
53,705!
550
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize));
53,698!
551
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup));
53,706!
552
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId));
53,709!
553
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs));
53,706!
554
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime));
53,714!
555
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId));
53,719!
556
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer));
53,717!
557
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId));
53,715!
558

559
    entry.id.taskId = taskId;
53,715✔
560
    if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) {
107,450!
561
      TAOS_CHECK_EXIT(terrno);
×
562
    }
563
  }
564

565
  int32_t numOfVgs = 0;
25,923✔
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs));
25,940!
567

568
  if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) {
25,940✔
569
    TAOS_CHECK_EXIT(terrno);
9!
570
  }
571

572
  for (int j = 0; j < numOfVgs; ++j) {
25,953✔
573
    int32_t vgId = 0;
14✔
574
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
14!
575
    if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) {
28!
576
      TAOS_CHECK_EXIT(terrno);
×
577
    }
578
  }
579

580
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
51,878!
581
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts));
51,885!
582
  tEndDecode(pDecoder);
25,946✔
583

584
_exit:
25,944✔
585
  return code;
25,944✔
586
}
587

588
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
232,837✔
589
  if (pMsg == NULL) {
232,837!
590
    return;
×
591
  }
592

593
  if (pMsg->pUpdateNodes != NULL) {
232,837✔
594
    taosArrayDestroy(pMsg->pUpdateNodes);
219,554✔
595
    pMsg->pUpdateNodes = NULL;
219,561✔
596
  }
597

598
  if (pMsg->pTaskStatus != NULL) {
232,844✔
599
    taosArrayDestroy(pMsg->pTaskStatus);
219,561✔
600
    pMsg->pTaskStatus = NULL;
219,561✔
601
  }
602

603
  pMsg->msgId = -1;
232,844✔
604
  pMsg->vgId = -1;
232,844✔
605
  pMsg->numOfTasks = -1;
232,844✔
606
}
607

608
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
132,407✔
609
  int32_t code = 0;
132,407✔
610
  int32_t lino;
611

612
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
132,407!
613
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
264,840!
614
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
264,840!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
264,840!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
264,840!
617
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
264,840!
618
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
264,840!
619
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
264,840!
620

621
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
264,840!
622
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
264,840!
623

624
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
264,840!
625
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
264,840!
626
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
132,420!
627
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
132,413!
628

629
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
264,844!
630
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
264,844!
631
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
264,844!
632

633
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
264,844!
634
  int32_t taskId = pTask->hTaskInfo.id.taskId;
132,422✔
635
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
132,422!
636

637
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
264,844!
638
  taskId = pTask->streamTaskId.taskId;
132,422✔
639
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
132,422!
640

641
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
264,844!
642
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
264,844!
643
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
264,844!
644
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
264,844!
645

646
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
132,422✔
647
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
132,418!
648
  for (int32_t i = 0; i < epSz; i++) {
315,595✔
649
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
183,191✔
650
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
183,170!
651
  }
652

653
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
132,404✔
654
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
140,156!
655
  }
656

657
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
132,404✔
658
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
129,448!
659
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
129,448!
660
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
129,448!
661
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
67,680✔
662
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
840!
663
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
67,260!
664
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
665
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
67,260✔
666
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
20,282!
667
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
20,282!
668
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
10,141!
669
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
57,119✔
670
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
57,076!
671
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
114,144!
672
  }
673
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
264,802!
674
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
264,802!
675
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
264,802!
676

677
  tEndEncode(pEncoder);
132,401✔
678
_exit:
132,411✔
679
  return code;
132,411✔
680
}
681

682
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
45,880✔
683
  int32_t taskId = 0;
45,880✔
684
  int32_t code = 0;
45,880✔
685
  int32_t lino;
686

687
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
45,880!
688
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
91,784!
689
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
45,890!
690
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
2!
691
  }
692

693
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
91,781!
694
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
91,778!
695
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
91,769!
696
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
91,766!
697
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
91,765!
698
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
91,761!
699

700
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
91,752!
701
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
91,752!
702

703
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
91,761!
704
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
91,759!
705
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
45,878!
706
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
45,890!
707

708
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
91,794!
709
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
91,794!
710
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
91,780!
711

712
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
91,770!
713
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
45,893!
714
  pTask->hTaskInfo.id.taskId = taskId;
45,893✔
715

716
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
91,784!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
45,885!
718
  pTask->streamTaskId.taskId = taskId;
45,885✔
719

720
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
91,770!
721
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
91,771!
722
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
91,772!
723
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
91,766!
724

725
  int32_t epSz = -1;
45,880✔
726
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
45,884!
727

728
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
45,884!
729
    TAOS_CHECK_EXIT(terrno);
×
730
  }
731
  for (int32_t i = 0; i < epSz; i++) {
109,213✔
732
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
63,323✔
733
    if (pInfo == NULL) {
63,310!
734
      TAOS_CHECK_EXIT(terrno);
×
735
    }
736
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
63,310!
737
      taosMemoryFreeClear(pInfo);
×
738
      goto _exit;
×
739
    }
740
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
126,650!
741
      TAOS_CHECK_EXIT(terrno);
×
742
    }
743
  }
744

745
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
45,890✔
746
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
48,304!
747
  }
748

749
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
45,892✔
750
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
44,897!
751
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
22,449!
752
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
22,449✔
753
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
22,453✔
754
      TAOS_CHECK_EXIT(terrno);
1!
755
    }
756
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
44,871!
757
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
23,444✔
758
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
298!
759
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
23,295!
760
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
761
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
23,295✔
762
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
6,488!
763
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
6,488!
764
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
3,244!
765
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
20,051✔
766
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
20,025!
767
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
20,029!
768
  }
769
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
91,754!
770
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
45,888!
771
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
91,784!
772
  }
773
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
45,892!
774

775
  tEndDecode(pDecoder);
45,890✔
776

777
_exit:
45,892✔
778
  return code;
45,892✔
779
}
780

781
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
12,943✔
782
  int32_t code = 0;
12,943✔
783
  int32_t lino;
784

785
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
12,943!
786
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
25,888!
787
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
25,888!
788
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
25,888!
789
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
25,888!
790
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer));
25,888!
791
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs));
25,888!
792
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
25,888!
793
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask));
25,888!
794
  tEndEncode(pEncoder);
12,944✔
795

796
_exit:
12,940✔
797
  return code;
12,940✔
798
}
799

800
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) {
6,475✔
801
  int32_t code = 0;
6,475✔
802
  int32_t lino;
803

804
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
6,475!
805
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
12,950!
806
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
12,950!
807
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
12,950!
808
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
12,950!
809
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer));
12,950!
810
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs));
12,950!
811
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
12,950!
812
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask));
12,950!
813
  tEndDecode(pDecoder);
6,475✔
814

815
_exit:
6,475✔
816
  return code;
6,475✔
817
}
818

819
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
232✔
820
  int32_t code = 0;
232✔
821
  int32_t lino;
822

823
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
232!
824
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs));
464!
825
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
464!
826
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
464!
827
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
464!
828
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
464!
829
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
464!
830
  tEndEncode(pEncoder);
232✔
831

832
_exit:
232✔
833
  if (code) {
232!
834
    return code;
×
835
  } else {
836
    return pEncoder->pos;
232✔
837
  }
838
}
839

840
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
114✔
841
  int32_t code = 0;
114✔
842
  int32_t lino;
843

844
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
114!
845
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs));
228!
846
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
228!
847
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
228!
848
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
228!
849
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
228!
850
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
228!
851
  tEndDecode(pDecoder);
114✔
852

853
_exit:
114✔
854
  return code;
114✔
855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc