• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.48
/source/libs/stream/src/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "os.h"
18
#include "tstream.h"
19
#include "streamInt.h"
20

21
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
105,051✔
22
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
210,102!
23
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
210,102!
24
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
210,102!
25
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
105,051!
26
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
210,104!
27
  return 0;
105,052✔
28
}
29

30
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
34,595✔
31
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
69,190!
32
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
69,190!
33
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
69,190!
34
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
34,595!
35
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
69,194!
36
  return 0;
34,597✔
37
}
38

39
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
2,970✔
40
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
2,970!
41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId));
5,940!
42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId));
5,940!
43
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId));
5,940!
44
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId));
5,940!
45
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps));
2,970!
46
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId));
5,940!
47
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime));
5,940!
48
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId));
5,940!
49
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger));
5,940!
50
  tEndEncode(pEncoder);
2,970✔
51
  return pEncoder->pos;
2,970✔
52
}
53

54
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
1,487✔
55
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
1,487!
56
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId));
2,974!
57
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId));
2,974!
58
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId));
2,974!
59
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId));
2,974!
60
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps));
1,487!
61
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId));
2,974!
62
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime));
2,974!
63
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId));
2,974!
64
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger));
2,974!
65
  tEndDecode(pDecoder);
1,487✔
66
  return 0;
1,487✔
67
}
68

69
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
2,974✔
70
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
2,974!
71
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId));
5,948!
72
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId));
5,948!
73
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId));
5,948!
74
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId));
5,948!
75
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime));
5,948!
76
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success));
5,948!
77
  tEndEncode(pEncoder);
2,974✔
78
  return pEncoder->pos;
2,974✔
79
}
80

81
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
10✔
82
  int32_t code = 0;
10✔
83
  int32_t lino;
84

85
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
10!
86
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
20!
87
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
20!
88

89
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
10✔
90
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
10!
91

92
  for (int32_t i = 0; i < size; ++i) {
20✔
93
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
10✔
94
    if (pInfo == NULL) {
10!
95
      TAOS_CHECK_EXIT(terrno);
×
96
    }
97

98
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
20!
99
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
10!
100
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
10!
101
  }
102

103
  // todo this new attribute will be result in being incompatible with previous version
104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
20!
105
  tEndEncode(pEncoder);
10✔
106
_exit:
10✔
107
  if (code) {
10!
108
    return code;
×
109
  } else {
110
    return pEncoder->pos;
10✔
111
  }
112
}
113

114
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
5✔
115
  int32_t code = 0;
5✔
116
  int32_t lino;
117

118
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
5!
119
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
10!
120
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
10!
121

122
  int32_t size = 0;
5✔
123
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
5!
124
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
5✔
125
  if (pMsg->pNodeList == NULL) {
5!
126
    TAOS_CHECK_EXIT(terrno);
×
127
  }
128
  for (int32_t i = 0; i < size; ++i) {
10✔
129
    SNodeUpdateInfo info = {0};
5✔
130
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
5!
131
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
5!
132
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
5!
133

134
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
10!
135
      TAOS_CHECK_EXIT(terrno);
×
136
    }
137
  }
138

139
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
10!
140

141
  tEndDecode(pDecoder);
5✔
142
_exit:
5✔
143
  return code;
5✔
144
}
145

146
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
24,732✔
147
  int32_t code = 0;
24,732✔
148
  int32_t lino;
149

150
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
24,732!
151
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
49,468!
152
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
49,468!
153
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
49,468!
154
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
49,468!
155
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
49,468!
156
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
49,468!
157
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
49,468!
158
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
49,468!
159
  tEndEncode(pEncoder);
24,734✔
160

161
_exit:
24,736✔
162
  if (code) {
24,736!
163
    return code;
×
164
  } else {
165
    return pEncoder->pos;
24,736✔
166
  }
167
}
168

169
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
12,320✔
170
  int32_t code = 0;
12,320✔
171
  int32_t lino;
172

173
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
12,320!
174
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
24,640!
175
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
24,640!
176
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
24,640!
177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
24,640!
178
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
24,640!
179
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
24,640!
180
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
24,640!
181
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
24,640!
182
  tEndDecode(pDecoder);
12,320✔
183

184
_exit:
12,320✔
185
  return code;
12,320✔
186
}
187

188
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
24,640✔
189
  int32_t code = 0;
24,640✔
190
  int32_t lino;
191

192
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
24,640!
193
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
49,280!
194
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
49,280!
195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
49,280!
196
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
49,280!
197
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
49,280!
198
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
49,280!
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
49,280!
200
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
49,280!
201
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
49,280!
202
  tEndEncode(pEncoder);
24,640✔
203

204
_exit:
24,640✔
205
  if (code) {
24,640!
206
    return code;
×
207
  } else {
208
    return pEncoder->pos;
24,640✔
209
  }
210
}
211

212
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
12,315✔
213
  int32_t code = 0;
12,315✔
214
  int32_t lino;
215

216
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
12,315!
217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
24,630!
218
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
24,630!
219
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
24,630!
220
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
24,630!
221
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
24,630!
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
24,629!
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
24,628!
224
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
24,627!
225
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
24,626!
226
  tEndDecode(pDecoder);
12,313✔
227

228
_exit:
12,313✔
229
  return code;
12,313✔
230
}
231

232
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
9,198✔
233
  int32_t code = 0;
9,198✔
234
  int32_t lino;
235

236
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
9,198!
237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
18,394!
238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
18,394!
239
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
18,394!
240
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
18,394!
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
18,394!
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
18,394!
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
18,394!
244
  tEndEncode(pEncoder);
9,197✔
245

246
_exit:
9,198✔
247
  if (code) {
9,198!
248
    return code;
×
249
  } else {
250
    return pEncoder->pos;
9,198✔
251
  }
252
}
253

254
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
4,599✔
255
  int32_t code = 0;
4,599✔
256
  int32_t lino;
257

258
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
4,599!
259
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
9,198!
260
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
9,198!
261
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
9,198!
262
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
9,198!
263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
9,198!
264
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
9,198!
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
9,198!
266
  tEndDecode(pDecoder);
4,599✔
267

268
_exit:
4,599✔
269
  return code;
4,599✔
270
}
271

272
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
39,694✔
273
  int32_t code = 0;
39,694✔
274
  int32_t lino;
275

276
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
39,694!
277
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
79,388!
278
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
79,388!
279
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
79,388!
280
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
79,388!
281
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
79,388!
282
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
79,388!
283
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
79,388!
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
79,388!
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
79,388!
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
79,388!
287
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
79,388!
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
79,388!
289
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
79,388!
290

291
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
39,694!
292
    stError("invalid dispatch req msg");
×
293
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
294
  }
295

296
  for (int32_t i = 0; i < pReq->blockNum; i++) {
94,570✔
297
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
54,876✔
298
    void*    data = taosArrayGetP(pReq->data, i);
54,876✔
299
    if (data == NULL || pLen == NULL) {
54,876!
300
      TAOS_CHECK_EXIT(terrno);
×
301
    }
302

303
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
109,752!
304
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
109,752!
305
  }
306
  tEndEncode(pEncoder);
39,694✔
307
_exit:
39,694✔
308
  if (code) {
39,694!
309
    return code;
×
310
  } else {
311
    return pEncoder->pos;
39,694✔
312
  }
313
}
314

315
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
19,837✔
316
  int32_t code = 0;
19,837✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
19,837!
320
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
39,674!
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
39,674!
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
39,674!
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
39,674!
324
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
39,673!
325
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
39,672!
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
39,672!
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
39,672!
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
39,672!
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
39,672!
330
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
39,673!
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
39,673!
332
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
39,672!
333

334
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
19,836!
335
    TAOS_CHECK_EXIT(terrno);
×
336
  }
337
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
19,837!
338
    TAOS_CHECK_EXIT(terrno);
×
339
  }
340
  for (int32_t i = 0; i < pReq->blockNum; i++) {
47,259✔
341
    int32_t  len1;
342
    uint64_t len2;
343
    void*    data;
344
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
27,424!
345
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
27,423!
346

347
    if (len1 != len2) {
27,423!
348
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
349
    }
350

351
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
54,845!
352
      TAOS_CHECK_EXIT(terrno);
×
353
    }
354

355
    if (taosArrayPush(pReq->data, &data) == NULL) {
54,844!
356
      TAOS_CHECK_EXIT(terrno);
×
357
    }
358
  }
359

360
  tEndDecode(pDecoder);
19,835✔
361
_exit:
19,836✔
362
  return code;
19,836✔
363
}
364

365
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
19,837✔
366
  taosArrayDestroyP(pReq->data, taosMemoryFree);
19,837✔
367
  taosArrayDestroy(pReq->dataLen);
19,837✔
368
}
19,837✔
369

370
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
900✔
371
  int32_t code = 0;
900✔
372
  int32_t lino;
373

374
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
900!
375
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
1,800!
376
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
1,800!
377
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
1,800!
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
1,800!
379
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
1,800!
380
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
1,800!
381
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
1,800!
382
  tEndEncode(pEncoder);
900✔
383

384
_exit:
900✔
385
  if (code) {
900!
386
    return code;
×
387
  } else {
388
    return pEncoder->pos;
900✔
389
  }
390
}
391

392
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
450✔
393
  int32_t code = 0;
450✔
394
  int32_t lino;
395

396
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
450!
397
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
900!
398
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
900!
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
900!
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
900!
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
900!
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
900!
403
  uint64_t len = 0;
450✔
404
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
900!
405
  pReq->retrieveLen = (int32_t)len;
450✔
406
  tEndDecode(pDecoder);
450✔
407

408
_exit:
450✔
409
  return code;
450✔
410
}
411

412
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
450✔
413

414
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
5,786✔
415
  int32_t code = 0;
5,786✔
416
  int32_t lino;
417

418
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
5,786!
419
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
11,572!
420
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
11,572!
421
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
11,572!
422
  tEndEncode(pEncoder);
5,786✔
423

424
_exit:
5,786✔
425
  return code;
5,786✔
426
}
427

428
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
2,892✔
429
  int32_t code = 0;
2,892✔
430
  int32_t lino;
431

432
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
2,892!
433
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
5,784!
434
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
5,784!
435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
5,784!
436
  tEndDecode(pDecoder);
2,892✔
437

438
_exit:
2,892✔
439
  return code;
2,892✔
440
}
441

442
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
8,914✔
443
  int32_t code = 0;
8,914✔
444
  int32_t lino;
445

446
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
8,914!
447
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId));
17,828!
448
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks));
17,828!
449

450
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
38,522✔
451
    STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
29,608✔
452
    if (ps == NULL) {
29,608!
453
      TAOS_CHECK_EXIT(terrno);
×
454
    }
455

456
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId));
59,216!
457
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId));
59,216!
458
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status));
59,216!
459
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage));
59,216!
460
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId));
59,216!
461
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed));
59,216!
462
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate));
59,216!
463
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal));
59,216!
464
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput));
59,216!
465
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal));
59,216!
466
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput));
59,216!
467
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota));
59,216!
468
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize));
59,216!
469
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer));
59,216!
470
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer));
59,216!
471
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer));
59,216!
472
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId));
59,216!
473
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed));
59,216!
474
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId));
59,216!
475
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId));
59,216!
476
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer));
59,216!
477
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime));
59,216!
478
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize));
59,216!
479
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup));
59,216!
480
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId));
59,216!
481
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs));
59,216!
482
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime));
59,216!
483
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId));
59,216!
484
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer));
59,216!
485
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId));
59,216!
486
  }
487

488
  int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
8,914✔
489
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs));
8,914!
490

491
  for (int j = 0; j < numOfVgs; ++j) {
8,914!
492
    int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
×
493
    if (pVgId == NULL) {
×
494
      TAOS_CHECK_EXIT(terrno);
×
495
    }
496

497
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId));
×
498
  }
499

500
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
17,828!
501
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts));
17,828!
502
  tEndEncode(pEncoder);
8,914✔
503

504
_exit:
8,914✔
505
  if (code) {
8,914!
506
    return code;
×
507
  } else {
508
    return pEncoder->pos;
8,914✔
509
  }
510
}
511

512
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
4,272✔
513
  int32_t code = 0;
4,272✔
514
  int32_t lino;
515

516
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
4,272!
517
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId));
8,544!
518
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks));
8,544!
519

520
  if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) {
4,272!
521
    TAOS_CHECK_EXIT(terrno);
×
522
  }
523
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
18,650✔
524
    int32_t          taskId = 0;
14,378✔
525
    STaskStatusEntry entry = {0};
14,378✔
526

527
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId));
14,378!
528
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
14,378!
529
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status));
14,378!
530
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage));
14,377!
531
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId));
14,377!
532
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed));
14,377!
533
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate));
14,377!
534
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal));
14,377!
535
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput));
14,377!
536
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal));
14,377!
537
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput));
14,377!
538
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota));
14,377!
539
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize));
14,377!
540
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer));
14,377!
541
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer));
14,377!
542
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer));
14,377!
543
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId));
14,377!
544
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed));
14,377!
545
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId));
14,377!
546

547
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId));
14,377!
548
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer));
14,377!
549
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime));
14,377!
550
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize));
14,377!
551
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup));
14,377!
552
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId));
14,377!
553
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs));
14,377!
554
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime));
14,377!
555
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId));
14,377!
556
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer));
14,377!
557
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId));
14,377!
558

559
    entry.id.taskId = taskId;
14,377✔
560
    if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) {
28,755!
561
      TAOS_CHECK_EXIT(terrno);
×
562
    }
563
  }
564

565
  int32_t numOfVgs = 0;
4,272✔
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs));
4,272!
567

568
  if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) {
4,272!
569
    TAOS_CHECK_EXIT(terrno);
×
570
  }
571

572
  for (int j = 0; j < numOfVgs; ++j) {
4,272!
573
    int32_t vgId = 0;
×
574
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
×
575
    if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) {
×
576
      TAOS_CHECK_EXIT(terrno);
×
577
    }
578
  }
579

580
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
8,544!
581
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts));
8,544!
582
  tEndDecode(pDecoder);
4,272✔
583

584
_exit:
4,272✔
585
  return code;
4,272✔
586
}
587

588
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
44,627✔
589
  if (pMsg == NULL) {
44,627!
590
    return;
×
591
  }
592

593
  if (pMsg->pUpdateNodes != NULL) {
44,627✔
594
    taosArrayDestroy(pMsg->pUpdateNodes);
32,526✔
595
    pMsg->pUpdateNodes = NULL;
32,526✔
596
  }
597

598
  if (pMsg->pTaskStatus != NULL) {
44,627✔
599
    taosArrayDestroy(pMsg->pTaskStatus);
32,526✔
600
    pMsg->pTaskStatus = NULL;
32,526✔
601
  }
602

603
  pMsg->msgId = -1;
44,627✔
604
  pMsg->vgId = -1;
44,627✔
605
  pMsg->numOfTasks = -1;
44,627✔
606
}
607

608
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
72,648✔
609
  int32_t code = 0;
72,648✔
610
  int32_t lino;
611

612
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
72,648!
613
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
145,306!
614
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
145,306!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
145,306!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
145,306!
617
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
145,306!
618
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
145,306!
619
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
145,306!
620

621
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
145,306!
622
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
145,306!
623

624
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
145,306!
625
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
145,306!
626
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
72,653!
627
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
72,647!
628

629
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
145,302!
630
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
145,302!
631
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
145,302!
632

633
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
145,302!
634
  int32_t taskId = pTask->hTaskInfo.id.taskId;
72,651✔
635
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
72,651!
636

637
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
145,302!
638
  taskId = pTask->streamTaskId.taskId;
72,651✔
639
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
72,651!
640

641
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
145,302!
642
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
145,302!
643
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
145,302!
644
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
145,302!
645

646
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
72,651✔
647
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
72,656!
648
  for (int32_t i = 0; i < epSz; i++) {
177,708✔
649
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
105,055✔
650
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
105,052!
651
  }
652

653
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
72,653✔
654
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
77,194!
655
  }
656

657
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
72,653✔
658
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
70,372!
659
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
70,372!
660
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
70,372!
661
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
37,467✔
662
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
816!
663
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
37,059!
664
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
665
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
37,059✔
666
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
11,724!
667
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
11,724!
668
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
5,862!
669
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
31,197✔
670
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
31,143!
671
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
62,288!
672
  }
673
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
145,308!
674
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
145,308!
675
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
145,308!
676

677
  tEndEncode(pEncoder);
72,654✔
678
_exit:
72,650✔
679
  return code;
72,650✔
680
}
681

682
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
24,292✔
683
  int32_t taskId = 0;
24,292✔
684
  int32_t code = 0;
24,292✔
685
  int32_t lino;
686

687
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
24,292!
688
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
48,584!
689
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
24,292!
690
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
691
  }
692

693
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
48,584!
694
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
48,584!
695
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
48,584!
696
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
48,584!
697
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
48,584!
698
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
48,584!
699

700
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
48,584!
701
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
48,584!
702

703
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
48,584!
704
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
48,584!
705
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
24,292!
706
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
24,292!
707

708
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
48,584!
709
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
48,584!
710
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
48,584!
711

712
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
48,584!
713
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
24,292!
714
  pTask->hTaskInfo.id.taskId = taskId;
24,292✔
715

716
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
48,584!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
24,292!
718
  pTask->streamTaskId.taskId = taskId;
24,292✔
719

720
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
48,584!
721
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
48,584!
722
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
48,584!
723
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
48,584!
724

725
  int32_t epSz = -1;
24,292✔
726
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
24,292!
727

728
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
24,292!
729
    TAOS_CHECK_EXIT(terrno);
×
730
  }
731
  for (int32_t i = 0; i < epSz; i++) {
58,889✔
732
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
34,597✔
733
    if (pInfo == NULL) {
34,595!
734
      TAOS_CHECK_EXIT(terrno);
×
735
    }
736
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
34,595!
737
      taosMemoryFreeClear(pInfo);
×
738
      goto _exit;
×
739
    }
740
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
69,194!
741
      TAOS_CHECK_EXIT(terrno);
×
742
    }
743
  }
744

745
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
24,292✔
746
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
25,698!
747
  }
748

749
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
24,292✔
750
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
23,540!
751
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
11,770!
752
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
11,770✔
753
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
11,770!
754
      TAOS_CHECK_EXIT(terrno);
×
755
    }
756
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
23,640!
757
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
12,522✔
758
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
284!
759
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
12,380!
760
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
761
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
12,380✔
762
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
3,694!
763
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
3,694!
764
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
1,847!
765
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
10,533✔
766
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
10,513!
767
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
10,513!
768
  }
769
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
48,684!
770
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
24,292!
771
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
48,584!
772
  }
773
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
24,292!
774

775
  tEndDecode(pDecoder);
24,292✔
776

777
_exit:
24,292✔
778
  return code;
24,292✔
779
}
780

781
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
5,696✔
782
  int32_t code = 0;
5,696✔
783
  int32_t lino;
784

785
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
5,696!
786
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
11,392!
787
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
11,392!
788
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
11,392!
789
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
11,392!
790
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer));
11,392!
791
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs));
11,392!
792
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
11,392!
793
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask));
11,392!
794
  tEndEncode(pEncoder);
5,696✔
795

796
_exit:
5,696✔
797
  return code;
5,696✔
798
}
799

800
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) {
2,848✔
801
  int32_t code = 0;
2,848✔
802
  int32_t lino;
803

804
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
2,848!
805
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
5,696!
806
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
5,696!
807
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
5,696!
808
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
5,696!
809
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer));
5,696!
810
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs));
5,696!
811
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
5,696!
812
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask));
5,696!
813
  tEndDecode(pDecoder);
2,848✔
814

815
_exit:
2,848✔
816
  return code;
2,848✔
817
}
818

819
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
30✔
820
  int32_t code = 0;
30✔
821
  int32_t lino;
822

823
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
30!
824
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs));
60!
825
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
60!
826
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
60!
827
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
60!
828
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
60!
829
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
60!
830
  tEndEncode(pEncoder);
30✔
831

832
_exit:
30✔
833
  if (code) {
30!
834
    return code;
×
835
  } else {
836
    return pEncoder->pos;
30✔
837
  }
838
}
839

840
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
15✔
841
  int32_t code = 0;
15✔
842
  int32_t lino;
843

844
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
15!
845
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs));
30!
846
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
30!
847
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
30!
848
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
30!
849
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
30!
850
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
30!
851
  tEndDecode(pDecoder);
15✔
852

853
_exit:
15✔
854
  return code;
15✔
855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc