• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3525

10 Nov 2024 03:50AM UTC coverage: 60.818% (-0.08%) from 60.898%
#3525

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

118634 of 249004 branches covered (47.64%)

Branch coverage included in aggregate %.

136 of 169 new or added lines in 23 files covered. (80.47%)

542 existing lines in 129 files now uncovered.

199071 of 273386 relevant lines covered (72.82%)

15691647.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.07
/source/libs/stream/src/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "os.h"
18
#include "tstream.h"
19
#include "streamInt.h"
20

21
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
183,151✔
22
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
366,302!
23
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
366,302!
24
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
366,302!
25
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
183,151!
26
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
366,356!
27
  return 0;
183,178✔
28
}
29

30
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
63,357✔
31
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
126,727!
32
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
126,739!
33
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
126,739!
34
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
63,370!
35
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
126,732!
36
  return 0;
63,366✔
37
}
38

39
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
6,612✔
40
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
6,612!
41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId));
13,224!
42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId));
13,224!
43
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId));
13,224!
44
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId));
13,224!
45
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps));
6,612!
46
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId));
13,224!
47
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime));
13,224!
48
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId));
13,224!
49
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger));
13,224!
50
  tEndEncode(pEncoder);
6,612✔
51
  return pEncoder->pos;
6,612✔
52
}
53

54
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
4,279✔
55
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
4,279!
56
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId));
8,569!
57
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId));
8,555!
58
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId));
8,548!
59
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId));
8,544!
60
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps));
4,271!
61
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId));
8,563!
62
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime));
8,563!
63
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId));
8,553!
64
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger));
8,545!
65
  tEndDecode(pDecoder);
4,273✔
66
  return 0;
4,273✔
67
}
68

69
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
8,573✔
70
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
8,573✔
71
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId));
17,152!
72
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId));
17,152!
73
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId));
17,152!
74
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId));
17,152!
75
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime));
17,152!
76
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success));
17,152!
77
  tEndEncode(pEncoder);
8,576✔
78
  return pEncoder->pos;
8,561✔
79
}
80

81
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
106✔
82
  int32_t code = 0;
106✔
83
  int32_t lino;
84

85
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
106!
86
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
212!
87
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
212!
88

89
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
106✔
90
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
106!
91

92
  for (int32_t i = 0; i < size; ++i) {
296✔
93
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
190✔
94
    if (pInfo == NULL) {
190!
95
      TAOS_CHECK_EXIT(terrno);
×
96
    }
97

98
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
380!
99
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
190!
100
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
190!
101
  }
102

103
  // todo this new attribute will be result in being incompatible with previous version
104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
212!
105
  tEndEncode(pEncoder);
106✔
106
_exit:
106✔
107
  if (code) {
106!
108
    return code;
×
109
  } else {
110
    return pEncoder->pos;
106✔
111
  }
112
}
113

114
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
73✔
115
  int32_t code = 0;
73✔
116
  int32_t lino;
117

118
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
73!
119
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
146!
120
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
146!
121

122
  int32_t size = 0;
73✔
123
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
73!
124
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
73✔
125
  if (pMsg->pNodeList == NULL) {
73!
126
    TAOS_CHECK_EXIT(terrno);
×
127
  }
128
  for (int32_t i = 0; i < size; ++i) {
204✔
129
    SNodeUpdateInfo info = {0};
131✔
130
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
131!
131
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
131!
132
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
131!
133

134
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
262!
135
      TAOS_CHECK_EXIT(terrno);
×
136
    }
137
  }
138

139
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
146!
140

141
  tEndDecode(pDecoder);
73✔
142
_exit:
73✔
143
  return code;
73✔
144
}
145

146
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
43,366✔
147
  int32_t code = 0;
43,366✔
148
  int32_t lino;
149

150
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
43,366!
151
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
86,730!
152
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
86,730!
153
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
86,730!
154
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
86,730!
155
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
86,730!
156
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
86,730!
157
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
86,730!
158
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
86,730!
159
  tEndEncode(pEncoder);
43,365✔
160

161
_exit:
43,363✔
162
  if (code) {
43,363!
163
    return code;
×
164
  } else {
165
    return pEncoder->pos;
43,363✔
166
  }
167
}
168

169
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
21,552✔
170
  int32_t code = 0;
21,552✔
171
  int32_t lino;
172

173
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
21,552!
174
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
43,138!
175
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
43,123!
176
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
43,099!
177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
43,086!
178
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
43,081!
179
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
43,072!
180
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
43,065!
181
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
43,068!
182
  tEndDecode(pDecoder);
21,536✔
183

184
_exit:
21,538✔
185
  return code;
21,538✔
186
}
187

188
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
43,016✔
189
  int32_t code = 0;
43,016✔
190
  int32_t lino;
191

192
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
43,016!
193
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
86,066!
194
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
86,066!
195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
86,066!
196
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
86,066!
197
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
86,066!
198
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
86,066!
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
86,066!
200
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
86,066!
201
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
86,066!
202
  tEndEncode(pEncoder);
43,033✔
203

204
_exit:
43,027✔
205
  if (code) {
43,027!
206
    return code;
×
207
  } else {
208
    return pEncoder->pos;
43,027✔
209
  }
210
}
211

212
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
21,538✔
213
  int32_t code = 0;
21,538✔
214
  int32_t lino;
215

216
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
21,538!
217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
43,088!
218
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
43,077!
219
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
43,068!
220
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
43,045!
221
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
43,032!
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
43,026!
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
43,021!
224
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
43,017!
225
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
43,013!
226
  tEndDecode(pDecoder);
21,507✔
227

228
_exit:
21,508✔
229
  return code;
21,508✔
230
}
231

232
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
17,259✔
233
  int32_t code = 0;
17,259✔
234
  int32_t lino;
235

236
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
17,259!
237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
34,582!
238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
34,582!
239
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
34,582!
240
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
34,582!
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
34,582!
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
34,582!
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
34,582!
244
  tEndEncode(pEncoder);
17,291✔
245

246
_exit:
17,298✔
247
  if (code) {
17,298!
248
    return code;
×
249
  } else {
250
    return pEncoder->pos;
17,298✔
251
  }
252
}
253

254
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
8,642✔
255
  int32_t code = 0;
8,642✔
256
  int32_t lino;
257

258
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
8,642!
259
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
17,302!
260
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
17,282!
261
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
17,271!
262
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
17,256!
263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
17,240!
264
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
17,226!
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
17,217!
266
  tEndDecode(pDecoder);
8,608✔
267

268
_exit:
8,605✔
269
  return code;
8,605✔
270
}
271

272
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
117,456✔
273
  int32_t code = 0;
117,456✔
274
  int32_t lino;
275

276
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
117,456!
277
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
234,964!
278
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
234,964!
279
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
234,964!
280
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
234,964!
281
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
234,964!
282
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
234,964!
283
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
234,964!
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
234,964!
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
234,964!
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
234,964!
287
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
234,964!
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
234,964!
289
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
234,964!
290

291
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
117,482!
292
    stError("invalid dispatch req msg");
×
293
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
5!
294
  }
295

296
  for (int32_t i = 0; i < pReq->blockNum; i++) {
435,357✔
297
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
317,900✔
298
    void*    data = taosArrayGetP(pReq->data, i);
317,881✔
299
    if (data == NULL || pLen == NULL) {
317,869!
UNCOV
300
      TAOS_CHECK_EXIT(terrno);
×
301
    }
302

303
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
635,756!
304
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
635,756!
305
  }
306
  tEndEncode(pEncoder);
117,457✔
307
_exit:
117,463✔
308
  if (code) {
117,463!
309
    return code;
×
310
  } else {
311
    return pEncoder->pos;
117,463✔
312
  }
313
}
314

315
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
58,716✔
316
  int32_t code = 0;
58,716✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
58,716!
320
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
117,456!
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
117,446!
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
117,437!
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
117,430!
324
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
117,420!
325
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
117,407!
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
117,392!
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
117,378!
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
117,373!
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
117,377!
330
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
117,384!
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
117,384!
332
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
117,383!
333

334
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
58,693!
335
    TAOS_CHECK_EXIT(terrno);
×
336
  }
337
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
58,721!
UNCOV
338
    TAOS_CHECK_EXIT(terrno);
×
339
  }
340
  for (int32_t i = 0; i < pReq->blockNum; i++) {
216,456✔
341
    int32_t  len1;
342
    uint64_t len2;
343
    void*    data;
344
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
157,787!
345
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
157,741!
346

347
    if (len1 != len2) {
157,741!
348
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
349
    }
350

351
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
315,492!
352
      TAOS_CHECK_EXIT(terrno);
×
353
    }
354

355
    if (taosArrayPush(pReq->data, &data) == NULL) {
315,480!
356
      TAOS_CHECK_EXIT(terrno);
×
357
    }
358
  }
359

360
  tEndDecode(pDecoder);
58,665✔
361
_exit:
58,690✔
362
  return code;
58,690✔
363
}
364

365
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
58,713✔
366
  taosArrayDestroyP(pReq->data, taosMemoryFree);
58,713✔
367
  taosArrayDestroy(pReq->dataLen);
58,717✔
368
}
58,709✔
369

370
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
1,074✔
371
  int32_t code = 0;
1,074✔
372
  int32_t lino;
373

374
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
1,074!
375
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
2,148!
376
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
2,148!
377
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
2,148!
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
2,148!
379
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
2,148!
380
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
2,148!
381
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
2,148!
382
  tEndEncode(pEncoder);
1,074✔
383

384
_exit:
1,074✔
385
  if (code) {
1,074!
386
    return code;
×
387
  } else {
388
    return pEncoder->pos;
1,074✔
389
  }
390
}
391

392
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
537✔
393
  int32_t code = 0;
537✔
394
  int32_t lino;
395

396
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
537!
397
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
1,074!
398
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
1,074!
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
1,074!
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
1,074!
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
1,074!
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
1,074!
403
  uint64_t len = 0;
537✔
404
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
1,074!
405
  pReq->retrieveLen = (int32_t)len;
537✔
406
  tEndDecode(pDecoder);
537✔
407

408
_exit:
537✔
409
  return code;
537✔
410
}
411

412
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
536✔
413

414
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
9,449✔
415
  int32_t code = 0;
9,449✔
416
  int32_t lino;
417

418
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
9,449!
419
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
18,904!
420
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
18,904!
421
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
18,904!
422
  tEndEncode(pEncoder);
9,452✔
423

424
_exit:
9,454✔
425
  return code;
9,454✔
426
}
427

428
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
4,733✔
429
  int32_t code = 0;
4,733✔
430
  int32_t lino;
431

432
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
4,733!
433
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
9,466!
434
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
9,466!
435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
9,466!
436
  tEndDecode(pDecoder);
4,733✔
437

438
_exit:
4,733✔
439
  return code;
4,733✔
440
}
441

442
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
52,830✔
443
  int32_t code = 0;
52,830✔
444
  int32_t lino;
445

446
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
52,830!
447
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId));
105,660!
448
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks));
105,660!
449

450
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
162,524✔
451
    STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
109,694✔
452
    if (ps == NULL) {
109,694!
453
      TAOS_CHECK_EXIT(terrno);
×
454
    }
455

456
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId));
219,388!
457
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId));
219,388!
458
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status));
219,388!
459
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage));
219,388!
460
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId));
219,388!
461
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed));
219,388!
462
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate));
219,388!
463
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal));
219,388!
464
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput));
219,388!
465
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal));
219,388!
466
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput));
219,388!
467
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota));
219,388!
468
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize));
219,388!
469
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer));
219,388!
470
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer));
219,388!
471
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer));
219,388!
472
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId));
219,388!
473
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed));
219,388!
474
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId));
219,388!
475
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId));
219,388!
476
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer));
219,388!
477
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime));
219,388!
478
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize));
219,388!
479
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup));
219,388!
480
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId));
219,388!
481
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs));
219,388!
482
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime));
219,388!
483
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId));
219,388!
484
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer));
219,388!
485
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId));
219,388!
486
  }
487

488
  int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
52,830✔
489
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs));
52,830!
490

491
  for (int j = 0; j < numOfVgs; ++j) {
52,872✔
492
    int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
42✔
493
    if (pVgId == NULL) {
42!
494
      TAOS_CHECK_EXIT(terrno);
×
495
    }
496

497
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId));
84!
498
  }
499

500
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
105,660!
501
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts));
105,660!
502
  tEndEncode(pEncoder);
52,830✔
503

504
_exit:
52,830✔
505
  if (code) {
52,830!
506
    return code;
×
507
  } else {
508
    return pEncoder->pos;
52,830✔
509
  }
510
}
511

512
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
26,172✔
513
  int32_t code = 0;
26,172✔
514
  int32_t lino;
515

516
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
26,172!
517
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId));
52,364!
518
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks));
52,349!
519

520
  if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) {
26,178!
521
    TAOS_CHECK_EXIT(terrno);
×
522
  }
523
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
80,473✔
524
    int32_t          taskId = 0;
54,307✔
525
    STaskStatusEntry entry = {0};
54,307✔
526

527
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId));
54,360!
528
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
54,356!
529
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status));
54,339!
530
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage));
54,329!
531
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId));
54,310!
532
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed));
54,304!
533
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate));
54,298!
534
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal));
54,288!
535
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput));
54,280!
536
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal));
54,273!
537
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput));
54,289!
538
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota));
54,269!
539
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize));
54,276!
540
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer));
54,267!
541
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer));
54,261!
542
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer));
54,243!
543
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId));
54,249!
544
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed));
54,259!
545
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId));
54,261!
546

547
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId));
54,276!
548
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer));
54,276!
549
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime));
54,261!
550
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize));
54,257!
551
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup));
54,259!
552
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId));
54,254!
553
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs));
54,237!
554
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime));
54,239!
555
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId));
54,245!
556
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer));
54,256!
557
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId));
54,246!
558

559
    entry.id.taskId = taskId;
54,246✔
560
    if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) {
108,542!
561
      TAOS_CHECK_EXIT(terrno);
×
562
    }
563
  }
564

565
  int32_t numOfVgs = 0;
26,166✔
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs));
26,177!
567

568
  if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) {
26,177✔
569
    TAOS_CHECK_EXIT(terrno);
9!
570
  }
571

572
  for (int j = 0; j < numOfVgs; ++j) {
26,195✔
573
    int32_t vgId = 0;
21✔
574
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
21!
575
    if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) {
42!
576
      TAOS_CHECK_EXIT(terrno);
×
577
    }
578
  }
579

580
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
52,345!
581
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts));
52,338!
582
  tEndDecode(pDecoder);
26,167✔
583

584
_exit:
26,172✔
585
  return code;
26,172✔
586
}
587

588
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
226,886✔
589
  if (pMsg == NULL) {
226,886!
590
    return;
×
591
  }
592

593
  if (pMsg->pUpdateNodes != NULL) {
226,886✔
594
    taosArrayDestroy(pMsg->pUpdateNodes);
213,550✔
595
    pMsg->pUpdateNodes = NULL;
213,552✔
596
  }
597

598
  if (pMsg->pTaskStatus != NULL) {
226,888✔
599
    taosArrayDestroy(pMsg->pTaskStatus);
213,553✔
600
    pMsg->pTaskStatus = NULL;
213,556✔
601
  }
602

603
  pMsg->msgId = -1;
226,891✔
604
  pMsg->vgId = -1;
226,891✔
605
  pMsg->numOfTasks = -1;
226,891✔
606
}
607

608
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
132,533✔
609
  int32_t code = 0;
132,533✔
610
  int32_t lino;
611

612
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
132,533!
613
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
265,082!
614
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
265,082!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
265,082!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
265,082!
617
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
265,082!
618
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
265,082!
619
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
265,082!
620

621
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
265,082!
622
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
265,082!
623

624
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
265,082!
625
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
265,082!
626
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
132,541!
627
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
132,539!
628

629
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
265,076!
630
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
265,076!
631
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
265,076!
632

633
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
265,076!
634
  int32_t taskId = pTask->hTaskInfo.id.taskId;
132,538✔
635
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
132,538!
636

637
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
265,076!
638
  taskId = pTask->streamTaskId.taskId;
132,538✔
639
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
132,538!
640

641
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
265,076!
642
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
265,076!
643
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
265,076!
644
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
265,076!
645

646
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
132,538✔
647
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
132,538!
648
  for (int32_t i = 0; i < epSz; i++) {
315,713✔
649
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
183,169✔
650
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
183,154!
651
  }
652

653
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
132,544✔
654
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
140,280!
655
  }
656

657
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
132,544✔
658
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
129,498!
659
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
129,498!
660
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
129,498!
661
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
67,795✔
662
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
848!
663
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
67,371!
664
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
665
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
67,371✔
666
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
20,796!
667
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
20,796!
668
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
10,398!
669
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
56,973✔
670
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
56,922!
671
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
113,844!
672
  }
673
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
265,086!
674
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
265,086!
675
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
265,086!
676

677
  tEndEncode(pEncoder);
132,543✔
678
_exit:
132,535✔
679
  return code;
132,535✔
680
}
681

682
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
45,956✔
683
  int32_t taskId = 0;
45,956✔
684
  int32_t code = 0;
45,956✔
685
  int32_t lino;
686

687
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
45,956!
688
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
91,922!
689
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
45,959!
690
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
3!
691
  }
692

693
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
91,916!
694
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
91,913!
695
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
91,908!
696
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
91,905!
697
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
91,907!
698
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
91,906!
699

700
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
91,898!
701
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
91,898!
702

703
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
91,905!
704
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
91,907!
705
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
45,954!
706
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
45,961!
707

708
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
91,920!
709
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
91,920!
710
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
91,911!
711

712
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
91,906!
713
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
45,957!
714
  pTask->hTaskInfo.id.taskId = taskId;
45,957✔
715

716
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
91,911!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
45,952!
718
  pTask->streamTaskId.taskId = taskId;
45,952✔
719

720
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
91,904!
721
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
91,906!
722
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
91,909!
723
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
91,904!
724

725
  int32_t epSz = -1;
45,949✔
726
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
45,951!
727

728
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
45,951✔
729
    TAOS_CHECK_EXIT(terrno);
3!
730
  }
731
  for (int32_t i = 0; i < epSz; i++) {
109,319✔
732
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
63,364✔
733
    if (pInfo == NULL) {
63,357!
734
      TAOS_CHECK_EXIT(terrno);
×
735
    }
736
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
63,357!
737
      taosMemoryFreeClear(pInfo);
×
738
      goto _exit;
×
739
    }
740
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
126,728!
741
      TAOS_CHECK_EXIT(terrno);
×
742
    }
743
  }
744

745
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
45,955✔
746
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
48,365!
747
  }
748

749
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
45,956✔
750
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
44,935!
751
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
22,467!
752
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
22,472✔
753
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
22,474✔
754
      TAOS_CHECK_EXIT(terrno);
7!
755
    }
756
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
44,878!
757
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
23,488✔
758
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
296!
759
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
23,340!
760
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
761
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
23,340✔
762
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
6,670!
763
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
6,670!
764
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
3,335!
765
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
20,005✔
766
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
19,984!
767
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
19,986!
768
  }
769
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
91,856!
770
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
45,956!
771
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
91,915!
772
  }
773
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
45,955!
774

775
  tEndDecode(pDecoder);
45,955✔
776

777
_exit:
45,958✔
778
  return code;
45,958✔
779
}
780

781
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
13,004✔
782
  int32_t code = 0;
13,004✔
783
  int32_t lino;
784

785
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
13,004!
786
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
26,016!
787
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
26,016!
788
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
26,016!
789
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
26,016!
790
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer));
26,016!
791
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs));
26,016!
792
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
26,016!
793
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask));
26,016!
794
  tEndEncode(pEncoder);
13,008✔
795

796
_exit:
13,003✔
797
  return code;
13,003✔
798
}
799

800
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) {
6,508✔
801
  int32_t code = 0;
6,508✔
802
  int32_t lino;
803

804
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
6,508!
805
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
13,016!
806
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
13,016!
807
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
13,016!
808
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
13,016!
809
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer));
13,016!
810
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs));
13,016!
811
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
13,016!
812
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask));
13,016!
813
  tEndDecode(pDecoder);
6,508✔
814

815
_exit:
6,508✔
816
  return code;
6,508✔
817
}
818

819
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
174✔
820
  int32_t code = 0;
174✔
821
  int32_t lino;
822

823
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
174!
824
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs));
348!
825
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
348!
826
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
348!
827
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
348!
828
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
348!
829
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
348!
830
  tEndEncode(pEncoder);
174✔
831

832
_exit:
174✔
833
  if (code) {
174!
834
    return code;
×
835
  } else {
836
    return pEncoder->pos;
174✔
837
  }
838
}
839

840
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
87✔
841
  int32_t code = 0;
87✔
842
  int32_t lino;
843

844
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
87!
845
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs));
174!
846
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
174!
847
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
174!
848
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
174!
849
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
174!
850
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
174!
851
  tEndDecode(pDecoder);
87✔
852

853
_exit:
87✔
854
  return code;
87✔
855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc