• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4747

21 Sep 2025 11:53PM UTC coverage: 58.002% (-1.1%) from 59.065%
#4747

push

travis-ci

web-flow
fix: refine python taos error log matching in checkAsan.sh (#33029)

* fix: refine python taos error log matching in checkAsan.sh

* fix: improve python taos error log matching in checkAsan.sh

133398 of 293157 branches covered (45.5%)

Branch coverage included in aggregate %.

201778 of 284713 relevant lines covered (70.87%)

5539418.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.14
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "tdatablock.h"
18
#include "tmsg.h"
19
#include "os.h"
20
#include "tcommon.h"
21

22
typedef struct STaskId {
23
  int64_t streamId;
24
  int64_t taskId;
25
} STaskId;
26

27
typedef enum EWindowType {
28
  WINDOW_TYPE_INTERVAL = 1,
29
  WINDOW_TYPE_SESSION,
30
  WINDOW_TYPE_STATE,
31
  WINDOW_TYPE_EVENT,
32
  WINDOW_TYPE_COUNT,
33
  WINDOW_TYPE_ANOMALY,
34
  WINDOW_TYPE_EXTERNAL,
35
  WINDOW_TYPE_PERIOD
36
} EWindowType;
37

38
typedef struct STaskCkptInfo {
39
  int64_t latestId;          // saved checkpoint id
40
  int64_t latestVer;         // saved checkpoint ver
41
  int64_t latestTime;        // latest checkpoint time
42
  int64_t latestSize;        // latest checkpoint size
43
  int8_t  remoteBackup;      // latest checkpoint backup done
44
  int64_t activeId;          // current active checkpoint id
45
  int32_t activeTransId;     // checkpoint trans id
46
  int8_t  failed;            // denote if the checkpoint is failed or not
47
  int8_t  consensusChkptId;  // required the consensus-checkpointId
48
  int64_t consensusTs;       //
49
} STaskCkptInfo;
50

51
typedef struct STaskStatusEntry {
52
  STaskId       id;
53
  int32_t       status;
54
  int32_t       statusLastDuration;  // to record the last duration of current status
55
  int64_t       stage;
56
  int32_t       nodeId;
57
  SVersionRange verRange;      // start/end version in WAL, only valid for source task
58
  int64_t       processedVer;  // only valid for source task
59
  double        inputQUsed;    // in MiB
60
  double        inputRate;
61
  double        procsThroughput;   // duration between one element put into input queue and being processed.
62
  double        procsTotal;        // duration between one element put into input queue and being processed.
63
  double        outputThroughput;  // the size of dispatched result blocks in bytes
64
  double        outputTotal;       // the size of dispatched result blocks in bytes
65
  double        sinkQuota;         // existed quota size for sink task
66
  double        sinkDataSize;      // sink to dst data size
67
  int64_t       startTime;
68
  int64_t       startCheckpointId;
69
  int64_t       startCheckpointVer;
70
  int64_t       hTaskId;
71
  STaskCkptInfo checkpointInfo;
72
  STaskNotifyEventStat notifyEventStat;
73
} STaskStatusEntry;
74

75
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
×
76
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
×
77
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
×
78
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
×
79
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
×
80
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
×
81
  return 0;
×
82
}
83

84
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
×
85
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
×
86
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
×
87
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
×
88
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
×
89
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
×
90
  return 0;
×
91
}
92

93
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
×
94
  int32_t code = 0;
×
95
  int32_t lino;
96

97
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
98
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
×
99
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
×
100

101
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
×
102
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
103

104
  for (int32_t i = 0; i < size; ++i) {
×
105
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
×
106
    if (pInfo == NULL) {
×
107
      TAOS_CHECK_EXIT(terrno);
×
108
    }
109

110
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
×
111
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
×
112
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
×
113
  }
114

115
  // todo this new attribute will be result in being incompatible with previous version
116
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
×
117

118
  int32_t numOfTasks = taosArrayGetSize(pMsg->pTaskList);
×
119
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfTasks));
×
120

121
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
122
    int32_t* pId = taosArrayGet(pMsg->pTaskList, i);
×
123
    if (pId == NULL) {
×
124
      TAOS_CHECK_EXIT(terrno);
×
125
    }
126
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)pId));
×
127
  }
128

129
  tEndEncode(pEncoder);
×
130
_exit:
×
131
  if (code) {
×
132
    return code;
×
133
  } else {
134
    return pEncoder->pos;
×
135
  }
136
}
137

138
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
×
139
  int32_t code = 0;
×
140
  int32_t lino;
141

142
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
143
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
×
144
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
×
145

146
  int32_t size = 0;
×
147
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
148

149
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
×
150
  TSDB_CHECK_NULL(pMsg->pNodeList, code, lino, _exit, terrno);
×
151

152
  for (int32_t i = 0; i < size; ++i) {
×
153
    SNodeUpdateInfo info = {0};
×
154
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
×
155
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
×
156
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
×
157

158
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
×
159
      TAOS_CHECK_EXIT(terrno);
×
160
    }
161
  }
162

163
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
×
164

165
  // number of tasks
166
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
167
  pMsg->pTaskList = taosArrayInit(size, sizeof(int32_t));
×
168
  if (pMsg->pTaskList == NULL) {
×
169
    TAOS_CHECK_EXIT(terrno);
×
170
  }
171

172
  for (int32_t i = 0; i < size; ++i) {
×
173
    int32_t id = 0;
×
174
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &id));
×
175
    if (taosArrayPush(pMsg->pTaskList, &id) == NULL) {
×
176
      TAOS_CHECK_EXIT(terrno);
×
177
    }
178
  }
179

180
  tEndDecode(pDecoder);
×
181
_exit:
×
182
  return code;
×
183
}
184

185
void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg) {
×
186
  taosArrayDestroy(pMsg->pNodeList);
×
187
  taosArrayDestroy(pMsg->pTaskList);
×
188
  pMsg->pNodeList = NULL;
×
189
  pMsg->pTaskList = NULL;
×
190
}
×
191

192
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
×
193
  int32_t code = 0;
×
194
  int32_t lino;
195

196
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
197
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
198
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
×
202
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
×
203
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
×
204
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
205
  tEndEncode(pEncoder);
×
206

207
_exit:
×
208
  if (code) {
×
209
    return code;
×
210
  } else {
211
    return pEncoder->pos;
×
212
  }
213
}
214

215
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
×
216
  int32_t code = 0;
×
217
  int32_t lino;
218

219
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
220
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
221
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
224
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
×
225
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
×
226
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
×
227
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
228
  tEndDecode(pDecoder);
×
229

230
_exit:
×
231
  return code;
×
232
}
233

234
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
×
235
  int32_t code = 0;
×
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
×
240
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
×
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
×
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
×
244
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
×
245
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
×
246
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
×
247
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
×
248
  tEndEncode(pEncoder);
×
249

250
_exit:
×
251
  if (code) {
×
252
    return code;
×
253
  } else {
254
    return pEncoder->pos;
×
255
  }
256
}
257

258
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
×
259
  int32_t code = 0;
×
260
  int32_t lino;
261

262
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
263
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
×
264
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
×
266
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
×
267
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
×
268
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
×
269
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
×
270
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
×
271
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
×
272
  tEndDecode(pDecoder);
×
273

274
_exit:
×
275
  return code;
×
276
}
277

278
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
283
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
×
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
×
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
287
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
289
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
290
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
291
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
×
292
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
293
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
×
294
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
×
295
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
×
296

297
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
×
298
    uError("invalid dispatch req msg");
×
299
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
300
  }
301

302
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
303
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
×
304
    void*    data = taosArrayGetP(pReq->data, i);
×
305
    if (data == NULL || pLen == NULL) {
×
306
      TAOS_CHECK_EXIT(terrno);
×
307
    }
308

309
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
×
310
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
×
311
  }
312
  tEndEncode(pEncoder);
×
313
_exit:
×
314
  if (code) {
×
315
    return code;
×
316
  } else {
317
    return pEncoder->pos;
×
318
  }
319
}
320

321
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
×
322
  int32_t code = 0;
×
323
  int32_t lino;
324

325
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
326
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
×
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
×
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
330
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
332
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
333
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
334
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
×
335
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
336
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
×
337
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
×
338
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
×
339

340
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
×
341
    TAOS_CHECK_EXIT(terrno);
×
342
  }
343
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
×
344
    TAOS_CHECK_EXIT(terrno);
×
345
  }
346
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
347
    int32_t  len1;
348
    uint64_t len2;
349
    void*    data;
350
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
×
351
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
×
352

353
    if (len1 != len2) {
×
354
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
355
    }
356

357
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
×
358
      TAOS_CHECK_EXIT(terrno);
×
359
    }
360

361
    if (taosArrayPush(pReq->data, &data) == NULL) {
×
362
      TAOS_CHECK_EXIT(terrno);
×
363
    }
364
  }
365

366
  tEndDecode(pDecoder);
×
367
_exit:
×
368
  return code;
×
369
}
370

371
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
×
372
  taosArrayDestroyP(pReq->data, NULL);
×
373
  taosArrayDestroy(pReq->dataLen);
×
374
}
×
375

376
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
×
377
  int32_t code = 0;
×
378
  int32_t lino;
379

380
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
381
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
382
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
383
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
×
384
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
×
385
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
×
386
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
×
387
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
×
388
  tEndEncode(pEncoder);
×
389

390
_exit:
×
391
  if (code) {
×
392
    return code;
×
393
  } else {
394
    return pEncoder->pos;
×
395
  }
396
}
397

398
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
×
399
  int32_t code = 0;
×
400
  int32_t lino;
401

402
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
403
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
404
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
×
406
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
×
407
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
×
408
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
×
409
  uint64_t len = 0;
×
410
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
×
411
  pReq->retrieveLen = (int32_t)len;
×
412
  tEndDecode(pDecoder);
×
413

414
_exit:
×
415
  return code;
×
416
}
417

418
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
×
419

420

421
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
258✔
422
  int32_t code = 0;
258✔
423
  int32_t lino = 0;
258✔
424
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
516!
425
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
516!
426
  switch (pReq->type) {
258!
427
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
258✔
428
      if (pReq->cont.fullTableNames) {
258!
429
        int32_t num = taosArrayGetSize(pReq->cont.fullTableNames);
258✔
430
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
258!
431
        for (int32_t i = 0; i < num; ++i) {
912✔
432
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.fullTableNames, i);
654✔
433
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
1,308!
434
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
1,308!
435
        }
436
      } else {
437
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
438
      }
439
      break;
258✔
440
    }
441
    default:
×
442
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
443
      break;
×
444
  }
445

446
_exit:
258✔
447

448
  return code;
258✔
449
}
450

451
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
387✔
452
  if (NULL == pReq) {
387✔
453
    return;
129✔
454
  }
455

456
  taosArrayDestroy(pReq->cont.fullTableNames);
258✔
457
}
458

459

460
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
129✔
461
  *ppDst = NULL;
129✔
462
  
463
  if (NULL == pSrc) {
129!
464
    return TSDB_CODE_SUCCESS;
×
465
  }
466

467
  int32_t code = 0, lino = 0;
129✔
468
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
129!
469
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
129!
470

471
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
129✔
472
  if (pSrc->cont.fullTableNames) {
129!
473
    (*ppDst)->cont.fullTableNames = taosArrayDup(pSrc->cont.fullTableNames, NULL);
129✔
474
    TSDB_CHECK_NULL((*ppDst)->cont.fullTableNames, code, lino, _exit, terrno);
129!
475
  }
476
  
477
_exit:
129✔
478

479
  if (code) {
129!
480
    tFreeSStreamMgmtReq(*ppDst);
×
481
    taosMemoryFreeClear(*ppDst);
×
482
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
483
  }
484
  
485
  return code;
129✔
486
}
487

488

489
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
129✔
490
  int32_t code = 0;
129✔
491
  int32_t lino = 0;
129✔
492

493
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
258!
494
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
258!
495
  switch (pReq->type) {
129!
496
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
129✔
497
      int32_t num = 0;
129✔
498
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
129!
499
      if (num > 0) {
129!
500
        pReq->cont.fullTableNames = taosArrayInit(num, sizeof(SStreamDbTableName));
129✔
501
        TSDB_CHECK_NULL(pReq->cont.fullTableNames, code, lino, _exit, terrno);
129!
502
        for (int32_t i = 0; i < num; ++i) {
456✔
503
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.fullTableNames, 1);
327✔
504
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
327!
505
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
327!
506
        }
507
      }
508
      break;
129✔
509
    }
510
    default:
×
511
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
512
      break;
×
513
  }
514

515
_exit:
129✔
516

517
  return code;  
129✔
518
}
519

520
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
63,580✔
521
  int32_t code = 0;
63,580✔
522
  int32_t lino;
523

524
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
127,160!
525
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
127,160!
526
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
127,160!
527

528
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
127,160!
529
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
127,160!
530
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
127,160!
531
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
127,160!
532
  // SKIP SESSIONID
533
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
127,160!
534
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
127,160!
535
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
127,160!
536
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
127,160!
537
  if (pTask->pMgmtReq) {
63,580✔
538
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
258!
539
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
258!
540
  } else {
541
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
63,322!
542
  }
543

544
_exit:
63,322✔
545

546
  return code;
63,580✔
547
}
548

549

550
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
30,419✔
551
  int32_t code = 0;
30,419✔
552
  int32_t lino;
553

554
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
60,838!
555
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
60,838!
556
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
60,838!
557
  
558
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
60,838!
559
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
60,838!
560
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
60,838!
561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
60,838!
562
  // SKIP SESSIONID
563
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
60,838!
564
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
60,838!
565
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
60,838!
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
60,838!
567
  int32_t req = 0;
30,419✔
568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
30,419!
569
  if (req) {
30,419✔
570
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
129!
571
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
129!
572
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
129!
573
  }
574

575
_exit:
30,419✔
576

577
  return code;
30,419✔
578
}
579

580
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
581
  int32_t code = 0;
×
582
  int32_t lino;
583

584
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
585
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
586
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
587
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
588

589
_exit:
×
590

591
  return code;
×
592
}
593

594
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
595
  int32_t code = 0;
×
596
  int32_t lino;
597

598
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
599
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
600
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
601
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
602

603
_exit:
×
604

605
  return code;
×
606
}
607

608

609
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
4,774✔
610
  int32_t code = 0;
4,774✔
611
  int32_t lino;
612

613
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
9,548!
614
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
9,548!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
9,548!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
9,548!
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
9,548!
618

619
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
4,774✔
620
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
4,774!
621
  for (int32_t i = 0; i < recalcNum; ++i) {
4,774!
622
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
623
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
624
  }
625

626
_exit:
4,774✔
627

628
  return code;
4,774✔
629
}
630

631
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
2,261✔
632
  int32_t code = 0;
2,261✔
633
  int32_t lino;
634

635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
4,522!
636
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
4,522!
637
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
4,522!
638
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
4,522!
639
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
4,522!
640

641
  int32_t recalcNum = 0;
2,261✔
642
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
2,261!
643
  if (recalcNum > 0) {
2,261!
644
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
645
    if (NULL == pStatus->userRecalcs) {
×
646
      code = terrno;
×
647
      goto _exit;
×
648
    }
649
  }
650

651
  for (int32_t i = 0; i < recalcNum; ++i) {
2,261!
652
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
653
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
654
  }
655

656
_exit:
2,261✔
657

658
  return code;
2,261✔
659
}
660

661

662
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
72,434✔
663
  int32_t code = 0;
72,434✔
664
  int32_t lino;
665

666
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
72,434!
667
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
144,868!
668
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
144,868!
669
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
144,868!
670
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
144,868!
671

672
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
72,434✔
673
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
72,434!
674
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
263,904✔
675
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
191,470✔
676
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
382,940!
677
  }
678
  
679
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
72,434✔
680
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
72,434!
681
  for (int32_t i = 0; i < statusNum; ++i) {
130,038✔
682
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
57,604✔
683
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
57,604!
684
  }
685

686
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
72,434✔
687
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
72,434!
688
  for (int32_t i = 0; i < reqNum; ++i) {
72,692✔
689
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
258✔
690
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
516!
691
  }
692

693
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
72,434✔
694
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
72,434!
695
  for (int32_t i = 0; i < triggerNum; ++i) {
77,208✔
696
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
4,774✔
697
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
4,774!
698
  }
699
  
700
  tEndEncode(pEncoder);
72,434✔
701

702
_exit:
72,434✔
703
  if (code) {
72,434!
704
    return code;
×
705
  } else {
706
    return pEncoder->pos;
72,434✔
707
  }
708
}
709

710
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
34,454✔
711
  int32_t code = 0;
34,454✔
712
  int32_t lino;
713

714
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
34,454!
715
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
68,908!
716
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
68,908!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
68,908!
718
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
68,908!
719

720
  int32_t vgLearderNum = 0;
34,454✔
721
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
34,454!
722
  if (vgLearderNum > 0) {
34,454✔
723
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
26,315✔
724
    if (NULL == pReq->pVgLeaders) {
26,315!
725
      code = terrno;
×
726
      goto _exit;
×
727
    }
728
  }
729
  for (int32_t i = 0; i < vgLearderNum; ++i) {
126,141✔
730
    int32_t vgId = 0;
91,687✔
731
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
91,687!
732
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
183,374!
733
      code = terrno;
×
734
      goto _exit;
×
735
    }
736
  }
737

738

739
  int32_t statusNum = 0;
34,454✔
740
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
34,454!
741
  if (statusNum > 0) {
34,454✔
742
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
1,848✔
743
    if (NULL == pReq->pStreamStatus) {
1,848!
744
      code = terrno;
×
745
      goto _exit;
×
746
    }
747
  }
748
  for (int32_t i = 0; i < statusNum; ++i) {
61,885✔
749
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
27,431✔
750
    if (NULL == pTask) {
27,431!
751
      code = terrno;
×
752
      goto _exit;
×
753
    }
754
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
27,431!
755
  }
756

757

758
  int32_t reqNum = 0;
34,454✔
759
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
34,454!
760
  if (reqNum > 0) {
34,454✔
761
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
32✔
762
    if (NULL == pReq->pStreamReq) {
32!
763
      code = terrno;
×
764
      goto _exit;
×
765
    }
766
  }
767
  for (int32_t i = 0; i < reqNum; ++i) {
34,583✔
768
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
129✔
769
    if (NULL == pIdx) {
129!
770
      code = terrno;
×
771
      goto _exit;
×
772
    }
773
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
129!
774
  }
775

776

777
  int32_t triggerNum = 0;
34,454✔
778
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
34,454!
779
  if (triggerNum > 0) {
34,454✔
780
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
718✔
781
    if (NULL == pReq->pTriggerStatus) {
718!
782
      code = terrno;
×
783
      goto _exit;
×
784
    }
785
  }
786
  for (int32_t i = 0; i < triggerNum; ++i) {
36,715✔
787
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
2,261✔
788
    if (NULL == pStatus) {
2,261!
789
      code = terrno;
×
790
      goto _exit;
×
791
    }
792
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
2,261!
793
  }
794

795
  
796
  tEndDecode(pDecoder);
34,454✔
797

798
_exit:
34,454✔
799
  return code;
34,454✔
800
}
801

802
void tFreeSSTriggerRuntimeStatus(void* param) {
4,648✔
803
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
4,648✔
804
  if (NULL == pStatus) {
4,648!
805
    return;
×
806
  }
807
  taosArrayDestroy(pStatus->userRecalcs);
4,648✔
808
}
809

810
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
214,400✔
811
  if (pMsg == NULL) {
214,400!
812
    return;
×
813
  }
814

815
  taosArrayDestroy(pMsg->pVgLeaders);
214,400✔
816
  if (deepClean) {
214,400!
817
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
214,400✔
818
    for (int32_t i = 0; i < reqNum; ++i) {
214,658✔
819
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
258✔
820
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
258✔
821
      if (NULL == pTask) {
258!
822
        continue;
×
823
      }
824

825
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
258✔
826
      taosMemoryFree(pTask->pMgmtReq);
258!
827
    }
828
  }
829
  taosArrayDestroy(pMsg->pStreamReq);
214,400✔
830
  taosArrayDestroy(pMsg->pStreamStatus);
214,400✔
831
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
214,400✔
832
}
833

834
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
1,104✔
835
  int32_t code = 0;
1,104✔
836
  int32_t lino;
837

838
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
2,208!
839
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
2,208!
840
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
2,208!
841
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
2,208!
842
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
2,208!
843
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
2,208!
844
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
2,208!
845
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
846
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
2,208!
847
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
2,208!
848

849
_exit:
1,104✔
850

851
  return code;
1,104✔
852
}
853

854
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
400✔
855
  int32_t code = 0;
400✔
856
  int32_t lino;
857

858
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
800!
859
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
800!
860

861
_exit:
400✔
862

863
  return code;
400✔
864
}
865

866

867
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
1,504✔
868
  int32_t code = 0;
1,504✔
869
  int32_t lino;
870

871
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
3,008!
872
  if (pMsg->triggerReader) {
1,504✔
873
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
1,104!
874
  } else {
875
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
400!
876
  }
877
  
878
_exit:
400✔
879

880
  return code;
1,504✔
881
}
882

883
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
3,186✔
884
  int32_t code = 0;
3,186✔
885
  int32_t lino;
886

887
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
6,372!
888
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
6,372!
889
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
3,186!
890

891
_exit:
3,186✔
892

893
  return code;
3,186✔
894
}
895

896
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
2,100✔
897
  int32_t code = 0;
2,100✔
898
  int32_t lino;
899

900
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
2,100!
901
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
4,200!
902

903
_exit:
2,100✔
904

905
  return code;
2,100✔
906
}
907

908

909
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
710✔
910
  int32_t code = 0;
710✔
911
  int32_t lino;
912

913
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
1,420!
914
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
1,420!
915
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
1,420!
916
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
1,420!
917
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
1,420!
918
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
1,420!
919
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->hasPartitionBy));
1,420!
920
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
1,420!
921
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
1,420!
922

923
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
710✔
924
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
710!
925
  for (int32_t i = 0; i < addrSize; ++i) {
888✔
926
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
178✔
927
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
356!
928
  }
929
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
1,420!
930
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
1,420!
931
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
1,420!
932

933
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
1,420!
934
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
1,420!
935
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
1,420!
936
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
1,420!
937

938
  switch (pMsg->triggerType) {
710!
939
    case WINDOW_TYPE_SESSION: {
26✔
940
      // session trigger
941
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
52!
942
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
52!
943
      break;
26✔
944
    }
945
    case WINDOW_TYPE_STATE: {
256✔
946
      // state trigger
947
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
512!
948
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
512!
949
      break;
256✔
950
    }
951
    case WINDOW_TYPE_INTERVAL: {
298✔
952
      // slide trigger
953
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
596!
954
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
596!
955
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
596!
956
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
596!
957
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
596!
958
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
596!
959
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
596!
960
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
596!
961
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
596!
962
      break;
298✔
963
    }
964
    case WINDOW_TYPE_EVENT: {
72✔
965
      // event trigger
966
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
72!
967
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
72!
968

969
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
144!
970
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
144!
971

972
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
144!
973
      break;
72✔
974
    }
975
    case WINDOW_TYPE_COUNT: {
32✔
976
      // count trigger
977
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
32!
978
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
64!
979

980
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
64!
981
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
64!
982
      break;
32✔
983
    }
984
    case WINDOW_TYPE_PERIOD: {
26✔
985
      // period trigger
986
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
52!
987
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
52!
988
      break;
26✔
989
    }
990
    default:
×
991
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
992
      break;
×
993
  }
994

995
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
1,420!
996
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
1,420!
997
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
1,420!
998
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
1,420!
999
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
710✔
1000
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
1,420!
1001
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
710✔
1002
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
1,420!
1003
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
710✔
1004
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
1,420!
1005

1006
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
710✔
1007
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
710!
1008
  for (int32_t i = 0; i < readerNum; ++i) {
1,604✔
1009
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
894✔
1010
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
894!
1011
  }
1012

1013
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
710✔
1014
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
710!
1015
  for (int32_t i = 0; i < runnerNum; ++i) {
2,810✔
1016
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
2,100✔
1017
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
2,100!
1018
  }
1019

1020
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
1,420!
1021
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
1,420!
1022

1023
_exit:
710✔
1024

1025
  return code;
710✔
1026
}
1027

1028

1029
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
19,906✔
1030
  int32_t code = 0;
19,906✔
1031
  int32_t lino;
1032

1033
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
39,812!
1034
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
39,812!
1035
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
39,812!
1036
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
39,812!
1037
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
39,812!
1038
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
39,812!
1039

1040
_exit:
19,906✔
1041

1042
  return code;
19,906✔
1043
}
1044

1045

1046
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
2,222✔
1047
  int32_t code = 0;
2,222✔
1048
  int32_t lino;
1049

1050
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
4,444!
1051
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
4,444!
1052
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
4,444!
1053
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
4,444!
1054
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
4,444!
1055
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
4,444!
1056
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
4,444!
1057
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
4,444!
1058

1059
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
2,222✔
1060
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
2,222!
1061
  for (int32_t i = 0; i < addrSize; ++i) {
2,726✔
1062
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
504✔
1063
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
1,008!
1064
  }
1065
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
4,444!
1066

1067
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
2,222✔
1068
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
2,222!
1069
  for (int32_t i = 0; i < outColNum; ++i) {
11,364✔
1070
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
9,142✔
1071
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
9,142!
1072
  }
1073

1074
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
2,222✔
1075
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
2,222!
1076
  for (int32_t i = 0; i < outTagNum; ++i) {
4,022✔
1077
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1,800✔
1078
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
1,800!
1079
  }
1080

1081
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
4,444!
1082
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
4,444!
1083

1084
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
4,444!
1085
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
4,444!
1086

1087
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
2,222✔
1088
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
2,222!
1089
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
2,630✔
1090
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
408✔
1091
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
408!
1092

1093
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
816!
1094
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
816!
1095
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
816!
1096
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
816!
1097
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
816!
1098
  }
1099

1100
_exit:
2,222✔
1101

1102
  return code;
2,222✔
1103
}
1104

1105

1106
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
4,436✔
1107
  int32_t code = 0;
4,436✔
1108
  int32_t lino;
1109

1110
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
4,436!
1111
  switch (pTask->task.type) {
4,436!
1112
    case STREAM_READER_TASK:
1,504✔
1113
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
1,504!
1114
      break;
1,504✔
1115
    case STREAM_TRIGGER_TASK:
710✔
1116
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
710!
1117
      break;
710✔
1118
    case STREAM_RUNNER_TASK:
2,222✔
1119
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
2,222!
1120
      break;
2,222✔
1121
    default:
×
1122
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1123
      break;
×
1124
  }
1125
  
1126
_exit:
4,436✔
1127

1128
  return code;
4,436✔
1129
}
1130

1131

1132
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
1,000✔
1133
  int32_t code = 0;
1,000✔
1134
  int32_t lino;
1135

1136
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
2,000!
1137

1138
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
1,000✔
1139
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
1,000!
1140
  for (int32_t i = 0; i < readerNum; ++i) {
2,504✔
1141
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
1,504✔
1142
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,504!
1143
  }
1144

1145
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
2,000!
1146
  if (pStream->triggerTask) {
1,000✔
1147
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
710!
1148
  }
1149
  
1150
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
1,000✔
1151
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
1,000!
1152
  for (int32_t i = 0; i < runnerNum; ++i) {
3,222✔
1153
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
2,222✔
1154
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
2,222!
1155
  }
1156

1157
_exit:
1,000✔
1158

1159
  return code;
1,000✔
1160
}
1161

1162
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
1,540✔
1163
  int32_t code = 0;
1,540✔
1164
  int32_t lino = 0;
1,540✔
1165

1166
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
3,080!
1167

1168
_exit:
1,540✔
1169
  return code;
1,540✔
1170
}
1171

1172
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
770✔
1173
  int32_t code = 0;
770✔
1174
  int32_t lino;
1175

1176
  int32_t type = 0;
770✔
1177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
770!
1178
  pMsg->msgType = type;
770✔
1179

1180
_exit:
770✔
1181
  return code;
770✔
1182
}
1183

1184
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
942✔
1185
  int32_t code = 0;
942✔
1186
  int32_t lino;
1187

1188
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
942!
1189

1190
_exit:
942✔
1191

1192
  return code;
942✔
1193
}
1194

1195
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
942✔
1196
  int32_t code = 0;
942✔
1197
  int32_t lino;
1198

1199
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
942!
1200
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
942!
1201

1202
_exit:
942✔
1203

1204
  return code;
942✔
1205
}
1206

1207
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
340✔
1208
  int32_t code = 0;
340✔
1209
  int32_t lino;
1210

1211
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
340!
1212
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
680!
1213
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
680!
1214

1215
_exit:
340✔
1216

1217
  return code;
340✔
1218
}
1219

1220
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
340✔
1221
  int32_t code = 0;
340✔
1222
  int32_t lino;
1223

1224
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
340!
1225
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
340!
1226

1227
_exit:
340✔
1228

1229
  return code;
340✔
1230
}
1231

1232

1233
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
×
1234
  int32_t code = 0;
×
1235
  int32_t lino;
1236

1237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
×
1238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
×
1239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
×
1240

1241
_exit:
×
1242

1243
  return code;
×
1244
}
1245

1246
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
258✔
1247
  int32_t code = 0;
258✔
1248
  int32_t lino;
1249

1250
  switch (msgType) {
258!
1251
    case STREAM_MSG_ORIGTBL_READER_INFO: {
258✔
1252
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
258✔
1253
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
258!
1254

1255
      for (int32_t i = 0; i < vgNum; ++i) {
912✔
1256
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
654✔
1257
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
1,308!
1258
      }
1259

1260
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
258✔
1261
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
258!
1262
      
1263
      for (int32_t i = 0; i < readerNum; ++i) {
450✔
1264
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
192✔
1265
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
192!
1266
      }
1267
      break;
258✔
1268
    }
1269
    case STREAM_MSG_UPDATE_RUNNER: {
×
1270
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
1271
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
1272
      
1273
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1274
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1275
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1276
      }
1277
      break;
×
1278
    }
1279
    case STREAM_MSG_USER_RECALC:{
×
1280
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
×
1281
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
×
1282
      
1283
      for (int32_t i = 0; i < recalcNum; ++i) {
×
1284
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
×
1285
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
×
1286
      }
1287
      break;
×
1288
    }
1289
    default:
×
1290
      break;
×
1291
  }
1292

1293
_exit:
258✔
1294

1295
  return code;
258✔
1296
}
1297

1298
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
258✔
1299
  int32_t code = 0;
258✔
1300
  int32_t lino;
1301

1302
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
258!
1303
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
516!
1304
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
516!
1305
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
258!
1306
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
258!
1307

1308
_exit:
258✔
1309

1310
  return code;
258✔
1311
}
1312

1313

1314
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
68,308✔
1315
  int32_t code = 0;
68,308✔
1316
  int32_t lino;
1317

1318
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
68,308!
1319
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
136,616!
1320
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
68,308✔
1321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
68,308!
1322
  for (int32_t i = 0; i < deployNum; ++i) {
69,308✔
1323
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
1,000✔
1324
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
1,000!
1325
  }
1326

1327
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
68,308✔
1328
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
68,308!
1329
  for (int32_t i = 0; i < startNum; ++i) {
69,250✔
1330
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
942✔
1331
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
942!
1332
  }
1333

1334
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
136,616!
1335
  if (!pRsp->undeploy.undeployAll) {
68,308!
1336
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
68,308✔
1337
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
68,308!
1338
    for (int32_t i = 0; i < undeployNum; ++i) {
68,648✔
1339
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
340✔
1340
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
340!
1341
    }
1342
  }
1343

1344
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
68,308✔
1345
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
68,308!
1346
  for (int32_t i = 0; i < rspNum; ++i) {
68,566✔
1347
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
258✔
1348
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
258!
1349
  }
1350
  
1351
_exit:
68,308✔
1352

1353
  tEndEncode(pEncoder);
68,308✔
1354

1355
  return code;
68,308✔
1356
}
1357

1358
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
552✔
1359
  int32_t code = 0;
552✔
1360
  int32_t lino;
1361

1362
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
1,104!
1363
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
1,104!
1364
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
1,104!
1365
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
1,104!
1366
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
1,104!
1367
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
1,104!
1368
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
1,104!
1369
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
1,104!
1370
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
1,104!
1371

1372
_exit:
552✔
1373

1374
  return code;
552✔
1375
}
1376

1377

1378
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
200✔
1379
  int32_t code = 0;
200✔
1380
  int32_t lino;
1381

1382
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
400!
1383
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
400!
1384

1385
_exit:
200✔
1386

1387
  return code;
200✔
1388
}
1389

1390

1391
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
752✔
1392
  int32_t code = 0;
752✔
1393
  int32_t lino;
1394

1395
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
1,504!
1396
  if (pMsg->triggerReader) {
752✔
1397
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
552!
1398
  } else {
1399
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
200!
1400
  }
1401
  
1402
_exit:
200✔
1403

1404
  return code;
752✔
1405
}
1406

1407

1408
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
1,593✔
1409
  int32_t code = 0;
1,593✔
1410
  int32_t lino;
1411

1412
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
3,186!
1413
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
3,186!
1414
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
1,593!
1415

1416
_exit:
1,593✔
1417

1418
  return code;
1,593✔
1419
}
1420

1421

1422
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
1,050✔
1423
  int32_t code = 0;
1,050✔
1424
  int32_t lino;
1425

1426
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
1,050!
1427
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
2,100!
1428

1429
_exit:
1,050✔
1430

1431
  return code;
1,050✔
1432
}
1433

1434

1435
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
355✔
1436
  int32_t code = 0;
355✔
1437
  int32_t lino;
1438

1439
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
710!
1440
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
710!
1441
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
710!
1442
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
710!
1443
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
710!
1444
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
710!
1445
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->hasPartitionBy));
710!
1446
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
710!
1447
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
710!
1448

1449
  int32_t addrSize = 0;
355✔
1450
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
355!
1451
  if (addrSize > 0) {
355✔
1452
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
89✔
1453
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
89!
1454
  }
1455
  for (int32_t i = 0; i < addrSize; ++i) {
444✔
1456
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
89✔
1457
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
89!
1458
  }
1459
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
710!
1460
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
710!
1461
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
710!
1462

1463
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
710!
1464
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
710!
1465
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
710!
1466
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
710!
1467

1468
  switch (pMsg->triggerType) {
355!
1469
    case WINDOW_TYPE_SESSION:
13✔
1470
      // session trigger
1471
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
26!
1472
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
26!
1473
      break;
13✔
1474
    case WINDOW_TYPE_STATE:
128✔
1475
      // state trigger
1476
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
256!
1477
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
256!
1478
      break;
128✔
1479
    
1480
    case WINDOW_TYPE_INTERVAL:
149✔
1481
      // slide trigger
1482
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
298!
1483
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
298!
1484
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
298!
1485
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
298!
1486
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
298!
1487
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
298!
1488
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
298!
1489
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
298!
1490
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
298!
1491
      break;
149✔
1492
    
1493
    case WINDOW_TYPE_EVENT:
36✔
1494
      // event trigger
1495
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
72!
1496
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
72!
1497
      
1498
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
72!
1499
      break;
36✔
1500
    
1501
    case WINDOW_TYPE_COUNT:
16✔
1502
      // count trigger
1503
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
32!
1504
      
1505
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
32!
1506
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
32!
1507
      break;
16✔
1508
    
1509
    case WINDOW_TYPE_PERIOD:
13✔
1510
      // period trigger
1511
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
26!
1512
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
26!
1513
      break;
13✔
1514
    default:
×
1515
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1516
      break;
×
1517
  }
1518

1519
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
710!
1520
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
710!
1521
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
710!
1522
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
710!
1523
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
710!
1524
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
710!
1525
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
710!
1526

1527
  int32_t readerNum = 0;
355✔
1528
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
355!
1529
  if (readerNum > 0) {
355✔
1530
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
353✔
1531
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
353!
1532
  }
1533
  for (int32_t i = 0; i < readerNum; ++i) {
802✔
1534
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
447✔
1535
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
447!
1536
  }
1537

1538
  int32_t runnerNum = 0;
355✔
1539
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
355!
1540
  if (runnerNum > 0) {
355✔
1541
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
350✔
1542
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
350!
1543
  }
1544
  for (int32_t i = 0; i < runnerNum; ++i) {
1,405✔
1545
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1,050✔
1546
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
1,050!
1547
  }
1548

1549
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
710!
1550
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
710!
1551

1552
_exit:
355✔
1553

1554
  return code;
355✔
1555
}
1556

1557

1558

1559
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
9,479✔
1560
  int32_t code = 0;
9,479✔
1561
  int32_t lino;
1562

1563
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
9,479!
1564
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
18,958!
1565
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
18,958!
1566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
18,958!
1567
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
18,958!
1568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
18,958!
1569

1570
_exit:
9,479✔
1571

1572
  return code;
9,479✔
1573
}
1574

1575
void destroySStreamOutCols(void* p){
204✔
1576
  if (p == NULL) return;
204!
1577
  SStreamOutCol* col = (SStreamOutCol*)p;
204✔
1578
  taosMemoryFreeClear(col->expr);
204!
1579
}
1580

1581
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
1,111✔
1582
  int32_t code = 0;
1,111✔
1583
  int32_t lino;
1584

1585
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
2,222!
1586
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
2,222!
1587
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
2,222!
1588
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
2,222!
1589
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
2,222!
1590
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
2,222!
1591
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
2,222!
1592
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
2,222!
1593

1594
  int32_t addrSize = 0;
1,111✔
1595
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
1,111!
1596
  if (addrSize > 0) {
1,111✔
1597
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
252✔
1598
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
252!
1599
  }
1600
  for (int32_t i = 0; i < addrSize; ++i) {
1,363✔
1601
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
252✔
1602
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
252!
1603
  }
1604
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
2,222!
1605

1606
  int32_t outColNum = 0;
1,111✔
1607
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
1,111!
1608
  if (outColNum > 0) {
1,111!
1609
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
1,111✔
1610
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
1,111!
1611
  }
1612
  for (int32_t i = 0; i < outColNum; ++i) {
5,682✔
1613
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
4,571✔
1614
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
4,571!
1615
  }
1616

1617
  int32_t outTagNum = 0;
1,111✔
1618
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
1,111!
1619
  if (outTagNum > 0) {
1,111✔
1620
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
543✔
1621
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
543!
1622
  }
1623
  for (int32_t i = 0; i < outTagNum; ++i) {
2,011✔
1624
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
900✔
1625
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
900!
1626
  }
1627

1628
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
2,222!
1629
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
2,222!
1630

1631
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
2,222!
1632
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
2,222!
1633

1634
  int32_t forceOutColsSize = 0;
1,111✔
1635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
1,111!
1636
  if (forceOutColsSize > 0) {
1,111✔
1637
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
33✔
1638
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
33!
1639
  }
1640
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,315✔
1641
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
204✔
1642

1643
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
408!
1644
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
408!
1645
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
408!
1646
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
408!
1647
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
408!
1648
  }
1649

1650
_exit:
1,111✔
1651

1652
  return code;
1,111✔
1653
}
1654

1655
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
2,218✔
1656
  int32_t code = 0;
2,218✔
1657
  int32_t lino;
1658

1659
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
2,218!
1660
  switch (pTask->task.type) {
2,218!
1661
    case STREAM_READER_TASK:
752✔
1662
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
752!
1663
      break;
752✔
1664
    case STREAM_TRIGGER_TASK:
355✔
1665
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
355!
1666
      break;
355✔
1667
    case STREAM_RUNNER_TASK:
1,111✔
1668
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
1,111!
1669
      break;
1,111✔
1670
    default:
×
1671
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1672
      break;
×
1673
  }
1674
  
1675
_exit:
2,218✔
1676

1677
  return code;
2,218✔
1678
}
1679

1680

1681
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
500✔
1682
  int32_t code = 0;
500✔
1683
  int32_t lino;
1684

1685
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
1,000!
1686

1687
  int32_t readerNum = 0;
500✔
1688
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
500!
1689
  if (readerNum > 0) {
500✔
1690
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
449✔
1691
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
449!
1692
  }
1693
  for (int32_t i = 0; i < readerNum; ++i) {
1,252✔
1694
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
752✔
1695
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
752!
1696
  }
1697

1698
  int32_t triggerTask = 0;
500✔
1699
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
500!
1700
  if (triggerTask) {
500✔
1701
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
355!
1702
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
355!
1703
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
355!
1704
  }
1705
  
1706
  int32_t runnerNum = 0;
500✔
1707
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
500!
1708
  if (runnerNum > 0) {
500✔
1709
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
386✔
1710
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
386!
1711
  }
1712
  for (int32_t i = 0; i < runnerNum; ++i) {
1,611✔
1713
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
1,111✔
1714
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
1,111!
1715
  }
1716

1717
_exit:
500✔
1718

1719
  return code;
500✔
1720
}
1721

1722

1723
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
471✔
1724
  int32_t code = 0;
471✔
1725
  int32_t lino;
1726

1727
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
471!
1728

1729
_exit:
471✔
1730

1731
  return code;
471✔
1732
}
1733

1734

1735
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
471✔
1736
  int32_t code = 0;
471✔
1737
  int32_t lino;
1738

1739
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
471!
1740
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
471!
1741

1742
_exit:
471✔
1743

1744
  return code;
471✔
1745
}
1746

1747

1748
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
170✔
1749
  int32_t code = 0;
170✔
1750
  int32_t lino;
1751

1752
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
170!
1753
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
340!
1754
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
340!
1755

1756
_exit:
170✔
1757

1758
  return code;
170✔
1759
}
1760

1761

1762
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
170✔
1763
  int32_t code = 0;
170✔
1764
  int32_t lino;
1765

1766
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
170!
1767
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
170!
1768

1769
_exit:
170✔
1770

1771
  return code;
170✔
1772
}
1773

1774
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
×
1775
  int32_t code = 0;
×
1776
  int32_t lino;
1777

1778
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
×
1779
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
×
1780
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
×
1781

1782
_exit:
×
1783

1784
  return code;
×
1785
}
1786

1787
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
129✔
1788
  int32_t code = 0;
129✔
1789
  int32_t lino;
1790

1791
  switch (msgType) {
129!
1792
    case STREAM_MSG_ORIGTBL_READER_INFO: {
129✔
1793
      int32_t vgNum = 0;
129✔
1794
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
129!
1795
      if (vgNum > 0) {
129!
1796
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
129✔
1797
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
129!
1798
      }
1799
      for (int32_t i = 0; i < vgNum; ++i) {
456✔
1800
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
327✔
1801
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
327!
1802
      }
1803

1804
      int32_t readerNum = 0;
129✔
1805
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
129!
1806
      if (readerNum > 0) {
129✔
1807
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
85✔
1808
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
85!
1809
      }
1810
      for (int32_t i = 0; i < readerNum; ++i) {
225✔
1811
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
96✔
1812
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
96!
1813
      }
1814
      break;
129✔
1815
    }
1816
    case STREAM_MSG_UPDATE_RUNNER: {
×
1817
      int32_t runnerNum = 0;
×
1818
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1819
      if (runnerNum > 0) {
×
1820
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1821
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1822
      }
1823
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1824
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1825
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1826
      }
1827
      break;
×
1828
    }
1829
    case STREAM_MSG_USER_RECALC: {
×
1830
      int32_t recalcNum = 0;
×
1831
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
×
1832
      if (recalcNum > 0) {
×
1833
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
×
1834
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
×
1835
      }
1836
      for (int32_t i = 0; i < recalcNum; ++i) {
×
1837
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
×
1838
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
×
1839
      }
1840
      break;
×
1841
    }
1842
    default:
×
1843
      break;
×
1844
  }
1845

1846
_exit:
129✔
1847

1848
  return code;
129✔
1849
}
1850

1851

1852
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
129✔
1853
  int32_t code = 0;
129✔
1854
  int32_t lino;
1855

1856
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
129!
1857
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
258!
1858
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
258!
1859
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
129!
1860
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
129!
1861

1862
_exit:
129✔
1863

1864
  return code;
129✔
1865
}
1866

1867
void tFreeSStreamMgmtRsp(void* param) {
258✔
1868
  if (NULL == param) {
258!
1869
    return;
×
1870
  }
1871
  
1872
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
258✔
1873

1874
  taosArrayDestroy(pRsp->cont.vgIds);
258✔
1875
  taosArrayDestroy(pRsp->cont.readerList);
258✔
1876
  taosArrayDestroy(pRsp->cont.runnerList);
258✔
1877
  taosArrayDestroy(pRsp->cont.recalcList);
258✔
1878
}
1879

1880
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
752✔
1881
  if (NULL == pReader) {
752!
1882
    return;
×
1883
  }
1884
  
1885
  if (pReader->triggerReader) {
752✔
1886
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
552✔
1887
    taosMemoryFree(pMsg->triggerTblName);
552!
1888
    taosMemoryFree(pMsg->partitionCols);
552!
1889
    taosMemoryFree(pMsg->triggerCols);
552!
1890
    taosMemoryFree(pMsg->triggerScanPlan);
552!
1891
    taosMemoryFree(pMsg->calcCacheScanPlan);
552!
1892
  } else {
1893
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
200✔
1894
    taosMemoryFree(pMsg->calcScanPlan);
200!
1895
  }
1896
}
1897

1898
void tFreeStreamNotifyUrl(void* param) {
×
1899
  if (NULL == param) {
×
1900
    return;
×
1901
  }
1902

1903
  taosMemoryFree(*(void**)param);
×
1904
}
1905

1906
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
355✔
1907
  if (NULL == pTrigger) {
355!
1908
    return;
×
1909
  }
1910
  
1911
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
355✔
1912
  switch (pTrigger->triggerType) {
355✔
1913
    case WINDOW_TYPE_EVENT:
36✔
1914
      taosMemoryFree(pTrigger->trigger.event.startCond);
36!
1915
      taosMemoryFree(pTrigger->trigger.event.endCond);
36!
1916
      break;
36✔
1917
    case WINDOW_TYPE_COUNT:
16✔
1918
      taosMemoryFree(pTrigger->trigger.count.condCols);  
16!
1919
      break;
16✔
1920
    default:
303✔
1921
      break;
303✔
1922
  }
1923

1924
  taosMemoryFree(pTrigger->triggerPrevFilter);
355!
1925
  taosMemoryFree(pTrigger->triggerScanPlan);
355!
1926
  taosMemoryFree(pTrigger->calcCacheScanPlan);
355!
1927

1928
  taosArrayDestroy(pTrigger->readerList);
355✔
1929
  taosArrayDestroy(pTrigger->runnerList);
355✔
1930
  taosMemoryFree(pTrigger->streamName);
355!
1931
}
1932

1933
void tFreeSStreamOutCol(void* param) {
×
1934
  if (NULL == param) {
×
1935
    return;
×
1936
  }
1937

1938
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1939
  taosMemoryFree(pOut->expr);
×
1940
}
1941

1942
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
1,111✔
1943
  if (NULL == pRunner) {
1,111!
1944
    return;
×
1945
  }
1946

1947
  taosMemoryFree(pRunner->streamName);
1,111!
1948
  taosMemoryFree(pRunner->pPlan);
1,111!
1949
  taosMemoryFree(pRunner->outDBFName);
1,111!
1950
  taosMemoryFree(pRunner->outTblName);
1,111!
1951

1952
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
1,111✔
1953
  taosArrayDestroy(pRunner->outCols);
1,111✔
1954
  taosArrayDestroy(pRunner->outTags);
1,111✔
1955

1956
  taosMemoryFree(pRunner->subTblNameExpr);
1,111!
1957
  taosMemoryFree(pRunner->tagValueExpr);
1,111!
1958
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
1,111✔
1959
}
1960

1961
void tFreeSStmTaskDeploy(void* param) {
2,863✔
1962
  if (NULL == param) {
2,863✔
1963
    return;
645✔
1964
  }
1965

1966
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
2,218✔
1967
  switch (pTask->task.type)  {
2,218!
1968
    case STREAM_READER_TASK:
752✔
1969
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
752✔
1970
      break;
752✔
1971
    case STREAM_TRIGGER_TASK:
355✔
1972
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
355✔
1973
      break;
355✔
1974
    case STREAM_RUNNER_TASK:
1,111✔
1975
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
1,111✔
1976
      break;
1,111✔
1977
    default:
×
1978
      break;
×
1979
  }
1980
}
1981

1982
void tFreeSStmStreamDeploy(void* param) {
500✔
1983
  if (NULL == param) {
500!
1984
    return;
×
1985
  }
1986
  
1987
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
500✔
1988
  taosArrayDestroy(pDeploy->readerTasks);
500✔
1989
  if (pDeploy->triggerTask) {
500✔
1990
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
355✔
1991
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
355✔
1992
    taosMemoryFree(pDeploy->triggerTask);
355!
1993
  }
1994

1995
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
500✔
1996
  for (int32_t i = 0; i < runnerNum; ++i) {
1,611✔
1997
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
1,111✔
1998
    taosMemoryFree(pRunner->msg.runner.pPlan);
1,111!
1999
  }
2000
  taosArrayDestroy(pDeploy->runnerTasks);
500✔
2001
}
2002

2003
void tDeepFreeSStmStreamDeploy(void* param) {
1,000✔
2004
  if (NULL == param) {
1,000!
2005
    return;
×
2006
  }
2007
  
2008
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
1,000✔
2009
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
1,000✔
2010
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
1,000✔
2011
  taosMemoryFree(pDeploy->triggerTask);
1,000!
2012
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
1,000✔
2013
}
2014

2015

2016
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
68,908✔
2017
  if (NULL == pRsp) {
68,908!
2018
    return;
×
2019
  }
2020
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
68,908✔
2021
  taosArrayDestroy(pRsp->start.taskList);
68,908✔
2022
  taosArrayDestroy(pRsp->undeploy.taskList);
68,908✔
2023
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
68,908✔
2024
}
2025

2026
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
34,118✔
2027
  if (NULL == pRsp) {
34,118!
2028
    return;
×
2029
  }
2030
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
34,118✔
2031
  taosArrayDestroy(pRsp->start.taskList);
34,118✔
2032
  taosArrayDestroy(pRsp->undeploy.taskList);
34,118✔
2033
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
34,118✔
2034
}
2035

2036

2037

2038
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
34,118✔
2039
  int32_t code = 0;
34,118✔
2040
  int32_t lino;
2041

2042
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
34,118!
2043
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
68,236!
2044
  int32_t deployNum = 0;
34,118✔
2045
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
34,118!
2046
  if (deployNum > 0) {
34,118✔
2047
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
110✔
2048
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
110!
2049
  }
2050
  for (int32_t i = 0; i < deployNum; ++i) {
34,618✔
2051
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
500✔
2052
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
500!
2053
  }
2054

2055
  int32_t startNum = 0;
34,118✔
2056
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
34,118!
2057
  if (startNum > 0) {
34,118✔
2058
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
173✔
2059
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
173!
2060
  }
2061
  for (int32_t i = 0; i < startNum; ++i) {
34,589✔
2062
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
471✔
2063
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
471!
2064
  }
2065

2066
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
68,236!
2067
  if (!pRsp->undeploy.undeployAll) {
34,118!
2068
    int32_t undeployNum = 0;
34,118✔
2069
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
34,118!
2070
    if (undeployNum > 0) {
34,118✔
2071
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
58✔
2072
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
58!
2073
    }
2074
    for (int32_t i = 0; i < undeployNum; ++i) {
34,288✔
2075
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
170✔
2076
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
170!
2077
    }
2078
  }  
2079

2080
  int32_t rspNum = 0;
34,118✔
2081
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
34,118!
2082
  if (rspNum > 0) {
34,118✔
2083
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
32✔
2084
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
32!
2085
    for (int32_t i = 0; i < rspNum; ++i) {
161✔
2086
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
129✔
2087
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
129!
2088
    }
2089
  }
2090

2091
  tEndDecode(pDecoder);
34,118✔
2092

2093
_exit:
34,118✔
2094
  return code;
34,118✔
2095
}
2096

2097
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
2098
  int32_t code = 0;
×
2099
  int32_t lino;
2100

2101
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2102
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2103
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
2104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
2105
  tEndEncode(pEncoder);
×
2106

2107
_exit:
×
2108
  return code;
×
2109
}
2110

2111
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
2112
  int32_t code = 0;
×
2113
  int32_t lino;
2114

2115
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2116
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2117
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
2118
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
2119
  tEndDecode(pDecoder);
×
2120

2121
_exit:
×
2122
  return code;
×
2123
}
2124

2125
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
2126
  int32_t code = 0;
×
2127
  int32_t lino;
2128

2129
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2130
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2131
  tEndEncode(pEncoder);
×
2132

2133
_exit:
×
2134
  return code;
×
2135
}
2136

2137
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
2138
  int32_t code = 0;
×
2139
  int32_t lino;
2140

2141
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2142
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2143
  tEndDecode(pDecoder);
×
2144

2145
_exit:
×
2146
  return code;
×
2147

2148
}
2149

2150

2151
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
2,842✔
2152
  int32_t code = 0;
2,842✔
2153
  int32_t lino;
2154

2155
  // name part
2156
  int32_t sqlLen = pReq->sql == NULL ? 0 : (int32_t)strlen(pReq->sql) + 1;
2,842!
2157
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
2,842!
2158
  int32_t outDbLen = pReq->outDB == NULL ? 0 : (int32_t)strlen(pReq->outDB) + 1;
2,842✔
2159
  int32_t streamDBLen = pReq->streamDB == NULL ? 0 : (int32_t)strlen(pReq->streamDB) + 1;
2,842!
2160
  int32_t triggerDBLen = pReq->triggerDB == NULL ? 0 : (int32_t)strlen(pReq->triggerDB) + 1;
2,842✔
2161
  int32_t triggerTblNameLen = pReq->triggerTblName == NULL ? 0 : (int32_t)strlen(pReq->triggerTblName) + 1;
2,842✔
2162
  int32_t outTblNameLen = pReq->outTblName == NULL ? 0 : (int32_t)strlen(pReq->outTblName) + 1;
2,842✔
2163

2164
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
5,684!
2165

2166
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->name, nameLen));
5,684!
2167
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->sql, sqlLen));
5,684!
2168
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outDB, outDbLen));
5,684!
2169
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->streamDB, streamDBLen));
5,684!
2170
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerDB, triggerDBLen));
5,684!
2171
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerTblName, triggerTblNameLen));
5,684!
2172
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outTblName, outTblNameLen));
5,684!
2173

2174
  int32_t calcDbSize = (int32_t)taosArrayGetSize(pReq->calcDB);
2,842✔
2175
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcDbSize));
2,842!
2176
  for (int32_t i = 0; i < calcDbSize; ++i) {
5,664✔
2177
    const char *dbName = taosArrayGetP(pReq->calcDB, i);
2,822✔
2178
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, dbName)));
2,822!
2179
  }
2180

2181
  // trigger control part
2182
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igExists));
5,684!
2183
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerType));
5,684!
2184
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igDisorder));
5,684!
2185
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteReCalc));
5,684!
2186
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteOutTbl));
5,684!
2187
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistory));
5,684!
2188
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistoryFirst));
5,684!
2189
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->calcNotifyOnly));
5,684!
2190
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->lowLatencyCalc));
5,684!
2191
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igNoDataTrigger));
5,684!
2192

2193
  // notify part
2194
  int32_t addrSize = (int32_t)taosArrayGetSize(pReq->pNotifyAddrUrls);
2,842✔
2195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
2,842!
2196
  for (int32_t i = 0; i < addrSize; ++i) {
3,330✔
2197
    const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
488✔
2198
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, url)));
488!
2199
  }
2200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyEventTypes));
5,684!
2201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyErrorHandle));
5,684!
2202
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->notifyHistory));
5,684!
2203

2204
  // out table part
2205

2206
  // trigger cols and partition cols
2207
  int32_t filterColsLen = pReq->triggerFilterCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerFilterCols) + 1;
2,842✔
2208
  int32_t triggerColsLen = pReq->triggerCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerCols) + 1;
2,842✔
2209
  int32_t partitionColsLen = pReq->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pReq->partitionCols) + 1;
2,842✔
2210
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerFilterCols, filterColsLen));
5,684!
2211
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerCols, triggerColsLen));
5,684!
2212
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->partitionCols, partitionColsLen));
5,684!
2213

2214
  // out col
2215
  int32_t outColSize = (int32_t )taosArrayGetSize(pReq->outCols);
2,842✔
2216
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColSize));
2,842!
2217
  for (int32_t i = 0; i < outColSize; ++i) {
11,806✔
2218
    SFieldWithOptions *pField = taosArrayGet(pReq->outCols, i);
8,964✔
2219
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pField));
8,964!
2220
  }
2221

2222
  // out tag
2223
  int32_t outTagSize = (int32_t )taosArrayGetSize(pReq->outTags);
2,842✔
2224
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagSize));
2,842!
2225
  for (int32_t i = 0; i < outTagSize; ++i) {
4,478✔
2226
    SField *pField = taosArrayGet(pReq->outTags, i);
1,636✔
2227
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->type));
3,272!
2228
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
3,272!
2229
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
3,272!
2230
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
3,272!
2231
  }
2232

2233
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->maxDelay));
5,684!
2234
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->fillHistoryStartTime));
5,684!
2235
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->watermark));
5,684!
2236
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->expiredTime));
5,684!
2237

2238
  switch (pReq->triggerType) {
2,842!
2239
    case WINDOW_TYPE_SESSION: {
100✔
2240
      // session trigger
2241
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.session.slotId));
200!
2242
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.session.sessionVal));
200!
2243
      break;
100✔
2244
    }
2245
    case WINDOW_TYPE_STATE: {
592✔
2246
      // state trigger
2247
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.stateWin.slotId));
1,184!
2248
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.stateWin.trueForDuration));
1,184!
2249
      break;
592✔
2250
    }
2251
    case WINDOW_TYPE_INTERVAL: {
1,614✔
2252
      // slide trigger
2253
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.intervalUnit));
3,228!
2254
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.slidingUnit));
3,228!
2255
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.offsetUnit));
3,228!
2256
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.soffsetUnit));
3,228!
2257
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.precision));
3,228!
2258
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.interval));
3,228!
2259
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.offset));
3,228!
2260
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.sliding));
3,228!
2261
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.soffset));
3,228!
2262
      break;
1,614✔
2263
    }
2264
    case WINDOW_TYPE_EVENT: {
192✔
2265
      // event trigger
2266
      int32_t eventWindowStartCondLen = pReq->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.startCond) + 1;
192!
2267
      int32_t eventWindowEndCondLen = pReq->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.endCond) + 1;
192!
2268

2269
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.startCond, eventWindowStartCondLen));
384!
2270
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.endCond, eventWindowEndCondLen));
384!
2271

2272
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.event.trueForDuration));
384!
2273
      break;
192✔
2274
    }
2275
    case WINDOW_TYPE_COUNT: {
160✔
2276
      // count trigger
2277
      int32_t countWindowCondColsLen = pReq->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.count.condCols) + 1;
160!
2278
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.count.condCols, countWindowCondColsLen));
320!
2279

2280
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.countVal));
320!
2281
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.sliding));
320!
2282
      break;
160✔
2283
    }
2284
    case WINDOW_TYPE_PERIOD: {
184✔
2285
      // period trigger
2286
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.precision));
368!
2287
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.periodUnit));
368!
2288
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.offsetUnit));
368!
2289
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.period));
368!
2290
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.offset));
368!
2291
      break;
184✔
2292
    }
2293
  }
2294

2295
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerTblType));
5,684!
2296
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblUid));
5,684!
2297
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblSuid));
5,684!
2298
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->vtableCalc));
5,684!
2299
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outTblType));
5,684!
2300
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outStbExists));
5,684!
2301
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->outStbUid));
5,684!
2302
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outStbSversion));
5,684!
2303
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->eventTypes));
5,684!
2304
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->flags));
5,684!
2305
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->tsmaId));
5,684!
2306
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->placeHolderBitmap));
5,684!
2307
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->calcTsSlotId));
5,684!
2308
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->triTsSlotId));
5,684!
2309

2310
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->triggerTblVgId));
5,684!
2311
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outTblVgId));
5,684!
2312

2313
  int32_t triggerScanPlanLen = pReq->triggerScanPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerScanPlan) + 1;
2,842✔
2314
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerScanPlan, triggerScanPlanLen));
5,684!
2315

2316
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerHasPF));
5,684!
2317
  int32_t triggerFilterLen = pReq->triggerPrevFilter == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerPrevFilter) + 1;
2,842✔
2318
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerPrevFilter, triggerFilterLen));
5,684!
2319

2320
  int32_t calcScanPlanListSize = (int32_t)taosArrayGetSize(pReq->calcScanPlanList);
2,842✔
2321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcScanPlanListSize));
2,842!
2322
  for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
5,820✔
2323
    SStreamCalcScan* pCalcScanPlan = (SStreamCalcScan*)taosArrayGet(pReq->calcScanPlanList, i);
2,978✔
2324
    int32_t          vgListSize = (int32_t)taosArrayGetSize(pCalcScanPlan->vgList);
2,978✔
2325
    int32_t          scanPlanLen = pCalcScanPlan->scanPlan == NULL ? 0 : (int32_t)strlen((char*)pCalcScanPlan->scanPlan) + 1;
2,978!
2326
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgListSize));
2,978!
2327
    for (int32_t j = 0; j < vgListSize; ++j) {
5,956✔
2328
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pCalcScanPlan->vgList, j)));
5,956!
2329
    }
2330
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pCalcScanPlan->readFromCache));
5,956!
2331
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCalcScanPlan->scanPlan, scanPlanLen));
5,956!
2332
  }
2333

2334
  int32_t calcPlanLen = pReq->calcPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->calcPlan) + 1;
2,842✔
2335
  int32_t subTblNameExprLen = pReq->subTblNameExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->subTblNameExpr) + 1;
2,842✔
2336
  int32_t tagValueExprLen = pReq->tagValueExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->tagValueExpr) + 1;
2,842✔
2337

2338
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfCalcSubplan));
5,684!
2339
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->calcPlan, calcPlanLen));
5,684!
2340
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->subTblNameExpr, subTblNameExprLen));
5,684!
2341
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->tagValueExpr, tagValueExprLen));
5,684!
2342

2343
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pReq->forceOutCols);
2,842✔
2344
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
2,842!
2345
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
3,114✔
2346
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pReq->forceOutCols, i);
272✔
2347
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
272!
2348

2349
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
544!
2350
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
544!
2351
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
544!
2352
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
544!
2353
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
544!
2354
  }
2355

2356
_exit:
2,842✔
2357

2358
  if (code) {
2,842!
2359
    return code;
×
2360
  }
2361
  
2362
  return 0;
2,842✔
2363
}
2364

2365
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
1,452✔
2366
  SEncoder encoder = {0};
1,452✔
2367
  tEncoderInit(&encoder, buf, bufLen);
1,452✔
2368
  int32_t code = 0;
1,452✔
2369
  int32_t lino;
2370

2371
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,452!
2372

2373
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
1,452!
2374

2375
  tEndEncode(&encoder);
1,452✔
2376

2377
_exit:
1,452✔
2378
  if (code) {
1,452!
2379
    tEncoderClear(&encoder);
×
2380
    return code;
×
2381
  } else {
2382
    int32_t tlen = encoder.pos;
1,452✔
2383
    tEncoderClear(&encoder);
1,452✔
2384
    return tlen;
1,452✔
2385
  }
2386
  return 0;
2387
}
2388

2389

2390
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
1,145✔
2391
  int32_t code = 0;
1,145✔
2392
  int32_t lino;
2393

2394
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
2,290!
2395

2396
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
2,290!
2397
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
2,290!
2398
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
2,290!
2399
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
2,290!
2400
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
2,290!
2401
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
2,290!
2402
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
2,290!
2403

2404
  int32_t calcDbSize = 0;
1,145✔
2405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
1,145!
2406
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
1,145✔
2407
  if (pReq->calcDB == NULL) {
1,145!
2408
    TAOS_CHECK_EXIT(terrno);
×
2409
  }
2410
  for (int32_t i = 0; i < calcDbSize; ++i) {
2,280✔
2411
    char *calcDb = NULL;
1,135✔
2412
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
1,135!
2413
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
1,135!
2414
    if (calcDb == NULL) {
1,135!
2415
      TAOS_CHECK_EXIT(terrno);
×
2416
    }
2417
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
2,270!
2418
      taosMemoryFree(calcDb);
×
2419
      TAOS_CHECK_EXIT(terrno);
×
2420
    }
2421
  }
2422

2423
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
2,290!
2424
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
2,290!
2425
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
2,290!
2426
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
2,290!
2427
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
2,290!
2428
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
2,290!
2429
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
2,290!
2430
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
2,290!
2431
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
2,290!
2432
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
2,290!
2433

2434
  int32_t addrSize = 0;
1,145✔
2435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
1,145!
2436
  if (addrSize > 0) {
1,145✔
2437
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
196✔
2438
    if (pReq->pNotifyAddrUrls == NULL) {
196!
2439
      TAOS_CHECK_EXIT(terrno);
×
2440
    }
2441
  }
2442
  for (int32_t i = 0; i < addrSize; ++i) {
1,356✔
2443
    char *url = NULL;
211✔
2444
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
211!
2445
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
211!
2446
    if (url == NULL) {
211!
2447
      TAOS_CHECK_EXIT(terrno);
×
2448
    }
2449
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
422!
2450
      taosMemoryFree(url);
×
2451
      TAOS_CHECK_EXIT(terrno);
×
2452
    }
2453
  }
2454
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
2,290!
2455
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyErrorHandle));
2,290!
2456
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
2,290!
2457

2458
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
2,290!
2459
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
2,290!
2460
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
2,290!
2461

2462
  int32_t outColSize = 0;
1,145✔
2463
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
1,145!
2464
  if (outColSize > 0) {
1,145✔
2465
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
1,135✔
2466
    if (pReq->outCols == NULL) {
1,135!
2467
      TAOS_CHECK_EXIT(terrno);
×
2468
    }
2469

2470
    for (int32_t i = 0; i < outColSize; ++i) {
5,143✔
2471
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
4,008✔
2472
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
4,008!
2473
    }
2474
  }
2475

2476
  int32_t outTagSize = 0;
1,145✔
2477
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
1,145!
2478
  if (outTagSize > 0) {
1,145✔
2479
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
459✔
2480
    if (pReq->outTags == NULL) {
459!
2481
      TAOS_CHECK_EXIT(terrno);
×
2482
    }
2483

2484
    for (int32_t i = 0; i < outTagSize; ++i) {
1,218✔
2485
      SFieldWithOptions field = {0};
759✔
2486
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
759!
2487
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
759!
2488
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
759!
2489
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
759!
2490
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
1,518!
2491
        TAOS_CHECK_EXIT(terrno);
×
2492
      }
2493
    }
2494
  }
2495

2496
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
2,290!
2497
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
2,290!
2498
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
2,290!
2499
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
2,290!
2500

2501
  switch (pReq->triggerType) {
1,145!
2502
    case WINDOW_TYPE_SESSION: {
38✔
2503
      // session trigger
2504
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
76!
2505
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
76!
2506
      break;
38✔
2507
    }
2508
      case WINDOW_TYPE_STATE: {
279✔
2509
        // state trigger
2510
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
558!
2511
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
558!
2512
        break;
279✔
2513
      }
2514
      case WINDOW_TYPE_INTERVAL: {
628✔
2515
        // slide trigger
2516
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
1,256!
2517
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
1,256!
2518
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
1,256!
2519
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
1,256!
2520
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
1,256!
2521
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
1,256!
2522
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
1,256!
2523
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
1,256!
2524
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
1,256!
2525
        break;
628✔
2526
      }
2527
      case WINDOW_TYPE_EVENT: {
84✔
2528
        // event trigger
2529
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
168!
2530
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
168!
2531
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
168!
2532
        break;
84✔
2533
      }
2534
      case WINDOW_TYPE_COUNT: {
56✔
2535
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
112!
2536

2537
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
112!
2538
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
112!
2539
        break;
56✔
2540
      }
2541
      case WINDOW_TYPE_PERIOD: {
60✔
2542
        // period trigger
2543
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
120!
2544
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
120!
2545
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
120!
2546
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
120!
2547
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
120!
2548
        break;
60✔
2549
      }
2550
      default:
×
2551
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2552
  }
2553

2554
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
2,290!
2555
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
2,290!
2556
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
2,290!
2557
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
2,290!
2558
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
2,290!
2559
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
2,290!
2560
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
2,290!
2561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
2,290!
2562
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
2,290!
2563
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
2,290!
2564
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
2,290!
2565
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
2,290!
2566
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
2,290!
2567
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
2,290!
2568

2569
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
2,290!
2570
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
2,290!
2571

2572
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
2,290!
2573

2574
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
2,290!
2575
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
2,290!
2576

2577
  int32_t calcScanPlanListSize = 0;
1,145✔
2578
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
1,145!
2579
  if (calcScanPlanListSize > 0) {
1,145✔
2580
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
1,135✔
2581
    if (pReq->calcScanPlanList == NULL) {
1,135!
2582
      TAOS_CHECK_EXIT(terrno);
×
2583
    }
2584
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
2,368✔
2585
      SStreamCalcScan calcScan = {0};
1,233✔
2586
      int32_t         vgListSize = 0;
1,233✔
2587
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
1,233!
2588
      if (vgListSize > 0) {
1,233!
2589
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
1,233✔
2590
        if (calcScan.vgList == NULL) {
1,233!
2591
          TAOS_CHECK_EXIT(terrno);
×
2592
        }
2593
        for (int32_t j = 0; j < vgListSize; ++j) {
2,466✔
2594
          int32_t vgId = 0;
1,233✔
2595
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
1,233!
2596
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
2,466!
2597
            TAOS_CHECK_EXIT(terrno);
×
2598
          }
2599
        }
2600
      }
2601
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
1,233!
2602
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
1,233!
2603
      taosArrayPush(pReq->calcScanPlanList, &calcScan);
1,233✔
2604
    }
2605
  }
2606

2607
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
2,290!
2608
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
2,290!
2609
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
2,290!
2610
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
2,290!
2611

2612
  int32_t forceOutColsSize = 0;
1,145✔
2613
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
1,145!
2614
  if (forceOutColsSize > 0) {
1,145✔
2615
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
22✔
2616
    if (pReq->forceOutCols == NULL) {
22!
2617
      TAOS_CHECK_EXIT(terrno);
×
2618
    }
2619
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
158✔
2620
      SStreamOutCol outCol = {0};
136✔
2621
      int64_t       exprLen = 0;
136✔
2622
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
136!
2623
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
136!
2624
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
136!
2625
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
136!
2626
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
136!
2627
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
272!
2628
        TAOS_CHECK_EXIT(terrno);
×
2629
      }
2630
    }
2631
  }
2632

2633
_exit:
1,145✔
2634

2635
  return code;
1,145✔
2636
}
2637

2638

2639
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
764✔
2640
  SDecoder decoder = {0};
764✔
2641
  tDecoderInit(&decoder, buf, bufLen);
764✔
2642
  int32_t code = 0;
764✔
2643
  int32_t lino;
2644

2645
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
764!
2646
  
2647
  TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImpl(&decoder, pReq));
764!
2648

2649
  tEndDecode(&decoder);
764✔
2650

2651
_exit:
764✔
2652

2653
  tDecoderClear(&decoder);
764✔
2654
  return code;
764✔
2655
}
2656

2657

2658
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
×
2659
  int32_t  code = 0;
×
2660
  int32_t  lino;
2661
  int32_t  tlen;
2662
  SEncoder encoder = {0};
×
2663
  tEncoderInit(&encoder, buf, bufLen);
×
2664

2665
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2666

2667
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2668
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2669
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2670

2671
  tEndEncode(&encoder);
×
2672

2673
_exit:
×
2674
  if (code) {
×
2675
    tlen = code;
×
2676
  } else {
2677
    tlen = encoder.pos;
×
2678
  }
2679
  tEncoderClear(&encoder);
×
2680
  return tlen;
×
2681
}
2682

2683
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
29✔
2684
  SDecoder decoder = {0};
29✔
2685
  int32_t  code = 0;
29✔
2686
  int32_t  lino;
2687
  tDecoderInit(&decoder, buf, bufLen);
29✔
2688

2689
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
29!
2690
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
58!
2691
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
58!
2692

2693
  tEndDecode(&decoder);
29✔
2694

2695
_exit:
29✔
2696
  tDecoderClear(&decoder);
29✔
2697
  return code;
29✔
2698
}
2699

2700
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
29✔
2701
  taosMemoryFreeClear(pReq->name);
29!
2702
}
29✔
2703

2704
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
2,422✔
2705
  if (pScan == NULL) {
2,422!
2706
    return;
×
2707
  }
2708
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
2,422✔
2709
  taosArrayDestroy(pCalcScan->vgList);
2,422✔
2710
  taosMemoryFreeClear(pCalcScan->scanPlan);
2,422!
2711
}
2712

2713
void tFreeStreamOutCol(void* pCol) {
204✔
2714
  if (pCol == NULL) {
204!
2715
    return;
×
2716
  }
2717
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
204✔
2718
  taosMemoryFreeClear(pOutCol->expr);
204!
2719
}
2720

2721

2722

2723
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
3,204✔
2724
  if (NULL == pReq) {
3,204✔
2725
    return;
401✔
2726
  }
2727
  taosMemoryFreeClear(pReq->name);
2,803!
2728
  taosMemoryFreeClear(pReq->sql);
2,803!
2729
  taosMemoryFreeClear(pReq->streamDB);
2,803!
2730
  taosMemoryFreeClear(pReq->triggerDB);
2,803!
2731
  taosMemoryFreeClear(pReq->outDB);
2,803!
2732
  taosMemoryFreeClear(pReq->triggerTblName);
2,803!
2733
  taosMemoryFreeClear(pReq->outTblName);
2,803!
2734

2735
  taosArrayDestroyP(pReq->calcDB, NULL);
2,803✔
2736
  pReq->calcDB = NULL;
2,803✔
2737
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
2,803✔
2738
  pReq->pNotifyAddrUrls = NULL;
2,803✔
2739

2740
  taosMemoryFreeClear(pReq->triggerFilterCols);
2,803!
2741
  taosMemoryFreeClear(pReq->triggerCols);
2,803!
2742
  taosMemoryFreeClear(pReq->partitionCols);
2,803!
2743

2744
  taosArrayDestroy(pReq->outTags);
2,803✔
2745
  pReq->outTags = NULL;
2,803✔
2746
  taosArrayDestroy(pReq->outCols);
2,803✔
2747
  pReq->outCols = NULL;
2,803✔
2748

2749
  switch (pReq->triggerType) {
2,803✔
2750
    case WINDOW_TYPE_EVENT:
156✔
2751
      taosMemoryFreeClear(pReq->trigger.event.startCond);
156!
2752
      taosMemoryFreeClear(pReq->trigger.event.endCond);
156!
2753
      break;
156✔
2754
    default:
2,647✔
2755
      break;
2,647✔
2756
  }
2757

2758
  taosMemoryFreeClear(pReq->triggerScanPlan);
2,803!
2759
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
2,803✔
2760
  pReq->calcScanPlanList = NULL;
2,803✔
2761
  taosMemoryFreeClear(pReq->triggerPrevFilter);
2,803!
2762

2763
  taosMemoryFreeClear(pReq->calcPlan);
2,803!
2764
  taosMemoryFreeClear(pReq->subTblNameExpr);
2,803!
2765
  taosMemoryFreeClear(pReq->tagValueExpr);
2,803!
2766
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
2,803✔
2767
  pReq->forceOutCols = NULL;
2,803✔
2768
}
2769

2770
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
329✔
2771
  int32_t code = 0, lino = 0;
329✔
2772
  if (NULL == pSrc) {
329!
2773
    return code;
×
2774
  } 
2775

2776
  void* p = NULL;
329✔
2777
  int32_t num = 0;
329✔
2778
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
329!
2779
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
329!
2780

2781
  SCMCreateStreamReq* pDst = *ppDst;
329✔
2782

2783
  if (pSrc->outDB) {
329✔
2784
    pDst->outDB = COPY_STR(pSrc->outDB);
324!
2785
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
324!
2786
  }
2787
  
2788
  if (pSrc->triggerTblName) {
329✔
2789
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
327!
2790
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
327!
2791
  }
2792
  
2793
  if (pSrc->outTblName) {
329✔
2794
    pDst->outTblName = COPY_STR(pSrc->outTblName);
324!
2795
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
324!
2796
  }
2797
  
2798
  if (pSrc->pNotifyAddrUrls) {
329✔
2799
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
89✔
2800
    if (num > 0) {
89!
2801
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
89✔
2802
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
89!
2803
    }
2804
    for (int32_t i = 0; i < num; ++i) {
178✔
2805
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
89!
2806
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
89!
2807
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
178!
2808
    }
2809
  }
2810
  
2811
  if (pSrc->triggerFilterCols) {
329✔
2812
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
53!
2813
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
53!
2814
  }
2815
  
2816
  if (pSrc->triggerCols) {
329✔
2817
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
316!
2818
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
316!
2819
  }
2820
  
2821
  if (pSrc->partitionCols) {
329✔
2822
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
155!
2823
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
155!
2824
  }
2825
  
2826
  if (pSrc->outCols) {
329✔
2827
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
324✔
2828
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
324!
2829
  }
2830
  
2831
  if (pSrc->outTags) {
329✔
2832
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
155✔
2833
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
155!
2834
  }
2835

2836
  pDst->triggerType = pSrc->triggerType;
329✔
2837
  
2838
  switch (pSrc->triggerType) {
329✔
2839
    case WINDOW_TYPE_EVENT:
36✔
2840
      if (pSrc->trigger.event.startCond) {
36!
2841
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
36!
2842
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
36!
2843
      }
2844
      
2845
      if (pSrc->trigger.event.endCond) {
36!
2846
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
36!
2847
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
36!
2848
      }
2849
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
36✔
2850
      break;
36✔
2851
    default:
293✔
2852
      pDst->trigger = pSrc->trigger;
293✔
2853
      break;
293✔
2854
  }
2855

2856

2857
  if (pSrc->triggerScanPlan) {
329✔
2858
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
327!
2859
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
327!
2860
  }
2861
  
2862
  if (pSrc->calcScanPlanList) {
329✔
2863
    num = taosArrayGetSize(pSrc->calcScanPlanList);
324✔
2864
    if (num > 0) {
324!
2865
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
324✔
2866
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
324!
2867
    }
2868
    for (int32_t i = 0; i < num; ++i) {
682✔
2869
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
358✔
2870
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
358✔
2871

2872
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
358✔
2873
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
358!
2874

2875
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
358!
2876
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
358!
2877
      
2878
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
716!
2879
    }
2880
  }
2881
  
2882
  if (pSrc->triggerPrevFilter) {
329✔
2883
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
53!
2884
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
53!
2885
  }
2886
  
2887
  if (pSrc->calcPlan) {
329✔
2888
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
324!
2889
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
324!
2890
  }
2891
  
2892
  if (pSrc->subTblNameExpr) {
329✔
2893
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
155!
2894
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
155!
2895
  }
2896
  
2897
  if (pSrc->tagValueExpr) {
329✔
2898
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
155!
2899
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
155!
2900
  }
2901
  
2902
  if (pSrc->forceOutCols) {
329✔
2903
    num = taosArrayGetSize(pSrc->forceOutCols);
11✔
2904
    if (num > 0) {
11!
2905
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
11✔
2906
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
11!
2907
    }
2908
    for (int32_t i = 0; i < num; ++i) {
79✔
2909
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
68✔
2910
      SStreamOutCol  dcol = {.type = scol->type};
68✔
2911

2912
      dcol.expr = COPY_STR(scol->expr);
68!
2913
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
68!
2914
      
2915
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
136!
2916
    }
2917
  }
2918

2919
  pDst->triggerTblUid = pSrc->triggerTblUid;
329✔
2920
  pDst->triggerTblType = pSrc->triggerTblType;
329✔
2921
  pDst->deleteReCalc = pSrc->deleteReCalc;
329✔
2922
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
329✔
2923
  
2924
_exit:
329✔
2925

2926
  if (code) {
329!
2927
    tFreeSCMCreateStreamReq(pDst);
×
2928
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2929
  }
2930

2931
  return code;
329✔
2932
}
2933

2934

2935
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
×
2936
  int32_t  code = 0;
×
2937
  int32_t  lino;
2938
  int32_t  tlen;
2939
  SEncoder encoder = {0};
×
2940
  tEncoderInit(&encoder, buf, bufLen);
×
2941
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2942

2943
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2944
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2945
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2946
  tEndEncode(&encoder);
×
2947

2948
_exit:
×
2949
  if (code) {
×
2950
    tlen = code;
×
2951
  } else {
2952
    tlen = encoder.pos;
×
2953
  }
2954
  tEncoderClear(&encoder);
×
2955
  return tlen;
×
2956
}
2957

2958
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
21✔
2959
  SDecoder decoder = {0};
21✔
2960
  int32_t  code = 0;
21✔
2961
  int32_t  lino;
2962

2963
  tDecoderInit(&decoder, buf, bufLen);
21✔
2964
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
2965
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
2966
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
42!
2967
  tEndDecode(&decoder);
21✔
2968

2969
_exit:
21✔
2970
  tDecoderClear(&decoder);
21✔
2971
  return code;
21✔
2972
}
2973

2974
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
×
2975
  taosMemoryFreeClear(pReq->name);
×
2976
}
×
2977

2978
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
×
2979
  SEncoder encoder = {0};
×
2980
  int32_t  code = 0;
×
2981
  int32_t  lino;
2982
  int32_t  tlen;
2983
  tEncoderInit(&encoder, buf, bufLen);
×
2984
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2985
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2986
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2987
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2988
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
×
2989
  tEndEncode(&encoder);
×
2990

2991
_exit:
×
2992
  if (code) {
×
2993
    tlen = code;
×
2994
  } else {
2995
    tlen = encoder.pos;
×
2996
  }
2997
  tEncoderClear(&encoder);
×
2998
  return tlen;
×
2999
}
3000

3001
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
21✔
3002
  SDecoder decoder = {0};
21✔
3003
  int32_t  code = 0;
21✔
3004
  int32_t  lino;
3005

3006
  tDecoderInit(&decoder, buf, bufLen);
21✔
3007
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
3008
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
3009
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
42!
3010
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
42!
3011
  tEndDecode(&decoder);
21✔
3012

3013
_exit:
21✔
3014
  tDecoderClear(&decoder);
21✔
3015
  return code;
21✔
3016
}
3017

3018
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
×
3019
  taosMemoryFreeClear(pReq->name);
×
3020
}
×
3021

3022
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
×
3023
  SEncoder encoder = {0};
×
3024
  int32_t  code = 0;
×
3025
  int32_t  lino;
3026
  int32_t  tlen;
3027
  tEncoderInit(&encoder, buf, bufLen);
×
3028
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3029
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
3030
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
3031
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
×
3032
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
×
3033
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
×
3034
  tEndEncode(&encoder);
×
3035

3036
_exit:
×
3037
  if (code) {
×
3038
    tlen = code;
×
3039
  } else {
3040
    tlen = encoder.pos;
×
3041
  }
3042
  tEncoderClear(&encoder);
×
3043
  return tlen;
×
3044
}
3045

3046
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
21✔
3047
  SDecoder decoder = {0};
21✔
3048
  int32_t  code = 0;
21✔
3049
  int32_t  lino;
3050

3051
  tDecoderInit(&decoder, buf, bufLen);
21✔
3052
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
3053

3054
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
3055
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
42!
3056
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
42!
3057
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
42!
3058
  tEndDecode(&decoder);
21✔
3059

3060
_exit:
21✔
3061
  tDecoderClear(&decoder);
21✔
3062
  return code;
21✔
3063
}
3064

3065
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
21✔
3066
  taosMemoryFreeClear(pReq->name);
21!
3067
}
21✔
3068

3069
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
×
3070
  int32_t code = 0;
×
3071
  int32_t lino;
3072

3073
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
3074
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
×
3075
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
×
3076

3077
_exit:
×
3078
  return code;
×
3079
}
3080

3081
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
×
3082
  SEncoder encoder = {0};
×
3083
  int32_t  code = 0;
×
3084
  int32_t  lino;
3085
  int32_t  tlen;
3086
  tEncoderInit(&encoder, buf, bufLen);
×
3087

3088
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3089
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
×
3090

3091
  tEndEncode(&encoder);
×
3092

3093
_exit:
×
3094
  if (code) {
×
3095
    tlen = code;
×
3096
  } else {
3097
    tlen = encoder.pos;
×
3098
  }
3099
  tEncoderClear(&encoder);
×
3100
  return tlen;
×
3101
}
3102

3103
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
×
3104
  int32_t code = 0;
×
3105
  int32_t lino;
3106

3107
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
3108
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
×
3109
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
×
3110

3111
_exit:
×
3112
  return code;
×
3113
}
3114

3115
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
×
3116
  SDecoder decoder = {0};
×
3117
  int32_t  code = 0;
×
3118
  int32_t  lino;
3119

3120
  tDecoderInit(&decoder, (char *)buf, bufLen);
×
3121

3122
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3123
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
×
3124

3125
  tEndDecode(&decoder);
×
3126

3127
_exit:
×
3128
  tDecoderClear(&decoder);
×
3129
  return code;
×
3130
}
3131

3132
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
×
3133
  int32_t code = 0;
×
3134
  int32_t lino;
3135

3136
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
3137
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
×
3138
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
×
3139
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
×
3140

3141
_exit:
×
3142
  return code;
×
3143
}
3144

3145
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
×
3146
  SEncoder encoder = {0};
×
3147
  int32_t  code = 0;
×
3148
  int32_t  lino;
3149
  int32_t  tlen;
3150
  tEncoderInit(&encoder, buf, bufLen);
×
3151

3152
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3153
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
×
3154

3155
  tEndEncode(&encoder);
×
3156

3157
_exit:
×
3158
  if (code) {
×
3159
    tlen = code;
×
3160
  } else {
3161
    tlen = encoder.pos;
×
3162
  }
3163
  tEncoderClear(&encoder);
×
3164
  return tlen;
×
3165
}
3166

3167
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
×
3168
  int32_t code = 0;
×
3169
  int32_t lino;
3170

3171
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
3172
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
×
3173
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
×
3174
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
×
3175

3176
_exit:
×
3177
  return code;
×
3178
}
3179

3180
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
×
3181
  SDecoder decoder = {0};
×
3182
  int32_t  code = 0;
×
3183
  int32_t  lino;
3184

3185
  tDecoderInit(&decoder, buf, bufLen);
×
3186

3187
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3188
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
×
3189

3190
  tEndDecode(&decoder);
×
3191

3192
_exit:
×
3193
  tDecoderClear(&decoder);
×
3194
  return code;
×
3195
}
3196

3197
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
279✔
3198
  SEncoder encoder = {0};
279✔
3199
  int32_t  code = TSDB_CODE_SUCCESS;
279✔
3200
  int32_t  lino = 0;
279✔
3201
  int32_t  tlen = 0;
279✔
3202

3203
  tEncoderInit(&encoder, buf, bufLen);
279✔
3204
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
279!
3205

3206
  int32_t size = taosArrayGetSize(pRsp->cols);
279✔
3207
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
279!
3208
  for (int32_t i = 0; i < size; ++i) {
2,638✔
3209
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
2,358✔
3210
    if (oInfo == NULL) {
2,359!
3211
      uError("col id is NULL at index %d", i);
×
3212
      code = TSDB_CODE_INVALID_PARA;
×
3213
      goto _exit;
×
3214
    }
3215
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
4,718!
3216
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
4,718!
3217
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
4,718!
3218
  }
3219

3220
  tEndEncode(&encoder);
280✔
3221

3222
_exit:
280✔
3223
  if (code != TSDB_CODE_SUCCESS) {
280!
3224
    tlen = code;
×
3225
  } else {
3226
    tlen = encoder.pos;
280✔
3227
  }
3228
  tEncoderClear(&encoder);
280✔
3229
  return tlen;
280✔
3230
}
3231

3232
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
140✔
3233
  SDecoder decoder = {0};
140✔
3234
  int32_t  code = TSDB_CODE_SUCCESS;
140✔
3235
  int32_t  lino = 0;
140✔
3236

3237
  tDecoderInit(&decoder, buf, bufLen);
140✔
3238
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
140!
3239

3240
  int32_t size = 0;
140✔
3241
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
140!
3242
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
140✔
3243
  if (pRsp->cols == NULL) {
140!
3244
    code = terrno;
×
3245
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3246
    goto _exit;
×
3247
  }
3248
  for (int32_t i = 0; i < size; ++i) {
1,319✔
3249
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
1,180✔
3250
    if (oInfo == NULL) {
1,180!
3251
      code = terrno;
×
3252
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3253
      goto _exit;
×
3254
    }
3255
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
2,360!
3256
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
2,360!
3257
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
2,359!
3258
  }
3259

3260
  tEndDecode(&decoder);
139✔
3261

3262
_exit:
140✔
3263
  tDecoderClear(&decoder);
140✔
3264
  return code;
140✔
3265
}
3266

3267
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
30,170✔
3268
  taosArrayDestroy(pRsp->cols);
30,170✔
3269
}
30,175✔
3270

3271
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
61,260✔
3272
  if (pReq == NULL) return;
61,260!
3273
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA) {
61,260✔
3274
    SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
10,451✔
3275
    if (pRequest->cids != NULL) {
10,451!
3276
      taosArrayDestroy(pRequest->cids);
10,451✔
3277
      pRequest->cids = NULL;
10,451✔
3278
    }
3279
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
50,809✔
3280
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
353✔
3281
    if (pRequest->cids != NULL) {
353!
3282
      taosArrayDestroy(pRequest->cids);
353✔
3283
      pRequest->cids = NULL;
353✔
3284
    }
3285
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
50,456✔
3286
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
547✔
3287
    if (pRequest->cids != NULL) {
547!
3288
      taosArrayDestroy(pRequest->cids);
547✔
3289
      pRequest->cids = NULL;
548✔
3290
    }
3291
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
49,909✔
3292
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
2,141✔
3293
    if (pRequest->cids != NULL) {
2,141!
3294
      taosArrayDestroy(pRequest->cids);
2,141✔
3295
      pRequest->cids = NULL;
2,141✔
3296
    }
3297
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
47,768✔
3298
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
140✔
3299
    if (pRequest->cols != NULL) {
140!
3300
      taosArrayDestroy(pRequest->cols);
140✔
3301
      pRequest->cols = NULL;
140✔
3302
    }
3303
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
47,628✔
3304
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
140✔
3305
    if (pRequest->uids != NULL) {
140!
3306
      taosArrayDestroy(pRequest->uids);
×
3307
      pRequest->uids = NULL;
×
3308
    }
3309
  }
3310
}
3311

3312
int32_t encodeColsArray(SEncoder* encoder, SArray* cids) {
38,320✔
3313
  int32_t  code = TSDB_CODE_SUCCESS;
38,320✔
3314
  int32_t  lino = 0;
38,320✔
3315
  int32_t size = taosArrayGetSize(cids);
38,320✔
3316
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
38,319!
3317
  for (int32_t i = 0; i < size; ++i) {
156,584✔
3318
    col_id_t* pColId = taosArrayGet(cids, i);
118,266✔
3319
    if (pColId == NULL) {
118,265!
3320
      uError("col id is NULL at index %d", i);
×
3321
      code = TSDB_CODE_INVALID_PARA;
×
3322
      goto _exit;
×
3323
    }
3324
    TAOS_CHECK_EXIT(tEncodeI16(encoder, *pColId));
236,530!
3325
  }
3326
  _exit:
38,318✔
3327

3328
  return code;
38,318✔
3329
}
3330

3331
int32_t decodeColsArray(SDecoder* decoder, SArray** cids) {
19,157✔
3332
  int32_t code = TSDB_CODE_SUCCESS;
19,157✔
3333
  int32_t lino = 0;
19,157✔
3334
  int32_t size = 0;
19,157✔
3335

3336
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
19,160!
3337
  if (size > 0){
19,160✔
3338
    *cids = taosArrayInit(size, sizeof(col_id_t));
13,489✔
3339
    if (*cids == NULL) {
13,491!
3340
      code = terrno;
×
3341
      uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3342
      goto _exit;
×
3343
    }
3344
  
3345
    for (int32_t i = 0; i < size; ++i) {
72,624✔
3346
      col_id_t* pColId = taosArrayReserve(*cids, 1);
59,132✔
3347
      if (pColId == NULL) {
59,130!
3348
        code = terrno;
×
3349
        uError("failed to reserve memory for col id at index %d, errno: %d", i, code);
×
3350
        goto _exit;
×
3351
      }
3352
      TAOS_CHECK_RETURN(tDecodeI16(decoder, pColId));
59,133!
3353
    }  
3354
  }
3355
  
3356
_exit:
19,163✔
3357
  if (code != TSDB_CODE_SUCCESS) {
19,163!
3358
    taosArrayDestroy(*cids);
×
3359
    *cids = NULL;
×
3360
  }
3361
  return code;
19,163✔
3362
}
3363

3364
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
123,128✔
3365
  SEncoder encoder = {0};
123,128✔
3366
  int32_t  code = TSDB_CODE_SUCCESS;
123,128✔
3367
  int32_t  lino = 0;
123,128✔
3368
  int32_t  tlen = 0;
123,128✔
3369

3370
  tEncoderInit(&encoder, buf, bufLen);
123,128✔
3371
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
123,113!
3372

3373
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
246,154!
3374
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
246,154!
3375
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
246,154!
3376
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
246,154!
3377

3378
  switch (pReq->type) {
123,077!
3379
    case STRIGGER_PULL_SET_TABLE: {
280✔
3380
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
280✔
3381
      int32_t size = taosArrayGetSize(pRequest->uids);
280✔
3382
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
280!
3383
      for (int32_t i = 0; i < size; ++i) {
934✔
3384
        int64_t* uids = taosArrayGet(pRequest->uids, i);
654✔
3385
        if (uids == NULL) {
654!
3386
          uError("uid is NULL at index %d", i);
×
3387
          code = TSDB_CODE_INVALID_PARA;
×
3388
          goto _exit;
×
3389
        }
3390
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[0]));
1,308!
3391
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[1]));
1,308!
3392
      }
3393
      break;
280✔
3394
    }
3395
    case STRIGGER_PULL_LAST_TS: {
849✔
3396
      break;
849✔
3397
    }
3398
    case STRIGGER_PULL_FIRST_TS: {
791✔
3399
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
791✔
3400
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
1,582!
3401
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,582!
3402
      break;
791✔
3403
    }
3404
    case STRIGGER_PULL_TSDB_META: {
1,252✔
3405
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
1,252✔
3406
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
2,504!
3407
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
2,504!
3408
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,504!
3409
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
2,504!
3410
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,504!
3411
      break;
1,252✔
3412
    }
3413
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3414
      break;
×
3415
    }
3416
    case STRIGGER_PULL_TSDB_TS_DATA: {
82✔
3417
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
82✔
3418
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
164!
3419
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
164!
3420
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
164!
3421
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
164!
3422
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
164!
3423
      break;
82✔
3424
    }
3425
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
328✔
3426
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
328✔
3427
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
656!
3428
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
656!
3429
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
656!
3430
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
656!
3431
      break;
328✔
3432
    }
3433
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
352✔
3434
      break;
352✔
3435
    }
3436
    case STRIGGER_PULL_TSDB_CALC_DATA: {
57,652✔
3437
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
57,652✔
3438
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
115,304!
3439
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
115,304!
3440
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
115,304!
3441
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
115,304!
3442
      break;
57,652✔
3443
    }
3444
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3445
      break;
×
3446
    }
3447
    case STRIGGER_PULL_TSDB_DATA: {
706✔
3448
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
706✔
3449
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
1,412!
3450
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
1,412!
3451
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
1,412!
3452
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
1,412!
3453
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
706!
3454
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
1,412!
3455
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,412!
3456
      break;
706✔
3457
    }
3458
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3459
      break;
×
3460
    }
3461
    case STRIGGER_PULL_WAL_META: {
21,593✔
3462
      SSTriggerWalMetaRequest* pRequest = (SSTriggerWalMetaRequest*)pReq;
21,593✔
3463
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
43,186!
3464
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
43,186!
3465
      break;
21,593✔
3466
    }
3467
    case STRIGGER_PULL_WAL_TS_DATA:
32,235✔
3468
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3469
    case STRIGGER_PULL_WAL_CALC_DATA:
3470
    case STRIGGER_PULL_WAL_DATA: {
3471
      SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
32,235✔
3472
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
64,470!
3473
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
64,470!
3474
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
64,470!
3475
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
64,470!
3476
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
32,235!
3477
      break;
32,236✔
3478
    }
3479
    case STRIGGER_PULL_GROUP_COL_VALUE: {
1,350✔
3480
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
1,350✔
3481
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,700!
3482
      break;
1,350✔
3483
    }
3484
    case STRIGGER_PULL_VTABLE_INFO: {
1,096✔
3485
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
1,096✔
3486
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
1,096!
3487
      break;
1,096✔
3488
    }
3489
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
4,281✔
3490
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
4,281✔
3491
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
8,562!
3492
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
4,281!
3493
      break;
4,282✔
3494
    }
3495
    case STRIGGER_PULL_OTABLE_INFO: {
280✔
3496
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
280✔
3497
      int32_t size = taosArrayGetSize(pRequest->cols);
280✔
3498
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
280!
3499
      for (int32_t i = 0; i < size; ++i) {
2,640✔
3500
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
2,360✔
3501
        if (oInfo == NULL) {
2,360!
3502
          uError("col id is NULL at index %d", i);
×
3503
          code = TSDB_CODE_INVALID_PARA;
×
3504
          goto _exit;
×
3505
        }
3506
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
4,720!
3507
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
4,720!
3508
      }
3509
      break; 
280✔
3510
    }
3511
    default: {
×
3512
      uError("unknown pull type %d", pReq->type);
×
3513
      code = TSDB_CODE_INVALID_PARA;
×
3514
      break;
×
3515
    }
3516
  }
3517

3518
  tEndEncode(&encoder);
123,129✔
3519

3520
_exit:
123,110✔
3521
  if (code != TSDB_CODE_SUCCESS) {
123,110!
3522
    tlen = code;
×
3523
  } else {
3524
    tlen = encoder.pos;
123,110✔
3525
  }
3526
  tEncoderClear(&encoder);
123,110✔
3527
  return tlen;
123,124✔
3528
}
3529

3530

3531
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
61,249✔
3532
  SDecoder decoder = {0};
61,249✔
3533
  int32_t  code = TSDB_CODE_SUCCESS;
61,249✔
3534
  int32_t  lino = 0;
61,249✔
3535

3536
  tDecoderInit(&decoder, buf, bufLen);
61,249✔
3537
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
61,245!
3538

3539
  int32_t type = 0;
61,261✔
3540
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
61,260!
3541
  SSTriggerPullRequest* pBase = &(pReq->base);
61,260✔
3542
  pBase->type = type;
61,260✔
3543
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
122,522!
3544
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
122,516!
3545
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
122,499!
3546

3547
  switch (type) {
61,245!
3548
    case STRIGGER_PULL_SET_TABLE: {
140✔
3549
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
140✔
3550
      int32_t size = 0;
140✔
3551
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
140!
3552
      pRequest->uids = taosArrayInit(size, 2 * sizeof(int64_t));
140✔
3553
      if (pRequest->uids == NULL) {
140!
3554
        code = terrno;
×
3555
        uError("failed to allocate memory for uids, size: %d, errno: %d", size, code);
×
3556
        goto _exit;
×
3557
      }
3558
      for (int32_t i = 0; i < size; ++i) {
467✔
3559
        int64_t* uid = taosArrayReserve(pRequest->uids, 1);
327✔
3560
        if (uid == NULL) {
327!
3561
          code = terrno;
×
3562
          uError("failed to reserve memory for uid, size: %d, errno: %d", size, code);
×
3563
          goto _exit;
×
3564
        }
3565
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid));
327!
3566
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid + 1));
654!
3567
      }
3568
      break;
140✔
3569
    }
3570
    case STRIGGER_PULL_LAST_TS: {
425✔
3571
      break;
425✔
3572
    }
3573
    case STRIGGER_PULL_FIRST_TS: {
390✔
3574
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
390✔
3575
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
780!
3576
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
780!
3577
      break;
390✔
3578
    }
3579
    case STRIGGER_PULL_TSDB_META: {
626✔
3580
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
626✔
3581
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
1,252!
3582
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
1,252!
3583
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,252!
3584
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,252!
3585
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,252!
3586
      break;
626✔
3587
    }
3588
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3589
      break;
×
3590
    }
3591
    case STRIGGER_PULL_TSDB_TS_DATA: {
41✔
3592
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
41✔
3593
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
82!
3594
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
82!
3595
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
82!
3596
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
82!
3597
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
82!
3598
      break;
41✔
3599
    }
3600
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
164✔
3601
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
164✔
3602
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
328!
3603
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
328!
3604
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
328!
3605
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
328!
3606
      break;
164✔
3607
    }
3608
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
176✔
3609
      break;
176✔
3610
    }
3611
    case STRIGGER_PULL_TSDB_CALC_DATA: {
28,822✔
3612
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
28,822✔
3613
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
57,644!
3614
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
57,643!
3615
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
57,641!
3616
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
57,639!
3617
      break;
28,819✔
3618
    }
3619
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3620
      break;
×
3621
    }
3622
    case STRIGGER_PULL_TSDB_DATA: {
353✔
3623
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
353✔
3624
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
706!
3625
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
706!
3626
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
706!
3627
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
706!
3628
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
353!
3629
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
706!
3630
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
706!
3631
      break;
353✔
3632
    }
3633
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3634
      break;
×
3635
    }
3636
    case STRIGGER_PULL_WAL_META: {
10,477✔
3637
      SSTriggerWalMetaRequest* pRequest = &(pReq->walMetaReq);
10,477✔
3638
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
20,952!
3639
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
20,961!
3640
      break;
10,486✔
3641
    }
3642
    case STRIGGER_PULL_WAL_TS_DATA:
16,121✔
3643
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3644
    case STRIGGER_PULL_WAL_CALC_DATA:
3645
    case STRIGGER_PULL_WAL_DATA: {
3646
      SSTriggerWalDataRequest* pRequest = &(pReq->walDataReq);
16,121✔
3647
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
32,240!
3648
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
32,239!
3649
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
32,241!
3650
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
32,243!
3651
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
16,122!
3652
      break;
16,119✔
3653
    }
3654
    case STRIGGER_PULL_GROUP_COL_VALUE: {
675✔
3655
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
675✔
3656
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,350!
3657
      break;
675✔
3658
    }
3659
    case STRIGGER_PULL_VTABLE_INFO: {
547✔
3660
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
547✔
3661
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
547!
3662
      break;
547✔
3663
    }
3664
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
2,141✔
3665
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
2,141✔
3666
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
4,282!
3667
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
2,141!
3668
      break;
2,141✔
3669
    }
3670
    case STRIGGER_PULL_OTABLE_INFO: {
140✔
3671
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
140✔
3672
      int32_t size = 0;
140✔
3673
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
140!
3674
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
140✔
3675
      if (pRequest->cols == NULL) {
140!
3676
        code = terrno;
×
3677
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3678
        goto _exit;
×
3679
      }
3680
      for (int32_t i = 0; i < size; ++i) {
1,320✔
3681
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
1,180✔
3682
        if (oInfo == NULL) {
1,180!
3683
          code = terrno;
×
3684
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3685
          goto _exit;
×
3686
        }
3687
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
1,180!
3688
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
1,180!
3689
      }
3690
      break;
140✔
3691
    }
3692
    default: {
7✔
3693
      uError("unknown pull type %d", type);
7!
3694
      code = TSDB_CODE_INVALID_PARA;
×
3695
      break;
×
3696
    }
3697
  }
3698

3699
  tEndDecode(&decoder);
61,242✔
3700

3701
_exit:
61,229✔
3702
  tDecoderClear(&decoder);
61,229✔
3703
  return code;
61,260✔
3704
}
3705

3706
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo) {
59,908✔
3707
  int32_t size = taosArrayGetSize(pParams);
59,908✔
3708
  int32_t code = 0;
59,909✔
3709
  int32_t lino = 0;
59,909✔
3710
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
59,909!
3711
  for (int32_t i = 0; i < size; ++i) {
44,717,603✔
3712
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
44,660,064✔
3713
    if (param == NULL) {
44,659,667✔
3714
      TAOS_CHECK_EXIT(terrno);
1,973!
3715
    }
3716

3717
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevTs));
89,315,388!
3718
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->currentTs));
89,315,388!
3719
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextTs));
89,315,388!
3720

3721
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wstart));
89,315,388!
3722
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wend));
89,315,388!
3723
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wduration));
89,315,388!
3724
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wrownum));
89,315,388!
3725

3726
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevLocalTime));
89,315,388!
3727
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextLocalTime));
89,315,388!
3728

3729
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->triggerTime));
89,315,388!
3730
    if (!ignoreNotificationInfo) {
44,657,694✔
3731
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
1,146,540!
3732
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
573,270✔
3733
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
1,146,540!
3734
    }
3735
  }
3736
_exit:
57,539✔
3737
  return code;
57,539✔
3738
}
3739

3740
void tDestroySSTriggerCalcParam(void* ptr) {
22,625,715✔
3741
  SSTriggerCalcParam* pParam = ptr;
22,625,715✔
3742
  if (pParam && pParam->extraNotifyContent != NULL) {
22,625,715!
3743
    taosMemoryFreeClear(pParam->extraNotifyContent);
358!
3744
  }
3745
  if (pParam && pParam->resultNotifyContent != NULL) {
22,625,715!
3746
    taosMemoryFreeClear(pParam->resultNotifyContent);
55!
3747
  }
3748
}
22,625,715✔
3749

3750
void tDestroySStreamGroupValue(void* ptr) {
56,220✔
3751
  SStreamGroupValue* pValue = ptr;
56,220✔
3752
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
56,220!
3753
    taosMemoryFreeClear(pValue->data.pData);
46,509!
3754
    pValue->data.nData = 0;
46,514✔
3755
  }
3756
}
56,225✔
3757

3758
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
29,954✔
3759
  int32_t size = 0, code = 0, lino = 0;
29,954✔
3760
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
29,954!
3761
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
29,954✔
3762
  if (*ppParams == NULL) {
29,951!
3763
    TAOS_CHECK_EXIT(terrno);
×
3764
  }
3765
  for (int32_t i = 0; i < size; ++i) {
22,363,871✔
3766
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
22,333,920✔
3767
    if (param == NULL) {
22,333,924!
3768
      TAOS_CHECK_EXIT(terrno);
×
3769
    }
3770
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevTs));
44,667,849!
3771
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->currentTs));
44,667,856!
3772
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextTs));
44,667,856!
3773

3774
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wstart));
44,667,852!
3775
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wend));
44,667,854!
3776
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wduration));
44,667,847!
3777
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wrownum));
44,667,838!
3778

3779
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevLocalTime));
44,667,835!
3780
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextLocalTime));
44,667,835!
3781

3782
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->triggerTime));
44,667,838!
3783
    if (!ignoreNotificationInfo) {
22,333,920✔
3784
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
573,268!
3785
      uint64_t len = 0;
286,634✔
3786
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
573,268!
3787
    }
3788
  }
3789

3790
_exit:
29,951✔
3791
  return code;
29,951✔
3792
}
3793

3794
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
61,263✔
3795
  int32_t code = TSDB_CODE_SUCCESS;
61,263✔
3796
  int32_t lino = 0;
61,263✔
3797

3798
  int32_t size = taosArrayGetSize(pGroupColVals);
61,263✔
3799
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
61,261!
3800
  for (int32_t i = 0; i < size; ++i) {
133,444✔
3801
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
72,184✔
3802
    if (pValue == NULL) {
72,184!
3803
      TAOS_CHECK_EXIT(terrno);
×
3804
    }
3805
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
72,184!
3806
    if (pValue->isNull) {
72,184!
3807
      continue;
×
3808
    }
3809
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
72,184!
3810
    if (pValue->isTbname) {
72,183✔
3811
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
67,362!
3812
      if (vgId != -1) { pValue->vgId = vgId; }
33,681✔
3813
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
67,362!
3814
    }
3815
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
144,366!
3816
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
72,183!
3817
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
133,112!
3818
    } else {
3819
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
11,254!
3820
    }
3821
  }
3822

3823
_exit:
61,260✔
3824
  return code;
61,260✔
3825
}
3826

3827
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
30,627✔
3828
  int32_t code = TSDB_CODE_SUCCESS;
30,627✔
3829
  int32_t lino = 0;
30,627✔
3830
  int32_t size = 0;
30,627✔
3831

3832
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
30,627!
3833
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
30,627✔
3834
  if (size > 0) {
30,626✔
3835
    if (*ppGroupColVals == NULL) {
21,727✔
3836
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
21,052✔
3837
      if (*ppGroupColVals == NULL) {
21,051!
3838
        TAOS_CHECK_EXIT(terrno);
×
3839
      }
3840
    } else {
3841
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
675!
3842
    }
3843
  }
3844
  for (int32_t i = 0; i < size; ++i) {
66,714✔
3845
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
36,087✔
3846
    if (pValue == NULL) {
36,087!
3847
      TAOS_CHECK_EXIT(terrno);
×
3848
    }
3849
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
36,087!
3850
    if (pValue->isNull) {
36,087!
3851
      continue;
×
3852
    }
3853
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
36,087!
3854
    if (pValue->isTbname) {
36,087✔
3855
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
33,676!
3856
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
33,676!
3857
    }
3858
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
72,173!
3859
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
36,086!
3860
      uint64_t len = 0;
33,273✔
3861
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
66,548!
3862
      pValue->data.nData = len;
33,275✔
3863
    } else {
3864
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
5,626!
3865
    }
3866
  }
3867
_exit:
30,627✔
3868
  return code;
30,627✔
3869
}
3870

3871
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
1,350✔
3872
  SEncoder encoder = {0};
1,350✔
3873
  int32_t  code = TSDB_CODE_SUCCESS;
1,350✔
3874
  int32_t  lino = 0;
1,350✔
3875
  int32_t  tlen = 0;
1,350✔
3876

3877
  tEncoderInit(&encoder, buf, bufLen);
1,350✔
3878
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,350!
3879

3880
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
1,350!
3881

3882
  tEndEncode(&encoder);
1,350✔
3883

3884
_exit:
1,350✔
3885
  if (code != TSDB_CODE_SUCCESS) {
1,350!
3886
    tlen = code;
×
3887
  } else {
3888
    tlen = encoder.pos;
1,350✔
3889
  }
3890
  tEncoderClear(&encoder);
1,350✔
3891
  return tlen;
1,350✔
3892
}
3893

3894
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
675✔
3895
  SDecoder decoder = {0};
675✔
3896
  int32_t  code = TSDB_CODE_SUCCESS;
675✔
3897
  int32_t  lino = 0;
675✔
3898
  int32_t  size = 0;
675✔
3899

3900
  tDecoderInit(&decoder, buf, bufLen);
675✔
3901
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
675!
3902

3903
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
675!
3904

3905
  tEndDecode(&decoder);
675✔
3906

3907
_exit:
675✔
3908
  tDecoderClear(&decoder);
675✔
3909
  return code;
675✔
3910
}
3911

3912
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
2,938✔
3913
  SEncoder encoder = {0};
2,938✔
3914
  int32_t  code = TSDB_CODE_SUCCESS;
2,938✔
3915
  int32_t  lino = 0;
2,938✔
3916
  int32_t  tlen = 0;
2,938✔
3917

3918
  tEncoderInit(&encoder, buf, bufLen);
2,938✔
3919
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,938!
3920

3921
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
5,876!
3922
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
5,876!
3923
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
5,876!
3924
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
5,876!
3925
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
5,876!
3926

3927
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false));
2,938!
3928
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
2,938!
3929
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
5,876!
3930

3931
  tEndEncode(&encoder);
2,938✔
3932

3933
_exit:
2,938✔
3934
  if (code != TSDB_CODE_SUCCESS) {
2,938!
3935
    tlen = code;
×
3936
  } else {
3937
    tlen = encoder.pos;
2,938✔
3938
  }
3939
  tEncoderClear(&encoder);
2,938✔
3940
  return tlen;
2,938✔
3941
}
3942

3943
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
1,468✔
3944
  SDecoder decoder = {0};
1,468✔
3945
  int32_t  code = TSDB_CODE_SUCCESS;
1,468✔
3946
  int32_t  lino = 0;
1,468✔
3947

3948
  tDecoderInit(&decoder, buf, bufLen);
1,468✔
3949
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,468!
3950

3951
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
2,936!
3952
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
2,936!
3953
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
2,936!
3954
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
2,936!
3955
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
2,936!
3956

3957
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
1,468!
3958
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
1,468!
3959
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
2,936!
3960

3961
  tEndDecode(&decoder);
1,468✔
3962

3963
_exit:
1,468✔
3964
  tDecoderClear(&decoder);
1,468✔
3965
  return code;
1,468✔
3966
}
3967

3968
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
4,629✔
3969
  if (pReq != NULL) {
4,629!
3970
    if (pReq->params != NULL) {
4,629✔
3971
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
1,555✔
3972
      pReq->params = NULL;
1,555✔
3973
    }
3974
    if (pReq->groupColVals != NULL) {
4,629✔
3975
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
1,217✔
3976
      pReq->groupColVals = NULL;
1,217✔
3977
    }
3978
    blockDataDestroy(pReq->pOutBlock);
4,629✔
3979
  }
3980
}
4,629✔
3981

3982
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
40,518✔
3983
  SEncoder encoder = {0};
40,518✔
3984
  int32_t  code = TSDB_CODE_SUCCESS;
40,518✔
3985
  int32_t  lino = 0;
40,518✔
3986
  int32_t  tlen = 0;
40,518✔
3987

3988
  tEncoderInit(&encoder, buf, bufLen);
40,518✔
3989
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
40,518!
3990

3991
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
81,036!
3992
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
81,036!
3993
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
81,036!
3994
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
81,036!
3995

3996
  tEndEncode(&encoder);
40,518✔
3997

3998
_exit:
40,518✔
3999
  if (code != TSDB_CODE_SUCCESS) {
40,518!
4000
    tlen = code;
×
4001
  } else {
4002
    tlen = encoder.pos;
40,518✔
4003
  }
4004
  tEncoderClear(&encoder);
40,518✔
4005
  return tlen;
40,518✔
4006
}
4007

4008
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
60,056✔
4009
  SDecoder decoder = {0};
60,056✔
4010
  int32_t  code = TSDB_CODE_SUCCESS;
60,056✔
4011
  int32_t  lino = 0;
60,056✔
4012

4013
  tDecoderInit(&decoder, buf, bufLen);
60,056✔
4014
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
60,023!
4015

4016
  int32_t type = 0;
60,523✔
4017
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
60,473!
4018
  pReq->type = type;
60,473✔
4019
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
120,896!
4020
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
120,703!
4021
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
120,501!
4022

4023
  tEndDecode(&decoder);
60,221✔
4024

4025
_exit:
60,038✔
4026
  tDecoderClear(&decoder);
60,038✔
4027
  return code;
60,374✔
4028
}
4029

4030
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo) {
56,970✔
4031
  int32_t code = 0, lino = 0;
56,970✔
4032
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true));
56,970!
4033
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
56,974!
4034
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
113,950!
4035
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
113,950!
4036
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
113,950!
4037
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
56,975!
4038
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
113,948!
4039
_exit:
56,974✔
4040
  return code;
56,974✔
4041
}
4042

4043
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
28,486✔
4044
  int32_t code = 0, lino = 0;
28,486✔
4045
  int32_t size = 0;
28,486✔
4046
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
28,486!
4047
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
28,484!
4048
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
56,971!
4049
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
56,971!
4050
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
56,971!
4051
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
28,486!
4052
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
56,972!
4053
_exit:
28,486✔
4054
  return code;
28,486✔
4055
}
4056

4057
int32_t tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
33,618✔
4058
  if (pInfo == NULL) return TSDB_CODE_SUCCESS;
33,618!
4059
  if (pInfo->pStreamPesudoFuncVals != NULL) {
33,618✔
4060
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
29,396✔
4061
    pInfo->pStreamPesudoFuncVals = NULL;
29,394✔
4062
  }
4063
  if (pInfo->pStreamPartColVals != NULL) {
33,616✔
4064
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
20,833✔
4065
    pInfo->pStreamPartColVals = NULL;
20,832✔
4066
  }
4067
  return TSDB_CODE_SUCCESS;
33,615✔
4068
}
4069

4070
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
1,095✔
4071
  SEncoder encoder = {0};
1,095✔
4072
  int32_t  code = TSDB_CODE_SUCCESS;
1,095✔
4073
  int32_t  lino = 0;
1,095✔
4074
  int32_t  tlen = 0;
1,095✔
4075

4076
  tEncoderInit(&encoder, buf, bufLen);
1,095✔
4077
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,095!
4078

4079
  int32_t size = taosArrayGetSize(pRsp->infos);
1,095✔
4080
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,095!
4081
  for (int32_t i = 0; i < size; ++i) {
3,517✔
4082
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
2,423✔
4083
    if (info == NULL) {
2,422!
4084
      TAOS_CHECK_EXIT(terrno);
×
4085
    }
4086
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
4,844!
4087
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
4,844!
4088
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
2,422!
4089
  }
4090

4091
  tEndEncode(&encoder);
1,094✔
4092

4093
_exit:
1,096✔
4094
  if (code != TSDB_CODE_SUCCESS) {
1,096!
4095
    tlen = code;
×
4096
  } else {
4097
    tlen = encoder.pos;
1,096✔
4098
  }
4099
  tEncoderClear(&encoder);
1,096✔
4100
  return tlen;
1,096✔
4101
}
4102

4103
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
548✔
4104
  SDecoder decoder = {0};
548✔
4105
  int32_t  code = TSDB_CODE_SUCCESS;
548✔
4106
  int32_t  lino = 0;
548✔
4107
  int32_t  size = 0;
548✔
4108

4109
  tDecoderInit(&decoder, buf, bufLen);
548✔
4110
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
548!
4111

4112
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
548!
4113
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
548✔
4114
  if (vTableInfo->infos == NULL) {
548!
4115
    TAOS_CHECK_EXIT(terrno);
×
4116
  }
4117
  for (int32_t i = 0; i < size; ++i) {
1,761✔
4118
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
1,213✔
4119
    if (info == NULL) {
1,213!
4120
      TAOS_CHECK_EXIT(terrno);
×
4121
    }
4122
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
2,426!
4123
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
2,426!
4124
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
1,213!
4125
  }
4126

4127
  tEndDecode(&decoder);
548✔
4128

4129
_exit:
548✔
4130
  tDecoderClear(&decoder);
548✔
4131
  return code;
548✔
4132
}
4133

4134

4135
void tDestroyVTableInfo(void *ptr) {
2,425✔
4136
  if (NULL == ptr) {
2,425!
4137
    return;
×
4138
  }
4139
  VTableInfo* pTable = (VTableInfo*)ptr;
2,425✔
4140
  taosMemoryFree(pTable->cols.pColRef);
2,425!
4141
}
4142

4143
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
30,585✔
4144
  if (ptr == NULL) return;
30,585!
4145
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
30,585✔
4146
  ptr->infos = NULL;
30,588✔
4147
}
4148

4149
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
1,629✔
4150
  SEncoder encoder = {0};
1,629✔
4151
  int32_t  code = TSDB_CODE_SUCCESS;
1,629✔
4152
  int32_t  lino = 0;
1,629✔
4153
  int32_t  tlen = 0;
1,629✔
4154

4155
  tEncoderInit(&encoder, buf, bufLen);
1,629✔
4156
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,630!
4157

4158
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
3,256!
4159
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
1,628✔
4160
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,629!
4161
  for (int32_t i = 0; i < size; ++i) {
3,905✔
4162
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
2,276✔
4163
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
4,552!
4164
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
4,552!
4165
  }
4166

4167
  tEndEncode(&encoder);
1,629✔
4168

4169
_exit:
1,628✔
4170
  if (code != TSDB_CODE_SUCCESS) {
1,628!
4171
    tlen = code;
×
4172
  } else {
4173
    tlen = encoder.pos;
1,628✔
4174
  }
4175
  tEncoderClear(&encoder);
1,628✔
4176
  return tlen;
1,628✔
4177
}
4178

4179
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
815✔
4180
  SDecoder decoder = {0};
815✔
4181
  int32_t  code = TSDB_CODE_SUCCESS;
815✔
4182
  int32_t  lino = 0;
815✔
4183
  SSDataBlock *pResBlock = pBlock;
815✔
4184

4185
  tDecoderInit(&decoder, buf, bufLen);
815✔
4186
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
815!
4187

4188
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
1,630!
4189
  int32_t numOfCols = 2;
815✔
4190
  if (pResBlock->pDataBlock == NULL) {
815!
4191
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
815✔
4192
    if (pResBlock->pDataBlock == NULL) {
815!
4193
      TAOS_CHECK_EXIT(terrno);
×
4194
    }
4195
    for (int32_t i = 0; i< numOfCols; ++i) {
2,445✔
4196
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
1,630✔
4197
      if (pColInfoData == NULL) {
1,630!
4198
        TAOS_CHECK_EXIT(terrno);
×
4199
      }
4200
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
1,630✔
4201
      pColInfoData->info.bytes = sizeof(int64_t);
1,630✔
4202
    }
4203
  }
4204
  int32_t numOfRows = 0;
815✔
4205
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
815!
4206
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
815!
4207
  for (int32_t i = 0; i < numOfRows; ++i) {
1,953✔
4208
    for (int32_t j = 0; j < numOfCols; ++j) {
3,413✔
4209
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
2,275✔
4210
      if (pColInfoData == NULL) {
2,275!
4211
        TAOS_CHECK_EXIT(terrno);
×
4212
      }
4213
      int64_t value = 0;
2,275✔
4214
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
2,276!
4215
      colDataSetInt64(pColInfoData, i, &value);
2,276✔
4216
    }
4217
  }
4218

4219
  pResBlock->info.dataLoad = 1;
816✔
4220
  pResBlock->info.rows = numOfRows;
816✔
4221

4222
  tEndDecode(&decoder);
816✔
4223

4224
_exit:
815✔
4225
  tDecoderClear(&decoder);
815✔
4226
  return code;
815✔
4227
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc