• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.2
/source/libs/stream/src/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "os.h"
18
#include "tstream.h"
19
#include "streamInt.h"
20

21
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
68,288✔
22
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
136,576!
23
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
136,576!
24
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
136,576!
25
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
68,288!
26
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
136,594!
27
  return 0;
68,297✔
28
}
29

30
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
23,535✔
31
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
47,081!
32
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
47,097!
33
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
47,104!
34
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
23,553!
35
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
47,126!
36
  return 0;
23,562✔
37
}
38

39
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
2,976✔
40
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
2,976!
41
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId));
5,952!
42
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId));
5,952!
43
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId));
5,952!
44
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId));
5,952!
45
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps));
2,976!
46
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId));
5,952!
47
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime));
5,952!
48
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId));
5,952!
49
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger));
5,952!
50
  tEndEncode(pEncoder);
2,976✔
51
  return pEncoder->pos;
2,976✔
52
}
53

54
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
1,492✔
55
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
1,492!
56
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId));
2,988!
57
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId));
2,990!
58
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId));
2,990!
59
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId));
2,989!
60
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps));
1,494!
61
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId));
2,990!
62
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime));
2,989!
63
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId));
2,989!
64
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger));
2,988!
65
  tEndDecode(pDecoder);
1,493✔
66
  return 0;
1,491✔
67
}
68

69
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
2,997✔
70
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
2,997✔
71
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId));
5,992!
72
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId));
5,992!
73
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId));
5,992!
74
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId));
5,992!
75
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime));
5,992!
76
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success));
5,992!
77
  tEndEncode(pEncoder);
2,996✔
78
  return pEncoder->pos;
2,998✔
79
}
80

UNCOV
81
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
×
UNCOV
82
  int32_t code = 0;
×
83
  int32_t lino;
84

UNCOV
85
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
UNCOV
86
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
×
UNCOV
87
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
×
88

UNCOV
89
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
×
UNCOV
90
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
91

UNCOV
92
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
93
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
×
UNCOV
94
    if (pInfo == NULL) {
×
95
      TAOS_CHECK_EXIT(terrno);
×
96
    }
97

UNCOV
98
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
×
UNCOV
99
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
×
UNCOV
100
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
×
101
  }
102

103
  // todo this new attribute will be result in being incompatible with previous version
UNCOV
104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
×
UNCOV
105
  tEndEncode(pEncoder);
×
UNCOV
106
_exit:
×
UNCOV
107
  if (code) {
×
108
    return code;
×
109
  } else {
UNCOV
110
    return pEncoder->pos;
×
111
  }
112
}
113

UNCOV
114
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
×
UNCOV
115
  int32_t code = 0;
×
116
  int32_t lino;
117

UNCOV
118
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
UNCOV
119
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
×
UNCOV
120
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
×
121

UNCOV
122
  int32_t size = 0;
×
UNCOV
123
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
UNCOV
124
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
×
UNCOV
125
  if (pMsg->pNodeList == NULL) {
×
126
    TAOS_CHECK_EXIT(terrno);
×
127
  }
UNCOV
128
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
129
    SNodeUpdateInfo info = {0};
×
UNCOV
130
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
×
UNCOV
131
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
×
UNCOV
132
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
×
133

UNCOV
134
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
×
135
      TAOS_CHECK_EXIT(terrno);
×
136
    }
137
  }
138

UNCOV
139
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
×
140

UNCOV
141
  tEndDecode(pDecoder);
×
UNCOV
142
_exit:
×
UNCOV
143
  return code;
×
144
}
145

146
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
17,815✔
147
  int32_t code = 0;
17,815✔
148
  int32_t lino;
149

150
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
17,815!
151
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
35,628!
152
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
35,628!
153
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
35,628!
154
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
35,628!
155
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
35,628!
156
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
35,628!
157
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
35,628!
158
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
35,628!
159
  tEndEncode(pEncoder);
17,814✔
160

161
_exit:
17,817✔
162
  if (code) {
17,817!
163
    return code;
×
164
  } else {
165
    return pEncoder->pos;
17,817✔
166
  }
167
}
168

169
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
8,881✔
170
  int32_t code = 0;
8,881✔
171
  int32_t lino;
172

173
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
8,881!
174
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
17,785!
175
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
17,773!
176
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
17,764!
177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
17,757!
178
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
17,747!
179
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
17,742!
180
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
17,738!
181
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
17,734!
182
  tEndDecode(pDecoder);
8,866✔
183

184
_exit:
8,861✔
185
  return code;
8,861✔
186
}
187

188
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
17,655✔
189
  int32_t code = 0;
17,655✔
190
  int32_t lino;
191

192
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
17,655!
193
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
35,412!
194
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
35,412!
195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
35,412!
196
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
35,412!
197
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
35,412!
198
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
35,412!
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
35,412!
200
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
35,412!
201
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
35,412!
202
  tEndEncode(pEncoder);
17,706✔
203

204
_exit:
17,715✔
205
  if (code) {
17,715!
206
    return code;
×
207
  } else {
208
    return pEncoder->pos;
17,715✔
209
  }
210
}
211

212
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
8,863✔
213
  int32_t code = 0;
8,863✔
214
  int32_t lino;
215

216
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
8,863!
217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
17,747!
218
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
17,731!
219
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
17,720!
220
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
17,705!
221
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
17,693!
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
17,686!
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
17,682!
224
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
17,683!
225
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
17,682!
226
  tEndDecode(pDecoder);
8,839✔
227

228
_exit:
8,825✔
229
  return code;
8,825✔
230
}
231

232
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
5,977✔
233
  int32_t code = 0;
5,977✔
234
  int32_t lino;
235

236
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
5,977!
237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
11,956!
238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
11,956!
239
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
11,956!
240
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
11,956!
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
11,956!
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
11,956!
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
11,956!
244
  tEndEncode(pEncoder);
5,978✔
245

246
_exit:
5,994✔
247
  if (code) {
5,994!
248
    return code;
×
249
  } else {
250
    return pEncoder->pos;
5,994✔
251
  }
252
}
253

254
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
2,995✔
255
  int32_t code = 0;
2,995✔
256
  int32_t lino;
257

258
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
2,995!
259
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
5,985!
260
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
5,986!
261
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
5,981!
262
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
5,976!
263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
5,966!
264
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
5,956!
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
5,953!
266
  tEndDecode(pDecoder);
2,976✔
267

268
_exit:
2,973✔
269
  return code;
2,973✔
270
}
271

272
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
23,082✔
273
  int32_t code = 0;
23,082✔
274
  int32_t lino;
275

276
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
23,082!
277
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
46,198!
278
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
46,198!
279
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
46,198!
280
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
46,198!
281
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
46,198!
282
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
46,198!
283
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
46,198!
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
46,198!
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
46,198!
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
46,198!
287
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
46,198!
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
46,198!
289
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
46,198!
290

291
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
23,099!
292
    stError("invalid dispatch req msg");
×
293
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
6!
294
  }
295

296
  for (int32_t i = 0; i < pReq->blockNum; i++) {
58,948✔
297
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
35,866✔
298
    void*    data = taosArrayGetP(pReq->data, i);
35,860✔
299
    if (data == NULL || pLen == NULL) {
35,865✔
300
      TAOS_CHECK_EXIT(terrno);
7!
301
    }
302

303
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
71,716!
304
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
71,716!
305
  }
306
  tEndEncode(pEncoder);
23,082✔
307
_exit:
23,078✔
308
  if (code) {
23,078!
309
    return code;
×
310
  } else {
311
    return pEncoder->pos;
23,078✔
312
  }
313
}
314

315
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
11,548✔
316
  int32_t code = 0;
11,548✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
11,548!
320
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
23,094!
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
23,082!
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
23,073!
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
23,068!
324
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
23,057!
325
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
23,045!
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
23,039!
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
23,041!
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
23,045!
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
23,046!
330
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
23,048!
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
23,050!
332
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
23,048!
333

334
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
11,523!
335
    TAOS_CHECK_EXIT(terrno);
×
336
  }
337
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
11,554✔
338
    TAOS_CHECK_EXIT(terrno);
1!
339
  }
340
  for (int32_t i = 0; i < pReq->blockNum; i++) {
29,385✔
341
    int32_t  len1;
342
    uint64_t len2;
343
    void*    data;
344
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
17,872!
345
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
17,857!
346

347
    if (len1 != len2) {
17,857!
348
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
349
    }
350

351
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
35,705!
352
      TAOS_CHECK_EXIT(terrno);
×
353
    }
354

355
    if (taosArrayPush(pReq->data, &data) == NULL) {
35,684!
356
      TAOS_CHECK_EXIT(terrno);
×
357
    }
358
  }
359

360
  tEndDecode(pDecoder);
11,510✔
361
_exit:
11,505✔
362
  return code;
11,505✔
363
}
364

365
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
11,542✔
366
  taosArrayDestroyP(pReq->data, taosMemoryFree);
11,542✔
367
  taosArrayDestroy(pReq->dataLen);
11,549✔
368
}
11,547✔
369

370
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
454✔
371
  int32_t code = 0;
454✔
372
  int32_t lino;
373

374
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
454!
375
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
908!
376
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
908!
377
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
908!
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
908!
379
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
908!
380
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
908!
381
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
908!
382
  tEndEncode(pEncoder);
454✔
383

384
_exit:
454✔
385
  if (code) {
454!
386
    return code;
×
387
  } else {
388
    return pEncoder->pos;
454✔
389
  }
390
}
391

392
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
227✔
393
  int32_t code = 0;
227✔
394
  int32_t lino;
395

396
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
227!
397
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
454!
398
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
454!
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
454!
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
454!
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
454!
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
454!
403
  uint64_t len = 0;
227✔
404
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
455!
405
  pReq->retrieveLen = (int32_t)len;
228✔
406
  tEndDecode(pDecoder);
228✔
407

408
_exit:
227✔
409
  return code;
227✔
410
}
411

412
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
227✔
413

414
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
2,730✔
415
  int32_t code = 0;
2,730✔
416
  int32_t lino;
417

418
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
2,730!
419
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
5,472!
420
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
5,472!
421
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
5,472!
422
  tEndEncode(pEncoder);
2,736✔
423

424
_exit:
2,732✔
425
  return code;
2,732✔
426
}
427

428
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
1,370✔
429
  int32_t code = 0;
1,370✔
430
  int32_t lino;
431

432
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
1,370!
433
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
2,740!
434
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
2,740!
435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
2,740!
436
  tEndDecode(pDecoder);
1,370✔
437

438
_exit:
1,370✔
439
  return code;
1,370✔
440
}
441

442
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
24,878✔
443
  int32_t code = 0;
24,878✔
444
  int32_t lino;
445

446
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
24,878!
447
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId));
49,756!
448
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks));
49,756!
449

450
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
77,302✔
451
    STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
52,424✔
452
    if (ps == NULL) {
52,424!
453
      TAOS_CHECK_EXIT(terrno);
×
454
    }
455

456
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId));
104,848!
457
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId));
104,848!
458
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status));
104,848!
459
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage));
104,848!
460
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId));
104,848!
461
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed));
104,848!
462
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate));
104,848!
463
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal));
104,848!
464
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput));
104,848!
465
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal));
104,848!
466
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput));
104,848!
467
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota));
104,848!
468
    TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize));
104,848!
469
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer));
104,848!
470
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer));
104,848!
471
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer));
104,848!
472
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId));
104,848!
473
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed));
104,848!
474
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId));
104,848!
475
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId));
104,848!
476
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer));
104,848!
477
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime));
104,848!
478
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize));
104,848!
479
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup));
104,848!
480
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId));
104,848!
481
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs));
104,848!
482
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime));
104,848!
483
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId));
104,848!
484
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer));
104,848!
485
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId));
104,848!
486
  }
487

488
  int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
24,878✔
489
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs));
24,878!
490

491
  for (int j = 0; j < numOfVgs; ++j) {
24,878!
UNCOV
492
    int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
×
UNCOV
493
    if (pVgId == NULL) {
×
494
      TAOS_CHECK_EXIT(terrno);
×
495
    }
496

UNCOV
497
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId));
×
498
  }
499

500
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
49,756!
501
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts));
49,756!
502
  tEndEncode(pEncoder);
24,878✔
503

504
_exit:
24,878✔
505
  if (code) {
24,878!
506
    return code;
×
507
  } else {
508
    return pEncoder->pos;
24,878✔
509
  }
510
}
511

512
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
12,278✔
513
  int32_t code = 0;
12,278✔
514
  int32_t lino;
515

516
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
12,278!
517
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId));
24,598!
518
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks));
24,601!
519

520
  if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) {
12,302!
521
    TAOS_CHECK_EXIT(terrno);
×
522
  }
523
  for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
38,169✔
524
    int32_t          taskId = 0;
25,892✔
525
    STaskStatusEntry entry = {0};
25,892✔
526

527
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId));
25,937!
528
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
25,930!
529
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status));
25,903!
530
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage));
25,899!
531
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId));
25,895!
532
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed));
25,905!
533
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate));
25,905!
534
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal));
25,894!
535
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput));
25,884!
536
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal));
25,891!
537
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput));
25,883!
538
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota));
25,873!
539
    TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize));
25,874!
540
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer));
25,868!
541
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer));
25,862!
542
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer));
25,860!
543
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId));
25,860!
544
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed));
25,864!
545
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId));
25,862!
546

547
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId));
25,852!
548
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer));
25,856!
549
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime));
25,853!
550
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize));
25,858!
551
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup));
25,845!
552
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId));
25,848!
553
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs));
25,842!
554
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime));
25,856!
555
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId));
25,856!
556
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer));
25,851!
557
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId));
25,846!
558

559
    entry.id.taskId = taskId;
25,846✔
560
    if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) {
51,734!
561
      TAOS_CHECK_EXIT(terrno);
×
562
    }
563
  }
564

565
  int32_t numOfVgs = 0;
12,277✔
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs));
12,264!
567

568
  if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) {
12,264✔
569
    TAOS_CHECK_EXIT(terrno);
1!
570
  }
571

572
  for (int j = 0; j < numOfVgs; ++j) {
12,304!
UNCOV
573
    int32_t vgId = 0;
×
UNCOV
574
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
×
UNCOV
575
    if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) {
×
576
      TAOS_CHECK_EXIT(terrno);
×
577
    }
578
  }
579

580
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
24,602!
581
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts));
24,594!
582
  tEndDecode(pDecoder);
12,296✔
583

584
_exit:
12,278✔
585
  return code;
12,278✔
586
}
587

588
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
624,543✔
589
  if (pMsg == NULL) {
624,543!
590
    return;
×
591
  }
592

593
  if (pMsg->pUpdateNodes != NULL) {
624,543✔
594
    taosArrayDestroy(pMsg->pUpdateNodes);
619,240✔
595
    pMsg->pUpdateNodes = NULL;
619,247✔
596
  }
597

598
  if (pMsg->pTaskStatus != NULL) {
624,550✔
599
    taosArrayDestroy(pMsg->pTaskStatus);
619,246✔
600
    pMsg->pTaskStatus = NULL;
619,249✔
601
  }
602

603
  pMsg->msgId = -1;
624,553✔
604
  pMsg->vgId = -1;
624,553✔
605
  pMsg->numOfTasks = -1;
624,553✔
606
}
607

608
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
52,609✔
609
  int32_t code = 0;
52,609✔
610
  int32_t lino;
611

612
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
52,609!
613
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
105,242!
614
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
105,242!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
105,242!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.totalLevel));
105,242!
617
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
105,242!
618
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
105,242!
619
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
105,242!
620

621
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
105,242!
622
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
105,242!
623

624
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
105,242!
625
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
105,242!
626
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
52,621!
627
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
52,613!
628

629
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
105,240!
630
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
105,240!
631
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
105,240!
632

633
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
105,240!
634
  int32_t taskId = pTask->hTaskInfo.id.taskId;
52,620✔
635
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
52,620!
636

637
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
105,240!
638
  taskId = pTask->streamTaskId.taskId;
52,620✔
639
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
52,620!
640

641
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
105,240!
642
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
105,240!
643
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
105,240!
644
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
105,240!
645

646
  int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
52,620✔
647
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
52,612!
648
  for (int32_t i = 0; i < epSz; i++) {
120,908✔
649
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
68,291✔
650
    TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
68,289!
651
  }
652

653
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
52,617✔
654
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
57,456!
655
  }
656

657
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
52,617✔
658
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
52,980!
659
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
52,980!
660
    TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
52,980!
661
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
26,127✔
662
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
600!
663
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
25,827!
664
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
×
665
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
25,827✔
666
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
7,992!
667
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
7,992!
668
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
3,996!
669
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
21,831✔
670
    TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
21,778!
671
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
43,560!
672
  }
673
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
105,236!
674
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
105,236!
675
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
105,236!
676

677
  tEndEncode(pEncoder);
52,618✔
678
_exit:
52,616✔
679
  return code;
52,616✔
680
}
681

682
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
18,056✔
683
  int32_t taskId = 0;
18,056✔
684
  int32_t code = 0;
18,056✔
685
  int32_t lino;
686

687
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
18,056!
688
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
36,127!
689
  if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
18,063!
690
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
691
  }
692

693
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
36,115!
694
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
36,109!
695
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.totalLevel));
36,115!
696
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
36,113!
697
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
36,108!
698
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
36,110!
699

700
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
36,116!
701
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
36,119!
702

703
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
36,109!
704
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
36,106!
705
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
18,057!
706
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
18,064!
707

708
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
36,122!
709
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
36,120!
710
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
36,121!
711

712
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
36,122!
713
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
18,053!
714
  pTask->hTaskInfo.id.taskId = taskId;
18,053✔
715

716
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
36,109!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
18,060!
718
  pTask->streamTaskId.taskId = taskId;
18,060✔
719

720
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
36,123!
721
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
36,124!
722
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
36,122!
723
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
36,123!
724

725
  int32_t epSz = -1;
18,062✔
726
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
18,061!
727

728
  if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
18,061!
729
    TAOS_CHECK_EXIT(terrno);
×
730
  }
731
  for (int32_t i = 0; i < epSz; i++) {
41,625✔
732
    SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
23,556✔
733
    if (pInfo == NULL) {
23,536!
734
      TAOS_CHECK_EXIT(terrno);
×
735
    }
736
    if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
23,536!
737
      taosMemoryFreeClear(pInfo);
×
738
      goto _exit;
×
739
    }
740
    if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
47,122!
741
      TAOS_CHECK_EXIT(terrno);
×
742
    }
743
  }
744

745
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
18,069✔
746
    TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
19,597!
747
  }
748

749
  if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
18,072✔
750
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
18,125!
751
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
9,062!
752
    pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
9,057✔
753
    if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
9,062✔
754
      TAOS_CHECK_EXIT(terrno);
1!
755
    }
756
    TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
18,106!
757
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
9,009✔
758
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
220!
759
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
8,899!
760
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
×
761
  } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
8,899✔
762
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
2,546!
763
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
2,546!
764
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
1,273!
765
  } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
7,626✔
766
    TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
7,594!
767
    TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
7,597!
768
  }
769
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
36,111!
770
  if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
18,055!
771
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
36,115!
772
  }
773
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
18,054!
774

775
  tEndDecode(pDecoder);
18,055✔
776

777
_exit:
18,053✔
778
  return code;
18,053✔
779
}
780

781
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
5,359✔
782
  int32_t code = 0;
5,359✔
783
  int32_t lino;
784

785
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
5,359!
786
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
10,720!
787
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
10,720!
788
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
10,720!
789
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
10,720!
790
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer));
10,720!
791
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs));
10,720!
792
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
10,720!
793
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask));
10,720!
794
  tEndEncode(pEncoder);
5,360✔
795

796
_exit:
5,357✔
797
  return code;
5,357✔
798
}
799

800
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) {
2,681✔
801
  int32_t code = 0;
2,681✔
802
  int32_t lino;
803

804
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
2,681!
805
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
5,362!
806
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
5,362!
807
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
5,362!
808
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
5,362!
809
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer));
5,362!
810
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs));
5,362!
811
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
5,362!
812
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask));
5,362!
813
  tEndDecode(pDecoder);
2,681✔
814

815
_exit:
2,681✔
816
  return code;
2,681✔
817
}
818

819
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
152✔
820
  int32_t code = 0;
152✔
821
  int32_t lino;
822

823
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
152!
824
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs));
304!
825
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
304!
826
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
304!
827
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId));
304!
828
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
304!
829
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId));
304!
830
  tEndEncode(pEncoder);
152✔
831

832
_exit:
152✔
833
  if (code) {
152!
834
    return code;
×
835
  } else {
836
    return pEncoder->pos;
152✔
837
  }
838
}
839

840
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
76✔
841
  int32_t code = 0;
76✔
842
  int32_t lino;
843

844
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
76!
845
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs));
152!
846
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
152!
847
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
152!
848
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId));
152!
849
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
152!
850
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId));
152!
851
  tEndDecode(pDecoder);
76✔
852

853
_exit:
76✔
854
  return code;
76✔
855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc