• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4754

25 Sep 2025 05:58AM UTC coverage: 57.946% (-1.0%) from 58.977%
#4754

push

travis-ci

web-flow
enh: taos command line support '-uroot' on windows (#33055)

133189 of 293169 branches covered (45.43%)

Branch coverage included in aggregate %.

201677 of 284720 relevant lines covered (70.83%)

5398749.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.03
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "tdatablock.h"
18
#include "tmsg.h"
19
#include "os.h"
20
#include "tcommon.h"
21

22
typedef struct STaskId {
23
  int64_t streamId;
24
  int64_t taskId;
25
} STaskId;
26

27
typedef enum EWindowType {
28
  WINDOW_TYPE_INTERVAL = 1,
29
  WINDOW_TYPE_SESSION,
30
  WINDOW_TYPE_STATE,
31
  WINDOW_TYPE_EVENT,
32
  WINDOW_TYPE_COUNT,
33
  WINDOW_TYPE_ANOMALY,
34
  WINDOW_TYPE_EXTERNAL,
35
  WINDOW_TYPE_PERIOD
36
} EWindowType;
37

38
typedef struct STaskCkptInfo {
39
  int64_t latestId;          // saved checkpoint id
40
  int64_t latestVer;         // saved checkpoint ver
41
  int64_t latestTime;        // latest checkpoint time
42
  int64_t latestSize;        // latest checkpoint size
43
  int8_t  remoteBackup;      // latest checkpoint backup done
44
  int64_t activeId;          // current active checkpoint id
45
  int32_t activeTransId;     // checkpoint trans id
46
  int8_t  failed;            // denote if the checkpoint is failed or not
47
  int8_t  consensusChkptId;  // required the consensus-checkpointId
48
  int64_t consensusTs;       //
49
} STaskCkptInfo;
50

51
typedef struct STaskStatusEntry {
52
  STaskId       id;
53
  int32_t       status;
54
  int32_t       statusLastDuration;  // to record the last duration of current status
55
  int64_t       stage;
56
  int32_t       nodeId;
57
  SVersionRange verRange;      // start/end version in WAL, only valid for source task
58
  int64_t       processedVer;  // only valid for source task
59
  double        inputQUsed;    // in MiB
60
  double        inputRate;
61
  double        procsThroughput;   // duration between one element put into input queue and being processed.
62
  double        procsTotal;        // duration between one element put into input queue and being processed.
63
  double        outputThroughput;  // the size of dispatched result blocks in bytes
64
  double        outputTotal;       // the size of dispatched result blocks in bytes
65
  double        sinkQuota;         // existed quota size for sink task
66
  double        sinkDataSize;      // sink to dst data size
67
  int64_t       startTime;
68
  int64_t       startCheckpointId;
69
  int64_t       startCheckpointVer;
70
  int64_t       hTaskId;
71
  STaskCkptInfo checkpointInfo;
72
  STaskNotifyEventStat notifyEventStat;
73
} STaskStatusEntry;
74

75
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
×
76
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
×
77
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
×
78
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
×
79
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
×
80
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
×
81
  return 0;
×
82
}
83

84
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
×
85
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
×
86
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
×
87
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
×
88
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
×
89
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
×
90
  return 0;
×
91
}
92

93
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
×
94
  int32_t code = 0;
×
95
  int32_t lino;
96

97
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
98
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
×
99
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
×
100

101
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
×
102
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
103

104
  for (int32_t i = 0; i < size; ++i) {
×
105
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
×
106
    if (pInfo == NULL) {
×
107
      TAOS_CHECK_EXIT(terrno);
×
108
    }
109

110
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
×
111
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
×
112
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
×
113
  }
114

115
  // todo this new attribute will be result in being incompatible with previous version
116
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
×
117

118
  int32_t numOfTasks = taosArrayGetSize(pMsg->pTaskList);
×
119
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfTasks));
×
120

121
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
122
    int32_t* pId = taosArrayGet(pMsg->pTaskList, i);
×
123
    if (pId == NULL) {
×
124
      TAOS_CHECK_EXIT(terrno);
×
125
    }
126
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)pId));
×
127
  }
128

129
  tEndEncode(pEncoder);
×
130
_exit:
×
131
  if (code) {
×
132
    return code;
×
133
  } else {
134
    return pEncoder->pos;
×
135
  }
136
}
137

138
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
×
139
  int32_t code = 0;
×
140
  int32_t lino;
141

142
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
143
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
×
144
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
×
145

146
  int32_t size = 0;
×
147
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
148

149
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
×
150
  TSDB_CHECK_NULL(pMsg->pNodeList, code, lino, _exit, terrno);
×
151

152
  for (int32_t i = 0; i < size; ++i) {
×
153
    SNodeUpdateInfo info = {0};
×
154
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
×
155
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
×
156
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
×
157

158
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
×
159
      TAOS_CHECK_EXIT(terrno);
×
160
    }
161
  }
162

163
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
×
164

165
  // number of tasks
166
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
167
  pMsg->pTaskList = taosArrayInit(size, sizeof(int32_t));
×
168
  if (pMsg->pTaskList == NULL) {
×
169
    TAOS_CHECK_EXIT(terrno);
×
170
  }
171

172
  for (int32_t i = 0; i < size; ++i) {
×
173
    int32_t id = 0;
×
174
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &id));
×
175
    if (taosArrayPush(pMsg->pTaskList, &id) == NULL) {
×
176
      TAOS_CHECK_EXIT(terrno);
×
177
    }
178
  }
179

180
  tEndDecode(pDecoder);
×
181
_exit:
×
182
  return code;
×
183
}
184

185
void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg) {
×
186
  taosArrayDestroy(pMsg->pNodeList);
×
187
  taosArrayDestroy(pMsg->pTaskList);
×
188
  pMsg->pNodeList = NULL;
×
189
  pMsg->pTaskList = NULL;
×
190
}
×
191

192
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
×
193
  int32_t code = 0;
×
194
  int32_t lino;
195

196
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
197
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
198
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
×
202
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
×
203
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
×
204
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
205
  tEndEncode(pEncoder);
×
206

207
_exit:
×
208
  if (code) {
×
209
    return code;
×
210
  } else {
211
    return pEncoder->pos;
×
212
  }
213
}
214

215
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
×
216
  int32_t code = 0;
×
217
  int32_t lino;
218

219
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
220
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
221
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
224
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
×
225
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
×
226
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
×
227
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
228
  tEndDecode(pDecoder);
×
229

230
_exit:
×
231
  return code;
×
232
}
233

234
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
×
235
  int32_t code = 0;
×
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
×
240
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
×
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
×
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
×
244
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
×
245
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
×
246
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
×
247
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
×
248
  tEndEncode(pEncoder);
×
249

250
_exit:
×
251
  if (code) {
×
252
    return code;
×
253
  } else {
254
    return pEncoder->pos;
×
255
  }
256
}
257

258
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
×
259
  int32_t code = 0;
×
260
  int32_t lino;
261

262
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
263
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
×
264
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
×
266
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
×
267
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
×
268
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
×
269
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
×
270
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
×
271
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
×
272
  tEndDecode(pDecoder);
×
273

274
_exit:
×
275
  return code;
×
276
}
277

278
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
283
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
×
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
×
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
287
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
289
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
290
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
291
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
×
292
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
293
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
×
294
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
×
295
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
×
296

297
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
×
298
    uError("invalid dispatch req msg");
×
299
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
300
  }
301

302
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
303
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
×
304
    void*    data = taosArrayGetP(pReq->data, i);
×
305
    if (data == NULL || pLen == NULL) {
×
306
      TAOS_CHECK_EXIT(terrno);
×
307
    }
308

309
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
×
310
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
×
311
  }
312
  tEndEncode(pEncoder);
×
313
_exit:
×
314
  if (code) {
×
315
    return code;
×
316
  } else {
317
    return pEncoder->pos;
×
318
  }
319
}
320

321
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
×
322
  int32_t code = 0;
×
323
  int32_t lino;
324

325
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
326
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
×
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
×
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
330
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
332
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
333
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
334
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
×
335
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
336
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
×
337
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
×
338
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
×
339

340
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
×
341
    TAOS_CHECK_EXIT(terrno);
×
342
  }
343
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
×
344
    TAOS_CHECK_EXIT(terrno);
×
345
  }
346
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
347
    int32_t  len1;
348
    uint64_t len2;
349
    void*    data;
350
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
×
351
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
×
352

353
    if (len1 != len2) {
×
354
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
355
    }
356

357
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
×
358
      TAOS_CHECK_EXIT(terrno);
×
359
    }
360

361
    if (taosArrayPush(pReq->data, &data) == NULL) {
×
362
      TAOS_CHECK_EXIT(terrno);
×
363
    }
364
  }
365

366
  tEndDecode(pDecoder);
×
367
_exit:
×
368
  return code;
×
369
}
370

371
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
×
372
  taosArrayDestroyP(pReq->data, NULL);
×
373
  taosArrayDestroy(pReq->dataLen);
×
374
}
×
375

376
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
×
377
  int32_t code = 0;
×
378
  int32_t lino;
379

380
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
381
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
382
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
383
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
×
384
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
×
385
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
×
386
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
×
387
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
×
388
  tEndEncode(pEncoder);
×
389

390
_exit:
×
391
  if (code) {
×
392
    return code;
×
393
  } else {
394
    return pEncoder->pos;
×
395
  }
396
}
397

398
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
×
399
  int32_t code = 0;
×
400
  int32_t lino;
401

402
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
403
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
404
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
×
406
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
×
407
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
×
408
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
×
409
  uint64_t len = 0;
×
410
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
×
411
  pReq->retrieveLen = (int32_t)len;
×
412
  tEndDecode(pDecoder);
×
413

414
_exit:
×
415
  return code;
×
416
}
417

418
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
×
419

420

421
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
120✔
422
  int32_t code = 0;
120✔
423
  int32_t lino = 0;
120✔
424
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
240!
425
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
240!
426
  switch (pReq->type) {
120!
427
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
120✔
428
      if (pReq->cont.fullTableNames) {
120!
429
        int32_t num = taosArrayGetSize(pReq->cont.fullTableNames);
120✔
430
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
120!
431
        for (int32_t i = 0; i < num; ++i) {
368✔
432
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.fullTableNames, i);
248✔
433
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
496!
434
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
496!
435
        }
436
      } else {
437
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
438
      }
439
      break;
120✔
440
    }
441
    default:
×
442
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
443
      break;
×
444
  }
445

446
_exit:
120✔
447

448
  return code;
120✔
449
}
450

451
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
178✔
452
  if (NULL == pReq) {
178✔
453
    return;
59✔
454
  }
455

456
  taosArrayDestroy(pReq->cont.fullTableNames);
119✔
457
}
458

459

460
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
60✔
461
  *ppDst = NULL;
60✔
462
  
463
  if (NULL == pSrc) {
60!
464
    return TSDB_CODE_SUCCESS;
×
465
  }
466

467
  int32_t code = 0, lino = 0;
60✔
468
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
60!
469
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
60!
470

471
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
60✔
472
  if (pSrc->cont.fullTableNames) {
60!
473
    (*ppDst)->cont.fullTableNames = taosArrayDup(pSrc->cont.fullTableNames, NULL);
60✔
474
    TSDB_CHECK_NULL((*ppDst)->cont.fullTableNames, code, lino, _exit, terrno);
60!
475
  }
476
  
477
_exit:
60✔
478

479
  if (code) {
60!
480
    tFreeSStreamMgmtReq(*ppDst);
×
481
    taosMemoryFreeClear(*ppDst);
×
482
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
483
  }
484
  
485
  return code;
60✔
486
}
487

488

489
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
59✔
490
  int32_t code = 0;
59✔
491
  int32_t lino = 0;
59✔
492

493
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
118!
494
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
118!
495
  switch (pReq->type) {
59!
496
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
59✔
497
      int32_t num = 0;
59✔
498
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
59!
499
      if (num > 0) {
59!
500
        pReq->cont.fullTableNames = taosArrayInit(num, sizeof(SStreamDbTableName));
59✔
501
        TSDB_CHECK_NULL(pReq->cont.fullTableNames, code, lino, _exit, terrno);
59!
502
        for (int32_t i = 0; i < num; ++i) {
181✔
503
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.fullTableNames, 1);
122✔
504
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
122!
505
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
122!
506
        }
507
      }
508
      break;
59✔
509
    }
510
    default:
×
511
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
512
      break;
×
513
  }
514

515
_exit:
59✔
516

517
  return code;  
59✔
518
}
519

520
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
47,646✔
521
  int32_t code = 0;
47,646✔
522
  int32_t lino;
523

524
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
95,292!
525
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
95,292!
526
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
95,292!
527

528
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
95,292!
529
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
95,292!
530
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
95,292!
531
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
95,292!
532
  // SKIP SESSIONID
533
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
95,292!
534
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
95,292!
535
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
95,292!
536
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
95,292!
537
  if (pTask->pMgmtReq) {
47,646✔
538
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
120!
539
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
120!
540
  } else {
541
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
47,526!
542
  }
543

544
_exit:
47,526✔
545

546
  return code;
47,646✔
547
}
548

549

550
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
22,712✔
551
  int32_t code = 0;
22,712✔
552
  int32_t lino;
553

554
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
45,424!
555
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
45,424!
556
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
45,424!
557
  
558
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
45,424!
559
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
45,424!
560
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
45,424!
561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
45,424!
562
  // SKIP SESSIONID
563
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
45,424!
564
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
45,424!
565
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
45,424!
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
45,424!
567
  int32_t req = 0;
22,712✔
568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
22,712!
569
  if (req) {
22,712✔
570
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
59!
571
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
59!
572
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
59!
573
  }
574

575
_exit:
22,712✔
576

577
  return code;
22,712✔
578
}
579

580
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
581
  int32_t code = 0;
×
582
  int32_t lino;
583

584
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
585
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
586
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
587
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
588

589
_exit:
×
590

591
  return code;
×
592
}
593

594
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
595
  int32_t code = 0;
×
596
  int32_t lino;
597

598
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
599
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
600
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
601
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
602

603
_exit:
×
604

605
  return code;
×
606
}
607

608

609
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
3,682✔
610
  int32_t code = 0;
3,682✔
611
  int32_t lino;
612

613
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
7,364!
614
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
7,364!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
7,364!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
7,364!
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
7,364!
618

619
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
3,682✔
620
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
3,682!
621
  for (int32_t i = 0; i < recalcNum; ++i) {
3,682!
622
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
623
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
624
  }
625

626
_exit:
3,682✔
627

628
  return code;
3,682✔
629
}
630

631
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
1,703✔
632
  int32_t code = 0;
1,703✔
633
  int32_t lino;
634

635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
3,406!
636
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
3,406!
637
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
3,406!
638
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
3,406!
639
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
3,406!
640

641
  int32_t recalcNum = 0;
1,703✔
642
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
1,703!
643
  if (recalcNum > 0) {
1,703!
644
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
645
    if (NULL == pStatus->userRecalcs) {
×
646
      code = terrno;
×
647
      goto _exit;
×
648
    }
649
  }
650

651
  for (int32_t i = 0; i < recalcNum; ++i) {
1,703!
652
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
653
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
654
  }
655

656
_exit:
1,703✔
657

658
  return code;
1,703✔
659
}
660

661

662
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
71,902✔
663
  int32_t code = 0;
71,902✔
664
  int32_t lino;
665

666
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
71,902!
667
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
143,804!
668
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
143,804!
669
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
143,804!
670
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
143,804!
671

672
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
71,902✔
673
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
71,902!
674
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
260,266✔
675
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
188,364✔
676
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
376,728!
677
  }
678
  
679
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
71,902✔
680
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
71,902!
681
  for (int32_t i = 0; i < statusNum; ++i) {
114,822✔
682
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
42,920✔
683
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
42,920!
684
  }
685

686
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
71,902✔
687
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
71,902!
688
  for (int32_t i = 0; i < reqNum; ++i) {
72,022✔
689
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
120✔
690
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
240!
691
  }
692

693
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
71,902✔
694
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
71,902!
695
  for (int32_t i = 0; i < triggerNum; ++i) {
75,584✔
696
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
3,682✔
697
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
3,682!
698
  }
699
  
700
  tEndEncode(pEncoder);
71,902✔
701

702
_exit:
71,902✔
703
  if (code) {
71,902!
704
    return code;
×
705
  } else {
706
    return pEncoder->pos;
71,902✔
707
  }
708
}
709

710
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
34,163✔
711
  int32_t code = 0;
34,163✔
712
  int32_t lino;
713

714
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
34,163!
715
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
68,326!
716
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
68,326!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
68,326!
718
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
68,326!
719

720
  int32_t vgLearderNum = 0;
34,163✔
721
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
34,163!
722
  if (vgLearderNum > 0) {
34,163✔
723
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
25,711✔
724
    if (NULL == pReq->pVgLeaders) {
25,711!
725
      code = terrno;
×
726
      goto _exit;
×
727
    }
728
  }
729
  for (int32_t i = 0; i < vgLearderNum; ++i) {
124,177✔
730
    int32_t vgId = 0;
90,014✔
731
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
90,014!
732
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
180,028!
733
      code = terrno;
×
734
      goto _exit;
×
735
    }
736
  }
737

738

739
  int32_t statusNum = 0;
34,163✔
740
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
34,163!
741
  if (statusNum > 0) {
34,163✔
742
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
1,698✔
743
    if (NULL == pReq->pStreamStatus) {
1,698!
744
      code = terrno;
×
745
      goto _exit;
×
746
    }
747
  }
748
  for (int32_t i = 0; i < statusNum; ++i) {
54,512✔
749
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
20,349✔
750
    if (NULL == pTask) {
20,349!
751
      code = terrno;
×
752
      goto _exit;
×
753
    }
754
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
20,349!
755
  }
756

757

758
  int32_t reqNum = 0;
34,163✔
759
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
34,163!
760
  if (reqNum > 0) {
34,163✔
761
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
28✔
762
    if (NULL == pReq->pStreamReq) {
28!
763
      code = terrno;
×
764
      goto _exit;
×
765
    }
766
  }
767
  for (int32_t i = 0; i < reqNum; ++i) {
34,222✔
768
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
59✔
769
    if (NULL == pIdx) {
59!
770
      code = terrno;
×
771
      goto _exit;
×
772
    }
773
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
59!
774
  }
775

776

777
  int32_t triggerNum = 0;
34,163✔
778
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
34,163!
779
  if (triggerNum > 0) {
34,163✔
780
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
694✔
781
    if (NULL == pReq->pTriggerStatus) {
694!
782
      code = terrno;
×
783
      goto _exit;
×
784
    }
785
  }
786
  for (int32_t i = 0; i < triggerNum; ++i) {
35,866✔
787
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
1,703✔
788
    if (NULL == pStatus) {
1,703!
789
      code = terrno;
×
790
      goto _exit;
×
791
    }
792
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
1,703!
793
  }
794

795
  
796
  tEndDecode(pDecoder);
34,163✔
797

798
_exit:
34,163✔
799
  return code;
34,163✔
800
}
801

802
void tFreeSSTriggerRuntimeStatus(void* param) {
3,544✔
803
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
3,544✔
804
  if (NULL == pStatus) {
3,544!
805
    return;
×
806
  }
807
  taosArrayDestroy(pStatus->userRecalcs);
3,544✔
808
}
809

810
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
212,948✔
811
  if (pMsg == NULL) {
212,948!
812
    return;
×
813
  }
814

815
  taosArrayDestroy(pMsg->pVgLeaders);
212,948✔
816
  if (deepClean) {
212,948!
817
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
212,948✔
818
    for (int32_t i = 0; i < reqNum; ++i) {
213,067✔
819
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
119✔
820
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
119✔
821
      if (NULL == pTask) {
119!
822
        continue;
×
823
      }
824

825
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
119✔
826
      taosMemoryFree(pTask->pMgmtReq);
119!
827
    }
828
  }
829
  taosArrayDestroy(pMsg->pStreamReq);
212,948✔
830
  taosArrayDestroy(pMsg->pStreamStatus);
212,948✔
831
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
212,948✔
832
}
833

834
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
790✔
835
  int32_t code = 0;
790✔
836
  int32_t lino;
837

838
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
1,580!
839
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
1,580!
840
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
1,580!
841
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
1,580!
842
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
1,580!
843
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
1,580!
844
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
1,580!
845
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
846
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
1,580!
847
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
1,580!
848

849
_exit:
790✔
850

851
  return code;
790✔
852
}
853

854
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
370✔
855
  int32_t code = 0;
370✔
856
  int32_t lino;
857

858
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
740!
859
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
740!
860

861
_exit:
370✔
862

863
  return code;
370✔
864
}
865

866

867
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
1,160✔
868
  int32_t code = 0;
1,160✔
869
  int32_t lino;
870

871
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
2,320!
872
  if (pMsg->triggerReader) {
1,160✔
873
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
790!
874
  } else {
875
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
370!
876
  }
877
  
878
_exit:
370✔
879

880
  return code;
1,160✔
881
}
882

883
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
2,458✔
884
  int32_t code = 0;
2,458✔
885
  int32_t lino;
886

887
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
4,916!
888
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
4,916!
889
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
2,458!
890

891
_exit:
2,458✔
892

893
  return code;
2,458✔
894
}
895

896
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
1,704✔
897
  int32_t code = 0;
1,704✔
898
  int32_t lino;
899

900
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
1,704!
901
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
3,408!
902

903
_exit:
1,704✔
904

905
  return code;
1,704✔
906
}
907

908

909
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
570✔
910
  int32_t code = 0;
570✔
911
  int32_t lino;
912

913
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
1,140!
914
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
1,140!
915
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
1,140!
916
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
1,140!
917
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
1,140!
918
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
1,140!
919
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->hasPartitionBy));
1,140!
920
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
1,140!
921
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
1,140!
922

923
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
570✔
924
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
570!
925
  for (int32_t i = 0; i < addrSize; ++i) {
594✔
926
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
24✔
927
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
48!
928
  }
929
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
1,140!
930
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
1,140!
931
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
1,140!
932

933
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
1,140!
934
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
1,140!
935
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
1,140!
936
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
1,140!
937

938
  switch (pMsg->triggerType) {
570!
939
    case WINDOW_TYPE_SESSION: {
12✔
940
      // session trigger
941
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
24!
942
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
24!
943
      break;
12✔
944
    }
945
    case WINDOW_TYPE_STATE: {
248✔
946
      // state trigger
947
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
496!
948
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
496!
949
      break;
248✔
950
    }
951
    case WINDOW_TYPE_INTERVAL: {
230✔
952
      // slide trigger
953
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
460!
954
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
460!
955
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
460!
956
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
460!
957
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
460!
958
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
460!
959
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
460!
960
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
460!
961
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
460!
962
      break;
230✔
963
    }
964
    case WINDOW_TYPE_EVENT: {
38✔
965
      // event trigger
966
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
38!
967
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
38!
968

969
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
76!
970
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
76!
971

972
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
76!
973
      break;
38✔
974
    }
975
    case WINDOW_TYPE_COUNT: {
26✔
976
      // count trigger
977
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
26!
978
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
52!
979

980
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
52!
981
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
52!
982
      break;
26✔
983
    }
984
    case WINDOW_TYPE_PERIOD: {
16✔
985
      // period trigger
986
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
32!
987
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
32!
988
      break;
16✔
989
    }
990
    default:
×
991
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
992
      break;
×
993
  }
994

995
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
1,140!
996
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
1,140!
997
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
1,140!
998
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
1,140!
999
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
570✔
1000
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
1,140!
1001
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
570✔
1002
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
1,140!
1003
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
570✔
1004
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
1,140!
1005

1006
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
570✔
1007
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
570!
1008
  for (int32_t i = 0; i < readerNum; ++i) {
1,310✔
1009
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
740✔
1010
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
740!
1011
  }
1012

1013
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
570✔
1014
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
570!
1015
  for (int32_t i = 0; i < runnerNum; ++i) {
2,274✔
1016
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1,704✔
1017
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
1,704!
1018
  }
1019

1020
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
1,140!
1021
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
1,140!
1022

1023
_exit:
570✔
1024

1025
  return code;
570✔
1026
}
1027

1028

1029
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
16,782✔
1030
  int32_t code = 0;
16,782✔
1031
  int32_t lino;
1032

1033
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
33,564!
1034
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
33,564!
1035
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
33,564!
1036
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
33,564!
1037
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
33,564!
1038
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
33,564!
1039

1040
_exit:
16,782✔
1041

1042
  return code;
16,782✔
1043
}
1044

1045

1046
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
1,842✔
1047
  int32_t code = 0;
1,842✔
1048
  int32_t lino;
1049

1050
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
3,684!
1051
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
3,684!
1052
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
3,684!
1053
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
3,684!
1054
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
3,684!
1055
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
3,684!
1056
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
3,684!
1057
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
3,684!
1058

1059
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
1,842✔
1060
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
1,842!
1061
  for (int32_t i = 0; i < addrSize; ++i) {
1,908✔
1062
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
66✔
1063
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
132!
1064
  }
1065
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
3,684!
1066

1067
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
1,842✔
1068
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
1,842!
1069
  for (int32_t i = 0; i < outColNum; ++i) {
9,164✔
1070
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
7,322✔
1071
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
7,322!
1072
  }
1073

1074
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
1,842✔
1075
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
1,842!
1076
  for (int32_t i = 0; i < outTagNum; ++i) {
3,614✔
1077
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1,772✔
1078
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
1,772!
1079
  }
1080

1081
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
3,684!
1082
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
3,684!
1083

1084
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
3,684!
1085
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
3,684!
1086

1087
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
1,842✔
1088
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
1,842!
1089
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
2,274✔
1090
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
432✔
1091
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
432!
1092

1093
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
864!
1094
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
864!
1095
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
864!
1096
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
864!
1097
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
864!
1098
  }
1099

1100
_exit:
1,842✔
1101

1102
  return code;
1,842✔
1103
}
1104

1105

1106
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
3,572✔
1107
  int32_t code = 0;
3,572✔
1108
  int32_t lino;
1109

1110
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
3,572!
1111
  switch (pTask->task.type) {
3,572!
1112
    case STREAM_READER_TASK:
1,160✔
1113
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
1,160!
1114
      break;
1,160✔
1115
    case STREAM_TRIGGER_TASK:
570✔
1116
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
570!
1117
      break;
570✔
1118
    case STREAM_RUNNER_TASK:
1,842✔
1119
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
1,842!
1120
      break;
1,842✔
1121
    default:
×
1122
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1123
      break;
×
1124
  }
1125
  
1126
_exit:
3,572✔
1127

1128
  return code;
3,572✔
1129
}
1130

1131

1132
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
716✔
1133
  int32_t code = 0;
716✔
1134
  int32_t lino;
1135

1136
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
1,432!
1137

1138
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
716✔
1139
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
716!
1140
  for (int32_t i = 0; i < readerNum; ++i) {
1,876✔
1141
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
1,160✔
1142
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,160!
1143
  }
1144

1145
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
1,432!
1146
  if (pStream->triggerTask) {
716✔
1147
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
570!
1148
  }
1149
  
1150
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
716✔
1151
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
716!
1152
  for (int32_t i = 0; i < runnerNum; ++i) {
2,558✔
1153
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
1,842✔
1154
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,842!
1155
  }
1156

1157
_exit:
716✔
1158

1159
  return code;
716✔
1160
}
1161

1162
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
1,154✔
1163
  int32_t code = 0;
1,154✔
1164
  int32_t lino = 0;
1,154✔
1165

1166
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
2,308!
1167

1168
_exit:
1,154✔
1169
  return code;
1,154✔
1170
}
1171

1172
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
577✔
1173
  int32_t code = 0;
577✔
1174
  int32_t lino;
1175

1176
  int32_t type = 0;
577✔
1177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
577!
1178
  pMsg->msgType = type;
577✔
1179

1180
_exit:
577✔
1181
  return code;
577✔
1182
}
1183

1184
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
672✔
1185
  int32_t code = 0;
672✔
1186
  int32_t lino;
1187

1188
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
672!
1189

1190
_exit:
672✔
1191

1192
  return code;
672✔
1193
}
1194

1195
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
672✔
1196
  int32_t code = 0;
672✔
1197
  int32_t lino;
1198

1199
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
672!
1200
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
672!
1201

1202
_exit:
672✔
1203

1204
  return code;
672✔
1205
}
1206

1207
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
360✔
1208
  int32_t code = 0;
360✔
1209
  int32_t lino;
1210

1211
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
360!
1212
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
720!
1213
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
720!
1214

1215
_exit:
360✔
1216

1217
  return code;
360✔
1218
}
1219

1220
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
360✔
1221
  int32_t code = 0;
360✔
1222
  int32_t lino;
1223

1224
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
360!
1225
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
360!
1226

1227
_exit:
360✔
1228

1229
  return code;
360✔
1230
}
1231

1232

1233
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
4✔
1234
  int32_t code = 0;
4✔
1235
  int32_t lino;
1236

1237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
8!
1238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
8!
1239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
8!
1240

1241
_exit:
4✔
1242

1243
  return code;
4✔
1244
}
1245

1246
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
122✔
1247
  int32_t code = 0;
122✔
1248
  int32_t lino;
1249

1250
  switch (msgType) {
122!
1251
    case STREAM_MSG_ORIGTBL_READER_INFO: {
118✔
1252
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
118✔
1253
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
118!
1254

1255
      for (int32_t i = 0; i < vgNum; ++i) {
362✔
1256
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
244✔
1257
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
488!
1258
      }
1259

1260
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
118✔
1261
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
118!
1262
      
1263
      for (int32_t i = 0; i < readerNum; ++i) {
132✔
1264
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
14✔
1265
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
14!
1266
      }
1267
      break;
118✔
1268
    }
1269
    case STREAM_MSG_UPDATE_RUNNER: {
×
1270
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
1271
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
1272
      
1273
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1274
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1275
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1276
      }
1277
      break;
×
1278
    }
1279
    case STREAM_MSG_USER_RECALC:{
4✔
1280
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
4✔
1281
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
4!
1282
      
1283
      for (int32_t i = 0; i < recalcNum; ++i) {
8✔
1284
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
4✔
1285
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
4!
1286
      }
1287
      break;
4✔
1288
    }
1289
    default:
×
1290
      break;
×
1291
  }
1292

1293
_exit:
122✔
1294

1295
  return code;
122✔
1296
}
1297

1298
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
122✔
1299
  int32_t code = 0;
122✔
1300
  int32_t lino;
1301

1302
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
122!
1303
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
244!
1304
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
244!
1305
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
122!
1306
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
122!
1307

1308
_exit:
122✔
1309

1310
  return code;
122✔
1311
}
1312

1313

1314
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
67,662✔
1315
  int32_t code = 0;
67,662✔
1316
  int32_t lino;
1317

1318
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
67,662!
1319
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
135,324!
1320
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
67,662✔
1321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
67,662!
1322
  for (int32_t i = 0; i < deployNum; ++i) {
68,378✔
1323
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
716✔
1324
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
716!
1325
  }
1326

1327
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
67,662✔
1328
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
67,662!
1329
  for (int32_t i = 0; i < startNum; ++i) {
68,334✔
1330
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
672✔
1331
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
672!
1332
  }
1333

1334
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
135,324!
1335
  if (!pRsp->undeploy.undeployAll) {
67,662!
1336
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
67,662✔
1337
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
67,662!
1338
    for (int32_t i = 0; i < undeployNum; ++i) {
68,022✔
1339
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
360✔
1340
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
360!
1341
    }
1342
  }
1343

1344
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
67,662✔
1345
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
67,662!
1346
  for (int32_t i = 0; i < rspNum; ++i) {
67,784✔
1347
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
122✔
1348
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
122!
1349
  }
1350
  
1351
_exit:
67,662✔
1352

1353
  tEndEncode(pEncoder);
67,662✔
1354

1355
  return code;
67,662✔
1356
}
1357

1358
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
395✔
1359
  int32_t code = 0;
395✔
1360
  int32_t lino;
1361

1362
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
790!
1363
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
790!
1364
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
790!
1365
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
790!
1366
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
790!
1367
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
790!
1368
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
790!
1369
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
790!
1370
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
790!
1371

1372
_exit:
395✔
1373

1374
  return code;
395✔
1375
}
1376

1377

1378
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
185✔
1379
  int32_t code = 0;
185✔
1380
  int32_t lino;
1381

1382
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
370!
1383
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
370!
1384

1385
_exit:
185✔
1386

1387
  return code;
185✔
1388
}
1389

1390

1391
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
580✔
1392
  int32_t code = 0;
580✔
1393
  int32_t lino;
1394

1395
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
1,160!
1396
  if (pMsg->triggerReader) {
580✔
1397
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
395!
1398
  } else {
1399
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
185!
1400
  }
1401
  
1402
_exit:
185✔
1403

1404
  return code;
580✔
1405
}
1406

1407

1408
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
1,229✔
1409
  int32_t code = 0;
1,229✔
1410
  int32_t lino;
1411

1412
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
2,458!
1413
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
2,458!
1414
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
1,229!
1415

1416
_exit:
1,229✔
1417

1418
  return code;
1,229✔
1419
}
1420

1421

1422
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
852✔
1423
  int32_t code = 0;
852✔
1424
  int32_t lino;
1425

1426
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
852!
1427
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,704!
1428

1429
_exit:
852✔
1430

1431
  return code;
852✔
1432
}
1433

1434

1435
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
285✔
1436
  int32_t code = 0;
285✔
1437
  int32_t lino;
1438

1439
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
570!
1440
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
570!
1441
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
570!
1442
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
570!
1443
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
570!
1444
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
570!
1445
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->hasPartitionBy));
570!
1446
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
570!
1447
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
570!
1448

1449
  int32_t addrSize = 0;
285✔
1450
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
285!
1451
  if (addrSize > 0) {
285✔
1452
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
12✔
1453
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
12!
1454
  }
1455
  for (int32_t i = 0; i < addrSize; ++i) {
297✔
1456
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
12✔
1457
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
12!
1458
  }
1459
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
570!
1460
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
570!
1461
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
570!
1462

1463
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
570!
1464
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
570!
1465
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
570!
1466
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
570!
1467

1468
  switch (pMsg->triggerType) {
285!
1469
    case WINDOW_TYPE_SESSION:
6✔
1470
      // session trigger
1471
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
12!
1472
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
12!
1473
      break;
6✔
1474
    case WINDOW_TYPE_STATE:
124✔
1475
      // state trigger
1476
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
248!
1477
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
248!
1478
      break;
124✔
1479
    
1480
    case WINDOW_TYPE_INTERVAL:
115✔
1481
      // slide trigger
1482
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
230!
1483
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
230!
1484
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
230!
1485
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
230!
1486
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
230!
1487
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
230!
1488
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
230!
1489
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
230!
1490
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
230!
1491
      break;
115✔
1492
    
1493
    case WINDOW_TYPE_EVENT:
19✔
1494
      // event trigger
1495
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
38!
1496
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
38!
1497
      
1498
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
38!
1499
      break;
19✔
1500
    
1501
    case WINDOW_TYPE_COUNT:
13✔
1502
      // count trigger
1503
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
26!
1504
      
1505
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
26!
1506
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
26!
1507
      break;
13✔
1508
    
1509
    case WINDOW_TYPE_PERIOD:
8✔
1510
      // period trigger
1511
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
16!
1512
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
16!
1513
      break;
8✔
1514
    default:
×
1515
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1516
      break;
×
1517
  }
1518

1519
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
570!
1520
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
570!
1521
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
570!
1522
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
570!
1523
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
570!
1524
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
570!
1525
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
570!
1526

1527
  int32_t readerNum = 0;
285✔
1528
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
285!
1529
  if (readerNum > 0) {
285✔
1530
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
284✔
1531
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
284!
1532
  }
1533
  for (int32_t i = 0; i < readerNum; ++i) {
655✔
1534
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
370✔
1535
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
370!
1536
  }
1537

1538
  int32_t runnerNum = 0;
285✔
1539
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
285!
1540
  if (runnerNum > 0) {
285✔
1541
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
284✔
1542
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
284!
1543
  }
1544
  for (int32_t i = 0; i < runnerNum; ++i) {
1,137✔
1545
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
852✔
1546
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
852!
1547
  }
1548

1549
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
570!
1550
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
570!
1551

1552
_exit:
285✔
1553

1554
  return code;
285✔
1555
}
1556

1557

1558

1559
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
7,923✔
1560
  int32_t code = 0;
7,923✔
1561
  int32_t lino;
1562

1563
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
7,923!
1564
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
15,846!
1565
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
15,846!
1566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
15,846!
1567
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
15,846!
1568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
15,846!
1569

1570
_exit:
7,923✔
1571

1572
  return code;
7,923✔
1573
}
1574

1575
void destroySStreamOutCols(void* p){
216✔
1576
  if (p == NULL) return;
216!
1577
  SStreamOutCol* col = (SStreamOutCol*)p;
216✔
1578
  taosMemoryFreeClear(col->expr);
216!
1579
}
1580

1581
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
921✔
1582
  int32_t code = 0;
921✔
1583
  int32_t lino;
1584

1585
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,842!
1586
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1,842!
1587
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
1,842!
1588
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
1,842!
1589
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
1,842!
1590
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
1,842!
1591
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
1,842!
1592
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
1,842!
1593

1594
  int32_t addrSize = 0;
921✔
1595
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
921!
1596
  if (addrSize > 0) {
921✔
1597
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
33✔
1598
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
33!
1599
  }
1600
  for (int32_t i = 0; i < addrSize; ++i) {
954✔
1601
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
33✔
1602
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
33!
1603
  }
1604
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
1,842!
1605

1606
  int32_t outColNum = 0;
921✔
1607
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
921!
1608
  if (outColNum > 0) {
921!
1609
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
921✔
1610
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
921!
1611
  }
1612
  for (int32_t i = 0; i < outColNum; ++i) {
4,582✔
1613
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
3,661✔
1614
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
3,661!
1615
  }
1616

1617
  int32_t outTagNum = 0;
921✔
1618
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
921!
1619
  if (outTagNum > 0) {
921✔
1620
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
541✔
1621
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
541!
1622
  }
1623
  for (int32_t i = 0; i < outTagNum; ++i) {
1,807✔
1624
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
886✔
1625
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
886!
1626
  }
1627

1628
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
1,842!
1629
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
1,842!
1630

1631
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
1,842!
1632
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
1,842!
1633

1634
  int32_t forceOutColsSize = 0;
921✔
1635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
921!
1636
  if (forceOutColsSize > 0) {
921✔
1637
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
33✔
1638
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
33!
1639
  }
1640
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,137✔
1641
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
216✔
1642

1643
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
432!
1644
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
432!
1645
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
432!
1646
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
432!
1647
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
432!
1648
  }
1649

1650
_exit:
921✔
1651

1652
  return code;
921✔
1653
}
1654

1655
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
1,786✔
1656
  int32_t code = 0;
1,786✔
1657
  int32_t lino;
1658

1659
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1,786!
1660
  switch (pTask->task.type) {
1,786!
1661
    case STREAM_READER_TASK:
580✔
1662
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
580!
1663
      break;
580✔
1664
    case STREAM_TRIGGER_TASK:
285✔
1665
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
285!
1666
      break;
285✔
1667
    case STREAM_RUNNER_TASK:
921✔
1668
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
921!
1669
      break;
921✔
1670
    default:
×
1671
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1672
      break;
×
1673
  }
1674
  
1675
_exit:
1,786✔
1676

1677
  return code;
1,786✔
1678
}
1679

1680

1681
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
358✔
1682
  int32_t code = 0;
358✔
1683
  int32_t lino;
1684

1685
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
716!
1686

1687
  int32_t readerNum = 0;
358✔
1688
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
358!
1689
  if (readerNum > 0) {
358✔
1690
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
310✔
1691
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
310!
1692
  }
1693
  for (int32_t i = 0; i < readerNum; ++i) {
938✔
1694
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
580✔
1695
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
580!
1696
  }
1697

1698
  int32_t triggerTask = 0;
358✔
1699
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
358!
1700
  if (triggerTask) {
358✔
1701
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
285!
1702
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
285!
1703
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
285!
1704
  }
1705
  
1706
  int32_t runnerNum = 0;
358✔
1707
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
358!
1708
  if (runnerNum > 0) {
358✔
1709
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
318✔
1710
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
318!
1711
  }
1712
  for (int32_t i = 0; i < runnerNum; ++i) {
1,279✔
1713
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
921✔
1714
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
921!
1715
  }
1716

1717
_exit:
358✔
1718

1719
  return code;
358✔
1720
}
1721

1722

1723
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
336✔
1724
  int32_t code = 0;
336✔
1725
  int32_t lino;
1726

1727
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
336!
1728

1729
_exit:
336✔
1730

1731
  return code;
336✔
1732
}
1733

1734

1735
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
336✔
1736
  int32_t code = 0;
336✔
1737
  int32_t lino;
1738

1739
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
336!
1740
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
336!
1741

1742
_exit:
336✔
1743

1744
  return code;
336✔
1745
}
1746

1747

1748
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
180✔
1749
  int32_t code = 0;
180✔
1750
  int32_t lino;
1751

1752
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
180!
1753
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
360!
1754
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
360!
1755

1756
_exit:
180✔
1757

1758
  return code;
180✔
1759
}
1760

1761

1762
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
180✔
1763
  int32_t code = 0;
180✔
1764
  int32_t lino;
1765

1766
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
180!
1767
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
180!
1768

1769
_exit:
180✔
1770

1771
  return code;
180✔
1772
}
1773

1774
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
2✔
1775
  int32_t code = 0;
2✔
1776
  int32_t lino;
1777

1778
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
4!
1779
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
4!
1780
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
4!
1781

1782
_exit:
2✔
1783

1784
  return code;
2✔
1785
}
1786

1787
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
61✔
1788
  int32_t code = 0;
61✔
1789
  int32_t lino;
1790

1791
  switch (msgType) {
61!
1792
    case STREAM_MSG_ORIGTBL_READER_INFO: {
59✔
1793
      int32_t vgNum = 0;
59✔
1794
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
59!
1795
      if (vgNum > 0) {
59!
1796
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
59✔
1797
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
59!
1798
      }
1799
      for (int32_t i = 0; i < vgNum; ++i) {
181✔
1800
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
122✔
1801
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
122!
1802
      }
1803

1804
      int32_t readerNum = 0;
59✔
1805
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
59!
1806
      if (readerNum > 0) {
59✔
1807
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
7✔
1808
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
7!
1809
      }
1810
      for (int32_t i = 0; i < readerNum; ++i) {
66✔
1811
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
7✔
1812
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
7!
1813
      }
1814
      break;
59✔
1815
    }
1816
    case STREAM_MSG_UPDATE_RUNNER: {
×
1817
      int32_t runnerNum = 0;
×
1818
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1819
      if (runnerNum > 0) {
×
1820
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1821
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1822
      }
1823
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1824
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1825
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1826
      }
1827
      break;
×
1828
    }
1829
    case STREAM_MSG_USER_RECALC: {
2✔
1830
      int32_t recalcNum = 0;
2✔
1831
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
2!
1832
      if (recalcNum > 0) {
2!
1833
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
2✔
1834
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
2!
1835
      }
1836
      for (int32_t i = 0; i < recalcNum; ++i) {
4✔
1837
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
2✔
1838
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
2!
1839
      }
1840
      break;
2✔
1841
    }
1842
    default:
×
1843
      break;
×
1844
  }
1845

1846
_exit:
61✔
1847

1848
  return code;
61✔
1849
}
1850

1851

1852
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
61✔
1853
  int32_t code = 0;
61✔
1854
  int32_t lino;
1855

1856
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
61!
1857
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
122!
1858
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
122!
1859
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
61!
1860
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
61!
1861

1862
_exit:
61✔
1863

1864
  return code;
61✔
1865
}
1866

1867
void tFreeSStreamMgmtRsp(void* param) {
122✔
1868
  if (NULL == param) {
122!
1869
    return;
×
1870
  }
1871
  
1872
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
122✔
1873

1874
  taosArrayDestroy(pRsp->cont.vgIds);
122✔
1875
  taosArrayDestroy(pRsp->cont.readerList);
122✔
1876
  taosArrayDestroy(pRsp->cont.runnerList);
122✔
1877
  taosArrayDestroy(pRsp->cont.recalcList);
122✔
1878
}
1879

1880
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
580✔
1881
  if (NULL == pReader) {
580!
1882
    return;
×
1883
  }
1884
  
1885
  if (pReader->triggerReader) {
580✔
1886
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
395✔
1887
    taosMemoryFree(pMsg->triggerTblName);
395!
1888
    taosMemoryFree(pMsg->partitionCols);
395!
1889
    taosMemoryFree(pMsg->triggerCols);
395!
1890
    taosMemoryFree(pMsg->triggerScanPlan);
395!
1891
    taosMemoryFree(pMsg->calcCacheScanPlan);
395!
1892
  } else {
1893
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
185✔
1894
    taosMemoryFree(pMsg->calcScanPlan);
185!
1895
  }
1896
}
1897

1898
void tFreeStreamNotifyUrl(void* param) {
×
1899
  if (NULL == param) {
×
1900
    return;
×
1901
  }
1902

1903
  taosMemoryFree(*(void**)param);
×
1904
}
1905

1906
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
285✔
1907
  if (NULL == pTrigger) {
285!
1908
    return;
×
1909
  }
1910
  
1911
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
285✔
1912
  switch (pTrigger->triggerType) {
285✔
1913
    case WINDOW_TYPE_EVENT:
19✔
1914
      taosMemoryFree(pTrigger->trigger.event.startCond);
19!
1915
      taosMemoryFree(pTrigger->trigger.event.endCond);
19!
1916
      break;
19✔
1917
    case WINDOW_TYPE_COUNT:
13✔
1918
      taosMemoryFree(pTrigger->trigger.count.condCols);  
13!
1919
      break;
13✔
1920
    default:
253✔
1921
      break;
253✔
1922
  }
1923

1924
  taosMemoryFree(pTrigger->triggerPrevFilter);
285!
1925
  taosMemoryFree(pTrigger->triggerScanPlan);
285!
1926
  taosMemoryFree(pTrigger->calcCacheScanPlan);
285!
1927

1928
  taosArrayDestroy(pTrigger->readerList);
285✔
1929
  taosArrayDestroy(pTrigger->runnerList);
285✔
1930
  taosMemoryFree(pTrigger->streamName);
285!
1931
}
1932

1933
void tFreeSStreamOutCol(void* param) {
×
1934
  if (NULL == param) {
×
1935
    return;
×
1936
  }
1937

1938
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1939
  taosMemoryFree(pOut->expr);
×
1940
}
1941

1942
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
921✔
1943
  if (NULL == pRunner) {
921!
1944
    return;
×
1945
  }
1946

1947
  taosMemoryFree(pRunner->streamName);
921!
1948
  taosMemoryFree(pRunner->pPlan);
921!
1949
  taosMemoryFree(pRunner->outDBFName);
921!
1950
  taosMemoryFree(pRunner->outTblName);
921!
1951

1952
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
921✔
1953
  taosArrayDestroy(pRunner->outCols);
921✔
1954
  taosArrayDestroy(pRunner->outTags);
921✔
1955

1956
  taosMemoryFree(pRunner->subTblNameExpr);
921!
1957
  taosMemoryFree(pRunner->tagValueExpr);
921!
1958
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
921✔
1959
}
1960

1961
void tFreeSStmTaskDeploy(void* param) {
2,217✔
1962
  if (NULL == param) {
2,217✔
1963
    return;
431✔
1964
  }
1965

1966
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
1,786✔
1967
  switch (pTask->task.type)  {
1,786!
1968
    case STREAM_READER_TASK:
580✔
1969
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
580✔
1970
      break;
580✔
1971
    case STREAM_TRIGGER_TASK:
285✔
1972
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
285✔
1973
      break;
285✔
1974
    case STREAM_RUNNER_TASK:
921✔
1975
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
921✔
1976
      break;
921✔
1977
    default:
×
1978
      break;
×
1979
  }
1980
}
1981

1982
void tFreeSStmStreamDeploy(void* param) {
358✔
1983
  if (NULL == param) {
358!
1984
    return;
×
1985
  }
1986
  
1987
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
358✔
1988
  taosArrayDestroy(pDeploy->readerTasks);
358✔
1989
  if (pDeploy->triggerTask) {
358✔
1990
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
285✔
1991
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
285✔
1992
    taosMemoryFree(pDeploy->triggerTask);
285!
1993
  }
1994

1995
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
358✔
1996
  for (int32_t i = 0; i < runnerNum; ++i) {
1,279✔
1997
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
921✔
1998
    taosMemoryFree(pRunner->msg.runner.pPlan);
921!
1999
  }
2000
  taosArrayDestroy(pDeploy->runnerTasks);
358✔
2001
}
2002

2003
void tDeepFreeSStmStreamDeploy(void* param) {
716✔
2004
  if (NULL == param) {
716!
2005
    return;
×
2006
  }
2007
  
2008
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
716✔
2009
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
716✔
2010
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
716✔
2011
  taosMemoryFree(pDeploy->triggerTask);
716!
2012
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
716✔
2013
}
2014

2015

2016
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
68,326✔
2017
  if (NULL == pRsp) {
68,326!
2018
    return;
×
2019
  }
2020
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
68,326✔
2021
  taosArrayDestroy(pRsp->start.taskList);
68,326✔
2022
  taosArrayDestroy(pRsp->undeploy.taskList);
68,326✔
2023
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
68,326✔
2024
}
2025

2026
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
33,791✔
2027
  if (NULL == pRsp) {
33,791!
2028
    return;
×
2029
  }
2030
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
33,791✔
2031
  taosArrayDestroy(pRsp->start.taskList);
33,791✔
2032
  taosArrayDestroy(pRsp->undeploy.taskList);
33,791✔
2033
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
33,791✔
2034
}
2035

2036

2037

2038
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
33,791✔
2039
  int32_t code = 0;
33,791✔
2040
  int32_t lino;
2041

2042
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
33,791!
2043
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
67,582!
2044
  int32_t deployNum = 0;
33,791✔
2045
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
33,791!
2046
  if (deployNum > 0) {
33,791✔
2047
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
107✔
2048
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
107!
2049
  }
2050
  for (int32_t i = 0; i < deployNum; ++i) {
34,149✔
2051
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
358✔
2052
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
358!
2053
  }
2054

2055
  int32_t startNum = 0;
33,791✔
2056
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
33,791!
2057
  if (startNum > 0) {
33,791✔
2058
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
171✔
2059
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
171!
2060
  }
2061
  for (int32_t i = 0; i < startNum; ++i) {
34,127✔
2062
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
336✔
2063
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
336!
2064
  }
2065

2066
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
67,582!
2067
  if (!pRsp->undeploy.undeployAll) {
33,791!
2068
    int32_t undeployNum = 0;
33,791✔
2069
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
33,791!
2070
    if (undeployNum > 0) {
33,791✔
2071
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
66✔
2072
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
66!
2073
    }
2074
    for (int32_t i = 0; i < undeployNum; ++i) {
33,971✔
2075
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
180✔
2076
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
180!
2077
    }
2078
  }  
2079

2080
  int32_t rspNum = 0;
33,791✔
2081
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
33,791!
2082
  if (rspNum > 0) {
33,791✔
2083
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
30✔
2084
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
30!
2085
    for (int32_t i = 0; i < rspNum; ++i) {
91✔
2086
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
61✔
2087
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
61!
2088
    }
2089
  }
2090

2091
  tEndDecode(pDecoder);
33,791✔
2092

2093
_exit:
33,791✔
2094
  return code;
33,791✔
2095
}
2096

2097
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
2098
  int32_t code = 0;
×
2099
  int32_t lino;
2100

2101
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2102
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2103
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
2104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
2105
  tEndEncode(pEncoder);
×
2106

2107
_exit:
×
2108
  return code;
×
2109
}
2110

2111
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
2112
  int32_t code = 0;
×
2113
  int32_t lino;
2114

2115
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2116
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2117
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
2118
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
2119
  tEndDecode(pDecoder);
×
2120

2121
_exit:
×
2122
  return code;
×
2123
}
2124

2125
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
2126
  int32_t code = 0;
×
2127
  int32_t lino;
2128

2129
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2130
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2131
  tEndEncode(pEncoder);
×
2132

2133
_exit:
×
2134
  return code;
×
2135
}
2136

2137
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
2138
  int32_t code = 0;
×
2139
  int32_t lino;
2140

2141
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2142
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2143
  tEndDecode(pDecoder);
×
2144

2145
_exit:
×
2146
  return code;
×
2147

2148
}
2149

2150

2151
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
2,558✔
2152
  int32_t code = 0;
2,558✔
2153
  int32_t lino;
2154

2155
  // name part
2156
  int32_t sqlLen = pReq->sql == NULL ? 0 : (int32_t)strlen(pReq->sql) + 1;
2,558!
2157
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
2,558!
2158
  int32_t outDbLen = pReq->outDB == NULL ? 0 : (int32_t)strlen(pReq->outDB) + 1;
2,558✔
2159
  int32_t streamDBLen = pReq->streamDB == NULL ? 0 : (int32_t)strlen(pReq->streamDB) + 1;
2,558!
2160
  int32_t triggerDBLen = pReq->triggerDB == NULL ? 0 : (int32_t)strlen(pReq->triggerDB) + 1;
2,558✔
2161
  int32_t triggerTblNameLen = pReq->triggerTblName == NULL ? 0 : (int32_t)strlen(pReq->triggerTblName) + 1;
2,558✔
2162
  int32_t outTblNameLen = pReq->outTblName == NULL ? 0 : (int32_t)strlen(pReq->outTblName) + 1;
2,558✔
2163

2164
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
5,116!
2165

2166
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->name, nameLen));
5,116!
2167
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->sql, sqlLen));
5,116!
2168
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outDB, outDbLen));
5,116!
2169
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->streamDB, streamDBLen));
5,116!
2170
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerDB, triggerDBLen));
5,116!
2171
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerTblName, triggerTblNameLen));
5,116!
2172
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outTblName, outTblNameLen));
5,116!
2173

2174
  int32_t calcDbSize = (int32_t)taosArrayGetSize(pReq->calcDB);
2,558✔
2175
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcDbSize));
2,558!
2176
  for (int32_t i = 0; i < calcDbSize; ++i) {
5,112✔
2177
    const char *dbName = taosArrayGetP(pReq->calcDB, i);
2,554✔
2178
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, dbName)));
2,554!
2179
  }
2180

2181
  // trigger control part
2182
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igExists));
5,116!
2183
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerType));
5,116!
2184
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igDisorder));
5,116!
2185
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteReCalc));
5,116!
2186
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteOutTbl));
5,116!
2187
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistory));
5,116!
2188
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistoryFirst));
5,116!
2189
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->calcNotifyOnly));
5,116!
2190
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->lowLatencyCalc));
5,116!
2191
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igNoDataTrigger));
5,116!
2192

2193
  // notify part
2194
  int32_t addrSize = (int32_t)taosArrayGetSize(pReq->pNotifyAddrUrls);
2,558✔
2195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
2,558!
2196
  for (int32_t i = 0; i < addrSize; ++i) {
2,738✔
2197
    const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
180✔
2198
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, url)));
180!
2199
  }
2200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyEventTypes));
5,116!
2201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyErrorHandle));
5,116!
2202
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->notifyHistory));
5,116!
2203

2204
  // out table part
2205

2206
  // trigger cols and partition cols
2207
  int32_t filterColsLen = pReq->triggerFilterCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerFilterCols) + 1;
2,558✔
2208
  int32_t triggerColsLen = pReq->triggerCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerCols) + 1;
2,558✔
2209
  int32_t partitionColsLen = pReq->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pReq->partitionCols) + 1;
2,558✔
2210
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerFilterCols, filterColsLen));
5,116!
2211
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerCols, triggerColsLen));
5,116!
2212
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->partitionCols, partitionColsLen));
5,116!
2213

2214
  // out col
2215
  int32_t outColSize = (int32_t )taosArrayGetSize(pReq->outCols);
2,558✔
2216
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColSize));
2,558!
2217
  for (int32_t i = 0; i < outColSize; ++i) {
10,246✔
2218
    SFieldWithOptions *pField = taosArrayGet(pReq->outCols, i);
7,688✔
2219
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pField));
7,688!
2220
  }
2221

2222
  // out tag
2223
  int32_t outTagSize = (int32_t )taosArrayGetSize(pReq->outTags);
2,558✔
2224
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagSize));
2,558!
2225
  for (int32_t i = 0; i < outTagSize; ++i) {
4,154✔
2226
    SField *pField = taosArrayGet(pReq->outTags, i);
1,596✔
2227
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->type));
3,192!
2228
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
3,192!
2229
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
3,192!
2230
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
3,192!
2231
  }
2232

2233
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->maxDelay));
5,116!
2234
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->fillHistoryStartTime));
5,116!
2235
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->watermark));
5,116!
2236
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->expiredTime));
5,116!
2237

2238
  switch (pReq->triggerType) {
2,558!
2239
    case WINDOW_TYPE_SESSION: {
72✔
2240
      // session trigger
2241
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.session.slotId));
144!
2242
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.session.sessionVal));
144!
2243
      break;
72✔
2244
    }
2245
    case WINDOW_TYPE_STATE: {
572✔
2246
      // state trigger
2247
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.stateWin.slotId));
1,144!
2248
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.stateWin.trueForDuration));
1,144!
2249
      break;
572✔
2250
    }
2251
    case WINDOW_TYPE_INTERVAL: {
1,478✔
2252
      // slide trigger
2253
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.intervalUnit));
2,956!
2254
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.slidingUnit));
2,956!
2255
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.offsetUnit));
2,956!
2256
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.soffsetUnit));
2,956!
2257
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.precision));
2,956!
2258
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.interval));
2,956!
2259
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.offset));
2,956!
2260
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.sliding));
2,956!
2261
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.soffset));
2,956!
2262
      break;
1,478✔
2263
    }
2264
    case WINDOW_TYPE_EVENT: {
124✔
2265
      // event trigger
2266
      int32_t eventWindowStartCondLen = pReq->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.startCond) + 1;
124!
2267
      int32_t eventWindowEndCondLen = pReq->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.endCond) + 1;
124!
2268

2269
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.startCond, eventWindowStartCondLen));
248!
2270
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.endCond, eventWindowEndCondLen));
248!
2271

2272
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.event.trueForDuration));
248!
2273
      break;
124✔
2274
    }
2275
    case WINDOW_TYPE_COUNT: {
148✔
2276
      // count trigger
2277
      int32_t countWindowCondColsLen = pReq->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.count.condCols) + 1;
148!
2278
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.count.condCols, countWindowCondColsLen));
296!
2279

2280
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.countVal));
296!
2281
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.sliding));
296!
2282
      break;
148✔
2283
    }
2284
    case WINDOW_TYPE_PERIOD: {
164✔
2285
      // period trigger
2286
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.precision));
328!
2287
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.periodUnit));
328!
2288
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.offsetUnit));
328!
2289
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.period));
328!
2290
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.offset));
328!
2291
      break;
164✔
2292
    }
2293
  }
2294

2295
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerTblType));
5,116!
2296
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblUid));
5,116!
2297
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblSuid));
5,116!
2298
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->vtableCalc));
5,116!
2299
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outTblType));
5,116!
2300
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outStbExists));
5,116!
2301
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->outStbUid));
5,116!
2302
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outStbSversion));
5,116!
2303
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->eventTypes));
5,116!
2304
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->flags));
5,116!
2305
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->tsmaId));
5,116!
2306
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->placeHolderBitmap));
5,116!
2307
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->calcTsSlotId));
5,116!
2308
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->triTsSlotId));
5,116!
2309

2310
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->triggerTblVgId));
5,116!
2311
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outTblVgId));
5,116!
2312

2313
  int32_t triggerScanPlanLen = pReq->triggerScanPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerScanPlan) + 1;
2,558✔
2314
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerScanPlan, triggerScanPlanLen));
5,116!
2315

2316
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerHasPF));
5,116!
2317
  int32_t triggerFilterLen = pReq->triggerPrevFilter == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerPrevFilter) + 1;
2,558✔
2318
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerPrevFilter, triggerFilterLen));
5,116!
2319

2320
  int32_t calcScanPlanListSize = (int32_t)taosArrayGetSize(pReq->calcScanPlanList);
2,558✔
2321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcScanPlanListSize));
2,558!
2322
  for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
5,272✔
2323
    SStreamCalcScan* pCalcScanPlan = (SStreamCalcScan*)taosArrayGet(pReq->calcScanPlanList, i);
2,714✔
2324
    int32_t          vgListSize = (int32_t)taosArrayGetSize(pCalcScanPlan->vgList);
2,714✔
2325
    int32_t          scanPlanLen = pCalcScanPlan->scanPlan == NULL ? 0 : (int32_t)strlen((char*)pCalcScanPlan->scanPlan) + 1;
2,714!
2326
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgListSize));
2,714!
2327
    for (int32_t j = 0; j < vgListSize; ++j) {
5,428✔
2328
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pCalcScanPlan->vgList, j)));
5,428!
2329
    }
2330
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pCalcScanPlan->readFromCache));
5,428!
2331
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCalcScanPlan->scanPlan, scanPlanLen));
5,428!
2332
  }
2333

2334
  int32_t calcPlanLen = pReq->calcPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->calcPlan) + 1;
2,558✔
2335
  int32_t subTblNameExprLen = pReq->subTblNameExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->subTblNameExpr) + 1;
2,558✔
2336
  int32_t tagValueExprLen = pReq->tagValueExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->tagValueExpr) + 1;
2,558✔
2337

2338
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfCalcSubplan));
5,116!
2339
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->calcPlan, calcPlanLen));
5,116!
2340
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->subTblNameExpr, subTblNameExprLen));
5,116!
2341
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->tagValueExpr, tagValueExprLen));
5,116!
2342

2343
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pReq->forceOutCols);
2,558✔
2344
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
2,558!
2345
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
2,818✔
2346
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pReq->forceOutCols, i);
260✔
2347
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
260!
2348

2349
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
520!
2350
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
520!
2351
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
520!
2352
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
520!
2353
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
520!
2354
  }
2355

2356
_exit:
2,558✔
2357

2358
  if (code) {
2,558!
2359
    return code;
×
2360
  }
2361
  
2362
  return 0;
2,558✔
2363
}
2364

2365
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
1,452✔
2366
  SEncoder encoder = {0};
1,452✔
2367
  tEncoderInit(&encoder, buf, bufLen);
1,452✔
2368
  int32_t code = 0;
1,452✔
2369
  int32_t lino;
2370

2371
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,452!
2372

2373
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
1,452!
2374

2375
  tEndEncode(&encoder);
1,452✔
2376

2377
_exit:
1,452✔
2378
  if (code) {
1,452!
2379
    tEncoderClear(&encoder);
×
2380
    return code;
×
2381
  } else {
2382
    int32_t tlen = encoder.pos;
1,452✔
2383
    tEncoderClear(&encoder);
1,452✔
2384
    return tlen;
1,452✔
2385
  }
2386
  return 0;
2387
}
2388

2389

2390
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
1,005✔
2391
  int32_t code = 0;
1,005✔
2392
  int32_t lino;
2393

2394
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
2,010!
2395

2396
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
2,010!
2397
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
2,010!
2398
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
2,010!
2399
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
2,010!
2400
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
2,010!
2401
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
2,010!
2402
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
2,010!
2403

2404
  int32_t calcDbSize = 0;
1,005✔
2405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
1,005!
2406
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
1,005✔
2407
  if (pReq->calcDB == NULL) {
1,005!
2408
    TAOS_CHECK_EXIT(terrno);
×
2409
  }
2410
  for (int32_t i = 0; i < calcDbSize; ++i) {
2,008✔
2411
    char *calcDb = NULL;
1,003✔
2412
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
1,003!
2413
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
1,003!
2414
    if (calcDb == NULL) {
1,003!
2415
      TAOS_CHECK_EXIT(terrno);
×
2416
    }
2417
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
2,006!
2418
      taosMemoryFree(calcDb);
×
2419
      TAOS_CHECK_EXIT(terrno);
×
2420
    }
2421
  }
2422

2423
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
2,010!
2424
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
2,010!
2425
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
2,010!
2426
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
2,010!
2427
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
2,010!
2428
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
2,010!
2429
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
2,010!
2430
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
2,010!
2431
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
2,010!
2432
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
2,010!
2433

2434
  int32_t addrSize = 0;
1,005✔
2435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
1,005!
2436
  if (addrSize > 0) {
1,005✔
2437
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
42✔
2438
    if (pReq->pNotifyAddrUrls == NULL) {
42!
2439
      TAOS_CHECK_EXIT(terrno);
×
2440
    }
2441
  }
2442
  for (int32_t i = 0; i < addrSize; ++i) {
1,062✔
2443
    char *url = NULL;
57✔
2444
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
57!
2445
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
57!
2446
    if (url == NULL) {
57!
2447
      TAOS_CHECK_EXIT(terrno);
×
2448
    }
2449
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
114!
2450
      taosMemoryFree(url);
×
2451
      TAOS_CHECK_EXIT(terrno);
×
2452
    }
2453
  }
2454
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
2,010!
2455
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyErrorHandle));
2,010!
2456
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
2,010!
2457

2458
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
2,010!
2459
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
2,010!
2460
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
2,010!
2461

2462
  int32_t outColSize = 0;
1,005✔
2463
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
1,005!
2464
  if (outColSize > 0) {
1,005✔
2465
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
1,003✔
2466
    if (pReq->outCols == NULL) {
1,003!
2467
      TAOS_CHECK_EXIT(terrno);
×
2468
    }
2469

2470
    for (int32_t i = 0; i < outColSize; ++i) {
4,379✔
2471
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
3,376✔
2472
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
3,376!
2473
    }
2474
  }
2475

2476
  int32_t outTagSize = 0;
1,005✔
2477
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
1,005!
2478
  if (outTagSize > 0) {
1,005✔
2479
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
450✔
2480
    if (pReq->outTags == NULL) {
450!
2481
      TAOS_CHECK_EXIT(terrno);
×
2482
    }
2483

2484
    for (int32_t i = 0; i < outTagSize; ++i) {
1,190✔
2485
      SFieldWithOptions field = {0};
740✔
2486
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
740!
2487
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
740!
2488
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
740!
2489
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
740!
2490
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
1,480!
2491
        TAOS_CHECK_EXIT(terrno);
×
2492
      }
2493
    }
2494
  }
2495

2496
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
2,010!
2497
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
2,010!
2498
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
2,010!
2499
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
2,010!
2500

2501
  switch (pReq->triggerType) {
1,005!
2502
    case WINDOW_TYPE_SESSION: {
24✔
2503
      // session trigger
2504
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
48!
2505
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
48!
2506
      break;
24✔
2507
    }
2508
      case WINDOW_TYPE_STATE: {
269✔
2509
        // state trigger
2510
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
538!
2511
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
538!
2512
        break;
269✔
2513
      }
2514
      case WINDOW_TYPE_INTERVAL: {
561✔
2515
        // slide trigger
2516
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
1,122!
2517
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
1,122!
2518
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
1,122!
2519
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
1,122!
2520
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
1,122!
2521
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
1,122!
2522
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
1,122!
2523
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
1,122!
2524
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
1,122!
2525
        break;
561✔
2526
      }
2527
      case WINDOW_TYPE_EVENT: {
50✔
2528
        // event trigger
2529
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
100!
2530
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
100!
2531
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
100!
2532
        break;
50✔
2533
      }
2534
      case WINDOW_TYPE_COUNT: {
51✔
2535
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
102!
2536

2537
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
102!
2538
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
102!
2539
        break;
51✔
2540
      }
2541
      case WINDOW_TYPE_PERIOD: {
50✔
2542
        // period trigger
2543
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
100!
2544
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
100!
2545
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
100!
2546
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
100!
2547
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
100!
2548
        break;
50✔
2549
      }
2550
      default:
×
2551
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2552
  }
2553

2554
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
2,010!
2555
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
2,010!
2556
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
2,010!
2557
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
2,010!
2558
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
2,010!
2559
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
2,010!
2560
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
2,010!
2561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
2,010!
2562
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
2,010!
2563
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
2,010!
2564
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
2,010!
2565
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
2,010!
2566
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
2,010!
2567
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
2,010!
2568

2569
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
2,010!
2570
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
2,010!
2571

2572
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
2,010!
2573

2574
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
2,010!
2575
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
2,010!
2576

2577
  int32_t calcScanPlanListSize = 0;
1,005✔
2578
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
1,005!
2579
  if (calcScanPlanListSize > 0) {
1,005✔
2580
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
1,003✔
2581
    if (pReq->calcScanPlanList == NULL) {
1,003!
2582
      TAOS_CHECK_EXIT(terrno);
×
2583
    }
2584
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
2,106✔
2585
      SStreamCalcScan calcScan = {0};
1,103✔
2586
      int32_t         vgListSize = 0;
1,103✔
2587
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
1,103!
2588
      if (vgListSize > 0) {
1,103!
2589
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
1,103✔
2590
        if (calcScan.vgList == NULL) {
1,103!
2591
          TAOS_CHECK_EXIT(terrno);
×
2592
        }
2593
        for (int32_t j = 0; j < vgListSize; ++j) {
2,206✔
2594
          int32_t vgId = 0;
1,103✔
2595
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
1,103!
2596
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
2,206!
2597
            TAOS_CHECK_EXIT(terrno);
×
2598
          }
2599
        }
2600
      }
2601
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
1,103!
2602
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
1,103!
2603
      taosArrayPush(pReq->calcScanPlanList, &calcScan);
1,103✔
2604
    }
2605
  }
2606

2607
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
2,010!
2608
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
2,010!
2609
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
2,010!
2610
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
2,010!
2611

2612
  int32_t forceOutColsSize = 0;
1,005✔
2613
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
1,005!
2614
  if (forceOutColsSize > 0) {
1,005✔
2615
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
20✔
2616
    if (pReq->forceOutCols == NULL) {
20!
2617
      TAOS_CHECK_EXIT(terrno);
×
2618
    }
2619
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
150✔
2620
      SStreamOutCol outCol = {0};
130✔
2621
      int64_t       exprLen = 0;
130✔
2622
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
130!
2623
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
130!
2624
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
130!
2625
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
130!
2626
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
130!
2627
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
260!
2628
        TAOS_CHECK_EXIT(terrno);
×
2629
      }
2630
    }
2631
  }
2632

2633
_exit:
1,005✔
2634

2635
  return code;
1,005✔
2636
}
2637

2638

2639
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
694✔
2640
  SDecoder decoder = {0};
694✔
2641
  tDecoderInit(&decoder, buf, bufLen);
694✔
2642
  int32_t code = 0;
694✔
2643
  int32_t lino;
2644

2645
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
694!
2646
  
2647
  TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImpl(&decoder, pReq));
694!
2648

2649
  tEndDecode(&decoder);
694✔
2650

2651
_exit:
694✔
2652

2653
  tDecoderClear(&decoder);
694✔
2654
  return code;
694✔
2655
}
2656

2657

2658
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
×
2659
  int32_t  code = 0;
×
2660
  int32_t  lino;
2661
  int32_t  tlen;
2662
  SEncoder encoder = {0};
×
2663
  tEncoderInit(&encoder, buf, bufLen);
×
2664

2665
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2666

2667
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2668
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2669
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2670

2671
  tEndEncode(&encoder);
×
2672

2673
_exit:
×
2674
  if (code) {
×
2675
    tlen = code;
×
2676
  } else {
2677
    tlen = encoder.pos;
×
2678
  }
2679
  tEncoderClear(&encoder);
×
2680
  return tlen;
×
2681
}
2682

2683
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
30✔
2684
  SDecoder decoder = {0};
30✔
2685
  int32_t  code = 0;
30✔
2686
  int32_t  lino;
2687
  tDecoderInit(&decoder, buf, bufLen);
30✔
2688

2689
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
30!
2690
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
60!
2691
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
60!
2692

2693
  tEndDecode(&decoder);
30✔
2694

2695
_exit:
30✔
2696
  tDecoderClear(&decoder);
30✔
2697
  return code;
30✔
2698
}
2699

2700
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
30✔
2701
  taosMemoryFreeClear(pReq->name);
30!
2702
}
30✔
2703

2704
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
2,227✔
2705
  if (pScan == NULL) {
2,227!
2706
    return;
×
2707
  }
2708
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
2,227✔
2709
  taosArrayDestroy(pCalcScan->vgList);
2,227✔
2710
  taosMemoryFreeClear(pCalcScan->scanPlan);
2,227!
2711
}
2712

2713
void tFreeStreamOutCol(void* pCol) {
195✔
2714
  if (pCol == NULL) {
195!
2715
    return;
×
2716
  }
2717
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
195✔
2718
  taosMemoryFreeClear(pOutCol->expr);
195!
2719
}
2720

2721

2722

2723
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
2,924✔
2724
  if (NULL == pReq) {
2,924✔
2725
    return;
331✔
2726
  }
2727
  taosMemoryFreeClear(pReq->name);
2,593!
2728
  taosMemoryFreeClear(pReq->sql);
2,593!
2729
  taosMemoryFreeClear(pReq->streamDB);
2,593!
2730
  taosMemoryFreeClear(pReq->triggerDB);
2,593!
2731
  taosMemoryFreeClear(pReq->outDB);
2,593!
2732
  taosMemoryFreeClear(pReq->triggerTblName);
2,593!
2733
  taosMemoryFreeClear(pReq->outTblName);
2,593!
2734

2735
  taosArrayDestroyP(pReq->calcDB, NULL);
2,593✔
2736
  pReq->calcDB = NULL;
2,593✔
2737
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
2,593✔
2738
  pReq->pNotifyAddrUrls = NULL;
2,593✔
2739

2740
  taosMemoryFreeClear(pReq->triggerFilterCols);
2,593!
2741
  taosMemoryFreeClear(pReq->triggerCols);
2,593!
2742
  taosMemoryFreeClear(pReq->partitionCols);
2,593!
2743

2744
  taosArrayDestroy(pReq->outTags);
2,593✔
2745
  pReq->outTags = NULL;
2,593✔
2746
  taosArrayDestroy(pReq->outCols);
2,593✔
2747
  pReq->outCols = NULL;
2,593✔
2748

2749
  switch (pReq->triggerType) {
2,593✔
2750
    case WINDOW_TYPE_EVENT:
105✔
2751
      taosMemoryFreeClear(pReq->trigger.event.startCond);
105!
2752
      taosMemoryFreeClear(pReq->trigger.event.endCond);
105!
2753
      break;
105✔
2754
    default:
2,488✔
2755
      break;
2,488✔
2756
  }
2757

2758
  taosMemoryFreeClear(pReq->triggerScanPlan);
2,593!
2759
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
2,593✔
2760
  pReq->calcScanPlanList = NULL;
2,593✔
2761
  taosMemoryFreeClear(pReq->triggerPrevFilter);
2,593!
2762

2763
  taosMemoryFreeClear(pReq->calcPlan);
2,593!
2764
  taosMemoryFreeClear(pReq->subTblNameExpr);
2,593!
2765
  taosMemoryFreeClear(pReq->tagValueExpr);
2,593!
2766
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
2,593✔
2767
  pReq->forceOutCols = NULL;
2,593✔
2768
}
2769

2770
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
259✔
2771
  int32_t code = 0, lino = 0;
259✔
2772
  if (NULL == pSrc) {
259!
2773
    return code;
×
2774
  } 
2775

2776
  void* p = NULL;
259✔
2777
  int32_t num = 0;
259✔
2778
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
259!
2779
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
259!
2780

2781
  SCMCreateStreamReq* pDst = *ppDst;
259✔
2782

2783
  if (pSrc->outDB) {
259✔
2784
    pDst->outDB = COPY_STR(pSrc->outDB);
258!
2785
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
258!
2786
  }
2787
  
2788
  if (pSrc->triggerTblName) {
259✔
2789
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
258!
2790
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
258!
2791
  }
2792
  
2793
  if (pSrc->outTblName) {
259✔
2794
    pDst->outTblName = COPY_STR(pSrc->outTblName);
258!
2795
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
258!
2796
  }
2797
  
2798
  if (pSrc->pNotifyAddrUrls) {
259✔
2799
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
12✔
2800
    if (num > 0) {
12!
2801
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
12✔
2802
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
12!
2803
    }
2804
    for (int32_t i = 0; i < num; ++i) {
24✔
2805
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
12!
2806
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
12!
2807
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
24!
2808
    }
2809
  }
2810
  
2811
  if (pSrc->triggerFilterCols) {
259✔
2812
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
36!
2813
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
36!
2814
  }
2815
  
2816
  if (pSrc->triggerCols) {
259✔
2817
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
251!
2818
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
251!
2819
  }
2820
  
2821
  if (pSrc->partitionCols) {
259✔
2822
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
150!
2823
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
150!
2824
  }
2825
  
2826
  if (pSrc->outCols) {
259✔
2827
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
258✔
2828
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
258!
2829
  }
2830
  
2831
  if (pSrc->outTags) {
259✔
2832
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
150✔
2833
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
150!
2834
  }
2835

2836
  pDst->triggerType = pSrc->triggerType;
259✔
2837
  
2838
  switch (pSrc->triggerType) {
259✔
2839
    case WINDOW_TYPE_EVENT:
19✔
2840
      if (pSrc->trigger.event.startCond) {
19!
2841
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
19!
2842
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
19!
2843
      }
2844
      
2845
      if (pSrc->trigger.event.endCond) {
19!
2846
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
19!
2847
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
19!
2848
      }
2849
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
19✔
2850
      break;
19✔
2851
    default:
240✔
2852
      pDst->trigger = pSrc->trigger;
240✔
2853
      break;
240✔
2854
  }
2855

2856

2857
  if (pSrc->triggerScanPlan) {
259✔
2858
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
258!
2859
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
258!
2860
  }
2861
  
2862
  if (pSrc->calcScanPlanList) {
259✔
2863
    num = taosArrayGetSize(pSrc->calcScanPlanList);
258✔
2864
    if (num > 0) {
258!
2865
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
258✔
2866
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
258!
2867
    }
2868
    for (int32_t i = 0; i < num; ++i) {
551✔
2869
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
293✔
2870
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
293✔
2871

2872
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
293✔
2873
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
293!
2874

2875
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
293!
2876
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
293!
2877
      
2878
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
586!
2879
    }
2880
  }
2881
  
2882
  if (pSrc->triggerPrevFilter) {
259✔
2883
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
36!
2884
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
36!
2885
  }
2886
  
2887
  if (pSrc->calcPlan) {
259✔
2888
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
258!
2889
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
258!
2890
  }
2891
  
2892
  if (pSrc->subTblNameExpr) {
259✔
2893
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
150!
2894
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
150!
2895
  }
2896
  
2897
  if (pSrc->tagValueExpr) {
259✔
2898
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
150!
2899
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
150!
2900
  }
2901
  
2902
  if (pSrc->forceOutCols) {
259✔
2903
    num = taosArrayGetSize(pSrc->forceOutCols);
10✔
2904
    if (num > 0) {
10!
2905
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
10✔
2906
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
10!
2907
    }
2908
    for (int32_t i = 0; i < num; ++i) {
75✔
2909
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
65✔
2910
      SStreamOutCol  dcol = {.type = scol->type};
65✔
2911

2912
      dcol.expr = COPY_STR(scol->expr);
65!
2913
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
65!
2914
      
2915
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
130!
2916
    }
2917
  }
2918

2919
  pDst->triggerTblUid = pSrc->triggerTblUid;
259✔
2920
  pDst->triggerTblType = pSrc->triggerTblType;
259✔
2921
  pDst->deleteReCalc = pSrc->deleteReCalc;
259✔
2922
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
259✔
2923
  
2924
_exit:
259✔
2925

2926
  if (code) {
259!
2927
    tFreeSCMCreateStreamReq(pDst);
×
2928
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2929
  }
2930

2931
  return code;
259✔
2932
}
2933

2934

2935
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
×
2936
  int32_t  code = 0;
×
2937
  int32_t  lino;
2938
  int32_t  tlen;
2939
  SEncoder encoder = {0};
×
2940
  tEncoderInit(&encoder, buf, bufLen);
×
2941
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2942

2943
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2944
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2945
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2946
  tEndEncode(&encoder);
×
2947

2948
_exit:
×
2949
  if (code) {
×
2950
    tlen = code;
×
2951
  } else {
2952
    tlen = encoder.pos;
×
2953
  }
2954
  tEncoderClear(&encoder);
×
2955
  return tlen;
×
2956
}
2957

2958
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
22✔
2959
  SDecoder decoder = {0};
22✔
2960
  int32_t  code = 0;
22✔
2961
  int32_t  lino;
2962

2963
  tDecoderInit(&decoder, buf, bufLen);
22✔
2964
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
22!
2965
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
44!
2966
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
44!
2967
  tEndDecode(&decoder);
22✔
2968

2969
_exit:
22✔
2970
  tDecoderClear(&decoder);
22✔
2971
  return code;
22✔
2972
}
2973

2974
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
×
2975
  taosMemoryFreeClear(pReq->name);
×
2976
}
×
2977

2978
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
×
2979
  SEncoder encoder = {0};
×
2980
  int32_t  code = 0;
×
2981
  int32_t  lino;
2982
  int32_t  tlen;
2983
  tEncoderInit(&encoder, buf, bufLen);
×
2984
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2985
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2986
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2987
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2988
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
×
2989
  tEndEncode(&encoder);
×
2990

2991
_exit:
×
2992
  if (code) {
×
2993
    tlen = code;
×
2994
  } else {
2995
    tlen = encoder.pos;
×
2996
  }
2997
  tEncoderClear(&encoder);
×
2998
  return tlen;
×
2999
}
3000

3001
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
22✔
3002
  SDecoder decoder = {0};
22✔
3003
  int32_t  code = 0;
22✔
3004
  int32_t  lino;
3005

3006
  tDecoderInit(&decoder, buf, bufLen);
22✔
3007
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
22!
3008
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
44!
3009
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
44!
3010
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
44!
3011
  tEndDecode(&decoder);
22✔
3012

3013
_exit:
22✔
3014
  tDecoderClear(&decoder);
22✔
3015
  return code;
22✔
3016
}
3017

3018
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
×
3019
  taosMemoryFreeClear(pReq->name);
×
3020
}
×
3021

3022
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
×
3023
  SEncoder encoder = {0};
×
3024
  int32_t  code = 0;
×
3025
  int32_t  lino;
3026
  int32_t  tlen;
3027
  tEncoderInit(&encoder, buf, bufLen);
×
3028
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3029
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
3030
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
3031
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
×
3032
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
×
3033
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
×
3034
  tEndEncode(&encoder);
×
3035

3036
_exit:
×
3037
  if (code) {
×
3038
    tlen = code;
×
3039
  } else {
3040
    tlen = encoder.pos;
×
3041
  }
3042
  tEncoderClear(&encoder);
×
3043
  return tlen;
×
3044
}
3045

3046
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
26✔
3047
  SDecoder decoder = {0};
26✔
3048
  int32_t  code = 0;
26✔
3049
  int32_t  lino;
3050

3051
  tDecoderInit(&decoder, buf, bufLen);
26✔
3052
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
26!
3053

3054
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
52!
3055
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
52!
3056
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
52!
3057
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
52!
3058
  tEndDecode(&decoder);
26✔
3059

3060
_exit:
26✔
3061
  tDecoderClear(&decoder);
26✔
3062
  return code;
26✔
3063
}
3064

3065
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
26✔
3066
  taosMemoryFreeClear(pReq->name);
26!
3067
}
26✔
3068

3069
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
×
3070
  int32_t code = 0;
×
3071
  int32_t lino;
3072

3073
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
3074
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
×
3075
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
×
3076

3077
_exit:
×
3078
  return code;
×
3079
}
3080

3081
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
×
3082
  SEncoder encoder = {0};
×
3083
  int32_t  code = 0;
×
3084
  int32_t  lino;
3085
  int32_t  tlen;
3086
  tEncoderInit(&encoder, buf, bufLen);
×
3087

3088
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3089
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
×
3090

3091
  tEndEncode(&encoder);
×
3092

3093
_exit:
×
3094
  if (code) {
×
3095
    tlen = code;
×
3096
  } else {
3097
    tlen = encoder.pos;
×
3098
  }
3099
  tEncoderClear(&encoder);
×
3100
  return tlen;
×
3101
}
3102

3103
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
×
3104
  int32_t code = 0;
×
3105
  int32_t lino;
3106

3107
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
3108
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
×
3109
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
×
3110

3111
_exit:
×
3112
  return code;
×
3113
}
3114

3115
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
×
3116
  SDecoder decoder = {0};
×
3117
  int32_t  code = 0;
×
3118
  int32_t  lino;
3119

3120
  tDecoderInit(&decoder, (char *)buf, bufLen);
×
3121

3122
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3123
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
×
3124

3125
  tEndDecode(&decoder);
×
3126

3127
_exit:
×
3128
  tDecoderClear(&decoder);
×
3129
  return code;
×
3130
}
3131

3132
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
×
3133
  int32_t code = 0;
×
3134
  int32_t lino;
3135

3136
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
3137
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
×
3138
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
×
3139
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
×
3140

3141
_exit:
×
3142
  return code;
×
3143
}
3144

3145
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
×
3146
  SEncoder encoder = {0};
×
3147
  int32_t  code = 0;
×
3148
  int32_t  lino;
3149
  int32_t  tlen;
3150
  tEncoderInit(&encoder, buf, bufLen);
×
3151

3152
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3153
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
×
3154

3155
  tEndEncode(&encoder);
×
3156

3157
_exit:
×
3158
  if (code) {
×
3159
    tlen = code;
×
3160
  } else {
3161
    tlen = encoder.pos;
×
3162
  }
3163
  tEncoderClear(&encoder);
×
3164
  return tlen;
×
3165
}
3166

3167
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
×
3168
  int32_t code = 0;
×
3169
  int32_t lino;
3170

3171
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
3172
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
×
3173
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
×
3174
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
×
3175

3176
_exit:
×
3177
  return code;
×
3178
}
3179

3180
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
×
3181
  SDecoder decoder = {0};
×
3182
  int32_t  code = 0;
×
3183
  int32_t  lino;
3184

3185
  tDecoderInit(&decoder, buf, bufLen);
×
3186

3187
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3188
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
×
3189

3190
  tEndDecode(&decoder);
×
3191

3192
_exit:
×
3193
  tDecoderClear(&decoder);
×
3194
  return code;
×
3195
}
3196

3197
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
120✔
3198
  SEncoder encoder = {0};
120✔
3199
  int32_t  code = TSDB_CODE_SUCCESS;
120✔
3200
  int32_t  lino = 0;
120✔
3201
  int32_t  tlen = 0;
120✔
3202

3203
  tEncoderInit(&encoder, buf, bufLen);
120✔
3204
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
120!
3205

3206
  int32_t size = taosArrayGetSize(pRsp->cols);
120✔
3207
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
120!
3208
  for (int32_t i = 0; i < size; ++i) {
582✔
3209
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
462✔
3210
    if (oInfo == NULL) {
462!
3211
      uError("col id is NULL at index %d", i);
×
3212
      code = TSDB_CODE_INVALID_PARA;
×
3213
      goto _exit;
×
3214
    }
3215
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
924!
3216
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
924!
3217
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
924!
3218
  }
3219

3220
  tEndEncode(&encoder);
120✔
3221

3222
_exit:
120✔
3223
  if (code != TSDB_CODE_SUCCESS) {
120!
3224
    tlen = code;
×
3225
  } else {
3226
    tlen = encoder.pos;
120✔
3227
  }
3228
  tEncoderClear(&encoder);
120✔
3229
  return tlen;
120✔
3230
}
3231

3232
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
60✔
3233
  SDecoder decoder = {0};
60✔
3234
  int32_t  code = TSDB_CODE_SUCCESS;
60✔
3235
  int32_t  lino = 0;
60✔
3236

3237
  tDecoderInit(&decoder, buf, bufLen);
60✔
3238
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
60!
3239

3240
  int32_t size = 0;
60✔
3241
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
60!
3242
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
60✔
3243
  if (pRsp->cols == NULL) {
60!
3244
    code = terrno;
×
3245
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3246
    goto _exit;
×
3247
  }
3248
  for (int32_t i = 0; i < size; ++i) {
291✔
3249
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
231✔
3250
    if (oInfo == NULL) {
231!
3251
      code = terrno;
×
3252
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3253
      goto _exit;
×
3254
    }
3255
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
462!
3256
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
462!
3257
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
462!
3258
  }
3259

3260
  tEndDecode(&decoder);
60✔
3261

3262
_exit:
60✔
3263
  tDecoderClear(&decoder);
60✔
3264
  return code;
60✔
3265
}
3266

3267
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
17,259✔
3268
  taosArrayDestroy(pRsp->cols);
17,259✔
3269
}
17,260✔
3270

3271
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
47,483✔
3272
  if (pReq == NULL) return;
47,483!
3273
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA) {
47,483✔
3274
    SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
2,254✔
3275
    if (pRequest->cids != NULL) {
2,254!
3276
      taosArrayDestroy(pRequest->cids);
2,256✔
3277
      pRequest->cids = NULL;
2,257✔
3278
    }
3279
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
45,229✔
3280
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
268✔
3281
    if (pRequest->cids != NULL) {
268!
3282
      taosArrayDestroy(pRequest->cids);
268✔
3283
      pRequest->cids = NULL;
268✔
3284
    }
3285
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
44,961✔
3286
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
192✔
3287
    if (pRequest->cids != NULL) {
192!
3288
      taosArrayDestroy(pRequest->cids);
192✔
3289
      pRequest->cids = NULL;
192✔
3290
    }
3291
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
44,769✔
3292
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
642✔
3293
    if (pRequest->cids != NULL) {
642!
3294
      taosArrayDestroy(pRequest->cids);
642✔
3295
      pRequest->cids = NULL;
642✔
3296
    }
3297
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
44,127✔
3298
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
61✔
3299
    if (pRequest->cols != NULL) {
61!
3300
      taosArrayDestroy(pRequest->cols);
61✔
3301
      pRequest->cols = NULL;
61✔
3302
    }
3303
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
44,066✔
3304
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
60✔
3305
    if (pRequest->uids != NULL) {
60!
3306
      taosArrayDestroy(pRequest->uids);
×
3307
      pRequest->uids = NULL;
×
3308
    }
3309
  }
3310
}
3311

3312
int32_t encodeColsArray(SEncoder* encoder, SArray* cids) {
17,624✔
3313
  int32_t  code = TSDB_CODE_SUCCESS;
17,624✔
3314
  int32_t  lino = 0;
17,624✔
3315
  int32_t size = taosArrayGetSize(cids);
17,624✔
3316
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
17,622!
3317
  for (int32_t i = 0; i < size; ++i) {
35,259✔
3318
    col_id_t* pColId = taosArrayGet(cids, i);
17,641✔
3319
    if (pColId == NULL) {
17,638✔
3320
      uError("col id is NULL at index %d", i);
1!
3321
      code = TSDB_CODE_INVALID_PARA;
×
3322
      goto _exit;
×
3323
    }
3324
    TAOS_CHECK_EXIT(tEncodeI16(encoder, *pColId));
35,274!
3325
  }
3326
  _exit:
17,618✔
3327

3328
  return code;
17,618✔
3329
}
3330

3331
int32_t decodeColsArray(SDecoder* decoder, SArray** cids) {
8,807✔
3332
  int32_t code = TSDB_CODE_SUCCESS;
8,807✔
3333
  int32_t lino = 0;
8,807✔
3334
  int32_t size = 0;
8,807✔
3335

3336
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
8,806!
3337
  if (size > 0){
8,806✔
3338
    *cids = taosArrayInit(size, sizeof(col_id_t));
3,355✔
3339
    if (*cids == NULL) {
3,358!
3340
      code = terrno;
×
3341
      uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3342
      goto _exit;
×
3343
    }
3344
  
3345
    for (int32_t i = 0; i < size; ++i) {
12,180✔
3346
      col_id_t* pColId = taosArrayReserve(*cids, 1);
8,821✔
3347
      if (pColId == NULL) {
8,822!
3348
        code = terrno;
×
3349
        uError("failed to reserve memory for col id at index %d, errno: %d", i, code);
×
3350
        goto _exit;
×
3351
      }
3352
      TAOS_CHECK_RETURN(tDecodeI16(decoder, pColId));
8,822!
3353
    }  
3354
  }
3355
  
3356
_exit:
8,810✔
3357
  if (code != TSDB_CODE_SUCCESS) {
8,810!
3358
    taosArrayDestroy(*cids);
×
3359
    *cids = NULL;
×
3360
  }
3361
  return code;
8,810✔
3362
}
3363

3364
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
95,474✔
3365
  SEncoder encoder = {0};
95,474✔
3366
  int32_t  code = TSDB_CODE_SUCCESS;
95,474✔
3367
  int32_t  lino = 0;
95,474✔
3368
  int32_t  tlen = 0;
95,474✔
3369

3370
  tEncoderInit(&encoder, buf, bufLen);
95,474✔
3371
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
95,461!
3372

3373
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
190,890!
3374
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
190,890!
3375
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
190,890!
3376
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
190,890!
3377

3378
  switch (pReq->type) {
95,445!
3379
    case STRIGGER_PULL_SET_TABLE: {
120✔
3380
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
120✔
3381
      int32_t size = taosArrayGetSize(pRequest->uids);
120✔
3382
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
120!
3383
      for (int32_t i = 0; i < size; ++i) {
354✔
3384
        int64_t* uids = taosArrayGet(pRequest->uids, i);
234✔
3385
        if (uids == NULL) {
234!
3386
          uError("uid is NULL at index %d", i);
×
3387
          code = TSDB_CODE_INVALID_PARA;
×
3388
          goto _exit;
×
3389
        }
3390
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[0]));
468!
3391
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[1]));
468!
3392
      }
3393
      break;
120✔
3394
    }
3395
    case STRIGGER_PULL_LAST_TS: {
685✔
3396
      break;
685✔
3397
    }
3398
    case STRIGGER_PULL_FIRST_TS: {
741✔
3399
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
741✔
3400
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
1,482!
3401
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,482!
3402
      break;
741✔
3403
    }
3404
    case STRIGGER_PULL_TSDB_META: {
1,272✔
3405
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
1,272✔
3406
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
2,544!
3407
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
2,544!
3408
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,544!
3409
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
2,544!
3410
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,544!
3411
      break;
1,272✔
3412
    }
3413
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3414
      break;
×
3415
    }
3416
    case STRIGGER_PULL_TSDB_TS_DATA: {
92✔
3417
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
92✔
3418
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
184!
3419
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
184!
3420
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
184!
3421
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
184!
3422
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
184!
3423
      break;
92✔
3424
    }
3425
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
328✔
3426
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
328✔
3427
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
656!
3428
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
656!
3429
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
656!
3430
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
656!
3431
      break;
328✔
3432
    }
3433
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
352✔
3434
      break;
352✔
3435
    }
3436
    case STRIGGER_PULL_TSDB_CALC_DATA: {
56,330✔
3437
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
56,330✔
3438
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
112,660!
3439
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
112,660!
3440
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
112,660!
3441
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
112,660!
3442
      break;
56,330✔
3443
    }
3444
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3445
      break;
×
3446
    }
3447
    case STRIGGER_PULL_TSDB_DATA: {
536✔
3448
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
536✔
3449
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
1,072!
3450
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
1,072!
3451
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
1,072!
3452
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
1,072!
3453
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
536!
3454
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
1,072!
3455
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,072!
3456
      break;
536✔
3457
    }
3458
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3459
      break;
×
3460
    }
3461
    case STRIGGER_PULL_WAL_META: {
16,616✔
3462
      SSTriggerWalMetaRequest* pRequest = (SSTriggerWalMetaRequest*)pReq;
16,616✔
3463
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
33,232!
3464
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
33,232!
3465
      break;
16,616✔
3466
    }
3467
    case STRIGGER_PULL_WAL_TS_DATA:
15,418✔
3468
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3469
    case STRIGGER_PULL_WAL_CALC_DATA:
3470
    case STRIGGER_PULL_WAL_DATA: {
3471
      SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
15,418✔
3472
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
30,836!
3473
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
30,836!
3474
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
30,836!
3475
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
30,836!
3476
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
15,418!
3477
      break;
15,415✔
3478
    }
3479
    case STRIGGER_PULL_GROUP_COL_VALUE: {
1,184✔
3480
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
1,184✔
3481
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,368!
3482
      break;
1,184✔
3483
    }
3484
    case STRIGGER_PULL_VTABLE_INFO: {
384✔
3485
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
384✔
3486
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
384!
3487
      break;
384✔
3488
    }
3489
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
1,284✔
3490
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
1,284✔
3491
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
2,568!
3492
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
1,284!
3493
      break;
1,284✔
3494
    }
3495
    case STRIGGER_PULL_OTABLE_INFO: {
121✔
3496
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
121✔
3497
      int32_t size = taosArrayGetSize(pRequest->cols);
121✔
3498
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
121!
3499
      for (int32_t i = 0; i < size; ++i) {
591✔
3500
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
471✔
3501
        if (oInfo == NULL) {
471✔
3502
          uError("col id is NULL at index %d", i);
1!
3503
          code = TSDB_CODE_INVALID_PARA;
×
3504
          goto _exit;
×
3505
        }
3506
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
940!
3507
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
940!
3508
      }
3509
      break; 
120✔
3510
    }
3511
    default: {
×
3512
      uError("unknown pull type %d", pReq->type);
×
3513
      code = TSDB_CODE_INVALID_PARA;
×
3514
      break;
×
3515
    }
3516
  }
3517

3518
  tEndEncode(&encoder);
95,459✔
3519

3520
_exit:
95,461✔
3521
  if (code != TSDB_CODE_SUCCESS) {
95,461!
3522
    tlen = code;
×
3523
  } else {
3524
    tlen = encoder.pos;
95,461✔
3525
  }
3526
  tEncoderClear(&encoder);
95,461✔
3527
  return tlen;
95,480✔
3528
}
3529

3530

3531
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
47,488✔
3532
  SDecoder decoder = {0};
47,488✔
3533
  int32_t  code = TSDB_CODE_SUCCESS;
47,488✔
3534
  int32_t  lino = 0;
47,488✔
3535

3536
  tDecoderInit(&decoder, buf, bufLen);
47,488✔
3537
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
47,488!
3538

3539
  int32_t type = 0;
47,499✔
3540
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
47,496!
3541
  SSTriggerPullRequest* pBase = &(pReq->base);
47,496✔
3542
  pBase->type = type;
47,496✔
3543
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
94,989!
3544
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
94,977!
3545
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
94,973!
3546

3547
  switch (type) {
47,489!
3548
    case STRIGGER_PULL_SET_TABLE: {
60✔
3549
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
60✔
3550
      int32_t size = 0;
60✔
3551
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
60!
3552
      pRequest->uids = taosArrayInit(size, 2 * sizeof(int64_t));
60✔
3553
      if (pRequest->uids == NULL) {
60!
3554
        code = terrno;
×
3555
        uError("failed to allocate memory for uids, size: %d, errno: %d", size, code);
×
3556
        goto _exit;
×
3557
      }
3558
      for (int32_t i = 0; i < size; ++i) {
177✔
3559
        int64_t* uid = taosArrayReserve(pRequest->uids, 1);
117✔
3560
        if (uid == NULL) {
117!
3561
          code = terrno;
×
3562
          uError("failed to reserve memory for uid, size: %d, errno: %d", size, code);
×
3563
          goto _exit;
×
3564
        }
3565
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid));
117!
3566
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid + 1));
234!
3567
      }
3568
      break;
60✔
3569
    }
3570
    case STRIGGER_PULL_LAST_TS: {
343✔
3571
      break;
343✔
3572
    }
3573
    case STRIGGER_PULL_FIRST_TS: {
365✔
3574
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
365✔
3575
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
730!
3576
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
730!
3577
      break;
365✔
3578
    }
3579
    case STRIGGER_PULL_TSDB_META: {
636✔
3580
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
636✔
3581
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
1,272!
3582
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
1,272!
3583
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,272!
3584
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,272!
3585
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,272!
3586
      break;
636✔
3587
    }
3588
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3589
      break;
×
3590
    }
3591
    case STRIGGER_PULL_TSDB_TS_DATA: {
46✔
3592
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
46✔
3593
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
92!
3594
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
92!
3595
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
92!
3596
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
92!
3597
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
92!
3598
      break;
46✔
3599
    }
3600
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
163✔
3601
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
163✔
3602
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
327!
3603
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
328!
3604
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
328!
3605
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
328!
3606
      break;
164✔
3607
    }
3608
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
176✔
3609
      break;
176✔
3610
    }
3611
    case STRIGGER_PULL_TSDB_CALC_DATA: {
28,164✔
3612
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
28,164✔
3613
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
56,328!
3614
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
56,327!
3615
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
56,325!
3616
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
56,323!
3617
      break;
28,161✔
3618
    }
3619
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3620
      break;
×
3621
    }
3622
    case STRIGGER_PULL_TSDB_DATA: {
268✔
3623
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
268✔
3624
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
536!
3625
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
536!
3626
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
536!
3627
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
536!
3628
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
268!
3629
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
536!
3630
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
536!
3631
      break;
268✔
3632
    }
3633
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3634
      break;
×
3635
    }
3636
    case STRIGGER_PULL_WAL_META: {
8,067✔
3637
      SSTriggerWalMetaRequest* pRequest = &(pReq->walMetaReq);
8,067✔
3638
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
16,133!
3639
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
16,142!
3640
      break;
8,076✔
3641
    }
3642
    case STRIGGER_PULL_WAL_TS_DATA:
7,706✔
3643
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3644
    case STRIGGER_PULL_WAL_CALC_DATA:
3645
    case STRIGGER_PULL_WAL_DATA: {
3646
      SSTriggerWalDataRequest* pRequest = &(pReq->walDataReq);
7,706✔
3647
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
15,411!
3648
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
15,414!
3649
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
15,413!
3650
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
15,408!
3651
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
7,704!
3652
      break;
7,710✔
3653
    }
3654
    case STRIGGER_PULL_GROUP_COL_VALUE: {
592✔
3655
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
592✔
3656
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,184!
3657
      break;
592✔
3658
    }
3659
    case STRIGGER_PULL_VTABLE_INFO: {
192✔
3660
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
192✔
3661
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
192!
3662
      break;
192✔
3663
    }
3664
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
642✔
3665
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
642✔
3666
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
1,284!
3667
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
642!
3668
      break;
641✔
3669
    }
3670
    case STRIGGER_PULL_OTABLE_INFO: {
61✔
3671
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
61✔
3672
      int32_t size = 0;
61✔
3673
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
61!
3674
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
61✔
3675
      if (pRequest->cols == NULL) {
61!
3676
        code = terrno;
×
3677
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3678
        goto _exit;
×
3679
      }
3680
      for (int32_t i = 0; i < size; ++i) {
297✔
3681
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
236✔
3682
        if (oInfo == NULL) {
236!
3683
          code = terrno;
×
3684
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3685
          goto _exit;
×
3686
        }
3687
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
236!
3688
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
236!
3689
      }
3690
      break;
61✔
3691
    }
3692
    default: {
8✔
3693
      uError("unknown pull type %d", type);
8!
3694
      code = TSDB_CODE_INVALID_PARA;
×
3695
      break;
×
3696
    }
3697
  }
3698

3699
  tEndDecode(&decoder);
47,491✔
3700

3701
_exit:
47,466✔
3702
  tDecoderClear(&decoder);
47,466✔
3703
  return code;
47,480✔
3704
}
3705

3706
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo) {
44,214✔
3707
  int32_t size = taosArrayGetSize(pParams);
44,214✔
3708
  int32_t code = 0;
44,213✔
3709
  int32_t lino = 0;
44,213✔
3710
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
44,213!
3711
  for (int32_t i = 0; i < size; ++i) {
41,989,954✔
3712
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
41,946,854✔
3713
    if (param == NULL) {
41,946,545✔
3714
      TAOS_CHECK_EXIT(terrno);
804!
3715
    }
3716

3717
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevTs));
83,891,482!
3718
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->currentTs));
83,891,482!
3719
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextTs));
83,891,482!
3720

3721
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wstart));
83,891,482!
3722
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wend));
83,891,482!
3723
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wduration));
83,891,482!
3724
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wrownum));
83,891,482!
3725

3726
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevLocalTime));
83,891,482!
3727
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextLocalTime));
83,891,482!
3728

3729
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->triggerTime));
83,891,482!
3730
    if (!ignoreNotificationInfo) {
41,945,741✔
3731
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
1,124,652!
3732
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
562,326✔
3733
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
1,124,652!
3734
    }
3735
  }
3736
_exit:
43,100✔
3737
  return code;
43,100✔
3738
}
3739

3740
void tDestroySSTriggerCalcParam(void* ptr) {
21,257,248✔
3741
  SSTriggerCalcParam* pParam = ptr;
21,257,248✔
3742
  if (pParam && pParam->extraNotifyContent != NULL) {
21,257,248!
3743
    taosMemoryFreeClear(pParam->extraNotifyContent);
147!
3744
  }
3745
  if (pParam && pParam->resultNotifyContent != NULL) {
21,257,248✔
3746
    taosMemoryFreeClear(pParam->resultNotifyContent);
55!
3747
  }
3748
}
21,257,248✔
3749

3750
void tDestroySStreamGroupValue(void* ptr) {
47,069✔
3751
  SStreamGroupValue* pValue = ptr;
47,069✔
3752
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
47,069!
3753
    taosMemoryFreeClear(pValue->data.pData);
36,506!
3754
    pValue->data.nData = 0;
36,506✔
3755
  }
3756
}
47,069✔
3757

3758
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
22,104✔
3759
  int32_t size = 0, code = 0, lino = 0;
22,104✔
3760
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
22,106!
3761
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
22,106✔
3762
  if (*ppParams == NULL) {
22,105!
3763
    TAOS_CHECK_EXIT(terrno);
×
3764
  }
3765
  for (int32_t i = 0; i < size; ++i) {
20,998,112✔
3766
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
20,976,009✔
3767
    if (param == NULL) {
20,976,008!
3768
      TAOS_CHECK_EXIT(terrno);
×
3769
    }
3770
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevTs));
41,952,018!
3771
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->currentTs));
41,952,019!
3772
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextTs));
41,952,016!
3773

3774
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wstart));
41,952,012!
3775
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wend));
41,952,009!
3776
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wduration));
41,952,007!
3777
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wrownum));
41,952,007!
3778

3779
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevLocalTime));
41,952,010!
3780
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextLocalTime));
41,952,012!
3781

3782
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->triggerTime));
41,952,013!
3783
    if (!ignoreNotificationInfo) {
20,976,007✔
3784
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
562,326!
3785
      uint64_t len = 0;
281,163✔
3786
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
562,326!
3787
    }
3788
  }
3789

3790
_exit:
22,103✔
3791
  return code;
22,103✔
3792
}
3793

3794
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
45,399✔
3795
  int32_t code = TSDB_CODE_SUCCESS;
45,399✔
3796
  int32_t lino = 0;
45,399✔
3797

3798
  int32_t size = taosArrayGetSize(pGroupColVals);
45,399✔
3799
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
45,399!
3800
  for (int32_t i = 0; i < size; ++i) {
100,638✔
3801
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
55,238✔
3802
    if (pValue == NULL) {
55,238!
3803
      TAOS_CHECK_EXIT(terrno);
×
3804
    }
3805
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
55,238!
3806
    if (pValue->isNull) {
55,239!
3807
      continue;
×
3808
    }
3809
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
55,239!
3810
    if (pValue->isTbname) {
55,239✔
3811
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
43,760!
3812
      if (vgId != -1) { pValue->vgId = vgId; }
21,880✔
3813
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
43,760!
3814
    }
3815
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
110,478!
3816
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
55,239!
3817
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
98,404!
3818
    } else {
3819
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
12,074!
3820
    }
3821
  }
3822

3823
_exit:
45,400✔
3824
  return code;
45,400✔
3825
}
3826

3827
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
22,698✔
3828
  int32_t code = TSDB_CODE_SUCCESS;
22,698✔
3829
  int32_t lino = 0;
22,698✔
3830
  int32_t size = 0;
22,698✔
3831

3832
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
22,698!
3833
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
22,698✔
3834
  if (size > 0) {
22,696✔
3835
    if (*ppGroupColVals == NULL) {
15,783✔
3836
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
15,191✔
3837
      if (*ppGroupColVals == NULL) {
15,191!
3838
        TAOS_CHECK_EXIT(terrno);
×
3839
      }
3840
    } else {
3841
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
592!
3842
    }
3843
  }
3844
  for (int32_t i = 0; i < size; ++i) {
50,319✔
3845
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
27,621✔
3846
    if (pValue == NULL) {
27,622!
3847
      TAOS_CHECK_EXIT(terrno);
×
3848
    }
3849
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
27,622!
3850
    if (pValue->isNull) {
27,622!
3851
      continue;
×
3852
    }
3853
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
27,622!
3854
    if (pValue->isTbname) {
27,621✔
3855
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
21,882!
3856
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
21,882!
3857
    }
3858
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
55,242!
3859
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
27,621!
3860
      uint64_t len = 0;
24,602✔
3861
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
49,205!
3862
      pValue->data.nData = len;
24,603✔
3863
    } else {
3864
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
6,038!
3865
    }
3866
  }
3867
_exit:
22,698✔
3868
  return code;
22,698✔
3869
}
3870

3871
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
1,184✔
3872
  SEncoder encoder = {0};
1,184✔
3873
  int32_t  code = TSDB_CODE_SUCCESS;
1,184✔
3874
  int32_t  lino = 0;
1,184✔
3875
  int32_t  tlen = 0;
1,184✔
3876

3877
  tEncoderInit(&encoder, buf, bufLen);
1,184✔
3878
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,184!
3879

3880
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
1,184!
3881

3882
  tEndEncode(&encoder);
1,184✔
3883

3884
_exit:
1,184✔
3885
  if (code != TSDB_CODE_SUCCESS) {
1,184!
3886
    tlen = code;
×
3887
  } else {
3888
    tlen = encoder.pos;
1,184✔
3889
  }
3890
  tEncoderClear(&encoder);
1,184✔
3891
  return tlen;
1,184✔
3892
}
3893

3894
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
592✔
3895
  SDecoder decoder = {0};
592✔
3896
  int32_t  code = TSDB_CODE_SUCCESS;
592✔
3897
  int32_t  lino = 0;
592✔
3898
  int32_t  size = 0;
592✔
3899

3900
  tDecoderInit(&decoder, buf, bufLen);
592✔
3901
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
592!
3902

3903
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
592!
3904

3905
  tEndDecode(&decoder);
592✔
3906

3907
_exit:
592✔
3908
  tDecoderClear(&decoder);
592✔
3909
  return code;
592✔
3910
}
3911

3912
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
2,236✔
3913
  SEncoder encoder = {0};
2,236✔
3914
  int32_t  code = TSDB_CODE_SUCCESS;
2,236✔
3915
  int32_t  lino = 0;
2,236✔
3916
  int32_t  tlen = 0;
2,236✔
3917

3918
  tEncoderInit(&encoder, buf, bufLen);
2,236✔
3919
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,236!
3920

3921
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
4,472!
3922
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
4,472!
3923
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
4,472!
3924
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
4,472!
3925
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
4,472!
3926

3927
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false));
2,236!
3928
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
2,236!
3929
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
4,472!
3930

3931
  tEndEncode(&encoder);
2,236✔
3932

3933
_exit:
2,236✔
3934
  if (code != TSDB_CODE_SUCCESS) {
2,236!
3935
    tlen = code;
×
3936
  } else {
3937
    tlen = encoder.pos;
2,236✔
3938
  }
3939
  tEncoderClear(&encoder);
2,236✔
3940
  return tlen;
2,236✔
3941
}
3942

3943
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
1,118✔
3944
  SDecoder decoder = {0};
1,118✔
3945
  int32_t  code = TSDB_CODE_SUCCESS;
1,118✔
3946
  int32_t  lino = 0;
1,118✔
3947

3948
  tDecoderInit(&decoder, buf, bufLen);
1,118✔
3949
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,118!
3950

3951
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
2,236!
3952
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
2,236!
3953
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
2,236!
3954
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
2,236!
3955
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
2,236!
3956

3957
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
1,118!
3958
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
1,118!
3959
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
2,235!
3960

3961
  tEndDecode(&decoder);
1,118✔
3962

3963
_exit:
1,118✔
3964
  tDecoderClear(&decoder);
1,118✔
3965
  return code;
1,118✔
3966
}
3967

3968
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
3,742✔
3969
  if (pReq != NULL) {
3,742!
3970
    if (pReq->params != NULL) {
3,742✔
3971
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
1,228✔
3972
      pReq->params = NULL;
1,228✔
3973
    }
3974
    if (pReq->groupColVals != NULL) {
3,742✔
3975
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
1,029✔
3976
      pReq->groupColVals = NULL;
1,029✔
3977
    }
3978
    blockDataDestroy(pReq->pOutBlock);
3,742✔
3979
  }
3980
}
3,742✔
3981

3982
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
31,288✔
3983
  SEncoder encoder = {0};
31,288✔
3984
  int32_t  code = TSDB_CODE_SUCCESS;
31,288✔
3985
  int32_t  lino = 0;
31,288✔
3986
  int32_t  tlen = 0;
31,288✔
3987

3988
  tEncoderInit(&encoder, buf, bufLen);
31,288✔
3989
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
31,288!
3990

3991
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
62,576!
3992
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
62,576!
3993
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
62,576!
3994
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
62,576!
3995

3996
  tEndEncode(&encoder);
31,288✔
3997

3998
_exit:
31,288✔
3999
  if (code != TSDB_CODE_SUCCESS) {
31,288!
4000
    tlen = code;
×
4001
  } else {
4002
    tlen = encoder.pos;
31,288✔
4003
  }
4004
  tEncoderClear(&encoder);
31,288✔
4005
  return tlen;
31,288✔
4006
}
4007

4008
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
46,378✔
4009
  SDecoder decoder = {0};
46,378✔
4010
  int32_t  code = TSDB_CODE_SUCCESS;
46,378✔
4011
  int32_t  lino = 0;
46,378✔
4012

4013
  tDecoderInit(&decoder, buf, bufLen);
46,378✔
4014
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
46,330!
4015

4016
  int32_t type = 0;
46,654✔
4017
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
46,611!
4018
  pReq->type = type;
46,611✔
4019
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
93,181!
4020
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
93,039!
4021
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
92,895!
4022

4023
  tEndDecode(&decoder);
46,426✔
4024

4025
_exit:
46,336✔
4026
  tDecoderClear(&decoder);
46,336✔
4027
  return code;
46,602✔
4028
}
4029

4030
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo) {
41,977✔
4031
  int32_t code = 0, lino = 0;
41,977✔
4032
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true));
41,977!
4033
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
41,981!
4034
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
83,960!
4035
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
83,960!
4036
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
83,960!
4037
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
41,980!
4038
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
83,958!
4039
_exit:
41,979✔
4040
  return code;
41,979✔
4041
}
4042

4043
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
20,987✔
4044
  int32_t code = 0, lino = 0;
20,987✔
4045
  int32_t size = 0;
20,987✔
4046
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
20,987!
4047
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
20,988!
4048
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
41,975!
4049
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
41,974!
4050
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
41,974!
4051
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
20,987!
4052
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
41,973!
4053
_exit:
20,986✔
4054
  return code;
20,986✔
4055
}
4056

4057
int32_t tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
25,414✔
4058
  if (pInfo == NULL) return TSDB_CODE_SUCCESS;
25,414!
4059
  if (pInfo->pStreamPesudoFuncVals != NULL) {
25,414✔
4060
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
21,702✔
4061
    pInfo->pStreamPesudoFuncVals = NULL;
21,703✔
4062
  }
4063
  if (pInfo->pStreamPartColVals != NULL) {
25,415✔
4064
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
14,987✔
4065
    pInfo->pStreamPartColVals = NULL;
14,986✔
4066
  }
4067
  return TSDB_CODE_SUCCESS;
25,414✔
4068
}
4069

4070
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
384✔
4071
  SEncoder encoder = {0};
384✔
4072
  int32_t  code = TSDB_CODE_SUCCESS;
384✔
4073
  int32_t  lino = 0;
384✔
4074
  int32_t  tlen = 0;
384✔
4075

4076
  tEncoderInit(&encoder, buf, bufLen);
384✔
4077
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
384!
4078

4079
  int32_t size = taosArrayGetSize(pRsp->infos);
384✔
4080
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
384!
4081
  for (int32_t i = 0; i < size; ++i) {
1,044✔
4082
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
660✔
4083
    if (info == NULL) {
660!
4084
      TAOS_CHECK_EXIT(terrno);
×
4085
    }
4086
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
1,320!
4087
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
1,320!
4088
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
660!
4089
  }
4090

4091
  tEndEncode(&encoder);
384✔
4092

4093
_exit:
384✔
4094
  if (code != TSDB_CODE_SUCCESS) {
384!
4095
    tlen = code;
×
4096
  } else {
4097
    tlen = encoder.pos;
384✔
4098
  }
4099
  tEncoderClear(&encoder);
384✔
4100
  return tlen;
384✔
4101
}
4102

4103
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
192✔
4104
  SDecoder decoder = {0};
192✔
4105
  int32_t  code = TSDB_CODE_SUCCESS;
192✔
4106
  int32_t  lino = 0;
192✔
4107
  int32_t  size = 0;
192✔
4108

4109
  tDecoderInit(&decoder, buf, bufLen);
192✔
4110
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
192!
4111

4112
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
192!
4113
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
192✔
4114
  if (vTableInfo->infos == NULL) {
192!
4115
    TAOS_CHECK_EXIT(terrno);
×
4116
  }
4117
  for (int32_t i = 0; i < size; ++i) {
522✔
4118
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
330✔
4119
    if (info == NULL) {
330!
4120
      TAOS_CHECK_EXIT(terrno);
×
4121
    }
4122
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
660!
4123
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
660!
4124
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
330!
4125
  }
4126

4127
  tEndDecode(&decoder);
192✔
4128

4129
_exit:
192✔
4130
  tDecoderClear(&decoder);
192✔
4131
  return code;
192✔
4132
}
4133

4134

4135
void tDestroyVTableInfo(void *ptr) {
660✔
4136
  if (NULL == ptr) {
660!
4137
    return;
×
4138
  }
4139
  VTableInfo* pTable = (VTableInfo*)ptr;
660✔
4140
  taosMemoryFree(pTable->cols.pColRef);
660!
4141
}
4142

4143
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
17,388✔
4144
  if (ptr == NULL) return;
17,388!
4145
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
17,388✔
4146
  ptr->infos = NULL;
17,388✔
4147
}
4148

4149
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
1,414✔
4150
  SEncoder encoder = {0};
1,414✔
4151
  int32_t  code = TSDB_CODE_SUCCESS;
1,414✔
4152
  int32_t  lino = 0;
1,414✔
4153
  int32_t  tlen = 0;
1,414✔
4154

4155
  tEncoderInit(&encoder, buf, bufLen);
1,414✔
4156
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,413!
4157

4158
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
2,830!
4159
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
1,415✔
4160
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,415!
4161
  for (int32_t i = 0; i < size; ++i) {
2,897✔
4162
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
1,482✔
4163
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
2,964!
4164
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
2,964!
4165
  }
4166

4167
  tEndEncode(&encoder);
1,415✔
4168

4169
_exit:
1,415✔
4170
  if (code != TSDB_CODE_SUCCESS) {
1,415!
4171
    tlen = code;
×
4172
  } else {
4173
    tlen = encoder.pos;
1,415✔
4174
  }
4175
  tEncoderClear(&encoder);
1,415✔
4176
  return tlen;
1,415✔
4177
}
4178

4179
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
707✔
4180
  SDecoder decoder = {0};
707✔
4181
  int32_t  code = TSDB_CODE_SUCCESS;
707✔
4182
  int32_t  lino = 0;
707✔
4183
  SSDataBlock *pResBlock = pBlock;
707✔
4184

4185
  tDecoderInit(&decoder, buf, bufLen);
707✔
4186
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
707!
4187

4188
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
1,416!
4189
  int32_t numOfCols = 2;
708✔
4190
  if (pResBlock->pDataBlock == NULL) {
708!
4191
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
708✔
4192
    if (pResBlock->pDataBlock == NULL) {
708!
4193
      TAOS_CHECK_EXIT(terrno);
×
4194
    }
4195
    for (int32_t i = 0; i< numOfCols; ++i) {
2,124✔
4196
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
1,416✔
4197
      if (pColInfoData == NULL) {
1,416!
4198
        TAOS_CHECK_EXIT(terrno);
×
4199
      }
4200
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
1,416✔
4201
      pColInfoData->info.bytes = sizeof(int64_t);
1,416✔
4202
    }
4203
  }
4204
  int32_t numOfRows = 0;
708✔
4205
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
708!
4206
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
708!
4207
  for (int32_t i = 0; i < numOfRows; ++i) {
1,447✔
4208
    for (int32_t j = 0; j < numOfCols; ++j) {
2,223✔
4209
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
1,482✔
4210
      if (pColInfoData == NULL) {
1,482!
4211
        TAOS_CHECK_EXIT(terrno);
×
4212
      }
4213
      int64_t value = 0;
1,482✔
4214
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
1,482!
4215
      colDataSetInt64(pColInfoData, i, &value);
1,482✔
4216
    }
4217
  }
4218

4219
  pResBlock->info.dataLoad = 1;
706✔
4220
  pResBlock->info.rows = numOfRows;
706✔
4221

4222
  tEndDecode(&decoder);
706✔
4223

4224
_exit:
707✔
4225
  tDecoderClear(&decoder);
707✔
4226
  return code;
708✔
4227
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc