• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4733

15 Sep 2025 08:54AM UTC coverage: 58.074% (-0.9%) from 59.006%
#4733

push

travis-ci

web-flow
test: submit stream case test_idmp_privilege.py (#32954)

133369 of 292975 branches covered (45.52%)

Branch coverage included in aggregate %.

202038 of 284577 relevant lines covered (71.0%)

5476618.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.15
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "tdatablock.h"
18
#include "tmsg.h"
19
#include "os.h"
20
#include "tcommon.h"
21

22
typedef struct STaskId {
23
  int64_t streamId;
24
  int64_t taskId;
25
} STaskId;
26

27
typedef enum EWindowType {
28
  WINDOW_TYPE_INTERVAL = 1,
29
  WINDOW_TYPE_SESSION,
30
  WINDOW_TYPE_STATE,
31
  WINDOW_TYPE_EVENT,
32
  WINDOW_TYPE_COUNT,
33
  WINDOW_TYPE_ANOMALY,
34
  WINDOW_TYPE_EXTERNAL,
35
  WINDOW_TYPE_PERIOD
36
} EWindowType;
37

38
typedef struct STaskCkptInfo {
39
  int64_t latestId;          // saved checkpoint id
40
  int64_t latestVer;         // saved checkpoint ver
41
  int64_t latestTime;        // latest checkpoint time
42
  int64_t latestSize;        // latest checkpoint size
43
  int8_t  remoteBackup;      // latest checkpoint backup done
44
  int64_t activeId;          // current active checkpoint id
45
  int32_t activeTransId;     // checkpoint trans id
46
  int8_t  failed;            // denote if the checkpoint is failed or not
47
  int8_t  consensusChkptId;  // required the consensus-checkpointId
48
  int64_t consensusTs;       //
49
} STaskCkptInfo;
50

51
typedef struct STaskStatusEntry {
52
  STaskId       id;
53
  int32_t       status;
54
  int32_t       statusLastDuration;  // to record the last duration of current status
55
  int64_t       stage;
56
  int32_t       nodeId;
57
  SVersionRange verRange;      // start/end version in WAL, only valid for source task
58
  int64_t       processedVer;  // only valid for source task
59
  double        inputQUsed;    // in MiB
60
  double        inputRate;
61
  double        procsThroughput;   // duration between one element put into input queue and being processed.
62
  double        procsTotal;        // duration between one element put into input queue and being processed.
63
  double        outputThroughput;  // the size of dispatched result blocks in bytes
64
  double        outputTotal;       // the size of dispatched result blocks in bytes
65
  double        sinkQuota;         // existed quota size for sink task
66
  double        sinkDataSize;      // sink to dst data size
67
  int64_t       startTime;
68
  int64_t       startCheckpointId;
69
  int64_t       startCheckpointVer;
70
  int64_t       hTaskId;
71
  STaskCkptInfo checkpointInfo;
72
  STaskNotifyEventStat notifyEventStat;
73
} STaskStatusEntry;
74

75
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
×
76
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
×
77
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
×
78
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
×
79
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
×
80
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
×
81
  return 0;
×
82
}
83

84
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
×
85
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
×
86
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
×
87
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
×
88
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
×
89
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
×
90
  return 0;
×
91
}
92

93
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
×
94
  int32_t code = 0;
×
95
  int32_t lino;
96

97
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
98
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
×
99
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
×
100

101
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
×
102
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
103

104
  for (int32_t i = 0; i < size; ++i) {
×
105
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
×
106
    if (pInfo == NULL) {
×
107
      TAOS_CHECK_EXIT(terrno);
×
108
    }
109

110
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
×
111
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
×
112
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
×
113
  }
114

115
  // todo this new attribute will be result in being incompatible with previous version
116
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
×
117

118
  int32_t numOfTasks = taosArrayGetSize(pMsg->pTaskList);
×
119
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfTasks));
×
120

121
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
122
    int32_t* pId = taosArrayGet(pMsg->pTaskList, i);
×
123
    if (pId == NULL) {
×
124
      TAOS_CHECK_EXIT(terrno);
×
125
    }
126
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)pId));
×
127
  }
128

129
  tEndEncode(pEncoder);
×
130
_exit:
×
131
  if (code) {
×
132
    return code;
×
133
  } else {
134
    return pEncoder->pos;
×
135
  }
136
}
137

138
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
×
139
  int32_t code = 0;
×
140
  int32_t lino;
141

142
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
143
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
×
144
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
×
145

146
  int32_t size = 0;
×
147
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
148

149
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
×
150
  TSDB_CHECK_NULL(pMsg->pNodeList, code, lino, _exit, terrno);
×
151

152
  for (int32_t i = 0; i < size; ++i) {
×
153
    SNodeUpdateInfo info = {0};
×
154
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
×
155
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
×
156
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
×
157

158
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
×
159
      TAOS_CHECK_EXIT(terrno);
×
160
    }
161
  }
162

163
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
×
164

165
  // number of tasks
166
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
167
  pMsg->pTaskList = taosArrayInit(size, sizeof(int32_t));
×
168
  if (pMsg->pTaskList == NULL) {
×
169
    TAOS_CHECK_EXIT(terrno);
×
170
  }
171

172
  for (int32_t i = 0; i < size; ++i) {
×
173
    int32_t id = 0;
×
174
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &id));
×
175
    if (taosArrayPush(pMsg->pTaskList, &id) == NULL) {
×
176
      TAOS_CHECK_EXIT(terrno);
×
177
    }
178
  }
179

180
  tEndDecode(pDecoder);
×
181
_exit:
×
182
  return code;
×
183
}
184

185
void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg) {
×
186
  taosArrayDestroy(pMsg->pNodeList);
×
187
  taosArrayDestroy(pMsg->pTaskList);
×
188
  pMsg->pNodeList = NULL;
×
189
  pMsg->pTaskList = NULL;
×
190
}
×
191

192
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
×
193
  int32_t code = 0;
×
194
  int32_t lino;
195

196
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
197
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
198
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
×
202
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
×
203
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
×
204
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
205
  tEndEncode(pEncoder);
×
206

207
_exit:
×
208
  if (code) {
×
209
    return code;
×
210
  } else {
211
    return pEncoder->pos;
×
212
  }
213
}
214

215
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
×
216
  int32_t code = 0;
×
217
  int32_t lino;
218

219
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
220
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
221
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
224
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
×
225
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
×
226
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
×
227
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
228
  tEndDecode(pDecoder);
×
229

230
_exit:
×
231
  return code;
×
232
}
233

234
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
×
235
  int32_t code = 0;
×
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
×
240
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
×
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
×
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
×
244
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
×
245
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
×
246
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
×
247
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
×
248
  tEndEncode(pEncoder);
×
249

250
_exit:
×
251
  if (code) {
×
252
    return code;
×
253
  } else {
254
    return pEncoder->pos;
×
255
  }
256
}
257

258
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
×
259
  int32_t code = 0;
×
260
  int32_t lino;
261

262
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
263
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
×
264
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
×
266
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
×
267
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
×
268
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
×
269
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
×
270
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
×
271
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
×
272
  tEndDecode(pDecoder);
×
273

274
_exit:
×
275
  return code;
×
276
}
277

278
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
283
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
×
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
×
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
287
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
289
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
290
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
291
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
×
292
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
293
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
×
294
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
×
295
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
×
296

297
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
×
298
    uError("invalid dispatch req msg");
×
299
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
300
  }
301

302
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
303
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
×
304
    void*    data = taosArrayGetP(pReq->data, i);
×
305
    if (data == NULL || pLen == NULL) {
×
306
      TAOS_CHECK_EXIT(terrno);
×
307
    }
308

309
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
×
310
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
×
311
  }
312
  tEndEncode(pEncoder);
×
313
_exit:
×
314
  if (code) {
×
315
    return code;
×
316
  } else {
317
    return pEncoder->pos;
×
318
  }
319
}
320

321
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
×
322
  int32_t code = 0;
×
323
  int32_t lino;
324

325
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
326
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
×
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
×
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
330
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
332
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
333
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
334
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
×
335
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
336
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
×
337
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
×
338
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
×
339

340
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
×
341
    TAOS_CHECK_EXIT(terrno);
×
342
  }
343
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
×
344
    TAOS_CHECK_EXIT(terrno);
×
345
  }
346
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
347
    int32_t  len1;
348
    uint64_t len2;
349
    void*    data;
350
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
×
351
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
×
352

353
    if (len1 != len2) {
×
354
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
355
    }
356

357
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
×
358
      TAOS_CHECK_EXIT(terrno);
×
359
    }
360

361
    if (taosArrayPush(pReq->data, &data) == NULL) {
×
362
      TAOS_CHECK_EXIT(terrno);
×
363
    }
364
  }
365

366
  tEndDecode(pDecoder);
×
367
_exit:
×
368
  return code;
×
369
}
370

371
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
×
372
  taosArrayDestroyP(pReq->data, NULL);
×
373
  taosArrayDestroy(pReq->dataLen);
×
374
}
×
375

376
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
×
377
  int32_t code = 0;
×
378
  int32_t lino;
379

380
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
381
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
382
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
383
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
×
384
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
×
385
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
×
386
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
×
387
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
×
388
  tEndEncode(pEncoder);
×
389

390
_exit:
×
391
  if (code) {
×
392
    return code;
×
393
  } else {
394
    return pEncoder->pos;
×
395
  }
396
}
397

398
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
×
399
  int32_t code = 0;
×
400
  int32_t lino;
401

402
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
403
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
404
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
×
406
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
×
407
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
×
408
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
×
409
  uint64_t len = 0;
×
410
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
×
411
  pReq->retrieveLen = (int32_t)len;
×
412
  tEndDecode(pDecoder);
×
413

414
_exit:
×
415
  return code;
×
416
}
417

418
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
×
419

420

421
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
130✔
422
  int32_t code = 0;
130✔
423
  int32_t lino = 0;
130✔
424
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
260!
425
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
260!
426
  switch (pReq->type) {
130!
427
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
130✔
428
      if (pReq->cont.fullTableNames) {
130!
429
        int32_t num = taosArrayGetSize(pReq->cont.fullTableNames);
130✔
430
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
130!
431
        for (int32_t i = 0; i < num; ++i) {
384✔
432
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.fullTableNames, i);
254✔
433
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
508!
434
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
508!
435
        }
436
      } else {
437
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
438
      }
439
      break;
130✔
440
    }
441
    default:
×
442
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
443
      break;
×
444
  }
445

446
_exit:
130✔
447

448
  return code;
130✔
449
}
450

451
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
195✔
452
  if (NULL == pReq) {
195✔
453
    return;
65✔
454
  }
455

456
  taosArrayDestroy(pReq->cont.fullTableNames);
130✔
457
}
458

459

460
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
65✔
461
  *ppDst = NULL;
65✔
462
  
463
  if (NULL == pSrc) {
65!
464
    return TSDB_CODE_SUCCESS;
×
465
  }
466

467
  int32_t code = 0, lino = 0;
65✔
468
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
65!
469
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
65!
470

471
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
65✔
472
  if (pSrc->cont.fullTableNames) {
65!
473
    (*ppDst)->cont.fullTableNames = taosArrayDup(pSrc->cont.fullTableNames, NULL);
65✔
474
    TSDB_CHECK_NULL((*ppDst)->cont.fullTableNames, code, lino, _exit, terrno);
65!
475
  }
476
  
477
_exit:
65✔
478

479
  if (code) {
65!
480
    tFreeSStreamMgmtReq(*ppDst);
×
481
    taosMemoryFreeClear(*ppDst);
×
482
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
483
  }
484
  
485
  return code;
65✔
486
}
487

488

489
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
65✔
490
  int32_t code = 0;
65✔
491
  int32_t lino = 0;
65✔
492

493
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
130!
494
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
130!
495
  switch (pReq->type) {
65!
496
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
65✔
497
      int32_t num = 0;
65✔
498
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
65!
499
      if (num > 0) {
65!
500
        pReq->cont.fullTableNames = taosArrayInit(num, sizeof(SStreamDbTableName));
65✔
501
        TSDB_CHECK_NULL(pReq->cont.fullTableNames, code, lino, _exit, terrno);
65!
502
        for (int32_t i = 0; i < num; ++i) {
192✔
503
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.fullTableNames, 1);
127✔
504
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
127!
505
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
127!
506
        }
507
      }
508
      break;
65✔
509
    }
510
    default:
×
511
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
512
      break;
×
513
  }
514

515
_exit:
65✔
516

517
  return code;  
65✔
518
}
519

520
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
47,930✔
521
  int32_t code = 0;
47,930✔
522
  int32_t lino;
523

524
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
95,860!
525
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
95,860!
526
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
95,860!
527

528
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
95,860!
529
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
95,860!
530
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
95,860!
531
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
95,860!
532
  // SKIP SESSIONID
533
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
95,860!
534
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
95,860!
535
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
95,860!
536
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
95,860!
537
  if (pTask->pMgmtReq) {
47,930✔
538
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
130!
539
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
130!
540
  } else {
541
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
47,800!
542
  }
543

544
_exit:
47,800✔
545

546
  return code;
47,930✔
547
}
548

549

550
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
22,793✔
551
  int32_t code = 0;
22,793✔
552
  int32_t lino;
553

554
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
45,586!
555
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
45,586!
556
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
45,586!
557
  
558
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
45,586!
559
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
45,586!
560
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
45,586!
561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
45,586!
562
  // SKIP SESSIONID
563
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
45,586!
564
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
45,586!
565
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
45,586!
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
45,586!
567
  int32_t req = 0;
22,793✔
568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
22,793!
569
  if (req) {
22,793✔
570
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
65!
571
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
65!
572
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
65!
573
  }
574

575
_exit:
22,793✔
576

577
  return code;
22,793✔
578
}
579

580
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
581
  int32_t code = 0;
×
582
  int32_t lino;
583

584
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
585
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
586
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
587
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
588

589
_exit:
×
590

591
  return code;
×
592
}
593

594
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
595
  int32_t code = 0;
×
596
  int32_t lino;
597

598
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
599
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
600
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
601
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
602

603
_exit:
×
604

605
  return code;
×
606
}
607

608

609
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
3,670✔
610
  int32_t code = 0;
3,670✔
611
  int32_t lino;
612

613
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
7,340!
614
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
7,340!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
7,340!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
7,340!
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
7,340!
618

619
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
3,670✔
620
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
3,670!
621
  for (int32_t i = 0; i < recalcNum; ++i) {
3,670!
622
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
623
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
624
  }
625

626
_exit:
3,670✔
627

628
  return code;
3,670✔
629
}
630

631
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
1,729✔
632
  int32_t code = 0;
1,729✔
633
  int32_t lino;
634

635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
3,458!
636
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
3,458!
637
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
3,458!
638
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
3,458!
639
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
3,458!
640

641
  int32_t recalcNum = 0;
1,729✔
642
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
1,729!
643
  if (recalcNum > 0) {
1,729!
644
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
645
    if (NULL == pStatus->userRecalcs) {
×
646
      code = terrno;
×
647
      goto _exit;
×
648
    }
649
  }
650

651
  for (int32_t i = 0; i < recalcNum; ++i) {
1,729!
652
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
653
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
654
  }
655

656
_exit:
1,729✔
657

658
  return code;
1,729✔
659
}
660

661

662
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
69,640✔
663
  int32_t code = 0;
69,640✔
664
  int32_t lino;
665

666
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
69,640!
667
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
139,280!
668
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
139,280!
669
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
139,280!
670
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
139,280!
671

672
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
69,640✔
673
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
69,640!
674
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
253,516✔
675
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
183,876✔
676
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
367,752!
677
  }
678
  
679
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
69,640✔
680
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
69,640!
681
  for (int32_t i = 0; i < statusNum; ++i) {
112,544✔
682
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
42,904✔
683
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
42,904!
684
  }
685

686
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
69,640✔
687
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
69,640!
688
  for (int32_t i = 0; i < reqNum; ++i) {
69,770✔
689
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
130✔
690
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
260!
691
  }
692

693
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
69,640✔
694
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
69,640!
695
  for (int32_t i = 0; i < triggerNum; ++i) {
73,310✔
696
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
3,670✔
697
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
3,670!
698
  }
699
  
700
  tEndEncode(pEncoder);
69,640✔
701

702
_exit:
69,640✔
703
  if (code) {
69,640!
704
    return code;
×
705
  } else {
706
    return pEncoder->pos;
69,640✔
707
  }
708
}
709

710
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
33,031✔
711
  int32_t code = 0;
33,031✔
712
  int32_t lino;
713

714
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
33,031!
715
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
66,062!
716
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
66,062!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
66,062!
718
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
66,062!
719

720
  int32_t vgLearderNum = 0;
33,031✔
721
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
33,031!
722
  if (vgLearderNum > 0) {
33,031✔
723
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
25,096✔
724
    if (NULL == pReq->pVgLeaders) {
25,096!
725
      code = terrno;
×
726
      goto _exit;
×
727
    }
728
  }
729
  for (int32_t i = 0; i < vgLearderNum; ++i) {
120,913✔
730
    int32_t vgId = 0;
87,882✔
731
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
87,882!
732
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
175,764!
733
      code = terrno;
×
734
      goto _exit;
×
735
    }
736
  }
737

738

739
  int32_t statusNum = 0;
33,031✔
740
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
33,031!
741
  if (statusNum > 0) {
33,031✔
742
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
1,757✔
743
    if (NULL == pReq->pStreamStatus) {
1,757!
744
      code = terrno;
×
745
      goto _exit;
×
746
    }
747
  }
748
  for (int32_t i = 0; i < statusNum; ++i) {
53,311✔
749
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
20,280✔
750
    if (NULL == pTask) {
20,280!
751
      code = terrno;
×
752
      goto _exit;
×
753
    }
754
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
20,280!
755
  }
756

757

758
  int32_t reqNum = 0;
33,031✔
759
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
33,031!
760
  if (reqNum > 0) {
33,031✔
761
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
24✔
762
    if (NULL == pReq->pStreamReq) {
24!
763
      code = terrno;
×
764
      goto _exit;
×
765
    }
766
  }
767
  for (int32_t i = 0; i < reqNum; ++i) {
33,096✔
768
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
65✔
769
    if (NULL == pIdx) {
65!
770
      code = terrno;
×
771
      goto _exit;
×
772
    }
773
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
65!
774
  }
775

776

777
  int32_t triggerNum = 0;
33,031✔
778
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
33,031!
779
  if (triggerNum > 0) {
33,031✔
780
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
708✔
781
    if (NULL == pReq->pTriggerStatus) {
708!
782
      code = terrno;
×
783
      goto _exit;
×
784
    }
785
  }
786
  for (int32_t i = 0; i < triggerNum; ++i) {
34,760✔
787
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
1,729✔
788
    if (NULL == pStatus) {
1,729!
789
      code = terrno;
×
790
      goto _exit;
×
791
    }
792
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
1,729!
793
  }
794

795
  
796
  tEndDecode(pDecoder);
33,031✔
797

798
_exit:
33,031✔
799
  return code;
33,031✔
800
}
801

802
void tFreeSSTriggerRuntimeStatus(void* param) {
3,564✔
803
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
3,564✔
804
  if (NULL == pStatus) {
3,564!
805
    return;
×
806
  }
807
  taosArrayDestroy(pStatus->userRecalcs);
3,564✔
808
}
809

810
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
206,090✔
811
  if (pMsg == NULL) {
206,090!
812
    return;
×
813
  }
814

815
  taosArrayDestroy(pMsg->pVgLeaders);
206,090✔
816
  if (deepClean) {
206,090!
817
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
206,090✔
818
    for (int32_t i = 0; i < reqNum; ++i) {
206,220✔
819
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
130✔
820
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
130✔
821
      if (NULL == pTask) {
130!
822
        continue;
×
823
      }
824

825
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
130✔
826
      taosMemoryFree(pTask->pMgmtReq);
130!
827
    }
828
  }
829
  taosArrayDestroy(pMsg->pStreamReq);
206,090✔
830
  taosArrayDestroy(pMsg->pStreamStatus);
206,090✔
831
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
206,090✔
832
}
833

834
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
862✔
835
  int32_t code = 0;
862✔
836
  int32_t lino;
837

838
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
1,724!
839
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
1,724!
840
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
1,724!
841
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
1,724!
842
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
1,724!
843
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
1,724!
844
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
1,724!
845
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
846
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
1,724!
847
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
1,724!
848

849
_exit:
862✔
850

851
  return code;
862✔
852
}
853

854
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
390✔
855
  int32_t code = 0;
390✔
856
  int32_t lino;
857

858
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
780!
859
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
780!
860

861
_exit:
390✔
862

863
  return code;
390✔
864
}
865

866

867
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
1,252✔
868
  int32_t code = 0;
1,252✔
869
  int32_t lino;
870

871
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
2,504!
872
  if (pMsg->triggerReader) {
1,252✔
873
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
862!
874
  } else {
875
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
390!
876
  }
877
  
878
_exit:
390✔
879

880
  return code;
1,252✔
881
}
882

883
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
2,624✔
884
  int32_t code = 0;
2,624✔
885
  int32_t lino;
886

887
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
5,248!
888
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
5,248!
889
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
2,624!
890

891
_exit:
2,624✔
892

893
  return code;
2,624✔
894
}
895

896
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
1,812✔
897
  int32_t code = 0;
1,812✔
898
  int32_t lino;
899

900
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
1,812!
901
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
3,624!
902

903
_exit:
1,812✔
904

905
  return code;
1,812✔
906
}
907

908

909
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
606✔
910
  int32_t code = 0;
606✔
911
  int32_t lino;
912

913
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
1,212!
914
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
1,212!
915
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
1,212!
916
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
1,212!
917
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
1,212!
918
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
1,212!
919
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->hasPartitionBy));
1,212!
920
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
1,212!
921
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
1,212!
922

923
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
606✔
924
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
606!
925
  for (int32_t i = 0; i < addrSize; ++i) {
668✔
926
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
62✔
927
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
124!
928
  }
929
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
1,212!
930
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
1,212!
931
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
1,212!
932

933
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
1,212!
934
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
1,212!
935
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
1,212!
936
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
1,212!
937

938
  switch (pMsg->triggerType) {
606!
939
    case WINDOW_TYPE_SESSION: {
10✔
940
      // session trigger
941
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
20!
942
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
20!
943
      break;
10✔
944
    }
945
    case WINDOW_TYPE_STATE: {
276✔
946
      // state trigger
947
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
552!
948
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
552!
949
      break;
276✔
950
    }
951
    case WINDOW_TYPE_INTERVAL: {
238✔
952
      // slide trigger
953
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
476!
954
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
476!
955
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
476!
956
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
476!
957
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
476!
958
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
476!
959
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
476!
960
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
476!
961
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
476!
962
      break;
238✔
963
    }
964
    case WINDOW_TYPE_EVENT: {
48✔
965
      // event trigger
966
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
48!
967
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
48!
968

969
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
96!
970
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
96!
971

972
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
96!
973
      break;
48✔
974
    }
975
    case WINDOW_TYPE_COUNT: {
20✔
976
      // count trigger
977
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
20!
978
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
40!
979

980
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
40!
981
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
40!
982
      break;
20✔
983
    }
984
    case WINDOW_TYPE_PERIOD: {
14✔
985
      // period trigger
986
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
28!
987
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
28!
988
      break;
14✔
989
    }
990
    default:
×
991
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
992
      break;
×
993
  }
994

995
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
1,212!
996
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
1,212!
997
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
1,212!
998
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
1,212!
999
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
606✔
1000
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
1,212!
1001
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
606✔
1002
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
1,212!
1003
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
606✔
1004
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
1,212!
1005

1006
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
606✔
1007
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
606!
1008
  for (int32_t i = 0; i < readerNum; ++i) {
1,374✔
1009
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
768✔
1010
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
768!
1011
  }
1012

1013
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
606✔
1014
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
606!
1015
  for (int32_t i = 0; i < runnerNum; ++i) {
2,418✔
1016
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1,812✔
1017
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
1,812!
1018
  }
1019

1020
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
1,212!
1021
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
1,212!
1022

1023
_exit:
606✔
1024

1025
  return code;
606✔
1026
}
1027

1028

1029
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
17,732✔
1030
  int32_t code = 0;
17,732✔
1031
  int32_t lino;
1032

1033
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
35,464!
1034
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
35,464!
1035
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
35,464!
1036
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
35,464!
1037
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
35,464!
1038
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
35,464!
1039

1040
_exit:
17,732✔
1041

1042
  return code;
17,732✔
1043
}
1044

1045

1046
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
1,948✔
1047
  int32_t code = 0;
1,948✔
1048
  int32_t lino;
1049

1050
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
3,896!
1051
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
3,896!
1052
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
3,896!
1053
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
3,896!
1054
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
3,896!
1055
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
3,896!
1056
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
3,896!
1057
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
3,896!
1058

1059
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
1,948✔
1060
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
1,948!
1061
  for (int32_t i = 0; i < addrSize; ++i) {
2,128✔
1062
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
180✔
1063
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
360!
1064
  }
1065
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
3,896!
1066

1067
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
1,948✔
1068
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
1,948!
1069
  for (int32_t i = 0; i < outColNum; ++i) {
9,766✔
1070
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
7,818✔
1071
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
7,818!
1072
  }
1073

1074
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
1,948✔
1075
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
1,948!
1076
  for (int32_t i = 0; i < outTagNum; ++i) {
3,806✔
1077
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1,858✔
1078
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
1,858!
1079
  }
1080

1081
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
3,896!
1082
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
3,896!
1083

1084
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
3,896!
1085
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
3,896!
1086

1087
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
1,948✔
1088
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
1,948!
1089
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
2,422✔
1090
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
474✔
1091
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
474!
1092

1093
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
948!
1094
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
948!
1095
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
948!
1096
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
948!
1097
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
948!
1098
  }
1099

1100
_exit:
1,948✔
1101

1102
  return code;
1,948✔
1103
}
1104

1105

1106
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
3,806✔
1107
  int32_t code = 0;
3,806✔
1108
  int32_t lino;
1109

1110
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
3,806!
1111
  switch (pTask->task.type) {
3,806!
1112
    case STREAM_READER_TASK:
1,252✔
1113
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
1,252!
1114
      break;
1,252✔
1115
    case STREAM_TRIGGER_TASK:
606✔
1116
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
606!
1117
      break;
606✔
1118
    case STREAM_RUNNER_TASK:
1,948✔
1119
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
1,948!
1120
      break;
1,948✔
1121
    default:
×
1122
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1123
      break;
×
1124
  }
1125
  
1126
_exit:
3,806✔
1127

1128
  return code;
3,806✔
1129
}
1130

1131

1132
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
802✔
1133
  int32_t code = 0;
802✔
1134
  int32_t lino;
1135

1136
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
1,604!
1137

1138
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
802✔
1139
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
802!
1140
  for (int32_t i = 0; i < readerNum; ++i) {
2,054✔
1141
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
1,252✔
1142
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,252!
1143
  }
1144

1145
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
1,604!
1146
  if (pStream->triggerTask) {
802✔
1147
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
606!
1148
  }
1149
  
1150
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
802✔
1151
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
802!
1152
  for (int32_t i = 0; i < runnerNum; ++i) {
2,750✔
1153
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
1,948✔
1154
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,948!
1155
  }
1156

1157
_exit:
802✔
1158

1159
  return code;
802✔
1160
}
1161

1162
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
1,220✔
1163
  int32_t code = 0;
1,220✔
1164
  int32_t lino = 0;
1,220✔
1165

1166
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
2,440!
1167

1168
_exit:
1,220✔
1169
  return code;
1,220✔
1170
}
1171

1172
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
610✔
1173
  int32_t code = 0;
610✔
1174
  int32_t lino;
1175

1176
  int32_t type = 0;
610✔
1177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
610!
1178
  pMsg->msgType = type;
610✔
1179

1180
_exit:
610✔
1181
  return code;
610✔
1182
}
1183

1184
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
722✔
1185
  int32_t code = 0;
722✔
1186
  int32_t lino;
1187

1188
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
722!
1189

1190
_exit:
722✔
1191

1192
  return code;
722✔
1193
}
1194

1195
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
722✔
1196
  int32_t code = 0;
722✔
1197
  int32_t lino;
1198

1199
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
722!
1200
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
722!
1201

1202
_exit:
722✔
1203

1204
  return code;
722✔
1205
}
1206

1207
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
368✔
1208
  int32_t code = 0;
368✔
1209
  int32_t lino;
1210

1211
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
368!
1212
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
736!
1213
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
736!
1214

1215
_exit:
368✔
1216

1217
  return code;
368✔
1218
}
1219

1220
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
368✔
1221
  int32_t code = 0;
368✔
1222
  int32_t lino;
1223

1224
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
368!
1225
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
368!
1226

1227
_exit:
368✔
1228

1229
  return code;
368✔
1230
}
1231

1232

1233
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
×
1234
  int32_t code = 0;
×
1235
  int32_t lino;
1236

1237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
×
1238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
×
1239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
×
1240

1241
_exit:
×
1242

1243
  return code;
×
1244
}
1245

1246
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
130✔
1247
  int32_t code = 0;
130✔
1248
  int32_t lino;
1249

1250
  switch (msgType) {
130!
1251
    case STREAM_MSG_ORIGTBL_READER_INFO: {
130✔
1252
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
130✔
1253
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
130!
1254

1255
      for (int32_t i = 0; i < vgNum; ++i) {
384✔
1256
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
254✔
1257
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
508!
1258
      }
1259

1260
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
130✔
1261
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
130!
1262
      
1263
      for (int32_t i = 0; i < readerNum; ++i) {
174✔
1264
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
44✔
1265
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
44!
1266
      }
1267
      break;
130✔
1268
    }
1269
    case STREAM_MSG_UPDATE_RUNNER: {
×
1270
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
1271
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
1272
      
1273
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1274
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1275
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1276
      }
1277
      break;
×
1278
    }
1279
    case STREAM_MSG_USER_RECALC:{
×
1280
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
×
1281
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
×
1282
      
1283
      for (int32_t i = 0; i < recalcNum; ++i) {
×
1284
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
×
1285
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
×
1286
      }
1287
      break;
×
1288
    }
1289
    default:
×
1290
      break;
×
1291
  }
1292

1293
_exit:
130✔
1294

1295
  return code;
130✔
1296
}
1297

1298
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
130✔
1299
  int32_t code = 0;
130✔
1300
  int32_t lino;
1301

1302
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
130!
1303
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
260!
1304
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
260!
1305
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
130!
1306
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
130!
1307

1308
_exit:
130✔
1309

1310
  return code;
130✔
1311
}
1312

1313

1314
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
65,444✔
1315
  int32_t code = 0;
65,444✔
1316
  int32_t lino;
1317

1318
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
65,444!
1319
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
130,888!
1320
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
65,444✔
1321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
65,444!
1322
  for (int32_t i = 0; i < deployNum; ++i) {
66,246✔
1323
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
802✔
1324
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
802!
1325
  }
1326

1327
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
65,444✔
1328
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
65,444!
1329
  for (int32_t i = 0; i < startNum; ++i) {
66,166✔
1330
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
722✔
1331
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
722!
1332
  }
1333

1334
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
130,888!
1335
  if (!pRsp->undeploy.undeployAll) {
65,444!
1336
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
65,444✔
1337
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
65,444!
1338
    for (int32_t i = 0; i < undeployNum; ++i) {
65,812✔
1339
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
368✔
1340
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
368!
1341
    }
1342
  }
1343

1344
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
65,444✔
1345
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
65,444!
1346
  for (int32_t i = 0; i < rspNum; ++i) {
65,574✔
1347
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
130✔
1348
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
130!
1349
  }
1350
  
1351
_exit:
65,444✔
1352

1353
  tEndEncode(pEncoder);
65,444✔
1354

1355
  return code;
65,444✔
1356
}
1357

1358
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
431✔
1359
  int32_t code = 0;
431✔
1360
  int32_t lino;
1361

1362
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
862!
1363
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
862!
1364
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
862!
1365
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
862!
1366
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
862!
1367
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
862!
1368
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
862!
1369
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
862!
1370
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
862!
1371

1372
_exit:
431✔
1373

1374
  return code;
431✔
1375
}
1376

1377

1378
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
195✔
1379
  int32_t code = 0;
195✔
1380
  int32_t lino;
1381

1382
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
390!
1383
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
390!
1384

1385
_exit:
195✔
1386

1387
  return code;
195✔
1388
}
1389

1390

1391
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
626✔
1392
  int32_t code = 0;
626✔
1393
  int32_t lino;
1394

1395
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
1,252!
1396
  if (pMsg->triggerReader) {
626✔
1397
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
431!
1398
  } else {
1399
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
195!
1400
  }
1401
  
1402
_exit:
195✔
1403

1404
  return code;
626✔
1405
}
1406

1407

1408
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
1,312✔
1409
  int32_t code = 0;
1,312✔
1410
  int32_t lino;
1411

1412
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
2,624!
1413
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
2,624!
1414
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
1,312!
1415

1416
_exit:
1,312✔
1417

1418
  return code;
1,312✔
1419
}
1420

1421

1422
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
906✔
1423
  int32_t code = 0;
906✔
1424
  int32_t lino;
1425

1426
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
906!
1427
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,812!
1428

1429
_exit:
906✔
1430

1431
  return code;
906✔
1432
}
1433

1434

1435
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
303✔
1436
  int32_t code = 0;
303✔
1437
  int32_t lino;
1438

1439
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
606!
1440
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
606!
1441
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
606!
1442
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
606!
1443
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
606!
1444
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
606!
1445
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->hasPartitionBy));
606!
1446
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
606!
1447
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
606!
1448

1449
  int32_t addrSize = 0;
303✔
1450
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
303!
1451
  if (addrSize > 0) {
303✔
1452
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
31✔
1453
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
31!
1454
  }
1455
  for (int32_t i = 0; i < addrSize; ++i) {
334✔
1456
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
31✔
1457
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
31!
1458
  }
1459
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
606!
1460
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
606!
1461
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
606!
1462

1463
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
606!
1464
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
606!
1465
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
606!
1466
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
606!
1467

1468
  switch (pMsg->triggerType) {
303!
1469
    case WINDOW_TYPE_SESSION:
5✔
1470
      // session trigger
1471
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
10!
1472
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
10!
1473
      break;
5✔
1474
    case WINDOW_TYPE_STATE:
138✔
1475
      // state trigger
1476
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
276!
1477
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
276!
1478
      break;
138✔
1479
    
1480
    case WINDOW_TYPE_INTERVAL:
119✔
1481
      // slide trigger
1482
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
238!
1483
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
238!
1484
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
238!
1485
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
238!
1486
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
238!
1487
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
238!
1488
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
238!
1489
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
238!
1490
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
238!
1491
      break;
119✔
1492
    
1493
    case WINDOW_TYPE_EVENT:
24✔
1494
      // event trigger
1495
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
48!
1496
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
48!
1497
      
1498
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
48!
1499
      break;
24✔
1500
    
1501
    case WINDOW_TYPE_COUNT:
10✔
1502
      // count trigger
1503
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
20!
1504
      
1505
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
20!
1506
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
20!
1507
      break;
10✔
1508
    
1509
    case WINDOW_TYPE_PERIOD:
7✔
1510
      // period trigger
1511
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
14!
1512
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
14!
1513
      break;
7✔
1514
    default:
×
1515
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1516
      break;
×
1517
  }
1518

1519
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
606!
1520
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
606!
1521
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
606!
1522
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
606!
1523
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
606!
1524
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
606!
1525
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
606!
1526

1527
  int32_t readerNum = 0;
303✔
1528
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
303!
1529
  if (readerNum > 0) {
303✔
1530
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
301✔
1531
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
301!
1532
  }
1533
  for (int32_t i = 0; i < readerNum; ++i) {
687✔
1534
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
384✔
1535
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
384!
1536
  }
1537

1538
  int32_t runnerNum = 0;
303✔
1539
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
303!
1540
  if (runnerNum > 0) {
303✔
1541
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
302✔
1542
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
302!
1543
  }
1544
  for (int32_t i = 0; i < runnerNum; ++i) {
1,209✔
1545
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
906✔
1546
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
906!
1547
  }
1548

1549
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
606!
1550
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
606!
1551

1552
_exit:
303✔
1553

1554
  return code;
303✔
1555
}
1556

1557

1558

1559
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
8,387✔
1560
  int32_t code = 0;
8,387✔
1561
  int32_t lino;
1562

1563
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
8,387!
1564
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
16,774!
1565
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
16,774!
1566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
16,774!
1567
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
16,774!
1568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
16,774!
1569

1570
_exit:
8,387✔
1571

1572
  return code;
8,387✔
1573
}
1574

1575
void destroySStreamOutCols(void* p){
237✔
1576
  if (p == NULL) return;
237!
1577
  SStreamOutCol* col = (SStreamOutCol*)p;
237✔
1578
  taosMemoryFreeClear(col->expr);
237!
1579
}
1580

1581
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
974✔
1582
  int32_t code = 0;
974✔
1583
  int32_t lino;
1584

1585
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,948!
1586
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1,948!
1587
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
1,948!
1588
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
1,948!
1589
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
1,948!
1590
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
1,948!
1591
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
1,948!
1592
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
1,948!
1593

1594
  int32_t addrSize = 0;
974✔
1595
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
974!
1596
  if (addrSize > 0) {
974✔
1597
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
90✔
1598
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
90!
1599
  }
1600
  for (int32_t i = 0; i < addrSize; ++i) {
1,064✔
1601
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
90✔
1602
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
90!
1603
  }
1604
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
1,948!
1605

1606
  int32_t outColNum = 0;
974✔
1607
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
974!
1608
  if (outColNum > 0) {
974!
1609
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
974✔
1610
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
974!
1611
  }
1612
  for (int32_t i = 0; i < outColNum; ++i) {
4,883✔
1613
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
3,909✔
1614
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
3,909!
1615
  }
1616

1617
  int32_t outTagNum = 0;
974✔
1618
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
974!
1619
  if (outTagNum > 0) {
974✔
1620
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
551✔
1621
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
551!
1622
  }
1623
  for (int32_t i = 0; i < outTagNum; ++i) {
1,903✔
1624
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
929✔
1625
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
929!
1626
  }
1627

1628
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
1,948!
1629
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
1,948!
1630

1631
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
1,948!
1632
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
1,948!
1633

1634
  int32_t forceOutColsSize = 0;
974✔
1635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
974!
1636
  if (forceOutColsSize > 0) {
974✔
1637
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
36✔
1638
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
36!
1639
  }
1640
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,211✔
1641
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
237✔
1642

1643
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
474!
1644
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
474!
1645
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
474!
1646
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
474!
1647
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
474!
1648
  }
1649

1650
_exit:
974✔
1651

1652
  return code;
974✔
1653
}
1654

1655
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
1,903✔
1656
  int32_t code = 0;
1,903✔
1657
  int32_t lino;
1658

1659
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1,903!
1660
  switch (pTask->task.type) {
1,903!
1661
    case STREAM_READER_TASK:
626✔
1662
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
626!
1663
      break;
626✔
1664
    case STREAM_TRIGGER_TASK:
303✔
1665
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
303!
1666
      break;
303✔
1667
    case STREAM_RUNNER_TASK:
974✔
1668
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
974!
1669
      break;
974✔
1670
    default:
×
1671
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1672
      break;
×
1673
  }
1674
  
1675
_exit:
1,903✔
1676

1677
  return code;
1,903✔
1678
}
1679

1680

1681
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
401✔
1682
  int32_t code = 0;
401✔
1683
  int32_t lino;
1684

1685
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
802!
1686

1687
  int32_t readerNum = 0;
401✔
1688
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
401!
1689
  if (readerNum > 0) {
401✔
1690
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
349✔
1691
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
349!
1692
  }
1693
  for (int32_t i = 0; i < readerNum; ++i) {
1,027✔
1694
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
626✔
1695
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
626!
1696
  }
1697

1698
  int32_t triggerTask = 0;
401✔
1699
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
401!
1700
  if (triggerTask) {
401✔
1701
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
303!
1702
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
303!
1703
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
303!
1704
  }
1705
  
1706
  int32_t runnerNum = 0;
401✔
1707
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
401!
1708
  if (runnerNum > 0) {
401✔
1709
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
346✔
1710
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
346!
1711
  }
1712
  for (int32_t i = 0; i < runnerNum; ++i) {
1,375✔
1713
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
974✔
1714
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
974!
1715
  }
1716

1717
_exit:
401✔
1718

1719
  return code;
401✔
1720
}
1721

1722

1723
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
361✔
1724
  int32_t code = 0;
361✔
1725
  int32_t lino;
1726

1727
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
361!
1728

1729
_exit:
361✔
1730

1731
  return code;
361✔
1732
}
1733

1734

1735
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
361✔
1736
  int32_t code = 0;
361✔
1737
  int32_t lino;
1738

1739
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
361!
1740
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
361!
1741

1742
_exit:
361✔
1743

1744
  return code;
361✔
1745
}
1746

1747

1748
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
184✔
1749
  int32_t code = 0;
184✔
1750
  int32_t lino;
1751

1752
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
184!
1753
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
368!
1754
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
368!
1755

1756
_exit:
184✔
1757

1758
  return code;
184✔
1759
}
1760

1761

1762
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
184✔
1763
  int32_t code = 0;
184✔
1764
  int32_t lino;
1765

1766
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
184!
1767
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
184!
1768

1769
_exit:
184✔
1770

1771
  return code;
184✔
1772
}
1773

1774
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
×
1775
  int32_t code = 0;
×
1776
  int32_t lino;
1777

1778
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
×
1779
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
×
1780
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
×
1781

1782
_exit:
×
1783

1784
  return code;
×
1785
}
1786

1787
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
65✔
1788
  int32_t code = 0;
65✔
1789
  int32_t lino;
1790

1791
  switch (msgType) {
65!
1792
    case STREAM_MSG_ORIGTBL_READER_INFO: {
65✔
1793
      int32_t vgNum = 0;
65✔
1794
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
65!
1795
      if (vgNum > 0) {
65!
1796
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
65✔
1797
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
65!
1798
      }
1799
      for (int32_t i = 0; i < vgNum; ++i) {
192✔
1800
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
127✔
1801
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
127!
1802
      }
1803

1804
      int32_t readerNum = 0;
65✔
1805
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
65!
1806
      if (readerNum > 0) {
65✔
1807
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
21✔
1808
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
21!
1809
      }
1810
      for (int32_t i = 0; i < readerNum; ++i) {
87✔
1811
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
22✔
1812
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
22!
1813
      }
1814
      break;
65✔
1815
    }
1816
    case STREAM_MSG_UPDATE_RUNNER: {
×
1817
      int32_t runnerNum = 0;
×
1818
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1819
      if (runnerNum > 0) {
×
1820
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1821
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1822
      }
1823
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1824
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1825
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1826
      }
1827
      break;
×
1828
    }
1829
    case STREAM_MSG_USER_RECALC: {
×
1830
      int32_t recalcNum = 0;
×
1831
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
×
1832
      if (recalcNum > 0) {
×
1833
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
×
1834
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
×
1835
      }
1836
      for (int32_t i = 0; i < recalcNum; ++i) {
×
1837
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
×
1838
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
×
1839
      }
1840
      break;
×
1841
    }
1842
    default:
×
1843
      break;
×
1844
  }
1845

1846
_exit:
65✔
1847

1848
  return code;
65✔
1849
}
1850

1851

1852
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
65✔
1853
  int32_t code = 0;
65✔
1854
  int32_t lino;
1855

1856
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
65!
1857
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
130!
1858
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
130!
1859
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
65!
1860
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
65!
1861

1862
_exit:
65✔
1863

1864
  return code;
65✔
1865
}
1866

1867
void tFreeSStreamMgmtRsp(void* param) {
130✔
1868
  if (NULL == param) {
130!
1869
    return;
×
1870
  }
1871
  
1872
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
130✔
1873

1874
  taosArrayDestroy(pRsp->cont.vgIds);
130✔
1875
  taosArrayDestroy(pRsp->cont.readerList);
130✔
1876
  taosArrayDestroy(pRsp->cont.runnerList);
130✔
1877
  taosArrayDestroy(pRsp->cont.recalcList);
130✔
1878
}
1879

1880
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
626✔
1881
  if (NULL == pReader) {
626!
1882
    return;
×
1883
  }
1884
  
1885
  if (pReader->triggerReader) {
626✔
1886
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
431✔
1887
    taosMemoryFree(pMsg->triggerTblName);
431!
1888
    taosMemoryFree(pMsg->partitionCols);
431!
1889
    taosMemoryFree(pMsg->triggerCols);
431!
1890
    taosMemoryFree(pMsg->triggerScanPlan);
431!
1891
    taosMemoryFree(pMsg->calcCacheScanPlan);
431!
1892
  } else {
1893
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
195✔
1894
    taosMemoryFree(pMsg->calcScanPlan);
195!
1895
  }
1896
}
1897

1898
void tFreeStreamNotifyUrl(void* param) {
×
1899
  if (NULL == param) {
×
1900
    return;
×
1901
  }
1902

1903
  taosMemoryFree(*(void**)param);
×
1904
}
1905

1906
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
303✔
1907
  if (NULL == pTrigger) {
303!
1908
    return;
×
1909
  }
1910
  
1911
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
303✔
1912
  switch (pTrigger->triggerType) {
303✔
1913
    case WINDOW_TYPE_EVENT:
24✔
1914
      taosMemoryFree(pTrigger->trigger.event.startCond);
24!
1915
      taosMemoryFree(pTrigger->trigger.event.endCond);
24!
1916
      break;
24✔
1917
    case WINDOW_TYPE_COUNT:
10✔
1918
      taosMemoryFree(pTrigger->trigger.count.condCols);  
10!
1919
      break;
10✔
1920
    default:
269✔
1921
      break;
269✔
1922
  }
1923

1924
  taosMemoryFree(pTrigger->triggerPrevFilter);
303!
1925
  taosMemoryFree(pTrigger->triggerScanPlan);
303!
1926
  taosMemoryFree(pTrigger->calcCacheScanPlan);
303!
1927

1928
  taosArrayDestroy(pTrigger->readerList);
303✔
1929
  taosArrayDestroy(pTrigger->runnerList);
303✔
1930
  taosMemoryFree(pTrigger->streamName);
303!
1931
}
1932

1933
void tFreeSStreamOutCol(void* param) {
×
1934
  if (NULL == param) {
×
1935
    return;
×
1936
  }
1937

1938
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1939
  taosMemoryFree(pOut->expr);
×
1940
}
1941

1942
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
974✔
1943
  if (NULL == pRunner) {
974!
1944
    return;
×
1945
  }
1946

1947
  taosMemoryFree(pRunner->streamName);
974!
1948
  taosMemoryFree(pRunner->pPlan);
974!
1949
  taosMemoryFree(pRunner->outDBFName);
974!
1950
  taosMemoryFree(pRunner->outTblName);
974!
1951

1952
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
974✔
1953
  taosArrayDestroy(pRunner->outCols);
974✔
1954
  taosArrayDestroy(pRunner->outTags);
974✔
1955

1956
  taosMemoryFree(pRunner->subTblNameExpr);
974!
1957
  taosMemoryFree(pRunner->tagValueExpr);
974!
1958
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
974✔
1959
}
1960

1961
void tFreeSStmTaskDeploy(void* param) {
2,402✔
1962
  if (NULL == param) {
2,402✔
1963
    return;
499✔
1964
  }
1965

1966
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
1,903✔
1967
  switch (pTask->task.type)  {
1,903!
1968
    case STREAM_READER_TASK:
626✔
1969
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
626✔
1970
      break;
626✔
1971
    case STREAM_TRIGGER_TASK:
303✔
1972
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
303✔
1973
      break;
303✔
1974
    case STREAM_RUNNER_TASK:
974✔
1975
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
974✔
1976
      break;
974✔
1977
    default:
×
1978
      break;
×
1979
  }
1980
}
1981

1982
void tFreeSStmStreamDeploy(void* param) {
401✔
1983
  if (NULL == param) {
401!
1984
    return;
×
1985
  }
1986
  
1987
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
401✔
1988
  taosArrayDestroy(pDeploy->readerTasks);
401✔
1989
  if (pDeploy->triggerTask) {
401✔
1990
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
303✔
1991
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
303✔
1992
    taosMemoryFree(pDeploy->triggerTask);
303!
1993
  }
1994

1995
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
401✔
1996
  for (int32_t i = 0; i < runnerNum; ++i) {
1,375✔
1997
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
974✔
1998
    taosMemoryFree(pRunner->msg.runner.pPlan);
974!
1999
  }
2000
  taosArrayDestroy(pDeploy->runnerTasks);
401✔
2001
}
2002

2003
void tDeepFreeSStmStreamDeploy(void* param) {
802✔
2004
  if (NULL == param) {
802!
2005
    return;
×
2006
  }
2007
  
2008
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
802✔
2009
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
802✔
2010
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
802✔
2011
  taosMemoryFree(pDeploy->triggerTask);
802!
2012
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
802✔
2013
}
2014

2015

2016
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
66,062✔
2017
  if (NULL == pRsp) {
66,062!
2018
    return;
×
2019
  }
2020
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
66,062✔
2021
  taosArrayDestroy(pRsp->start.taskList);
66,062✔
2022
  taosArrayDestroy(pRsp->undeploy.taskList);
66,062✔
2023
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
66,062✔
2024
}
2025

2026
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
32,712✔
2027
  if (NULL == pRsp) {
32,712!
2028
    return;
×
2029
  }
2030
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
32,712✔
2031
  taosArrayDestroy(pRsp->start.taskList);
32,712✔
2032
  taosArrayDestroy(pRsp->undeploy.taskList);
32,712✔
2033
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
32,712✔
2034
}
2035

2036

2037

2038
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
32,712✔
2039
  int32_t code = 0;
32,712✔
2040
  int32_t lino;
2041

2042
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
32,712!
2043
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
65,424!
2044
  int32_t deployNum = 0;
32,712✔
2045
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
32,712!
2046
  if (deployNum > 0) {
32,712✔
2047
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
116✔
2048
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
116!
2049
  }
2050
  for (int32_t i = 0; i < deployNum; ++i) {
33,113✔
2051
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
401✔
2052
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
401!
2053
  }
2054

2055
  int32_t startNum = 0;
32,712✔
2056
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
32,712!
2057
  if (startNum > 0) {
32,712✔
2058
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
171✔
2059
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
171!
2060
  }
2061
  for (int32_t i = 0; i < startNum; ++i) {
33,073✔
2062
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
361✔
2063
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
361!
2064
  }
2065

2066
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
65,424!
2067
  if (!pRsp->undeploy.undeployAll) {
32,712!
2068
    int32_t undeployNum = 0;
32,712✔
2069
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
32,712!
2070
    if (undeployNum > 0) {
32,712✔
2071
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
66✔
2072
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
66!
2073
    }
2074
    for (int32_t i = 0; i < undeployNum; ++i) {
32,896✔
2075
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
184✔
2076
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
184!
2077
    }
2078
  }  
2079

2080
  int32_t rspNum = 0;
32,712✔
2081
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
32,712!
2082
  if (rspNum > 0) {
32,712✔
2083
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
24✔
2084
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
24!
2085
    for (int32_t i = 0; i < rspNum; ++i) {
89✔
2086
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
65✔
2087
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
65!
2088
    }
2089
  }
2090

2091
  tEndDecode(pDecoder);
32,712✔
2092

2093
_exit:
32,712✔
2094
  return code;
32,712✔
2095
}
2096

2097
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
2098
  int32_t code = 0;
×
2099
  int32_t lino;
2100

2101
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2102
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2103
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
2104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
2105
  tEndEncode(pEncoder);
×
2106

2107
_exit:
×
2108
  return code;
×
2109
}
2110

2111
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
2112
  int32_t code = 0;
×
2113
  int32_t lino;
2114

2115
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2116
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2117
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
2118
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
2119
  tEndDecode(pDecoder);
×
2120

2121
_exit:
×
2122
  return code;
×
2123
}
2124

2125
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
2126
  int32_t code = 0;
×
2127
  int32_t lino;
2128

2129
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2130
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2131
  tEndEncode(pEncoder);
×
2132

2133
_exit:
×
2134
  return code;
×
2135
}
2136

2137
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
2138
  int32_t code = 0;
×
2139
  int32_t lino;
2140

2141
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2142
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2143
  tEndDecode(pDecoder);
×
2144

2145
_exit:
×
2146
  return code;
×
2147

2148
}
2149

2150

2151
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
2,642✔
2152
  int32_t code = 0;
2,642✔
2153
  int32_t lino;
2154

2155
  // name part
2156
  int32_t sqlLen = pReq->sql == NULL ? 0 : (int32_t)strlen(pReq->sql) + 1;
2,642!
2157
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
2,642!
2158
  int32_t outDbLen = pReq->outDB == NULL ? 0 : (int32_t)strlen(pReq->outDB) + 1;
2,642✔
2159
  int32_t streamDBLen = pReq->streamDB == NULL ? 0 : (int32_t)strlen(pReq->streamDB) + 1;
2,642!
2160
  int32_t triggerDBLen = pReq->triggerDB == NULL ? 0 : (int32_t)strlen(pReq->triggerDB) + 1;
2,642✔
2161
  int32_t triggerTblNameLen = pReq->triggerTblName == NULL ? 0 : (int32_t)strlen(pReq->triggerTblName) + 1;
2,642✔
2162
  int32_t outTblNameLen = pReq->outTblName == NULL ? 0 : (int32_t)strlen(pReq->outTblName) + 1;
2,642✔
2163

2164
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
5,284!
2165

2166
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->name, nameLen));
5,284!
2167
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->sql, sqlLen));
5,284!
2168
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outDB, outDbLen));
5,284!
2169
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->streamDB, streamDBLen));
5,284!
2170
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerDB, triggerDBLen));
5,284!
2171
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerTblName, triggerTblNameLen));
5,284!
2172
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outTblName, outTblNameLen));
5,284!
2173

2174
  int32_t calcDbSize = (int32_t)taosArrayGetSize(pReq->calcDB);
2,642✔
2175
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcDbSize));
2,642!
2176
  for (int32_t i = 0; i < calcDbSize; ++i) {
5,280✔
2177
    const char *dbName = taosArrayGetP(pReq->calcDB, i);
2,638✔
2178
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, dbName)));
2,638!
2179
  }
2180

2181
  // trigger control part
2182
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igExists));
5,284!
2183
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerType));
5,284!
2184
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igDisorder));
5,284!
2185
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteReCalc));
5,284!
2186
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteOutTbl));
5,284!
2187
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistory));
5,284!
2188
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistoryFirst));
5,284!
2189
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->calcNotifyOnly));
5,284!
2190
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->lowLatencyCalc));
5,284!
2191
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igNoDataTrigger));
5,284!
2192

2193
  // notify part
2194
  int32_t addrSize = (int32_t)taosArrayGetSize(pReq->pNotifyAddrUrls);
2,642✔
2195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
2,642!
2196
  for (int32_t i = 0; i < addrSize; ++i) {
2,898✔
2197
    const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
256✔
2198
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, url)));
256!
2199
  }
2200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyEventTypes));
5,284!
2201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyErrorHandle));
5,284!
2202
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->notifyHistory));
5,284!
2203

2204
  // out table part
2205

2206
  // trigger cols and partition cols
2207
  int32_t filterColsLen = pReq->triggerFilterCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerFilterCols) + 1;
2,642✔
2208
  int32_t triggerColsLen = pReq->triggerCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerCols) + 1;
2,642✔
2209
  int32_t partitionColsLen = pReq->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pReq->partitionCols) + 1;
2,642✔
2210
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerFilterCols, filterColsLen));
5,284!
2211
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerCols, triggerColsLen));
5,284!
2212
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->partitionCols, partitionColsLen));
5,284!
2213

2214
  // out col
2215
  int32_t outColSize = (int32_t )taosArrayGetSize(pReq->outCols);
2,642✔
2216
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColSize));
2,642!
2217
  for (int32_t i = 0; i < outColSize; ++i) {
10,698✔
2218
    SFieldWithOptions *pField = taosArrayGet(pReq->outCols, i);
8,056✔
2219
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pField));
8,056!
2220
  }
2221

2222
  // out tag
2223
  int32_t outTagSize = (int32_t )taosArrayGetSize(pReq->outTags);
2,642✔
2224
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagSize));
2,642!
2225
  for (int32_t i = 0; i < outTagSize; ++i) {
4,314✔
2226
    SField *pField = taosArrayGet(pReq->outTags, i);
1,672✔
2227
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->type));
3,344!
2228
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
3,344!
2229
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
3,344!
2230
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
3,344!
2231
  }
2232

2233
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->maxDelay));
5,284!
2234
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->fillHistoryStartTime));
5,284!
2235
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->watermark));
5,284!
2236
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->expiredTime));
5,284!
2237

2238
  switch (pReq->triggerType) {
2,642!
2239
    case WINDOW_TYPE_SESSION: {
68✔
2240
      // session trigger
2241
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.session.slotId));
136!
2242
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.session.sessionVal));
136!
2243
      break;
68✔
2244
    }
2245
    case WINDOW_TYPE_STATE: {
632✔
2246
      // state trigger
2247
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.stateWin.slotId));
1,264!
2248
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.stateWin.trueForDuration));
1,264!
2249
      break;
632✔
2250
    }
2251
    case WINDOW_TYPE_INTERVAL: {
1,506✔
2252
      // slide trigger
2253
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.intervalUnit));
3,012!
2254
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.slidingUnit));
3,012!
2255
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.offsetUnit));
3,012!
2256
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.soffsetUnit));
3,012!
2257
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.precision));
3,012!
2258
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.interval));
3,012!
2259
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.offset));
3,012!
2260
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.sliding));
3,012!
2261
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.soffset));
3,012!
2262
      break;
1,506✔
2263
    }
2264
    case WINDOW_TYPE_EVENT: {
144✔
2265
      // event trigger
2266
      int32_t eventWindowStartCondLen = pReq->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.startCond) + 1;
144!
2267
      int32_t eventWindowEndCondLen = pReq->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.endCond) + 1;
144!
2268

2269
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.startCond, eventWindowStartCondLen));
288!
2270
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.endCond, eventWindowEndCondLen));
288!
2271

2272
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.event.trueForDuration));
288!
2273
      break;
144✔
2274
    }
2275
    case WINDOW_TYPE_COUNT: {
136✔
2276
      // count trigger
2277
      int32_t countWindowCondColsLen = pReq->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.count.condCols) + 1;
136!
2278
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.count.condCols, countWindowCondColsLen));
272!
2279

2280
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.countVal));
272!
2281
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.sliding));
272!
2282
      break;
136✔
2283
    }
2284
    case WINDOW_TYPE_PERIOD: {
156✔
2285
      // period trigger
2286
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.precision));
312!
2287
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.periodUnit));
312!
2288
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.offsetUnit));
312!
2289
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.period));
312!
2290
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.offset));
312!
2291
      break;
156✔
2292
    }
2293
  }
2294

2295
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerTblType));
5,284!
2296
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblUid));
5,284!
2297
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblSuid));
5,284!
2298
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->vtableCalc));
5,284!
2299
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outTblType));
5,284!
2300
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outStbExists));
5,284!
2301
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->outStbUid));
5,284!
2302
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outStbSversion));
5,284!
2303
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->eventTypes));
5,284!
2304
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->flags));
5,284!
2305
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->tsmaId));
5,284!
2306
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->placeHolderBitmap));
5,284!
2307
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->calcTsSlotId));
5,284!
2308
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->triTsSlotId));
5,284!
2309

2310
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->triggerTblVgId));
5,284!
2311
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outTblVgId));
5,284!
2312

2313
  int32_t triggerScanPlanLen = pReq->triggerScanPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerScanPlan) + 1;
2,642✔
2314
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerScanPlan, triggerScanPlanLen));
5,284!
2315

2316
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerHasPF));
5,284!
2317
  int32_t triggerFilterLen = pReq->triggerPrevFilter == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerPrevFilter) + 1;
2,642✔
2318
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerPrevFilter, triggerFilterLen));
5,284!
2319

2320
  int32_t calcScanPlanListSize = (int32_t)taosArrayGetSize(pReq->calcScanPlanList);
2,642✔
2321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcScanPlanListSize));
2,642!
2322
  for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
5,436✔
2323
    SStreamCalcScan* pCalcScanPlan = (SStreamCalcScan*)taosArrayGet(pReq->calcScanPlanList, i);
2,794✔
2324
    int32_t          vgListSize = (int32_t)taosArrayGetSize(pCalcScanPlan->vgList);
2,794✔
2325
    int32_t          scanPlanLen = pCalcScanPlan->scanPlan == NULL ? 0 : (int32_t)strlen((char*)pCalcScanPlan->scanPlan) + 1;
2,794!
2326
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgListSize));
2,794!
2327
    for (int32_t j = 0; j < vgListSize; ++j) {
5,588✔
2328
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pCalcScanPlan->vgList, j)));
5,588!
2329
    }
2330
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pCalcScanPlan->readFromCache));
5,588!
2331
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCalcScanPlan->scanPlan, scanPlanLen));
5,588!
2332
  }
2333

2334
  int32_t calcPlanLen = pReq->calcPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->calcPlan) + 1;
2,642✔
2335
  int32_t subTblNameExprLen = pReq->subTblNameExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->subTblNameExpr) + 1;
2,642✔
2336
  int32_t tagValueExprLen = pReq->tagValueExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->tagValueExpr) + 1;
2,642✔
2337

2338
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfCalcSubplan));
5,284!
2339
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->calcPlan, calcPlanLen));
5,284!
2340
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->subTblNameExpr, subTblNameExprLen));
5,284!
2341
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->tagValueExpr, tagValueExprLen));
5,284!
2342

2343
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pReq->forceOutCols);
2,642✔
2344
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
2,642!
2345
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
2,930✔
2346
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pReq->forceOutCols, i);
288✔
2347
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
288!
2348

2349
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
576!
2350
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
576!
2351
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
576!
2352
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
576!
2353
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
576!
2354
  }
2355

2356
_exit:
2,642✔
2357

2358
  if (code) {
2,642!
2359
    return code;
×
2360
  }
2361
  
2362
  return 0;
2,642✔
2363
}
2364

2365
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
1,452✔
2366
  SEncoder encoder = {0};
1,452✔
2367
  tEncoderInit(&encoder, buf, bufLen);
1,452✔
2368
  int32_t code = 0;
1,452✔
2369
  int32_t lino;
2370

2371
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,452!
2372

2373
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
1,452!
2374

2375
  tEndEncode(&encoder);
1,452✔
2376

2377
_exit:
1,452✔
2378
  if (code) {
1,452!
2379
    tEncoderClear(&encoder);
×
2380
    return code;
×
2381
  } else {
2382
    int32_t tlen = encoder.pos;
1,452✔
2383
    tEncoderClear(&encoder);
1,452✔
2384
    return tlen;
1,452✔
2385
  }
2386
  return 0;
2387
}
2388

2389

2390
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
1,042✔
2391
  int32_t code = 0;
1,042✔
2392
  int32_t lino;
2393

2394
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
2,084!
2395

2396
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
2,084!
2397
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
2,084!
2398
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
2,084!
2399
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
2,084!
2400
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
2,084!
2401
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
2,084!
2402
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
2,084!
2403

2404
  int32_t calcDbSize = 0;
1,042✔
2405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
1,042!
2406
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
1,042✔
2407
  if (pReq->calcDB == NULL) {
1,042!
2408
    TAOS_CHECK_EXIT(terrno);
×
2409
  }
2410
  for (int32_t i = 0; i < calcDbSize; ++i) {
2,082✔
2411
    char *calcDb = NULL;
1,040✔
2412
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
1,040!
2413
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
1,040!
2414
    if (calcDb == NULL) {
1,040!
2415
      TAOS_CHECK_EXIT(terrno);
×
2416
    }
2417
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
2,080!
2418
      taosMemoryFree(calcDb);
×
2419
      TAOS_CHECK_EXIT(terrno);
×
2420
    }
2421
  }
2422

2423
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
2,084!
2424
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
2,084!
2425
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
2,084!
2426
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
2,084!
2427
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
2,084!
2428
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
2,084!
2429
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
2,084!
2430
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
2,084!
2431
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
2,084!
2432
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
2,084!
2433

2434
  int32_t addrSize = 0;
1,042✔
2435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
1,042!
2436
  if (addrSize > 0) {
1,042✔
2437
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
80✔
2438
    if (pReq->pNotifyAddrUrls == NULL) {
80!
2439
      TAOS_CHECK_EXIT(terrno);
×
2440
    }
2441
  }
2442
  for (int32_t i = 0; i < addrSize; ++i) {
1,137✔
2443
    char *url = NULL;
95✔
2444
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
95!
2445
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
95!
2446
    if (url == NULL) {
95!
2447
      TAOS_CHECK_EXIT(terrno);
×
2448
    }
2449
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
190!
2450
      taosMemoryFree(url);
×
2451
      TAOS_CHECK_EXIT(terrno);
×
2452
    }
2453
  }
2454
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
2,084!
2455
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyErrorHandle));
2,084!
2456
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
2,084!
2457

2458
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
2,084!
2459
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
2,084!
2460
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
2,084!
2461

2462
  int32_t outColSize = 0;
1,042✔
2463
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
1,042!
2464
  if (outColSize > 0) {
1,042✔
2465
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
1,040✔
2466
    if (pReq->outCols == NULL) {
1,040!
2467
      TAOS_CHECK_EXIT(terrno);
×
2468
    }
2469

2470
    for (int32_t i = 0; i < outColSize; ++i) {
4,589✔
2471
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
3,549✔
2472
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
3,549!
2473
    }
2474
  }
2475

2476
  int32_t outTagSize = 0;
1,042✔
2477
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
1,042!
2478
  if (outTagSize > 0) {
1,042✔
2479
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
460✔
2480
    if (pReq->outTags == NULL) {
460!
2481
      TAOS_CHECK_EXIT(terrno);
×
2482
    }
2483

2484
    for (int32_t i = 0; i < outTagSize; ++i) {
1,236✔
2485
      SFieldWithOptions field = {0};
776✔
2486
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
776!
2487
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
776!
2488
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
776!
2489
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
776!
2490
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
1,552!
2491
        TAOS_CHECK_EXIT(terrno);
×
2492
      }
2493
    }
2494
  }
2495

2496
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
2,084!
2497
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
2,084!
2498
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
2,084!
2499
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
2,084!
2500

2501
  switch (pReq->triggerType) {
1,042!
2502
    case WINDOW_TYPE_SESSION: {
22✔
2503
      // session trigger
2504
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
44!
2505
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
44!
2506
      break;
22✔
2507
    }
2508
      case WINDOW_TYPE_STATE: {
301✔
2509
        // state trigger
2510
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
602!
2511
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
602!
2512
        break;
301✔
2513
      }
2514
      case WINDOW_TYPE_INTERVAL: {
569✔
2515
        // slide trigger
2516
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
1,138!
2517
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
1,138!
2518
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
1,138!
2519
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
1,138!
2520
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
1,138!
2521
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
1,138!
2522
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
1,138!
2523
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
1,138!
2524
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
1,138!
2525
        break;
569✔
2526
      }
2527
      case WINDOW_TYPE_EVENT: {
60✔
2528
        // event trigger
2529
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
120!
2530
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
120!
2531
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
120!
2532
        break;
60✔
2533
      }
2534
      case WINDOW_TYPE_COUNT: {
44✔
2535
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
88!
2536

2537
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
88!
2538
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
88!
2539
        break;
44✔
2540
      }
2541
      case WINDOW_TYPE_PERIOD: {
46✔
2542
        // period trigger
2543
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
92!
2544
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
92!
2545
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
92!
2546
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
92!
2547
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
92!
2548
        break;
46✔
2549
      }
2550
      default:
×
2551
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2552
  }
2553

2554
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
2,084!
2555
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
2,084!
2556
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
2,084!
2557
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
2,084!
2558
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
2,084!
2559
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
2,084!
2560
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
2,084!
2561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
2,084!
2562
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
2,084!
2563
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
2,084!
2564
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
2,084!
2565
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
2,084!
2566
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
2,084!
2567
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
2,084!
2568

2569
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
2,084!
2570
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
2,084!
2571

2572
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
2,084!
2573

2574
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
2,084!
2575
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
2,084!
2576

2577
  int32_t calcScanPlanListSize = 0;
1,042✔
2578
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
1,042!
2579
  if (calcScanPlanListSize > 0) {
1,042✔
2580
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
1,040✔
2581
    if (pReq->calcScanPlanList == NULL) {
1,040!
2582
      TAOS_CHECK_EXIT(terrno);
×
2583
    }
2584
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
2,178✔
2585
      SStreamCalcScan calcScan = {0};
1,138✔
2586
      int32_t         vgListSize = 0;
1,138✔
2587
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
1,138!
2588
      if (vgListSize > 0) {
1,138!
2589
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
1,138✔
2590
        if (calcScan.vgList == NULL) {
1,138!
2591
          TAOS_CHECK_EXIT(terrno);
×
2592
        }
2593
        for (int32_t j = 0; j < vgListSize; ++j) {
2,276✔
2594
          int32_t vgId = 0;
1,138✔
2595
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
1,138!
2596
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
2,276!
2597
            TAOS_CHECK_EXIT(terrno);
×
2598
          }
2599
        }
2600
      }
2601
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
1,138!
2602
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
1,138!
2603
      taosArrayPush(pReq->calcScanPlanList, &calcScan);
1,138✔
2604
    }
2605
  }
2606

2607
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
2,084!
2608
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
2,084!
2609
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
2,084!
2610
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
2,084!
2611

2612
  int32_t forceOutColsSize = 0;
1,042✔
2613
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
1,042!
2614
  if (forceOutColsSize > 0) {
1,042✔
2615
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
22✔
2616
    if (pReq->forceOutCols == NULL) {
22!
2617
      TAOS_CHECK_EXIT(terrno);
×
2618
    }
2619
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
166✔
2620
      SStreamOutCol outCol = {0};
144✔
2621
      int64_t       exprLen = 0;
144✔
2622
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
144!
2623
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
144!
2624
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
144!
2625
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
144!
2626
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
144!
2627
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
288!
2628
        TAOS_CHECK_EXIT(terrno);
×
2629
      }
2630
    }
2631
  }
2632

2633
_exit:
1,042✔
2634

2635
  return code;
1,042✔
2636
}
2637

2638

2639
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
709✔
2640
  SDecoder decoder = {0};
709✔
2641
  tDecoderInit(&decoder, buf, bufLen);
709✔
2642
  int32_t code = 0;
709✔
2643
  int32_t lino;
2644

2645
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
709!
2646
  
2647
  TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImpl(&decoder, pReq));
709!
2648

2649
  tEndDecode(&decoder);
709✔
2650

2651
_exit:
709✔
2652

2653
  tDecoderClear(&decoder);
709✔
2654
  return code;
709✔
2655
}
2656

2657

2658
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
×
2659
  int32_t  code = 0;
×
2660
  int32_t  lino;
2661
  int32_t  tlen;
2662
  SEncoder encoder = {0};
×
2663
  tEncoderInit(&encoder, buf, bufLen);
×
2664

2665
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2666

2667
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2668
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2669
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2670

2671
  tEndEncode(&encoder);
×
2672

2673
_exit:
×
2674
  if (code) {
×
2675
    tlen = code;
×
2676
  } else {
2677
    tlen = encoder.pos;
×
2678
  }
2679
  tEncoderClear(&encoder);
×
2680
  return tlen;
×
2681
}
2682

2683
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
29✔
2684
  SDecoder decoder = {0};
29✔
2685
  int32_t  code = 0;
29✔
2686
  int32_t  lino;
2687
  tDecoderInit(&decoder, buf, bufLen);
29✔
2688

2689
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
29!
2690
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
58!
2691
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
58!
2692

2693
  tEndDecode(&decoder);
29✔
2694

2695
_exit:
29✔
2696
  tDecoderClear(&decoder);
29✔
2697
  return code;
29✔
2698
}
2699

2700
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
29✔
2701
  taosMemoryFreeClear(pReq->name);
29!
2702
}
29✔
2703

2704
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
2,281✔
2705
  if (pScan == NULL) {
2,281!
2706
    return;
×
2707
  }
2708
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
2,281✔
2709
  taosArrayDestroy(pCalcScan->vgList);
2,281✔
2710
  taosMemoryFreeClear(pCalcScan->scanPlan);
2,281!
2711
}
2712

2713
void tFreeStreamOutCol(void* pCol) {
216✔
2714
  if (pCol == NULL) {
216!
2715
    return;
×
2716
  }
2717
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
216✔
2718
  taosMemoryFreeClear(pOutCol->expr);
216!
2719
}
2720

2721

2722

2723
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
2,996✔
2724
  if (NULL == pReq) {
2,996✔
2725
    return;
346✔
2726
  }
2727
  taosMemoryFreeClear(pReq->name);
2,650!
2728
  taosMemoryFreeClear(pReq->sql);
2,650!
2729
  taosMemoryFreeClear(pReq->streamDB);
2,650!
2730
  taosMemoryFreeClear(pReq->triggerDB);
2,650!
2731
  taosMemoryFreeClear(pReq->outDB);
2,650!
2732
  taosMemoryFreeClear(pReq->triggerTblName);
2,650!
2733
  taosMemoryFreeClear(pReq->outTblName);
2,650!
2734

2735
  taosArrayDestroyP(pReq->calcDB, NULL);
2,650✔
2736
  pReq->calcDB = NULL;
2,650✔
2737
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
2,650✔
2738
  pReq->pNotifyAddrUrls = NULL;
2,650✔
2739

2740
  taosMemoryFreeClear(pReq->triggerFilterCols);
2,650!
2741
  taosMemoryFreeClear(pReq->triggerCols);
2,650!
2742
  taosMemoryFreeClear(pReq->partitionCols);
2,650!
2743

2744
  taosArrayDestroy(pReq->outTags);
2,650✔
2745
  pReq->outTags = NULL;
2,650✔
2746
  taosArrayDestroy(pReq->outCols);
2,650✔
2747
  pReq->outCols = NULL;
2,650✔
2748

2749
  switch (pReq->triggerType) {
2,650✔
2750
    case WINDOW_TYPE_EVENT:
120✔
2751
      taosMemoryFreeClear(pReq->trigger.event.startCond);
120!
2752
      taosMemoryFreeClear(pReq->trigger.event.endCond);
120!
2753
      break;
120✔
2754
    default:
2,530✔
2755
      break;
2,530✔
2756
  }
2757

2758
  taosMemoryFreeClear(pReq->triggerScanPlan);
2,650!
2759
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
2,650✔
2760
  pReq->calcScanPlanList = NULL;
2,650✔
2761
  taosMemoryFreeClear(pReq->triggerPrevFilter);
2,650!
2762

2763
  taosMemoryFreeClear(pReq->calcPlan);
2,650!
2764
  taosMemoryFreeClear(pReq->subTblNameExpr);
2,650!
2765
  taosMemoryFreeClear(pReq->tagValueExpr);
2,650!
2766
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
2,650✔
2767
  pReq->forceOutCols = NULL;
2,650✔
2768
}
2769

2770
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
279✔
2771
  int32_t code = 0, lino = 0;
279✔
2772
  if (NULL == pSrc) {
279!
2773
    return code;
×
2774
  } 
2775

2776
  void* p = NULL;
279✔
2777
  int32_t num = 0;
279✔
2778
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
279!
2779
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
279!
2780

2781
  SCMCreateStreamReq* pDst = *ppDst;
279✔
2782

2783
  if (pSrc->outDB) {
279✔
2784
    pDst->outDB = COPY_STR(pSrc->outDB);
278!
2785
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
278!
2786
  }
2787
  
2788
  if (pSrc->triggerTblName) {
279✔
2789
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
278!
2790
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
278!
2791
  }
2792
  
2793
  if (pSrc->outTblName) {
279✔
2794
    pDst->outTblName = COPY_STR(pSrc->outTblName);
278!
2795
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
278!
2796
  }
2797
  
2798
  if (pSrc->pNotifyAddrUrls) {
279✔
2799
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
31✔
2800
    if (num > 0) {
31!
2801
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
31✔
2802
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
31!
2803
    }
2804
    for (int32_t i = 0; i < num; ++i) {
62✔
2805
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
31!
2806
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
31!
2807
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
62!
2808
    }
2809
  }
2810
  
2811
  if (pSrc->triggerFilterCols) {
279✔
2812
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
36!
2813
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
36!
2814
  }
2815
  
2816
  if (pSrc->triggerCols) {
279✔
2817
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
273!
2818
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
273!
2819
  }
2820
  
2821
  if (pSrc->partitionCols) {
279✔
2822
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
157!
2823
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
157!
2824
  }
2825
  
2826
  if (pSrc->outCols) {
279✔
2827
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
278✔
2828
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
278!
2829
  }
2830
  
2831
  if (pSrc->outTags) {
279✔
2832
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
157✔
2833
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
157!
2834
  }
2835

2836
  pDst->triggerType = pSrc->triggerType;
279✔
2837
  
2838
  switch (pSrc->triggerType) {
279✔
2839
    case WINDOW_TYPE_EVENT:
24✔
2840
      if (pSrc->trigger.event.startCond) {
24!
2841
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
24!
2842
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
24!
2843
      }
2844
      
2845
      if (pSrc->trigger.event.endCond) {
24!
2846
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
24!
2847
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
24!
2848
      }
2849
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
24✔
2850
      break;
24✔
2851
    default:
255✔
2852
      pDst->trigger = pSrc->trigger;
255✔
2853
      break;
255✔
2854
  }
2855

2856

2857
  if (pSrc->triggerScanPlan) {
279✔
2858
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
278!
2859
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
278!
2860
  }
2861
  
2862
  if (pSrc->calcScanPlanList) {
279✔
2863
    num = taosArrayGetSize(pSrc->calcScanPlanList);
278✔
2864
    if (num > 0) {
278!
2865
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
278✔
2866
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
278!
2867
    }
2868
    for (int32_t i = 0; i < num; ++i) {
590✔
2869
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
312✔
2870
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
312✔
2871

2872
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
312✔
2873
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
312!
2874

2875
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
312!
2876
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
312!
2877
      
2878
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
624!
2879
    }
2880
  }
2881
  
2882
  if (pSrc->triggerPrevFilter) {
279✔
2883
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
36!
2884
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
36!
2885
  }
2886
  
2887
  if (pSrc->calcPlan) {
279✔
2888
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
278!
2889
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
278!
2890
  }
2891
  
2892
  if (pSrc->subTblNameExpr) {
279✔
2893
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
157!
2894
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
157!
2895
  }
2896
  
2897
  if (pSrc->tagValueExpr) {
279✔
2898
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
157!
2899
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
157!
2900
  }
2901
  
2902
  if (pSrc->forceOutCols) {
279✔
2903
    num = taosArrayGetSize(pSrc->forceOutCols);
11✔
2904
    if (num > 0) {
11!
2905
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
11✔
2906
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
11!
2907
    }
2908
    for (int32_t i = 0; i < num; ++i) {
83✔
2909
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
72✔
2910
      SStreamOutCol  dcol = {.type = scol->type};
72✔
2911

2912
      dcol.expr = COPY_STR(scol->expr);
72!
2913
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
72!
2914
      
2915
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
144!
2916
    }
2917
  }
2918

2919
  pDst->triggerTblUid = pSrc->triggerTblUid;
279✔
2920
  pDst->triggerTblType = pSrc->triggerTblType;
279✔
2921
  pDst->deleteReCalc = pSrc->deleteReCalc;
279✔
2922
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
279✔
2923
  
2924
_exit:
279✔
2925

2926
  if (code) {
279!
2927
    tFreeSCMCreateStreamReq(pDst);
×
2928
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2929
  }
2930

2931
  return code;
279✔
2932
}
2933

2934

2935
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
×
2936
  int32_t  code = 0;
×
2937
  int32_t  lino;
2938
  int32_t  tlen;
2939
  SEncoder encoder = {0};
×
2940
  tEncoderInit(&encoder, buf, bufLen);
×
2941
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2942

2943
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2944
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2945
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2946
  tEndEncode(&encoder);
×
2947

2948
_exit:
×
2949
  if (code) {
×
2950
    tlen = code;
×
2951
  } else {
2952
    tlen = encoder.pos;
×
2953
  }
2954
  tEncoderClear(&encoder);
×
2955
  return tlen;
×
2956
}
2957

2958
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
21✔
2959
  SDecoder decoder = {0};
21✔
2960
  int32_t  code = 0;
21✔
2961
  int32_t  lino;
2962

2963
  tDecoderInit(&decoder, buf, bufLen);
21✔
2964
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
2965
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
2966
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
42!
2967
  tEndDecode(&decoder);
21✔
2968

2969
_exit:
21✔
2970
  tDecoderClear(&decoder);
21✔
2971
  return code;
21✔
2972
}
2973

2974
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
×
2975
  taosMemoryFreeClear(pReq->name);
×
2976
}
×
2977

2978
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
×
2979
  SEncoder encoder = {0};
×
2980
  int32_t  code = 0;
×
2981
  int32_t  lino;
2982
  int32_t  tlen;
2983
  tEncoderInit(&encoder, buf, bufLen);
×
2984
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2985
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2986
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2987
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2988
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
×
2989
  tEndEncode(&encoder);
×
2990

2991
_exit:
×
2992
  if (code) {
×
2993
    tlen = code;
×
2994
  } else {
2995
    tlen = encoder.pos;
×
2996
  }
2997
  tEncoderClear(&encoder);
×
2998
  return tlen;
×
2999
}
3000

3001
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
21✔
3002
  SDecoder decoder = {0};
21✔
3003
  int32_t  code = 0;
21✔
3004
  int32_t  lino;
3005

3006
  tDecoderInit(&decoder, buf, bufLen);
21✔
3007
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
3008
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
3009
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
42!
3010
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
42!
3011
  tEndDecode(&decoder);
21✔
3012

3013
_exit:
21✔
3014
  tDecoderClear(&decoder);
21✔
3015
  return code;
21✔
3016
}
3017

3018
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
×
3019
  taosMemoryFreeClear(pReq->name);
×
3020
}
×
3021

3022
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
×
3023
  SEncoder encoder = {0};
×
3024
  int32_t  code = 0;
×
3025
  int32_t  lino;
3026
  int32_t  tlen;
3027
  tEncoderInit(&encoder, buf, bufLen);
×
3028
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3029
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
3030
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
3031
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
×
3032
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
×
3033
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
×
3034
  tEndEncode(&encoder);
×
3035

3036
_exit:
×
3037
  if (code) {
×
3038
    tlen = code;
×
3039
  } else {
3040
    tlen = encoder.pos;
×
3041
  }
3042
  tEncoderClear(&encoder);
×
3043
  return tlen;
×
3044
}
3045

3046
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
21✔
3047
  SDecoder decoder = {0};
21✔
3048
  int32_t  code = 0;
21✔
3049
  int32_t  lino;
3050

3051
  tDecoderInit(&decoder, buf, bufLen);
21✔
3052
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
3053

3054
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
3055
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
42!
3056
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
42!
3057
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
42!
3058
  tEndDecode(&decoder);
21✔
3059

3060
_exit:
21✔
3061
  tDecoderClear(&decoder);
21✔
3062
  return code;
21✔
3063
}
3064

3065
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
21✔
3066
  taosMemoryFreeClear(pReq->name);
21!
3067
}
21✔
3068

3069
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
×
3070
  int32_t code = 0;
×
3071
  int32_t lino;
3072

3073
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
3074
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
×
3075
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
×
3076

3077
_exit:
×
3078
  return code;
×
3079
}
3080

3081
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
×
3082
  SEncoder encoder = {0};
×
3083
  int32_t  code = 0;
×
3084
  int32_t  lino;
3085
  int32_t  tlen;
3086
  tEncoderInit(&encoder, buf, bufLen);
×
3087

3088
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3089
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
×
3090

3091
  tEndEncode(&encoder);
×
3092

3093
_exit:
×
3094
  if (code) {
×
3095
    tlen = code;
×
3096
  } else {
3097
    tlen = encoder.pos;
×
3098
  }
3099
  tEncoderClear(&encoder);
×
3100
  return tlen;
×
3101
}
3102

3103
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
×
3104
  int32_t code = 0;
×
3105
  int32_t lino;
3106

3107
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
3108
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
×
3109
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
×
3110

3111
_exit:
×
3112
  return code;
×
3113
}
3114

3115
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
×
3116
  SDecoder decoder = {0};
×
3117
  int32_t  code = 0;
×
3118
  int32_t  lino;
3119

3120
  tDecoderInit(&decoder, (char *)buf, bufLen);
×
3121

3122
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3123
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
×
3124

3125
  tEndDecode(&decoder);
×
3126

3127
_exit:
×
3128
  tDecoderClear(&decoder);
×
3129
  return code;
×
3130
}
3131

3132
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
×
3133
  int32_t code = 0;
×
3134
  int32_t lino;
3135

3136
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
3137
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
×
3138
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
×
3139
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
×
3140

3141
_exit:
×
3142
  return code;
×
3143
}
3144

3145
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
×
3146
  SEncoder encoder = {0};
×
3147
  int32_t  code = 0;
×
3148
  int32_t  lino;
3149
  int32_t  tlen;
3150
  tEncoderInit(&encoder, buf, bufLen);
×
3151

3152
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3153
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
×
3154

3155
  tEndEncode(&encoder);
×
3156

3157
_exit:
×
3158
  if (code) {
×
3159
    tlen = code;
×
3160
  } else {
3161
    tlen = encoder.pos;
×
3162
  }
3163
  tEncoderClear(&encoder);
×
3164
  return tlen;
×
3165
}
3166

3167
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
×
3168
  int32_t code = 0;
×
3169
  int32_t lino;
3170

3171
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
3172
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
×
3173
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
×
3174
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
×
3175

3176
_exit:
×
3177
  return code;
×
3178
}
3179

3180
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
×
3181
  SDecoder decoder = {0};
×
3182
  int32_t  code = 0;
×
3183
  int32_t  lino;
3184

3185
  tDecoderInit(&decoder, buf, bufLen);
×
3186

3187
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3188
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
×
3189

3190
  tEndDecode(&decoder);
×
3191

3192
_exit:
×
3193
  tDecoderClear(&decoder);
×
3194
  return code;
×
3195
}
3196

3197
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
129✔
3198
  SEncoder encoder = {0};
129✔
3199
  int32_t  code = TSDB_CODE_SUCCESS;
129✔
3200
  int32_t  lino = 0;
129✔
3201
  int32_t  tlen = 0;
129✔
3202

3203
  tEncoderInit(&encoder, buf, bufLen);
129✔
3204
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
129!
3205

3206
  int32_t size = taosArrayGetSize(pRsp->cols);
130✔
3207
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
130!
3208
  for (int32_t i = 0; i < size; ++i) {
856✔
3209
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
726✔
3210
    if (oInfo == NULL) {
726!
3211
      uError("col id is NULL at index %d", i);
×
3212
      code = TSDB_CODE_INVALID_PARA;
×
3213
      goto _exit;
×
3214
    }
3215
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
1,452!
3216
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
1,452!
3217
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
1,452!
3218
  }
3219

3220
  tEndEncode(&encoder);
130✔
3221

3222
_exit:
130✔
3223
  if (code != TSDB_CODE_SUCCESS) {
130!
3224
    tlen = code;
×
3225
  } else {
3226
    tlen = encoder.pos;
130✔
3227
  }
3228
  tEncoderClear(&encoder);
130✔
3229
  return tlen;
130✔
3230
}
3231

3232
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
65✔
3233
  SDecoder decoder = {0};
65✔
3234
  int32_t  code = TSDB_CODE_SUCCESS;
65✔
3235
  int32_t  lino = 0;
65✔
3236

3237
  tDecoderInit(&decoder, buf, bufLen);
65✔
3238
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
65!
3239

3240
  int32_t size = 0;
65✔
3241
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
65!
3242
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
65✔
3243
  if (pRsp->cols == NULL) {
65!
3244
    code = terrno;
×
3245
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3246
    goto _exit;
×
3247
  }
3248
  for (int32_t i = 0; i < size; ++i) {
428✔
3249
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
363✔
3250
    if (oInfo == NULL) {
363!
3251
      code = terrno;
×
3252
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3253
      goto _exit;
×
3254
    }
3255
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
726!
3256
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
726!
3257
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
726!
3258
  }
3259

3260
  tEndDecode(&decoder);
65✔
3261

3262
_exit:
65✔
3263
  tDecoderClear(&decoder);
65✔
3264
  return code;
65✔
3265
}
3266

3267
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
17,751✔
3268
  taosArrayDestroy(pRsp->cols);
17,751✔
3269
}
17,751✔
3270

3271
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
48,384✔
3272
  if (pReq == NULL) return;
48,384!
3273
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA) {
48,384✔
3274
    SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
2,547✔
3275
    if (pRequest->cids != NULL) {
2,547!
3276
      taosArrayDestroy(pRequest->cids);
2,548✔
3277
      pRequest->cids = NULL;
2,549✔
3278
    }
3279
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
45,837✔
3280
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
342✔
3281
    if (pRequest->cids != NULL) {
342!
3282
      taosArrayDestroy(pRequest->cids);
342✔
3283
      pRequest->cids = NULL;
342✔
3284
    }
3285
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
45,495✔
3286
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
195✔
3287
    if (pRequest->cids != NULL) {
195!
3288
      taosArrayDestroy(pRequest->cids);
195✔
3289
      pRequest->cids = NULL;
195✔
3290
    }
3291
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
45,300✔
3292
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
821✔
3293
    if (pRequest->cids != NULL) {
821!
3294
      taosArrayDestroy(pRequest->cids);
821✔
3295
      pRequest->cids = NULL;
821✔
3296
    }
3297
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
44,479✔
3298
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
66✔
3299
    if (pRequest->cols != NULL) {
66!
3300
      taosArrayDestroy(pRequest->cols);
66✔
3301
      pRequest->cols = NULL;
66✔
3302
    }
3303
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
44,413✔
3304
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
65✔
3305
    if (pRequest->uids != NULL) {
65!
3306
      taosArrayDestroy(pRequest->uids);
×
3307
      pRequest->uids = NULL;
×
3308
    }
3309
  }
3310
}
3311

3312
int32_t encodeColsArray(SEncoder* encoder, SArray* cids) {
18,459✔
3313
  int32_t  code = TSDB_CODE_SUCCESS;
18,459✔
3314
  int32_t  lino = 0;
18,459✔
3315
  int32_t size = taosArrayGetSize(cids);
18,459✔
3316
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
18,457!
3317
  for (int32_t i = 0; i < size; ++i) {
46,499✔
3318
    col_id_t* pColId = taosArrayGet(cids, i);
28,041✔
3319
    if (pColId == NULL) {
28,041!
3320
      uError("col id is NULL at index %d", i);
×
3321
      code = TSDB_CODE_INVALID_PARA;
×
3322
      goto _exit;
×
3323
    }
3324
    TAOS_CHECK_EXIT(tEncodeI16(encoder, *pColId));
56,084!
3325
  }
3326
  _exit:
18,458✔
3327

3328
  return code;
18,458✔
3329
}
3330

3331
int32_t decodeColsArray(SDecoder* decoder, SArray** cids) {
9,219✔
3332
  int32_t code = TSDB_CODE_SUCCESS;
9,219✔
3333
  int32_t lino = 0;
9,219✔
3334
  int32_t size = 0;
9,219✔
3335

3336
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
9,220!
3337
  if (size > 0){
9,220✔
3338
    *cids = taosArrayInit(size, sizeof(col_id_t));
3,905✔
3339
    if (*cids == NULL) {
3,906✔
3340
      code = terrno;
2✔
3341
      uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3342
      goto _exit;
×
3343
    }
3344
  
3345
    for (int32_t i = 0; i < size; ++i) {
17,925✔
3346
      col_id_t* pColId = taosArrayReserve(*cids, 1);
14,018✔
3347
      if (pColId == NULL) {
14,019!
3348
        code = terrno;
×
3349
        uError("failed to reserve memory for col id at index %d, errno: %d", i, code);
×
3350
        goto _exit;
×
3351
      }
3352
      TAOS_CHECK_RETURN(tDecodeI16(decoder, pColId));
14,021!
3353
    }  
3354
  }
3355
  
3356
_exit:
9,222✔
3357
  if (code != TSDB_CODE_SUCCESS) {
9,222!
3358
    taosArrayDestroy(*cids);
×
3359
    *cids = NULL;
×
3360
  }
3361
  return code;
9,222✔
3362
}
3363

3364
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
97,290✔
3365
  SEncoder encoder = {0};
97,290✔
3366
  int32_t  code = TSDB_CODE_SUCCESS;
97,290✔
3367
  int32_t  lino = 0;
97,290✔
3368
  int32_t  tlen = 0;
97,290✔
3369

3370
  tEncoderInit(&encoder, buf, bufLen);
97,290✔
3371
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
97,285!
3372

3373
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
194,542!
3374
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
194,542!
3375
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
194,542!
3376
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
194,542!
3377

3378
  switch (pReq->type) {
97,271!
3379
    case STRIGGER_PULL_SET_TABLE: {
130✔
3380
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
130✔
3381
      int32_t size = taosArrayGetSize(pRequest->uids);
130✔
3382
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
130!
3383
      for (int32_t i = 0; i < size; ++i) {
374✔
3384
        int64_t* uids = taosArrayGet(pRequest->uids, i);
244✔
3385
        if (uids == NULL) {
244!
3386
          uError("uid is NULL at index %d", i);
×
3387
          code = TSDB_CODE_INVALID_PARA;
×
3388
          goto _exit;
×
3389
        }
3390
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[0]));
488!
3391
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[1]));
488!
3392
      }
3393
      break;
130✔
3394
    }
3395
    case STRIGGER_PULL_LAST_TS: {
728✔
3396
      break;
728✔
3397
    }
3398
    case STRIGGER_PULL_FIRST_TS: {
747✔
3399
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
747✔
3400
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
1,494!
3401
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,494!
3402
      break;
747✔
3403
    }
3404
    case STRIGGER_PULL_TSDB_META: {
1,254✔
3405
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
1,254✔
3406
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
2,508!
3407
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
2,508!
3408
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,508!
3409
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
2,508!
3410
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,508!
3411
      break;
1,254✔
3412
    }
3413
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3414
      break;
×
3415
    }
3416
    case STRIGGER_PULL_TSDB_TS_DATA: {
82✔
3417
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
82✔
3418
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
164!
3419
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
164!
3420
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
164!
3421
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
164!
3422
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
164!
3423
      break;
82✔
3424
    }
3425
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
330✔
3426
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
330✔
3427
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
660!
3428
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
660!
3429
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
660!
3430
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
660!
3431
      break;
330✔
3432
    }
3433
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
352✔
3434
      break;
352✔
3435
    }
3436
    case STRIGGER_PULL_TSDB_CALC_DATA: {
56,738✔
3437
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
56,738✔
3438
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
113,476!
3439
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
113,476!
3440
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
113,476!
3441
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
113,476!
3442
      break;
56,738✔
3443
    }
3444
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3445
      break;
×
3446
    }
3447
    case STRIGGER_PULL_TSDB_DATA: {
686✔
3448
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
686✔
3449
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
1,372!
3450
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
1,372!
3451
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
1,372!
3452
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
1,372!
3453
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
686!
3454
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
1,372!
3455
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,372!
3456
      break;
686✔
3457
    }
3458
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3459
      break;
×
3460
    }
3461
    case STRIGGER_PULL_WAL_META: {
17,116✔
3462
      SSTriggerWalMetaRequest* pRequest = (SSTriggerWalMetaRequest*)pReq;
17,116✔
3463
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
34,232!
3464
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
34,232!
3465
      break;
17,116✔
3466
    }
3467
    case STRIGGER_PULL_WAL_TS_DATA:
15,744✔
3468
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3469
    case STRIGGER_PULL_WAL_CALC_DATA:
3470
    case STRIGGER_PULL_WAL_DATA: {
3471
      SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
15,744✔
3472
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
31,488!
3473
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
31,488!
3474
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
31,488!
3475
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
31,488!
3476
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
15,744!
3477
      break;
15,744✔
3478
    }
3479
    case STRIGGER_PULL_GROUP_COL_VALUE: {
1,218✔
3480
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
1,218✔
3481
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,436!
3482
      break;
1,218✔
3483
    }
3484
    case STRIGGER_PULL_VTABLE_INFO: {
389✔
3485
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
389✔
3486
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
389!
3487
      break;
389✔
3488
    }
3489
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
1,642✔
3490
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
1,642✔
3491
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
3,284!
3492
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
1,642!
3493
      break;
1,642✔
3494
    }
3495
    case STRIGGER_PULL_OTABLE_INFO: {
132✔
3496
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
132✔
3497
      int32_t size = taosArrayGetSize(pRequest->cols);
132✔
3498
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
132!
3499
      for (int32_t i = 0; i < size; ++i) {
867✔
3500
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
734✔
3501
        if (oInfo == NULL) {
734!
3502
          uError("col id is NULL at index %d", i);
×
3503
          code = TSDB_CODE_INVALID_PARA;
×
3504
          goto _exit;
×
3505
        }
3506
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
1,470!
3507
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
1,470!
3508
      }
3509
      break; 
133✔
3510
    }
3511
    default: {
×
3512
      uError("unknown pull type %d", pReq->type);
×
3513
      code = TSDB_CODE_INVALID_PARA;
×
3514
      break;
×
3515
    }
3516
  }
3517

3518
  tEndEncode(&encoder);
97,289✔
3519

3520
_exit:
97,274✔
3521
  if (code != TSDB_CODE_SUCCESS) {
97,274!
3522
    tlen = code;
×
3523
  } else {
3524
    tlen = encoder.pos;
97,274✔
3525
  }
3526
  tEncoderClear(&encoder);
97,274✔
3527
  return tlen;
97,312✔
3528
}
3529

3530

3531
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
48,370✔
3532
  SDecoder decoder = {0};
48,370✔
3533
  int32_t  code = TSDB_CODE_SUCCESS;
48,370✔
3534
  int32_t  lino = 0;
48,370✔
3535

3536
  tDecoderInit(&decoder, buf, bufLen);
48,370✔
3537
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
48,368!
3538

3539
  int32_t type = 0;
48,389✔
3540
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
48,387!
3541
  SSTriggerPullRequest* pBase = &(pReq->base);
48,387✔
3542
  pBase->type = type;
48,387✔
3543
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
96,769!
3544
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
96,754!
3545
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
96,744!
3546

3547
  switch (type) {
48,372!
3548
    case STRIGGER_PULL_SET_TABLE: {
65✔
3549
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
65✔
3550
      int32_t size = 0;
65✔
3551
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
65!
3552
      pRequest->uids = taosArrayInit(size, 2 * sizeof(int64_t));
65✔
3553
      if (pRequest->uids == NULL) {
65!
3554
        code = terrno;
×
3555
        uError("failed to allocate memory for uids, size: %d, errno: %d", size, code);
×
3556
        goto _exit;
×
3557
      }
3558
      for (int32_t i = 0; i < size; ++i) {
187✔
3559
        int64_t* uid = taosArrayReserve(pRequest->uids, 1);
122✔
3560
        if (uid == NULL) {
122!
3561
          code = terrno;
×
3562
          uError("failed to reserve memory for uid, size: %d, errno: %d", size, code);
×
3563
          goto _exit;
×
3564
        }
3565
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid));
122!
3566
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid + 1));
244!
3567
      }
3568
      break;
65✔
3569
    }
3570
    case STRIGGER_PULL_LAST_TS: {
365✔
3571
      break;
365✔
3572
    }
3573
    case STRIGGER_PULL_FIRST_TS: {
368✔
3574
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
368✔
3575
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
736!
3576
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
736!
3577
      break;
368✔
3578
    }
3579
    case STRIGGER_PULL_TSDB_META: {
627✔
3580
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
627✔
3581
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
1,254!
3582
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
1,254!
3583
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,254!
3584
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,254!
3585
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,254!
3586
      break;
627✔
3587
    }
3588
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3589
      break;
×
3590
    }
3591
    case STRIGGER_PULL_TSDB_TS_DATA: {
41✔
3592
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
41✔
3593
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
82!
3594
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
82!
3595
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
82!
3596
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
82!
3597
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
82!
3598
      break;
41✔
3599
    }
3600
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
165✔
3601
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
165✔
3602
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
330!
3603
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
330!
3604
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
330!
3605
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
330!
3606
      break;
165✔
3607
    }
3608
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
176✔
3609
      break;
176✔
3610
    }
3611
    case STRIGGER_PULL_TSDB_CALC_DATA: {
28,363✔
3612
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
28,363✔
3613
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
56,727!
3614
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
56,728!
3615
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
56,726!
3616
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
56,725!
3617
      break;
28,363✔
3618
    }
3619
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3620
      break;
×
3621
    }
3622
    case STRIGGER_PULL_TSDB_DATA: {
342✔
3623
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
342✔
3624
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
684!
3625
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
684!
3626
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
684!
3627
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
684!
3628
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
342!
3629
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
684!
3630
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
684!
3631
      break;
342✔
3632
    }
3633
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3634
      break;
×
3635
    }
3636
    case STRIGGER_PULL_WAL_META: {
8,283✔
3637
      SSTriggerWalMetaRequest* pRequest = &(pReq->walMetaReq);
8,283✔
3638
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
16,558!
3639
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
16,565!
3640
      break;
8,290✔
3641
    }
3642
    case STRIGGER_PULL_WAL_TS_DATA:
7,864✔
3643
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3644
    case STRIGGER_PULL_WAL_CALC_DATA:
3645
    case STRIGGER_PULL_WAL_DATA: {
3646
      SSTriggerWalDataRequest* pRequest = &(pReq->walDataReq);
7,864✔
3647
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
15,726!
3648
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
15,730!
3649
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
15,734!
3650
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
15,735!
3651
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
7,869!
3652
      break;
7,866✔
3653
    }
3654
    case STRIGGER_PULL_GROUP_COL_VALUE: {
609✔
3655
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
609✔
3656
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,218!
3657
      break;
609✔
3658
    }
3659
    case STRIGGER_PULL_VTABLE_INFO: {
195✔
3660
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
195✔
3661
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
195!
3662
      break;
195✔
3663
    }
3664
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
821✔
3665
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
821✔
3666
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
1,642!
3667
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
821!
3668
      break;
821✔
3669
    }
3670
    case STRIGGER_PULL_OTABLE_INFO: {
66✔
3671
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
66✔
3672
      int32_t size = 0;
66✔
3673
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
66!
3674
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
66✔
3675
      if (pRequest->cols == NULL) {
66!
3676
        code = terrno;
×
3677
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3678
        goto _exit;
×
3679
      }
3680
      for (int32_t i = 0; i < size; ++i) {
434✔
3681
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
368✔
3682
        if (oInfo == NULL) {
368!
3683
          code = terrno;
×
3684
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3685
          goto _exit;
×
3686
        }
3687
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
368!
3688
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
368!
3689
      }
3690
      break;
66✔
3691
    }
3692
    default: {
22✔
3693
      uError("unknown pull type %d", type);
22!
3694
      code = TSDB_CODE_INVALID_PARA;
×
3695
      break;
×
3696
    }
3697
  }
3698

3699
  tEndDecode(&decoder);
48,359✔
3700

3701
_exit:
48,339✔
3702
  tDecoderClear(&decoder);
48,339✔
3703
  return code;
48,368✔
3704
}
3705

3706
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo) {
51,804✔
3707
  int32_t size = taosArrayGetSize(pParams);
51,804✔
3708
  int32_t code = 0;
51,801✔
3709
  int32_t lino = 0;
51,801✔
3710
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
51,801!
3711
  for (int32_t i = 0; i < size; ++i) {
42,897,131✔
3712
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
42,848,213✔
3713
    if (param == NULL) {
42,847,113✔
3714
      TAOS_CHECK_EXIT(terrno);
1,783!
3715
    }
3716

3717
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevTs));
85,690,660!
3718
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->currentTs));
85,690,660!
3719
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextTs));
85,690,660!
3720

3721
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wstart));
85,690,660!
3722
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wend));
85,690,660!
3723
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wduration));
85,690,660!
3724
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wrownum));
85,690,660!
3725

3726
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevLocalTime));
85,690,660!
3727
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextLocalTime));
85,690,660!
3728

3729
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->triggerTime));
85,690,660!
3730
    if (!ignoreNotificationInfo) {
42,845,330✔
3731
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
87,476!
3732
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
43,738✔
3733
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
87,476!
3734
    }
3735
  }
3736
_exit:
48,918✔
3737
  return code;
48,918✔
3738
}
3739

3740
void tDestroySSTriggerCalcParam(void* ptr) {
21,451,551✔
3741
  SSTriggerCalcParam* pParam = ptr;
21,451,551✔
3742
  if (pParam && pParam->extraNotifyContent != NULL) {
21,451,551!
3743
    taosMemoryFreeClear(pParam->extraNotifyContent);
195!
3744
  }
3745
  if (pParam && pParam->resultNotifyContent != NULL) {
21,451,551!
3746
    taosMemoryFreeClear(pParam->resultNotifyContent);
63!
3747
  }
3748
}
21,451,551✔
3749

3750
void tDestroySStreamGroupValue(void* ptr) {
48,555✔
3751
  SStreamGroupValue* pValue = ptr;
48,555✔
3752
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
48,555!
3753
    taosMemoryFreeClear(pValue->data.pData);
38,526!
3754
    pValue->data.nData = 0;
38,529✔
3755
  }
3756
}
48,558✔
3757

3758
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
25,903✔
3759
  int32_t size = 0, code = 0, lino = 0;
25,903✔
3760
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
25,903!
3761
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
25,903✔
3762
  if (*ppParams == NULL) {
25,903!
3763
    TAOS_CHECK_EXIT(terrno);
×
3764
  }
3765
  for (int32_t i = 0; i < size; ++i) {
21,453,411✔
3766
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
21,427,510✔
3767
    if (param == NULL) {
21,427,510!
3768
      TAOS_CHECK_EXIT(terrno);
×
3769
    }
3770
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevTs));
42,855,018!
3771
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->currentTs));
42,855,016!
3772
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextTs));
42,855,015!
3773

3774
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wstart));
42,855,012!
3775
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wend));
42,855,009!
3776
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wduration));
42,855,010!
3777
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wrownum));
42,855,011!
3778

3779
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevLocalTime));
42,855,008!
3780
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextLocalTime));
42,855,008!
3781

3782
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->triggerTime));
42,855,013!
3783
    if (!ignoreNotificationInfo) {
21,427,508✔
3784
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
43,738!
3785
      uint64_t len = 0;
21,869✔
3786
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
43,738!
3787
    }
3788
  }
3789

3790
_exit:
25,901✔
3791
  return code;
25,901✔
3792
}
3793

3794
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
53,023✔
3795
  int32_t code = TSDB_CODE_SUCCESS;
53,023✔
3796
  int32_t lino = 0;
53,023✔
3797

3798
  int32_t size = taosArrayGetSize(pGroupColVals);
53,023✔
3799
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
53,023!
3800
  for (int32_t i = 0; i < size; ++i) {
112,396✔
3801
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
59,382✔
3802
    if (pValue == NULL) {
59,378!
3803
      TAOS_CHECK_EXIT(terrno);
×
3804
    }
3805
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
59,378!
3806
    if (pValue->isNull) {
59,376!
3807
      continue;
×
3808
    }
3809
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
59,376!
3810
    if (pValue->isTbname) {
59,373✔
3811
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
53,474!
3812
      if (vgId != -1) { pValue->vgId = vgId; }
26,737✔
3813
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
53,474!
3814
    }
3815
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
118,746!
3816
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
59,373!
3817
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
106,870!
3818
    } else {
3819
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
11,876!
3820
    }
3821
  }
3822

3823
_exit:
53,014✔
3824
  return code;
53,014✔
3825
}
3826

3827
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
26,511✔
3828
  int32_t code = TSDB_CODE_SUCCESS;
26,511✔
3829
  int32_t lino = 0;
26,511✔
3830
  int32_t size = 0;
26,511✔
3831

3832
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
26,510!
3833
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
26,510✔
3834
  if (size > 0) {
26,511✔
3835
    if (*ppGroupColVals == NULL) {
17,865✔
3836
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
17,256✔
3837
      if (*ppGroupColVals == NULL) {
17,257!
3838
        TAOS_CHECK_EXIT(terrno);
×
3839
      }
3840
    } else {
3841
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
609!
3842
    }
3843
  }
3844
  for (int32_t i = 0; i < size; ++i) {
56,205✔
3845
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
29,691✔
3846
    if (pValue == NULL) {
29,690!
3847
      TAOS_CHECK_EXIT(terrno);
×
3848
    }
3849
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
29,690!
3850
    if (pValue->isNull) {
29,691!
3851
      continue;
×
3852
    }
3853
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
29,691!
3854
    if (pValue->isTbname) {
29,690✔
3855
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
26,742!
3856
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
26,742!
3857
    }
3858
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
59,380!
3859
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
29,690!
3860
      uint64_t len = 0;
26,720✔
3861
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
53,443!
3862
      pValue->data.nData = len;
26,723✔
3863
    } else {
3864
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
5,940!
3865
    }
3866
  }
3867
_exit:
26,514✔
3868
  return code;
26,514✔
3869
}
3870

3871
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
1,218✔
3872
  SEncoder encoder = {0};
1,218✔
3873
  int32_t  code = TSDB_CODE_SUCCESS;
1,218✔
3874
  int32_t  lino = 0;
1,218✔
3875
  int32_t  tlen = 0;
1,218✔
3876

3877
  tEncoderInit(&encoder, buf, bufLen);
1,218✔
3878
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,218!
3879

3880
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
1,218!
3881

3882
  tEndEncode(&encoder);
1,218✔
3883

3884
_exit:
1,217✔
3885
  if (code != TSDB_CODE_SUCCESS) {
1,217!
3886
    tlen = code;
×
3887
  } else {
3888
    tlen = encoder.pos;
1,217✔
3889
  }
3890
  tEncoderClear(&encoder);
1,217✔
3891
  return tlen;
1,218✔
3892
}
3893

3894
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
609✔
3895
  SDecoder decoder = {0};
609✔
3896
  int32_t  code = TSDB_CODE_SUCCESS;
609✔
3897
  int32_t  lino = 0;
609✔
3898
  int32_t  size = 0;
609✔
3899

3900
  tDecoderInit(&decoder, buf, bufLen);
609✔
3901
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
609!
3902

3903
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
609!
3904

3905
  tEndDecode(&decoder);
609✔
3906

3907
_exit:
609✔
3908
  tDecoderClear(&decoder);
609✔
3909
  return code;
609✔
3910
}
3911

3912
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
2,190✔
3913
  SEncoder encoder = {0};
2,190✔
3914
  int32_t  code = TSDB_CODE_SUCCESS;
2,190✔
3915
  int32_t  lino = 0;
2,190✔
3916
  int32_t  tlen = 0;
2,190✔
3917

3918
  tEncoderInit(&encoder, buf, bufLen);
2,190✔
3919
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,190!
3920

3921
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
4,380!
3922
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
4,380!
3923
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
4,380!
3924
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
4,380!
3925
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
4,380!
3926

3927
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false));
2,190!
3928
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
2,190!
3929
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
4,380!
3930

3931
  tEndEncode(&encoder);
2,190✔
3932

3933
_exit:
2,190✔
3934
  if (code != TSDB_CODE_SUCCESS) {
2,190!
3935
    tlen = code;
×
3936
  } else {
3937
    tlen = encoder.pos;
2,190✔
3938
  }
3939
  tEncoderClear(&encoder);
2,190✔
3940
  return tlen;
2,190✔
3941
}
3942

3943
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
1,095✔
3944
  SDecoder decoder = {0};
1,095✔
3945
  int32_t  code = TSDB_CODE_SUCCESS;
1,095✔
3946
  int32_t  lino = 0;
1,095✔
3947

3948
  tDecoderInit(&decoder, buf, bufLen);
1,095✔
3949
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,095!
3950

3951
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
2,190!
3952
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
2,190!
3953
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
2,190!
3954
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
2,190!
3955
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
2,190!
3956

3957
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
1,095!
3958
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
1,095!
3959
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
2,190!
3960

3961
  tEndDecode(&decoder);
1,095✔
3962

3963
_exit:
1,095✔
3964
  tDecoderClear(&decoder);
1,095✔
3965
  return code;
1,095✔
3966
}
3967

3968
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
3,823✔
3969
  if (pReq != NULL) {
3,823!
3970
    if (pReq->params != NULL) {
3,824✔
3971
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
1,157✔
3972
      pReq->params = NULL;
1,157✔
3973
    }
3974
    if (pReq->groupColVals != NULL) {
3,824✔
3975
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
1,025✔
3976
      pReq->groupColVals = NULL;
1,025✔
3977
    }
3978
    blockDataDestroy(pReq->pOutBlock);
3,824✔
3979
  }
3980
}
3,823✔
3981

3982
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
33,054✔
3983
  SEncoder encoder = {0};
33,054✔
3984
  int32_t  code = TSDB_CODE_SUCCESS;
33,054✔
3985
  int32_t  lino = 0;
33,054✔
3986
  int32_t  tlen = 0;
33,054✔
3987

3988
  tEncoderInit(&encoder, buf, bufLen);
33,054✔
3989
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
33,054!
3990

3991
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
66,108!
3992
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
66,108!
3993
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
66,108!
3994
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
66,108!
3995

3996
  tEndEncode(&encoder);
33,054✔
3997

3998
_exit:
33,054✔
3999
  if (code != TSDB_CODE_SUCCESS) {
33,054!
4000
    tlen = code;
×
4001
  } else {
4002
    tlen = encoder.pos;
33,054✔
4003
  }
4004
  tEncoderClear(&encoder);
33,054✔
4005
  return tlen;
33,054✔
4006
}
4007

4008
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
48,808✔
4009
  SDecoder decoder = {0};
48,808✔
4010
  int32_t  code = TSDB_CODE_SUCCESS;
48,808✔
4011
  int32_t  lino = 0;
48,808✔
4012

4013
  tDecoderInit(&decoder, buf, bufLen);
48,808✔
4014
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
48,815!
4015

4016
  int32_t type = 0;
49,250✔
4017
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
49,193!
4018
  pReq->type = type;
49,193✔
4019
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
98,382!
4020
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
98,255!
4021
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
98,066!
4022

4023
  tEndDecode(&decoder);
49,000✔
4024

4025
_exit:
48,800✔
4026
  tDecoderClear(&decoder);
48,800✔
4027
  return code;
49,173✔
4028
}
4029

4030
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo) {
49,616✔
4031
  int32_t code = 0, lino = 0;
49,616✔
4032
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true));
49,616!
4033
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
49,617!
4034
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
99,228!
4035
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
99,228!
4036
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
99,228!
4037
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
49,614!
4038
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
99,236!
4039
_exit:
49,618✔
4040
  return code;
49,618✔
4041
}
4042

4043
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
24,808✔
4044
  int32_t code = 0, lino = 0;
24,808✔
4045
  int32_t size = 0;
24,808✔
4046
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
24,808!
4047
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
24,807!
4048
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
49,616!
4049
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
49,615!
4050
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
49,614!
4051
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
24,807!
4052
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
49,614!
4053
_exit:
24,807✔
4054
  return code;
24,807✔
4055
}
4056

4057
int32_t tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
29,457✔
4058
  if (pInfo == NULL) return TSDB_CODE_SUCCESS;
29,457!
4059
  if (pInfo->pStreamPesudoFuncVals != NULL) {
29,457✔
4060
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
25,577✔
4061
    pInfo->pStreamPesudoFuncVals = NULL;
25,576✔
4062
  }
4063
  if (pInfo->pStreamPartColVals != NULL) {
29,456✔
4064
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
17,064✔
4065
    pInfo->pStreamPartColVals = NULL;
17,063✔
4066
  }
4067
  return TSDB_CODE_SUCCESS;
29,455✔
4068
}
4069

4070
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
390✔
4071
  SEncoder encoder = {0};
390✔
4072
  int32_t  code = TSDB_CODE_SUCCESS;
390✔
4073
  int32_t  lino = 0;
390✔
4074
  int32_t  tlen = 0;
390✔
4075

4076
  tEncoderInit(&encoder, buf, bufLen);
390✔
4077
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
390!
4078

4079
  int32_t size = taosArrayGetSize(pRsp->infos);
390✔
4080
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
390!
4081
  for (int32_t i = 0; i < size; ++i) {
1,071✔
4082
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
681✔
4083
    if (info == NULL) {
681!
4084
      TAOS_CHECK_EXIT(terrno);
×
4085
    }
4086
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
1,362!
4087
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
1,362!
4088
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
681!
4089
  }
4090

4091
  tEndEncode(&encoder);
390✔
4092

4093
_exit:
390✔
4094
  if (code != TSDB_CODE_SUCCESS) {
390!
4095
    tlen = code;
×
4096
  } else {
4097
    tlen = encoder.pos;
390✔
4098
  }
4099
  tEncoderClear(&encoder);
390✔
4100
  return tlen;
390✔
4101
}
4102

4103
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
195✔
4104
  SDecoder decoder = {0};
195✔
4105
  int32_t  code = TSDB_CODE_SUCCESS;
195✔
4106
  int32_t  lino = 0;
195✔
4107
  int32_t  size = 0;
195✔
4108

4109
  tDecoderInit(&decoder, buf, bufLen);
195✔
4110
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
195!
4111

4112
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
195!
4113
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
195✔
4114
  if (vTableInfo->infos == NULL) {
195!
4115
    TAOS_CHECK_EXIT(terrno);
×
4116
  }
4117
  for (int32_t i = 0; i < size; ++i) {
535✔
4118
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
339✔
4119
    if (info == NULL) {
340!
4120
      TAOS_CHECK_EXIT(terrno);
×
4121
    }
4122
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
680!
4123
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
680!
4124
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
340!
4125
  }
4126

4127
  tEndDecode(&decoder);
196✔
4128

4129
_exit:
195✔
4130
  tDecoderClear(&decoder);
195✔
4131
  return code;
195✔
4132
}
4133

4134

4135
void tDestroyVTableInfo(void *ptr) {
682✔
4136
  if (NULL == ptr) {
682!
4137
    return;
×
4138
  }
4139
  VTableInfo* pTable = (VTableInfo*)ptr;
682✔
4140
  taosMemoryFree(pTable->cols.pColRef);
682!
4141
}
4142

4143
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
17,882✔
4144
  if (ptr == NULL) return;
17,882!
4145
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
17,882✔
4146
  ptr->infos = NULL;
17,878✔
4147
}
4148

4149
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
1,466✔
4150
  SEncoder encoder = {0};
1,466✔
4151
  int32_t  code = TSDB_CODE_SUCCESS;
1,466✔
4152
  int32_t  lino = 0;
1,466✔
4153
  int32_t  tlen = 0;
1,466✔
4154

4155
  tEncoderInit(&encoder, buf, bufLen);
1,466✔
4156
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,465!
4157

4158
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
2,930!
4159
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
1,465✔
4160
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,465!
4161
  for (int32_t i = 0; i < size; ++i) {
3,097✔
4162
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
1,632✔
4163
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
3,264!
4164
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
3,264!
4165
  }
4166

4167
  tEndEncode(&encoder);
1,465✔
4168

4169
_exit:
1,465✔
4170
  if (code != TSDB_CODE_SUCCESS) {
1,465!
4171
    tlen = code;
×
4172
  } else {
4173
    tlen = encoder.pos;
1,465✔
4174
  }
4175
  tEncoderClear(&encoder);
1,465✔
4176
  return tlen;
1,465✔
4177
}
4178

4179
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
733✔
4180
  SDecoder decoder = {0};
733✔
4181
  int32_t  code = TSDB_CODE_SUCCESS;
733✔
4182
  int32_t  lino = 0;
733✔
4183
  SSDataBlock *pResBlock = pBlock;
733✔
4184

4185
  tDecoderInit(&decoder, buf, bufLen);
733✔
4186
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
733!
4187

4188
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
1,466!
4189
  int32_t numOfCols = 2;
733✔
4190
  if (pResBlock->pDataBlock == NULL) {
733!
4191
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
733✔
4192
    if (pResBlock->pDataBlock == NULL) {
733!
4193
      TAOS_CHECK_EXIT(terrno);
×
4194
    }
4195
    for (int32_t i = 0; i< numOfCols; ++i) {
2,199✔
4196
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
1,466✔
4197
      if (pColInfoData == NULL) {
1,466!
4198
        TAOS_CHECK_EXIT(terrno);
×
4199
      }
4200
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
1,466✔
4201
      pColInfoData->info.bytes = sizeof(int64_t);
1,466✔
4202
    }
4203
  }
4204
  int32_t numOfRows = 0;
733✔
4205
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
733!
4206
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
733!
4207
  for (int32_t i = 0; i < numOfRows; ++i) {
1,549✔
4208
    for (int32_t j = 0; j < numOfCols; ++j) {
2,448✔
4209
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
1,632✔
4210
      if (pColInfoData == NULL) {
1,632!
4211
        TAOS_CHECK_EXIT(terrno);
×
4212
      }
4213
      int64_t value = 0;
1,632✔
4214
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
1,632!
4215
      colDataSetInt64(pColInfoData, i, &value);
1,632✔
4216
    }
4217
  }
4218

4219
  pResBlock->info.dataLoad = 1;
733✔
4220
  pResBlock->info.rows = numOfRows;
733✔
4221

4222
  tEndDecode(&decoder);
733✔
4223

4224
_exit:
733✔
4225
  tDecoderClear(&decoder);
733✔
4226
  return code;
733✔
4227
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc