• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4808

16 Oct 2025 11:40AM UTC coverage: 57.938% (-0.6%) from 58.524%
#4808

push

travis-ci

web-flow
fix(tref): increase TSDB_REF_OBJECTS from 100 to 2000 for improved reference handling (#33281)

137662 of 303532 branches covered (45.35%)

Branch coverage included in aggregate %.

209234 of 295200 relevant lines covered (70.88%)

4035326.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.87
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "taos.h"
18
#include "tarray.h"
19
#include "tdatablock.h"
20
#include "thash.h"
21
#include "tlist.h"
22
#include "tmsg.h"
23
#include "os.h"
24
#include "tcommon.h"
25
#include "tsimplehash.h"
26

27
typedef struct STaskId {
28
  int64_t streamId;
29
  int64_t taskId;
30
} STaskId;
31

32
typedef enum EWindowType {
33
  WINDOW_TYPE_INTERVAL = 1,
34
  WINDOW_TYPE_SESSION,
35
  WINDOW_TYPE_STATE,
36
  WINDOW_TYPE_EVENT,
37
  WINDOW_TYPE_COUNT,
38
  WINDOW_TYPE_ANOMALY,
39
  WINDOW_TYPE_EXTERNAL,
40
  WINDOW_TYPE_PERIOD
41
} EWindowType;
42

43
typedef struct STaskCkptInfo {
44
  int64_t latestId;          // saved checkpoint id
45
  int64_t latestVer;         // saved checkpoint ver
46
  int64_t latestTime;        // latest checkpoint time
47
  int64_t latestSize;        // latest checkpoint size
48
  int8_t  remoteBackup;      // latest checkpoint backup done
49
  int64_t activeId;          // current active checkpoint id
50
  int32_t activeTransId;     // checkpoint trans id
51
  int8_t  failed;            // denote if the checkpoint is failed or not
52
  int8_t  consensusChkptId;  // required the consensus-checkpointId
53
  int64_t consensusTs;       //
54
} STaskCkptInfo;
55

56
typedef struct STaskStatusEntry {
57
  STaskId       id;
58
  int32_t       status;
59
  int32_t       statusLastDuration;  // to record the last duration of current status
60
  int64_t       stage;
61
  int32_t       nodeId;
62
  SVersionRange verRange;      // start/end version in WAL, only valid for source task
63
  int64_t       processedVer;  // only valid for source task
64
  double        inputQUsed;    // in MiB
65
  double        inputRate;
66
  double        procsThroughput;   // duration between one element put into input queue and being processed.
67
  double        procsTotal;        // duration between one element put into input queue and being processed.
68
  double        outputThroughput;  // the size of dispatched result blocks in bytes
69
  double        outputTotal;       // the size of dispatched result blocks in bytes
70
  double        sinkQuota;         // existed quota size for sink task
71
  double        sinkDataSize;      // sink to dst data size
72
  int64_t       startTime;
73
  int64_t       startCheckpointId;
74
  int64_t       startCheckpointVer;
75
  int64_t       hTaskId;
76
  STaskCkptInfo checkpointInfo;
77
  STaskNotifyEventStat notifyEventStat;
78
} STaskStatusEntry;
79

80
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
×
81
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
×
82
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
×
83
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
×
84
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
×
85
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
×
86
  return 0;
×
87
}
88

89
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
×
90
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
×
91
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
×
92
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
×
93
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
×
94
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
×
95
  return 0;
×
96
}
97

98
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
×
99
  int32_t code = 0;
×
100
  int32_t lino;
101

102
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
103
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
×
104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
×
105

106
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
×
107
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
108

109
  for (int32_t i = 0; i < size; ++i) {
×
110
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
×
111
    if (pInfo == NULL) {
×
112
      TAOS_CHECK_EXIT(terrno);
×
113
    }
114

115
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
×
116
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
×
117
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
×
118
  }
119

120
  // todo this new attribute will be result in being incompatible with previous version
121
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
×
122

123
  int32_t numOfTasks = taosArrayGetSize(pMsg->pTaskList);
×
124
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfTasks));
×
125

126
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
127
    int32_t* pId = taosArrayGet(pMsg->pTaskList, i);
×
128
    if (pId == NULL) {
×
129
      TAOS_CHECK_EXIT(terrno);
×
130
    }
131
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)pId));
×
132
  }
133

134
  tEndEncode(pEncoder);
×
135
_exit:
×
136
  if (code) {
×
137
    return code;
×
138
  } else {
139
    return pEncoder->pos;
×
140
  }
141
}
142

143
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
×
144
  int32_t code = 0;
×
145
  int32_t lino;
146

147
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
148
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
×
149
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
×
150

151
  int32_t size = 0;
×
152
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
153

154
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
×
155
  TSDB_CHECK_NULL(pMsg->pNodeList, code, lino, _exit, terrno);
×
156

157
  for (int32_t i = 0; i < size; ++i) {
×
158
    SNodeUpdateInfo info = {0};
×
159
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
×
160
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
×
161
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
×
162

163
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
×
164
      TAOS_CHECK_EXIT(terrno);
×
165
    }
166
  }
167

168
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
×
169

170
  // number of tasks
171
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
172
  pMsg->pTaskList = taosArrayInit(size, sizeof(int32_t));
×
173
  if (pMsg->pTaskList == NULL) {
×
174
    TAOS_CHECK_EXIT(terrno);
×
175
  }
176

177
  for (int32_t i = 0; i < size; ++i) {
×
178
    int32_t id = 0;
×
179
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &id));
×
180
    if (taosArrayPush(pMsg->pTaskList, &id) == NULL) {
×
181
      TAOS_CHECK_EXIT(terrno);
×
182
    }
183
  }
184

185
  tEndDecode(pDecoder);
×
186
_exit:
×
187
  return code;
×
188
}
189

190
void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg) {
×
191
  taosArrayDestroy(pMsg->pNodeList);
×
192
  taosArrayDestroy(pMsg->pTaskList);
×
193
  pMsg->pNodeList = NULL;
×
194
  pMsg->pTaskList = NULL;
×
195
}
×
196

197
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
×
198
  int32_t code = 0;
×
199
  int32_t lino;
200

201
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
202
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
203
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
204
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
205
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
206
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
×
207
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
×
208
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
×
209
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
210
  tEndEncode(pEncoder);
×
211

212
_exit:
×
213
  if (code) {
×
214
    return code;
×
215
  } else {
216
    return pEncoder->pos;
×
217
  }
218
}
219

220
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
×
221
  int32_t code = 0;
×
222
  int32_t lino;
223

224
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
225
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
226
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
227
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
228
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
229
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
×
230
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
×
231
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
×
232
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
233
  tEndDecode(pDecoder);
×
234

235
_exit:
×
236
  return code;
×
237
}
238

239
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
×
240
  int32_t code = 0;
×
241
  int32_t lino;
242

243
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
244
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
×
245
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
246
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
×
247
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
×
248
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
×
249
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
×
250
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
×
251
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
×
252
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
×
253
  tEndEncode(pEncoder);
×
254

255
_exit:
×
256
  if (code) {
×
257
    return code;
×
258
  } else {
259
    return pEncoder->pos;
×
260
  }
261
}
262

263
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
×
264
  int32_t code = 0;
×
265
  int32_t lino;
266

267
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
268
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
×
269
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
270
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
×
271
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
×
272
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
×
273
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
×
274
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
×
275
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
×
276
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
×
277
  tEndDecode(pDecoder);
×
278

279
_exit:
×
280
  return code;
×
281
}
282

283
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
×
284
  int32_t code = 0;
×
285
  int32_t lino;
286

287
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
288
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
289
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
×
290
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
×
291
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
292
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
293
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
294
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
295
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
296
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
×
297
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
298
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
×
299
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
×
300
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
×
301

302
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
×
303
    uError("invalid dispatch req msg");
×
304
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
305
  }
306

307
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
308
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
×
309
    void*    data = taosArrayGetP(pReq->data, i);
×
310
    if (data == NULL || pLen == NULL) {
×
311
      TAOS_CHECK_EXIT(terrno);
×
312
    }
313

314
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
×
315
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
×
316
  }
317
  tEndEncode(pEncoder);
×
318
_exit:
×
319
  if (code) {
×
320
    return code;
×
321
  } else {
322
    return pEncoder->pos;
×
323
  }
324
}
325

326
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
×
327
  int32_t code = 0;
×
328
  int32_t lino;
329

330
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
331
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
332
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
×
333
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
×
334
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
335
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
336
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
337
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
338
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
339
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
×
340
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
341
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
×
342
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
×
343
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
×
344

345
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
×
346
    TAOS_CHECK_EXIT(terrno);
×
347
  }
348
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
×
349
    TAOS_CHECK_EXIT(terrno);
×
350
  }
351
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
352
    int32_t  len1;
353
    uint64_t len2;
354
    void*    data;
355
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
×
356
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
×
357

358
    if (len1 != len2) {
×
359
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
360
    }
361

362
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
×
363
      TAOS_CHECK_EXIT(terrno);
×
364
    }
365

366
    if (taosArrayPush(pReq->data, &data) == NULL) {
×
367
      TAOS_CHECK_EXIT(terrno);
×
368
    }
369
  }
370

371
  tEndDecode(pDecoder);
×
372
_exit:
×
373
  return code;
×
374
}
375

376
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
×
377
  taosArrayDestroyP(pReq->data, NULL);
×
378
  taosArrayDestroy(pReq->dataLen);
×
379
}
×
380

381
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
×
382
  int32_t code = 0;
×
383
  int32_t lino;
384

385
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
386
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
387
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
388
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
×
389
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
×
390
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
×
391
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
×
392
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
×
393
  tEndEncode(pEncoder);
×
394

395
_exit:
×
396
  if (code) {
×
397
    return code;
×
398
  } else {
399
    return pEncoder->pos;
×
400
  }
401
}
402

403
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
×
404
  int32_t code = 0;
×
405
  int32_t lino;
406

407
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
408
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
409
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
410
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
×
411
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
×
412
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
×
413
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
×
414
  uint64_t len = 0;
×
415
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
×
416
  pReq->retrieveLen = (int32_t)len;
×
417
  tEndDecode(pDecoder);
×
418

419
_exit:
×
420
  return code;
×
421
}
422

423
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
×
424

425

426
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
292✔
427
  int32_t code = 0;
292✔
428
  int32_t lino = 0;
292✔
429
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
584!
430
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
584!
431
  switch (pReq->type) {
292!
432
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
258✔
433
      if (pReq->cont.pReqs) {
258!
434
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
258✔
435
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
258!
436
        for (int32_t i = 0; i < num; ++i) {
920✔
437
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.pReqs, i);
662✔
438
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
1,324!
439
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
1,324!
440
        }
441
      } else {
442
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
443
      }
444
      break;
258✔
445
    }
446
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
34✔
447
      if (pReq->cont.pReqs) {
34!
448
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
34✔
449
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
34!
450
        for (int32_t i = 0; i < num; ++i) {
68✔
451
          SStreamOReaderDeployReq* pDeploy = taosArrayGet(pReq->cont.pReqs, i);
34✔
452
          int32_t vgIdNum = taosArrayGetSize(pDeploy->vgIds);
34✔
453
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeploy->execId));
68!
454
          TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pDeploy->uid));
68!
455
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgIdNum));
34!
456
          for (int32_t n = 0; n < vgIdNum; ++n) {
72✔
457
            TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pDeploy->vgIds, n)));
76!
458
          }
459
        }
460
      } else {
461
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
462
      }
463
      break;
34✔
464
    }
465
    default:
×
466
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
467
      break;
×
468
  }
469

470
_exit:
292✔
471

472
  return code;
292✔
473
}
474

475
void tFreeRunnerOReaderDeployReq(void* param) {
51✔
476
  if (NULL == param || NULL == *(void**)param) {
51!
477
    return;
8✔
478
  }
479

480
  SStreamOReaderDeployReq* pReq = (SStreamOReaderDeployReq*)param;
43✔
481
  taosArrayDestroy(pReq->vgIds);
43✔
482
}
483

484
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
584✔
485
  if (NULL == pReq) {
584✔
486
    return;
146✔
487
  }
488

489
  switch (pReq->type) {
438!
490
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
387✔
491
      taosArrayDestroy(pReq->cont.pReqs);
387✔
492
      break;
387✔
493
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER:
51✔
494
      taosArrayDestroyEx(pReq->cont.pReqs, tFreeRunnerOReaderDeployReq);
51✔
495
      break;
51✔
496
    default:
×
497
      break;
×
498
  }
499
}
500

501

502
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
146✔
503
  *ppDst = NULL;
146✔
504
  
505
  if (NULL == pSrc) {
146!
506
    return TSDB_CODE_SUCCESS;
×
507
  }
508

509
  int32_t code = 0, lino = 0;
146✔
510
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
146!
511
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
146!
512

513
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
146✔
514
  if (pSrc->cont.pReqs) {
146!
515
    switch (pSrc->type) {
146!
516
      case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
129✔
517
        (*ppDst)->cont.pReqs = taosArrayDup(pSrc->cont.pReqs, NULL);
129✔
518
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
129!
519
        break;
129✔
520
      case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
17✔
521
        int32_t reqNum = taosArrayGetSize(pSrc->cont.pReqs);
17✔
522
        (*ppDst)->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), reqNum);
17✔
523
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
17!
524
        for (int32_t i = 0; i < reqNum; ++i) {
34✔
525
          SStreamOReaderDeployReq* pNew = taosArrayGet((*ppDst)->cont.pReqs, i);
17✔
526
          SStreamOReaderDeployReq* pReq = taosArrayGet(pSrc->cont.pReqs, i);
17✔
527
          pNew->vgIds = taosArrayDup(pReq->vgIds, NULL);
17✔
528
          TSDB_CHECK_NULL(pNew->vgIds, code, lino, _exit, terrno);
17!
529
          pNew->execId = pReq->execId;
17✔
530
          pNew->uid = pReq->uid;
17✔
531
        }
532
        break;
17✔
533
      }  
534
      default:
×
535
        break;
×
536
    }
537
  }
538
  
539
_exit:
×
540

541
  if (code) {
146!
542
    tFreeSStreamMgmtReq(*ppDst);
×
543
    taosMemoryFreeClear(*ppDst);
×
544
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
545
  }
546
  
547
  return code;
146✔
548
}
549

550

551
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
146✔
552
  int32_t code = 0;
146✔
553
  int32_t lino = 0;
146✔
554

555
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
292!
556
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
292!
557
  switch (pReq->type) {
146!
558
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
129✔
559
      int32_t num = 0;
129✔
560
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
129!
561
      if (num > 0) {
129!
562
        pReq->cont.pReqs = taosArrayInit(num, sizeof(SStreamDbTableName));
129✔
563
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
129!
564
        for (int32_t i = 0; i < num; ++i) {
460✔
565
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.pReqs, 1);
331✔
566
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
331!
567
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
331!
568
        }
569
      }
570
      break;
129✔
571
    }
572
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
17✔
573
      int32_t num = 0, vgIdNum = 0;
17✔
574
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
17!
575
      if (num > 0) {
17!
576
        pReq->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), num);
17✔
577
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
17!
578
        for (int32_t i = 0; i < num; ++i) {
34✔
579
          SStreamOReaderDeployReq* p = taosArrayGet(pReq->cont.pReqs, i);
17✔
580
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &p->execId));
34!
581
          TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &p->uid));
34!
582
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgIdNum));
17!
583
          if (vgIdNum > 0) {
17!
584
            p->vgIds = taosArrayInit_s(sizeof(int32_t), vgIdNum);
17✔
585
            TSDB_CHECK_NULL(p->vgIds, code, lino, _exit, terrno);
17!
586
          }
587
          for (int32_t n = 0; n < vgIdNum; ++n) {
36✔
588
            int32_t* vgId = taosArrayGet(p->vgIds, n);
19✔
589
            TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));
19!
590
          }
591
        }
592
      }
593
      break;
17✔
594
    }
595
    default:
×
596
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
597
      break;
×
598
  }
599

600
_exit:
146✔
601

602
  return code;  
146✔
603
}
604

605
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
106,260✔
606
  int32_t code = 0;
106,260✔
607
  int32_t lino;
608

609
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
212,520!
610
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
212,520!
611
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
212,520!
612

613
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
212,520!
614
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
212,520!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
212,520!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
212,520!
617
  // SKIP SESSIONID
618
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
212,520!
619
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
212,520!
620
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
212,520!
621
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
212,520!
622
  if (pTask->pMgmtReq) {
106,260✔
623
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
292!
624
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
292!
625
  } else {
626
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
105,968!
627
  }
628

629
_exit:
105,968✔
630

631
  return code;
106,260✔
632
}
633

634

635
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
51,373✔
636
  int32_t code = 0;
51,373✔
637
  int32_t lino;
638

639
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
102,746!
640
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
102,746!
641
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
102,746!
642
  
643
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
102,746!
644
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
102,746!
645
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
102,746!
646
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
102,746!
647
  // SKIP SESSIONID
648
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
102,746!
649
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
102,746!
650
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
102,746!
651
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
102,746!
652
  int32_t req = 0;
51,373✔
653
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
51,373!
654
  if (req) {
51,373✔
655
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
146!
656
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
146!
657
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
146!
658
  }
659

660
_exit:
51,373✔
661

662
  return code;
51,373✔
663
}
664

665
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
666
  int32_t code = 0;
×
667
  int32_t lino;
668

669
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
670
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
671
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
672
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
673

674
_exit:
×
675

676
  return code;
×
677
}
678

679
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
680
  int32_t code = 0;
×
681
  int32_t lino;
682

683
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
684
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
685
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
686
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
687

688
_exit:
×
689

690
  return code;
×
691
}
692

693

694
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
7,444✔
695
  int32_t code = 0;
7,444✔
696
  int32_t lino;
697

698
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
14,888!
699
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
14,888!
700
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
14,888!
701
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
14,888!
702
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
14,888!
703

704
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
7,444✔
705
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
7,444!
706
  for (int32_t i = 0; i < recalcNum; ++i) {
7,444!
707
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
708
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
709
  }
710

711
_exit:
7,444✔
712

713
  return code;
7,444✔
714
}
715

716
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
3,520✔
717
  int32_t code = 0;
3,520✔
718
  int32_t lino;
719

720
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
7,040!
721
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
7,040!
722
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
7,040!
723
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
7,040!
724
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
7,040!
725

726
  int32_t recalcNum = 0;
3,520✔
727
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
3,520!
728
  if (recalcNum > 0) {
3,520!
729
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
730
    if (NULL == pStatus->userRecalcs) {
×
731
      code = terrno;
×
732
      goto _exit;
×
733
    }
734
  }
735

736
  for (int32_t i = 0; i < recalcNum; ++i) {
3,520!
737
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
738
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
739
  }
740

741
_exit:
3,520✔
742

743
  return code;
3,520✔
744
}
745

746

747
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
63,204✔
748
  int32_t code = 0;
63,204✔
749
  int32_t lino;
750

751
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
63,204!
752
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
126,408!
753
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
126,408!
754
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
126,408!
755
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
126,408!
756

757
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
63,204✔
758
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
63,204!
759
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
220,626✔
760
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
157,422✔
761
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
314,844!
762
  }
763
  
764
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
63,204✔
765
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
63,204!
766
  for (int32_t i = 0; i < statusNum; ++i) {
162,244✔
767
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
99,040✔
768
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
99,040!
769
  }
770

771
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
63,204✔
772
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
63,204!
773
  for (int32_t i = 0; i < reqNum; ++i) {
63,496✔
774
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
292✔
775
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
584!
776
  }
777

778
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
63,204✔
779
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
63,204!
780
  for (int32_t i = 0; i < triggerNum; ++i) {
70,648✔
781
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
7,444✔
782
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
7,444!
783
  }
784
  
785
  tEndEncode(pEncoder);
63,204✔
786

787
_exit:
63,204✔
788
  if (code) {
63,204!
789
    return code;
×
790
  } else {
791
    return pEncoder->pos;
63,204✔
792
  }
793
}
794

795
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
30,412✔
796
  int32_t code = 0;
30,412✔
797
  int32_t lino;
798

799
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
30,412!
800
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
60,824!
801
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
60,824!
802
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
60,824!
803
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
60,824!
804

805
  int32_t vgLearderNum = 0;
30,412✔
806
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
30,412!
807
  if (vgLearderNum > 0) {
30,412✔
808
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
21,569✔
809
    if (NULL == pReq->pVgLeaders) {
21,569!
810
      code = terrno;
×
811
      goto _exit;
×
812
    }
813
  }
814
  for (int32_t i = 0; i < vgLearderNum; ++i) {
106,689✔
815
    int32_t vgId = 0;
76,277✔
816
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
76,277!
817
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
152,554!
818
      code = terrno;
×
819
      goto _exit;
×
820
    }
821
  }
822

823

824
  int32_t statusNum = 0;
30,412✔
825
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
30,412!
826
  if (statusNum > 0) {
30,412✔
827
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
2,980✔
828
    if (NULL == pReq->pStreamStatus) {
2,980!
829
      code = terrno;
×
830
      goto _exit;
×
831
    }
832
  }
833
  for (int32_t i = 0; i < statusNum; ++i) {
78,175✔
834
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
47,763✔
835
    if (NULL == pTask) {
47,763!
836
      code = terrno;
×
837
      goto _exit;
×
838
    }
839
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
47,763!
840
  }
841

842

843
  int32_t reqNum = 0;
30,412✔
844
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
30,412!
845
  if (reqNum > 0) {
30,412✔
846
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
48✔
847
    if (NULL == pReq->pStreamReq) {
48!
848
      code = terrno;
×
849
      goto _exit;
×
850
    }
851
  }
852
  for (int32_t i = 0; i < reqNum; ++i) {
30,558✔
853
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
146✔
854
    if (NULL == pIdx) {
146!
855
      code = terrno;
×
856
      goto _exit;
×
857
    }
858
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
146!
859
  }
860

861

862
  int32_t triggerNum = 0;
30,412✔
863
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
30,412!
864
  if (triggerNum > 0) {
30,412✔
865
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
1,272✔
866
    if (NULL == pReq->pTriggerStatus) {
1,272!
867
      code = terrno;
×
868
      goto _exit;
×
869
    }
870
  }
871
  for (int32_t i = 0; i < triggerNum; ++i) {
33,932✔
872
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
3,520✔
873
    if (NULL == pStatus) {
3,520!
874
      code = terrno;
×
875
      goto _exit;
×
876
    }
877
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
3,520!
878
  }
879

880
  
881
  tEndDecode(pDecoder);
30,412✔
882

883
_exit:
30,412✔
884
  return code;
30,412✔
885
}
886

887
void tFreeSSTriggerRuntimeStatus(void* param) {
7,242✔
888
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
7,242✔
889
  if (NULL == pStatus) {
7,242!
890
    return;
×
891
  }
892
  taosArrayDestroy(pStatus->userRecalcs);
7,242✔
893
}
894

895
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
181,965✔
896
  if (pMsg == NULL) {
181,965!
897
    return;
×
898
  }
899

900
  taosArrayDestroy(pMsg->pVgLeaders);
181,965✔
901
  if (deepClean) {
181,965!
902
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
181,965✔
903
    for (int32_t i = 0; i < reqNum; ++i) {
182,257✔
904
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
292✔
905
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
292✔
906
      if (NULL == pTask) {
292!
907
        continue;
×
908
      }
909

910
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
292✔
911
      taosMemoryFree(pTask->pMgmtReq);
292!
912
    }
913
  }
914
  taosArrayDestroy(pMsg->pStreamReq);
181,965✔
915
  taosArrayDestroy(pMsg->pStreamStatus);
181,965✔
916
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
181,965✔
917
}
918

919
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
1,240✔
920
  int32_t code = 0;
1,240✔
921
  int32_t lino;
922

923
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
2,480!
924
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
2,480!
925
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblSuid));
2,480!
926
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
2,480!
927
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
2,480!
928
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
2,480!
929
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
2,480!
930
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
2,480!
931
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
932
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
2,480!
933
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
2,480!
934

935
_exit:
1,240✔
936

937
  return code;
1,240✔
938
}
939

940
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
982✔
941
  int32_t code = 0;
982✔
942
  int32_t lino;
943

944
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
1,964!
945
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
1,964!
946

947
_exit:
982✔
948

949
  return code;
982✔
950
}
951

952

953
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
2,222✔
954
  int32_t code = 0;
2,222✔
955
  int32_t lino;
956

957
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
4,444!
958
  if (pMsg->triggerReader) {
2,222✔
959
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
1,240!
960
  } else {
961
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
982!
962
  }
963
  
964
_exit:
982✔
965

966
  return code;
2,222✔
967
}
968

969
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
3,670✔
970
  int32_t code = 0;
3,670✔
971
  int32_t lino;
972

973
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
7,340!
974
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
7,340!
975
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
3,670!
976

977
_exit:
3,670✔
978

979
  return code;
3,670✔
980
}
981

982
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
2,400✔
983
  int32_t code = 0;
2,400✔
984
  int32_t lino;
985

986
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
2,400!
987
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
4,800!
988

989
_exit:
2,400✔
990

991
  return code;
2,400✔
992
}
993

994

995
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
810✔
996
  int32_t code = 0;
810✔
997
  int32_t lino;
998

999
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
1,620!
1000
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
1,620!
1001
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
1,620!
1002
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
1,620!
1003
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
1,620!
1004
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
1,620!
1005
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
1,620!
1006
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
1,620!
1007
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblStb));
1,620!
1008
  int32_t partitionColsLen = pMsg->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->partitionCols) + 1;
810✔
1009
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, partitionColsLen));
1,620!
1010

1011
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
810✔
1012
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
810!
1013
  for (int32_t i = 0; i < addrSize; ++i) {
984✔
1014
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
174✔
1015
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
348!
1016
  }
1017
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
1,620!
1018
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
1,620!
1019
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
1,620!
1020

1021
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
1,620!
1022
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
1,620!
1023
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
1,620!
1024
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
1,620!
1025

1026
  switch (pMsg->triggerType) {
810!
1027
    case WINDOW_TYPE_SESSION: {
30✔
1028
      // session trigger
1029
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
60!
1030
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
60!
1031
      break;
30✔
1032
    }
1033
    case WINDOW_TYPE_STATE: {
292✔
1034
      // state trigger
1035
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
584!
1036
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.extend));
584!
1037
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
584!
1038
      int32_t stateWindowExprLen =
292✔
1039
          pMsg->trigger.stateWin.expr == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.expr) + 1;
292!
1040
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.expr, stateWindowExprLen));
584!
1041
      break;
292✔
1042
    }
1043
    case WINDOW_TYPE_INTERVAL: {
338✔
1044
      // slide trigger
1045
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
676!
1046
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
676!
1047
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
676!
1048
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
676!
1049
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
676!
1050
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
676!
1051
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
676!
1052
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
676!
1053
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
676!
1054
      break;
338✔
1055
    }
1056
    case WINDOW_TYPE_EVENT: {
80✔
1057
      // event trigger
1058
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
80!
1059
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
80!
1060

1061
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
160!
1062
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
160!
1063

1064
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
160!
1065
      break;
80✔
1066
    }
1067
    case WINDOW_TYPE_COUNT: {
40✔
1068
      // count trigger
1069
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
40!
1070
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
80!
1071

1072
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
80!
1073
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
80!
1074
      break;
40✔
1075
    }
1076
    case WINDOW_TYPE_PERIOD: {
30✔
1077
      // period trigger
1078
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
60!
1079
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
60!
1080
      break;
30✔
1081
    }
1082
    default:
×
1083
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1084
      break;
×
1085
  }
1086

1087
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
1,620!
1088
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
1,620!
1089
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
1,620!
1090
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
1,620!
1091
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
810✔
1092
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
1,620!
1093
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
810✔
1094
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
1,620!
1095
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
810✔
1096
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
1,620!
1097

1098
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
810✔
1099
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
810!
1100
  for (int32_t i = 0; i < readerNum; ++i) {
1,842✔
1101
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
1,032✔
1102
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
1,032!
1103
  }
1104

1105
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
810✔
1106
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
810!
1107
  for (int32_t i = 0; i < runnerNum; ++i) {
3,210✔
1108
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
2,400✔
1109
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
2,400!
1110
  }
1111

1112
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
1,620!
1113
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
1,620!
1114

1115
_exit:
810✔
1116

1117
  return code;
810✔
1118
}
1119

1120

1121
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
22,894✔
1122
  int32_t code = 0;
22,894✔
1123
  int32_t lino;
1124

1125
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
45,788!
1126
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
45,788!
1127
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
45,788!
1128
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
45,788!
1129
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
45,788!
1130
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
45,788!
1131

1132
_exit:
22,894✔
1133

1134
  return code;
22,894✔
1135
}
1136

1137

1138
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
2,532✔
1139
  int32_t code = 0;
2,532✔
1140
  int32_t lino;
1141

1142
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
5,064!
1143
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
5,064!
1144
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
5,064!
1145
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
5,064!
1146
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
5,064!
1147
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
5,064!
1148
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
5,064!
1149
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
5,064!
1150

1151
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
2,532✔
1152
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
2,532!
1153
  for (int32_t i = 0; i < addrSize; ++i) {
3,024✔
1154
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
492✔
1155
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
984!
1156
  }
1157
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
5,064!
1158

1159
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
2,532✔
1160
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
2,532!
1161
  for (int32_t i = 0; i < outColNum; ++i) {
13,206✔
1162
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
10,674✔
1163
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
10,674!
1164
  }
1165

1166
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
2,532✔
1167
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
2,532!
1168
  for (int32_t i = 0; i < outTagNum; ++i) {
4,608✔
1169
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
2,076✔
1170
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
2,076!
1171
  }
1172

1173
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
5,064!
1174
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
5,064!
1175

1176
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
5,064!
1177
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
5,064!
1178

1179
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
2,532✔
1180
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
2,532!
1181
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
2,982✔
1182
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
450✔
1183
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
450!
1184

1185
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
900!
1186
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
900!
1187
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
900!
1188
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
900!
1189
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
900!
1190
  }
1191

1192
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
5,064!
1193

1194
_exit:
2,532✔
1195

1196
  return code;
2,532✔
1197
}
1198

1199

1200
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
5,564✔
1201
  int32_t code = 0;
5,564✔
1202
  int32_t lino;
1203

1204
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
5,564!
1205
  switch (pTask->task.type) {
5,564!
1206
    case STREAM_READER_TASK:
2,222✔
1207
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
2,222!
1208
      break;
2,222✔
1209
    case STREAM_TRIGGER_TASK:
810✔
1210
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
810!
1211
      break;
810✔
1212
    case STREAM_RUNNER_TASK:
2,532✔
1213
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
2,532!
1214
      break;
2,532✔
1215
    default:
×
1216
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1217
      break;
×
1218
  }
1219
  
1220
_exit:
5,564✔
1221

1222
  return code;
5,564✔
1223
}
1224

1225

1226
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
1,092✔
1227
  int32_t code = 0;
1,092✔
1228
  int32_t lino;
1229

1230
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
2,184!
1231

1232
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
1,092✔
1233
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
1,092!
1234
  for (int32_t i = 0; i < readerNum; ++i) {
3,314✔
1235
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
2,222✔
1236
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
2,222!
1237
  }
1238

1239
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
2,184!
1240
  if (pStream->triggerTask) {
1,092✔
1241
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
810!
1242
  }
1243
  
1244
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
1,092✔
1245
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
1,092!
1246
  for (int32_t i = 0; i < runnerNum; ++i) {
3,624✔
1247
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
2,532✔
1248
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
2,532!
1249
  }
1250

1251
_exit:
1,092✔
1252

1253
  return code;
1,092✔
1254
}
1255

1256
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
1,656✔
1257
  int32_t code = 0;
1,656✔
1258
  int32_t lino = 0;
1,656✔
1259

1260
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
3,312!
1261

1262
_exit:
1,656✔
1263
  return code;
1,656✔
1264
}
1265

1266
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
828✔
1267
  int32_t code = 0;
828✔
1268
  int32_t lino;
1269

1270
  int32_t type = 0;
828✔
1271
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
828!
1272
  pMsg->msgType = type;
828✔
1273

1274
_exit:
828✔
1275
  return code;
828✔
1276
}
1277

1278
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
1,056✔
1279
  int32_t code = 0;
1,056✔
1280
  int32_t lino;
1281

1282
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
1,056!
1283

1284
_exit:
1,056✔
1285

1286
  return code;
1,056✔
1287
}
1288

1289
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
1,056✔
1290
  int32_t code = 0;
1,056✔
1291
  int32_t lino;
1292

1293
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
1,056!
1294
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
1,056!
1295

1296
_exit:
1,056✔
1297

1298
  return code;
1,056✔
1299
}
1300

1301
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
276✔
1302
  int32_t code = 0;
276✔
1303
  int32_t lino;
1304

1305
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
276!
1306
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
552!
1307
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
552!
1308

1309
_exit:
276✔
1310

1311
  return code;
276✔
1312
}
1313

1314
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
276✔
1315
  int32_t code = 0;
276✔
1316
  int32_t lino;
1317

1318
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
276!
1319
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
276!
1320

1321
_exit:
276✔
1322

1323
  return code;
276✔
1324
}
1325

1326

1327
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
32✔
1328
  int32_t code = 0;
32✔
1329
  int32_t lino;
1330

1331
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
64!
1332
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
64!
1333
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
64!
1334

1335
_exit:
32✔
1336

1337
  return code;
32✔
1338
}
1339

1340
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
324✔
1341
  int32_t code = 0;
324✔
1342
  int32_t lino;
1343

1344
  switch (msgType) {
324!
1345
    case STREAM_MSG_ORIGTBL_READER_INFO: {
258✔
1346
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
258✔
1347
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
258!
1348

1349
      for (int32_t i = 0; i < vgNum; ++i) {
920✔
1350
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
662✔
1351
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
1,324!
1352
      }
1353

1354
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
258✔
1355
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
258!
1356
      
1357
      for (int32_t i = 0; i < readerNum; ++i) {
458✔
1358
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
200✔
1359
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
200!
1360
      }
1361
      break;
258✔
1362
    }
1363
    case STREAM_MSG_UPDATE_RUNNER: {
×
1364
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
1365
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
1366
      
1367
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1368
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1369
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1370
      }
1371
      break;
×
1372
    }
1373
    case STREAM_MSG_USER_RECALC: {
32✔
1374
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
32✔
1375
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
32!
1376
      
1377
      for (int32_t i = 0; i < recalcNum; ++i) {
64✔
1378
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
32✔
1379
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
32!
1380
      }
1381
      break;
32✔
1382
    }
1383
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
34✔
1384
      int32_t rspNum = taosArrayGetSize(pRsp->execRspList);
34✔
1385
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
34!
1386
      
1387
      for (int32_t i = 0; i < rspNum; ++i) {
68✔
1388
        SStreamOReaderDeployRsp* pDeployRsp = taosArrayGet(pRsp->execRspList, i);
34✔
1389
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeployRsp->execId));
68!
1390
        int32_t vgNum = taosArrayGetSize(pDeployRsp->vgList);
34✔
1391
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
34!
1392
        for (int32_t n = 0; n < vgNum; ++n) {
72✔
1393
          TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, taosArrayGet(pDeployRsp->vgList, n)));
38!
1394
        }
1395
      }
1396
      break;
34✔
1397
    }
1398
    default:
×
1399
      break;
×
1400
  }
1401

1402
_exit:
324✔
1403

1404
  return code;
324✔
1405
}
1406

1407
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
324✔
1408
  int32_t code = 0;
324✔
1409
  int32_t lino;
1410

1411
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
324!
1412
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
648!
1413
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
648!
1414
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
324!
1415
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
324!
1416

1417
_exit:
324✔
1418

1419
  return code;
324✔
1420
}
1421

1422

1423
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
60,144✔
1424
  int32_t code = 0;
60,144✔
1425
  int32_t lino;
1426

1427
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
60,144!
1428
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
120,288!
1429
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
60,144✔
1430
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
60,144!
1431
  for (int32_t i = 0; i < deployNum; ++i) {
61,236✔
1432
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
1,092✔
1433
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
1,092!
1434
  }
1435

1436
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
60,144✔
1437
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
60,144!
1438
  for (int32_t i = 0; i < startNum; ++i) {
61,200✔
1439
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
1,056✔
1440
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
1,056!
1441
  }
1442

1443
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
120,288!
1444
  if (!pRsp->undeploy.undeployAll) {
60,144!
1445
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
60,144✔
1446
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
60,144!
1447
    for (int32_t i = 0; i < undeployNum; ++i) {
60,420✔
1448
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
276✔
1449
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
276!
1450
    }
1451
  }
1452

1453
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
60,144✔
1454
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
60,144!
1455
  for (int32_t i = 0; i < rspNum; ++i) {
60,468✔
1456
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
324✔
1457
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
324!
1458
  }
1459
  
1460
_exit:
60,144✔
1461

1462
  tEndEncode(pEncoder);
60,144✔
1463

1464
  return code;
60,144✔
1465
}
1466

1467
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
620✔
1468
  int32_t code = 0;
620✔
1469
  int32_t lino;
1470

1471
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
1,240!
1472
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
1,240!
1473
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblSuid));
1,240!
1474
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
1,240!
1475
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
1,240!
1476
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
1,240!
1477
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
1,240!
1478
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
1,240!
1479
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
1,240!
1480
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
1,240!
1481

1482
_exit:
620✔
1483

1484
  return code;
620✔
1485
}
1486

1487

1488
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
491✔
1489
  int32_t code = 0;
491✔
1490
  int32_t lino;
1491

1492
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
982!
1493
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
982!
1494

1495
_exit:
491✔
1496

1497
  return code;
491✔
1498
}
1499

1500

1501
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
1,111✔
1502
  int32_t code = 0;
1,111✔
1503
  int32_t lino;
1504

1505
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
2,222!
1506
  if (pMsg->triggerReader) {
1,111✔
1507
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
620!
1508
  } else {
1509
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
491!
1510
  }
1511
  
1512
_exit:
491✔
1513

1514
  return code;
1,111✔
1515
}
1516

1517

1518
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
1,835✔
1519
  int32_t code = 0;
1,835✔
1520
  int32_t lino;
1521

1522
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
3,670!
1523
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
3,670!
1524
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
1,835!
1525

1526
_exit:
1,835✔
1527

1528
  return code;
1,835✔
1529
}
1530

1531

1532
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
1,200✔
1533
  int32_t code = 0;
1,200✔
1534
  int32_t lino;
1535

1536
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
1,200!
1537
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
2,400!
1538

1539
_exit:
1,200✔
1540

1541
  return code;
1,200✔
1542
}
1543

1544

1545
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
405✔
1546
  int32_t code = 0;
405✔
1547
  int32_t lino;
1548

1549
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
810!
1550
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
810!
1551
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
810!
1552
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
810!
1553
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
810!
1554
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
810!
1555
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
810!
1556
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
810!
1557
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblStb));
810!
1558
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
810!
1559

1560
  int32_t addrSize = 0;
405✔
1561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
405!
1562
  if (addrSize > 0) {
405✔
1563
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
87✔
1564
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
87!
1565
  }
1566
  for (int32_t i = 0; i < addrSize; ++i) {
492✔
1567
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
87✔
1568
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
87!
1569
  }
1570
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
810!
1571
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
810!
1572
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
810!
1573

1574
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
810!
1575
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
810!
1576
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
810!
1577
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
810!
1578

1579
  switch (pMsg->triggerType) {
405!
1580
    case WINDOW_TYPE_SESSION:
15✔
1581
      // session trigger
1582
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
30!
1583
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
30!
1584
      break;
15✔
1585
    case WINDOW_TYPE_STATE:
146✔
1586
      // state trigger
1587
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
292!
1588
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.extend));
292!
1589
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
292!
1590
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.expr, NULL));
292!
1591
      break;
146✔
1592
    
1593
    case WINDOW_TYPE_INTERVAL:
169✔
1594
      // slide trigger
1595
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
338!
1596
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
338!
1597
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
338!
1598
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
338!
1599
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
338!
1600
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
338!
1601
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
338!
1602
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
338!
1603
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
338!
1604
      break;
169✔
1605
    
1606
    case WINDOW_TYPE_EVENT:
40✔
1607
      // event trigger
1608
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
80!
1609
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
80!
1610
      
1611
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
80!
1612
      break;
40✔
1613
    
1614
    case WINDOW_TYPE_COUNT:
20✔
1615
      // count trigger
1616
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
40!
1617
      
1618
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
40!
1619
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
40!
1620
      break;
20✔
1621
    
1622
    case WINDOW_TYPE_PERIOD:
15✔
1623
      // period trigger
1624
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
30!
1625
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
30!
1626
      break;
15✔
1627
    default:
×
1628
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1629
      break;
×
1630
  }
1631

1632
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
810!
1633
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
810!
1634
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
810!
1635
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
810!
1636
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
810!
1637
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
810!
1638
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
810!
1639

1640
  int32_t readerNum = 0;
405✔
1641
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
405!
1642
  if (readerNum > 0) {
405✔
1643
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
403✔
1644
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
403!
1645
  }
1646
  for (int32_t i = 0; i < readerNum; ++i) {
921✔
1647
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
516✔
1648
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
516!
1649
  }
1650

1651
  int32_t runnerNum = 0;
405✔
1652
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
405!
1653
  if (runnerNum > 0) {
405✔
1654
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
400✔
1655
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
400!
1656
  }
1657
  for (int32_t i = 0; i < runnerNum; ++i) {
1,605✔
1658
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1,200✔
1659
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
1,200!
1660
  }
1661

1662
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
810!
1663
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
810!
1664

1665
_exit:
405✔
1666

1667
  return code;
405✔
1668
}
1669

1670

1671

1672
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
10,995✔
1673
  int32_t code = 0;
10,995✔
1674
  int32_t lino;
1675

1676
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
10,995!
1677
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
21,990!
1678
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
21,990!
1679
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
21,990!
1680
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
21,990!
1681
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
21,990!
1682

1683
_exit:
10,995✔
1684

1685
  return code;
10,995✔
1686
}
1687

1688
void destroySStreamOutCols(void* p){
225✔
1689
  if (p == NULL) return;
225!
1690
  SStreamOutCol* col = (SStreamOutCol*)p;
225✔
1691
  taosMemoryFreeClear(col->expr);
225!
1692
}
1693

1694
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
1,266✔
1695
  int32_t code = 0;
1,266✔
1696
  int32_t lino;
1697

1698
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
2,532!
1699
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
2,532!
1700
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
2,532!
1701
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
2,532!
1702
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
2,532!
1703
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
2,532!
1704
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
2,532!
1705
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
2,532!
1706

1707
  int32_t addrSize = 0;
1,266✔
1708
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
1,266!
1709
  if (addrSize > 0) {
1,266✔
1710
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
246✔
1711
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
246!
1712
  }
1713
  for (int32_t i = 0; i < addrSize; ++i) {
1,512✔
1714
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
246✔
1715
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
246!
1716
  }
1717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
2,532!
1718

1719
  int32_t outColNum = 0;
1,266✔
1720
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
1,266!
1721
  if (outColNum > 0) {
1,266!
1722
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
1,266✔
1723
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
1,266!
1724
  }
1725
  for (int32_t i = 0; i < outColNum; ++i) {
6,603✔
1726
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
5,337✔
1727
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
5,337!
1728
  }
1729

1730
  int32_t outTagNum = 0;
1,266✔
1731
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
1,266!
1732
  if (outTagNum > 0) {
1,266✔
1733
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
666✔
1734
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
666!
1735
  }
1736
  for (int32_t i = 0; i < outTagNum; ++i) {
2,304✔
1737
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1,038✔
1738
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
1,038!
1739
  }
1740

1741
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
2,532!
1742
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
2,532!
1743

1744
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
2,532!
1745
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
2,532!
1746

1747
  int32_t forceOutColsSize = 0;
1,266✔
1748
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
1,266!
1749
  if (forceOutColsSize > 0) {
1,266✔
1750
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
36✔
1751
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
36!
1752
  }
1753
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,491✔
1754
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
225✔
1755

1756
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
450!
1757
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
450!
1758
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
450!
1759
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
450!
1760
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
450!
1761
  }
1762

1763
  if (!tDecodeIsEnd(pDecoder)) {
1,266!
1764
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
2,532!
1765
  }
1766

1767
_exit:
1,266✔
1768

1769
  return code;
1,266✔
1770
}
1771

1772
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
2,782✔
1773
  int32_t code = 0;
2,782✔
1774
  int32_t lino;
1775

1776
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
2,782!
1777
  switch (pTask->task.type) {
2,782!
1778
    case STREAM_READER_TASK:
1,111✔
1779
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
1,111!
1780
      break;
1,111✔
1781
    case STREAM_TRIGGER_TASK:
405✔
1782
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
405!
1783
      break;
405✔
1784
    case STREAM_RUNNER_TASK:
1,266✔
1785
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
1,266!
1786
      break;
1,266✔
1787
    default:
×
1788
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1789
      break;
×
1790
  }
1791
  
1792
_exit:
2,782✔
1793

1794
  return code;
2,782✔
1795
}
1796

1797

1798
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
546✔
1799
  int32_t code = 0;
546✔
1800
  int32_t lino;
1801

1802
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
1,092!
1803

1804
  int32_t readerNum = 0;
546✔
1805
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
546!
1806
  if (readerNum > 0) {
546✔
1807
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
509✔
1808
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
509!
1809
  }
1810
  for (int32_t i = 0; i < readerNum; ++i) {
1,657✔
1811
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
1,111✔
1812
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
1,111!
1813
  }
1814

1815
  int32_t triggerTask = 0;
546✔
1816
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
546!
1817
  if (triggerTask) {
546✔
1818
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
405!
1819
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
405!
1820
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
405!
1821
  }
1822
  
1823
  int32_t runnerNum = 0;
546✔
1824
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
546!
1825
  if (runnerNum > 0) {
546✔
1826
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
425✔
1827
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
425!
1828
  }
1829
  for (int32_t i = 0; i < runnerNum; ++i) {
1,812✔
1830
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
1,266✔
1831
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
1,266!
1832
  }
1833

1834
_exit:
546✔
1835

1836
  return code;
546✔
1837
}
1838

1839

1840
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
528✔
1841
  int32_t code = 0;
528✔
1842
  int32_t lino;
1843

1844
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
528!
1845

1846
_exit:
528✔
1847

1848
  return code;
528✔
1849
}
1850

1851

1852
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
528✔
1853
  int32_t code = 0;
528✔
1854
  int32_t lino;
1855

1856
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
528!
1857
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
528!
1858

1859
_exit:
528✔
1860

1861
  return code;
528✔
1862
}
1863

1864

1865
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
138✔
1866
  int32_t code = 0;
138✔
1867
  int32_t lino;
1868

1869
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
138!
1870
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
276!
1871
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
276!
1872

1873
_exit:
138✔
1874

1875
  return code;
138✔
1876
}
1877

1878

1879
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
138✔
1880
  int32_t code = 0;
138✔
1881
  int32_t lino;
1882

1883
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
138!
1884
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
138!
1885

1886
_exit:
138✔
1887

1888
  return code;
138✔
1889
}
1890

1891
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
16✔
1892
  int32_t code = 0;
16✔
1893
  int32_t lino;
1894

1895
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
32!
1896
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
32!
1897
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
32!
1898

1899
_exit:
16✔
1900

1901
  return code;
16✔
1902
}
1903

1904
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
162✔
1905
  int32_t code = 0;
162✔
1906
  int32_t lino;
1907

1908
  switch (msgType) {
162!
1909
    case STREAM_MSG_ORIGTBL_READER_INFO: {
129✔
1910
      int32_t vgNum = 0;
129✔
1911
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
129!
1912
      if (vgNum > 0) {
129!
1913
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
129✔
1914
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
129!
1915
      }
1916
      for (int32_t i = 0; i < vgNum; ++i) {
460✔
1917
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
331✔
1918
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
331!
1919
      }
1920

1921
      int32_t readerNum = 0;
129✔
1922
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
129!
1923
      if (readerNum > 0) {
129✔
1924
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
89✔
1925
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
89!
1926
      }
1927
      for (int32_t i = 0; i < readerNum; ++i) {
229✔
1928
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
100✔
1929
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
100!
1930
      }
1931
      break;
129✔
1932
    }
1933
    case STREAM_MSG_UPDATE_RUNNER: {
×
1934
      int32_t runnerNum = 0;
×
1935
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1936
      if (runnerNum > 0) {
×
1937
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1938
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1939
      }
1940
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1941
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1942
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1943
      }
1944
      break;
×
1945
    }
1946
    case STREAM_MSG_USER_RECALC: {
16✔
1947
      int32_t recalcNum = 0;
16✔
1948
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
16!
1949
      if (recalcNum > 0) {
16!
1950
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
16✔
1951
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
16!
1952
      }
1953
      for (int32_t i = 0; i < recalcNum; ++i) {
32✔
1954
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
16✔
1955
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
16!
1956
      }
1957
      break;
16✔
1958
    }
1959
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
17✔
1960
      int32_t rspNum = 0, vgNum = 0;
17✔
1961
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));  
17!
1962
      if (rspNum > 0) {
17!
1963
        pCont->execRspList = taosArrayInit_s(sizeof(SStreamOReaderDeployRsp), rspNum);
17✔
1964
        TSDB_CHECK_NULL(pCont->execRspList, code, lino, _exit, terrno);
17!
1965
      }
1966
      for (int32_t i = 0; i < rspNum; ++i) {
34✔
1967
        SStreamOReaderDeployRsp *pDeployRsp = taosArrayGet(pCont->execRspList, i);
17✔
1968
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pDeployRsp->execId));  
34!
1969
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));
17!
1970
        if (vgNum > 0) {
17!
1971
          pDeployRsp->vgList = taosArrayInit_s(sizeof(SStreamTaskAddr), vgNum);
17✔
1972
          TSDB_CHECK_NULL(pDeployRsp->vgList, code, lino, _exit, terrno);
17!
1973
        }
1974
        for (int32_t n = 0; n < vgNum; ++n) {
36✔
1975
          SStreamTaskAddr* pAddr = taosArrayGet(pDeployRsp->vgList, n);
19✔
1976
          TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));  
19!
1977
        }
1978
      }
1979
      break;
17✔
1980
    }
1981
    default:
×
1982
      break;
×
1983
  }
1984

1985
_exit:
162✔
1986

1987
  return code;
162✔
1988
}
1989

1990

1991
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
162✔
1992
  int32_t code = 0;
162✔
1993
  int32_t lino;
1994

1995
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
162!
1996
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
324!
1997
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
324!
1998
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
162!
1999
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
162!
2000

2001
_exit:
162✔
2002

2003
  return code;
162✔
2004
}
2005

2006
void tFreeSStreamOReaderDeployRsp(void* param) {
34✔
2007
  if (NULL == param) {
34!
2008
    return;
×
2009
  }
2010

2011
  SStreamOReaderDeployRsp* pRsp = (SStreamOReaderDeployRsp*)param;
34✔
2012
  taosArrayDestroy(pRsp->vgList);
34✔
2013
}
2014

2015
void tFreeSStreamMgmtRsp(void* param) {
324✔
2016
  if (NULL == param) {
324!
2017
    return;
×
2018
  }
2019
  
2020
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
324✔
2021

2022
  taosArrayDestroy(pRsp->cont.vgIds);
324✔
2023
  taosArrayDestroy(pRsp->cont.readerList);
324✔
2024
  taosArrayDestroy(pRsp->cont.runnerList);
324✔
2025
  taosArrayDestroy(pRsp->cont.recalcList);
324✔
2026
  taosArrayDestroyEx(pRsp->cont.execRspList, tFreeSStreamOReaderDeployRsp);
324✔
2027
}
2028

2029
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
1,111✔
2030
  if (NULL == pReader) {
1,111!
2031
    return;
×
2032
  }
2033
  
2034
  if (pReader->triggerReader) {
1,111✔
2035
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
620✔
2036
    taosMemoryFree(pMsg->triggerTblName);
620!
2037
    taosMemoryFree(pMsg->partitionCols);
620!
2038
    taosMemoryFree(pMsg->triggerCols);
620!
2039
    taosMemoryFree(pMsg->triggerScanPlan);
620!
2040
    taosMemoryFree(pMsg->calcCacheScanPlan);
620!
2041
  } else {
2042
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
491✔
2043
    taosMemoryFree(pMsg->calcScanPlan);
491!
2044
  }
2045
}
2046

2047
void tFreeStreamNotifyUrl(void* param) {
×
2048
  if (NULL == param) {
×
2049
    return;
×
2050
  }
2051

2052
  taosMemoryFree(*(void**)param);
×
2053
}
2054

2055
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
405✔
2056
  if (NULL == pTrigger) {
405!
2057
    return;
×
2058
  }
2059
  
2060
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
405✔
2061
  switch (pTrigger->triggerType) {
405✔
2062
    case WINDOW_TYPE_STATE:
146✔
2063
      taosMemoryFree(pTrigger->trigger.stateWin.expr);
146!
2064
      break;
146✔
2065
    case WINDOW_TYPE_EVENT:
40✔
2066
      taosMemoryFree(pTrigger->trigger.event.startCond);
40!
2067
      taosMemoryFree(pTrigger->trigger.event.endCond);
40!
2068
      break;
40✔
2069
    case WINDOW_TYPE_COUNT:
20✔
2070
      taosMemoryFree(pTrigger->trigger.count.condCols);  
20!
2071
      break;
20✔
2072
    default:
199✔
2073
      break;
199✔
2074
  }
2075

2076
  taosMemoryFree(pTrigger->partitionCols);
405!
2077
  taosMemoryFree(pTrigger->triggerPrevFilter);
405!
2078
  taosMemoryFree(pTrigger->triggerScanPlan);
405!
2079
  taosMemoryFree(pTrigger->calcCacheScanPlan);
405!
2080

2081
  taosArrayDestroy(pTrigger->readerList);
405✔
2082
  taosArrayDestroy(pTrigger->runnerList);
405✔
2083
  taosMemoryFree(pTrigger->streamName);
405!
2084
}
2085

2086
void tFreeSStreamOutCol(void* param) {
×
2087
  if (NULL == param) {
×
2088
    return;
×
2089
  }
2090

2091
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
2092
  taosMemoryFree(pOut->expr);
×
2093
}
2094

2095
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
1,266✔
2096
  if (NULL == pRunner) {
1,266!
2097
    return;
×
2098
  }
2099

2100
  taosMemoryFree(pRunner->streamName);
1,266!
2101
  taosMemoryFree(pRunner->pPlan);
1,266!
2102
  taosMemoryFree(pRunner->outDBFName);
1,266!
2103
  taosMemoryFree(pRunner->outTblName);
1,266!
2104

2105
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
1,266✔
2106
  taosArrayDestroy(pRunner->outCols);
1,266✔
2107
  taosArrayDestroy(pRunner->outTags);
1,266✔
2108

2109
  taosMemoryFree(pRunner->subTblNameExpr);
1,266!
2110
  taosMemoryFree(pRunner->tagValueExpr);
1,266!
2111
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
1,266✔
2112
}
2113

2114
void tFreeSStmTaskDeploy(void* param) {
3,469✔
2115
  if (NULL == param) {
3,469✔
2116
    return;
687✔
2117
  }
2118

2119
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
2,782✔
2120
  switch (pTask->task.type)  {
2,782!
2121
    case STREAM_READER_TASK:
1,111✔
2122
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
1,111✔
2123
      break;
1,111✔
2124
    case STREAM_TRIGGER_TASK:
405✔
2125
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
405✔
2126
      break;
405✔
2127
    case STREAM_RUNNER_TASK:
1,266✔
2128
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
1,266✔
2129
      break;
1,266✔
2130
    default:
×
2131
      break;
×
2132
  }
2133
}
2134

2135
void tFreeSStmStreamDeploy(void* param) {
546✔
2136
  if (NULL == param) {
546!
2137
    return;
×
2138
  }
2139
  
2140
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
546✔
2141
  taosArrayDestroy(pDeploy->readerTasks);
546✔
2142
  if (pDeploy->triggerTask) {
546✔
2143
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
405✔
2144
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
405✔
2145
    taosMemoryFree(pDeploy->triggerTask);
405!
2146
  }
2147

2148
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
546✔
2149
  for (int32_t i = 0; i < runnerNum; ++i) {
1,812✔
2150
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
1,266✔
2151
    taosMemoryFree(pRunner->msg.runner.pPlan);
1,266!
2152
  }
2153
  taosArrayDestroy(pDeploy->runnerTasks);
546✔
2154
}
2155

2156
void tDeepFreeSStmStreamDeploy(void* param) {
1,092✔
2157
  if (NULL == param) {
1,092!
2158
    return;
×
2159
  }
2160
  
2161
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
1,092✔
2162
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
1,092✔
2163
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
1,092✔
2164
  taosMemoryFree(pDeploy->triggerTask);
1,092!
2165
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
1,092✔
2166
}
2167

2168

2169
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
60,824✔
2170
  if (NULL == pRsp) {
60,824!
2171
    return;
×
2172
  }
2173
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
60,824✔
2174
  taosArrayDestroy(pRsp->start.taskList);
60,824✔
2175
  taosArrayDestroy(pRsp->undeploy.taskList);
60,824✔
2176
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
60,824✔
2177
}
2178

2179
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
29,903✔
2180
  if (NULL == pRsp) {
29,903!
2181
    return;
×
2182
  }
2183
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
29,903✔
2184
  taosArrayDestroy(pRsp->start.taskList);
29,903✔
2185
  taosArrayDestroy(pRsp->undeploy.taskList);
29,903✔
2186
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
29,903✔
2187
}
2188

2189

2190

2191
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
29,903✔
2192
  int32_t code = 0;
29,903✔
2193
  int32_t lino;
2194

2195
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
29,903!
2196
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
59,806!
2197
  int32_t deployNum = 0;
29,903✔
2198
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
29,903!
2199
  if (deployNum > 0) {
29,903✔
2200
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
130✔
2201
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
130!
2202
  }
2203
  for (int32_t i = 0; i < deployNum; ++i) {
30,449✔
2204
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
546✔
2205
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
546!
2206
  }
2207

2208
  int32_t startNum = 0;
29,903✔
2209
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
29,903!
2210
  if (startNum > 0) {
29,903✔
2211
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
214✔
2212
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
214!
2213
  }
2214
  for (int32_t i = 0; i < startNum; ++i) {
30,431✔
2215
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
528✔
2216
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
528!
2217
  }
2218

2219
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
59,806!
2220
  if (!pRsp->undeploy.undeployAll) {
29,903!
2221
    int32_t undeployNum = 0;
29,903✔
2222
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
29,903!
2223
    if (undeployNum > 0) {
29,903✔
2224
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
46✔
2225
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
46!
2226
    }
2227
    for (int32_t i = 0; i < undeployNum; ++i) {
30,041✔
2228
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
138✔
2229
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
138!
2230
    }
2231
  }  
2232

2233
  int32_t rspNum = 0;
29,903✔
2234
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
29,903!
2235
  if (rspNum > 0) {
29,903✔
2236
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
62✔
2237
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
62!
2238
    for (int32_t i = 0; i < rspNum; ++i) {
224✔
2239
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
162✔
2240
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
162!
2241
    }
2242
  }
2243

2244
  tEndDecode(pDecoder);
29,903✔
2245

2246
_exit:
29,903✔
2247
  return code;
29,903✔
2248
}
2249

2250
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
2251
  int32_t code = 0;
×
2252
  int32_t lino;
2253

2254
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2255
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2256
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
2257
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
2258
  tEndEncode(pEncoder);
×
2259

2260
_exit:
×
2261
  return code;
×
2262
}
2263

2264
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
2265
  int32_t code = 0;
×
2266
  int32_t lino;
2267

2268
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2269
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2270
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
2271
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
2272
  tEndDecode(pDecoder);
×
2273

2274
_exit:
×
2275
  return code;
×
2276
}
2277

2278
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
2279
  int32_t code = 0;
×
2280
  int32_t lino;
2281

2282
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2283
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2284
  tEndEncode(pEncoder);
×
2285

2286
_exit:
×
2287
  return code;
×
2288
}
2289

2290
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
2291
  int32_t code = 0;
×
2292
  int32_t lino;
2293

2294
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2295
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2296
  tEndDecode(pDecoder);
×
2297

2298
_exit:
×
2299
  return code;
×
2300

2301
}
2302

2303

2304
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
3,086✔
2305
  int32_t code = 0;
3,086✔
2306
  int32_t lino;
2307

2308
  // name part
2309
  int32_t sqlLen = pReq->sql == NULL ? 0 : (int32_t)strlen(pReq->sql) + 1;
3,086!
2310
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
3,086!
2311
  int32_t outDbLen = pReq->outDB == NULL ? 0 : (int32_t)strlen(pReq->outDB) + 1;
3,086✔
2312
  int32_t streamDBLen = pReq->streamDB == NULL ? 0 : (int32_t)strlen(pReq->streamDB) + 1;
3,086!
2313
  int32_t triggerDBLen = pReq->triggerDB == NULL ? 0 : (int32_t)strlen(pReq->triggerDB) + 1;
3,086✔
2314
  int32_t triggerTblNameLen = pReq->triggerTblName == NULL ? 0 : (int32_t)strlen(pReq->triggerTblName) + 1;
3,086✔
2315
  int32_t outTblNameLen = pReq->outTblName == NULL ? 0 : (int32_t)strlen(pReq->outTblName) + 1;
3,086✔
2316

2317
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
6,172!
2318

2319
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->name, nameLen));
6,172!
2320
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->sql, sqlLen));
6,172!
2321
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outDB, outDbLen));
6,172!
2322
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->streamDB, streamDBLen));
6,172!
2323
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerDB, triggerDBLen));
6,172!
2324
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerTblName, triggerTblNameLen));
6,172!
2325
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outTblName, outTblNameLen));
6,172!
2326

2327
  int32_t calcDbSize = (int32_t)taosArrayGetSize(pReq->calcDB);
3,086✔
2328
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcDbSize));
3,086!
2329
  for (int32_t i = 0; i < calcDbSize; ++i) {
6,152✔
2330
    const char *dbName = taosArrayGetP(pReq->calcDB, i);
3,066✔
2331
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, dbName)));
3,066!
2332
  }
2333

2334
  // trigger control part
2335
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igExists));
6,172!
2336
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerType));
6,172!
2337
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igDisorder));
6,172!
2338
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteReCalc));
6,172!
2339
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteOutTbl));
6,172!
2340
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistory));
6,172!
2341
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistoryFirst));
6,172!
2342
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->calcNotifyOnly));
6,172!
2343
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->lowLatencyCalc));
6,172!
2344
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igNoDataTrigger));
6,172!
2345

2346
  // notify part
2347
  int32_t addrSize = (int32_t)taosArrayGetSize(pReq->pNotifyAddrUrls);
3,086✔
2348
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
3,086!
2349
  for (int32_t i = 0; i < addrSize; ++i) {
3,566✔
2350
    const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
480✔
2351
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, url)));
480!
2352
  }
2353
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyEventTypes));
6,172!
2354
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->addOptions));
6,172!
2355
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->notifyHistory));
6,172!
2356

2357
  // out table part
2358

2359
  // trigger cols and partition cols
2360
  int32_t filterColsLen = pReq->triggerFilterCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerFilterCols) + 1;
3,086✔
2361
  int32_t triggerColsLen = pReq->triggerCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerCols) + 1;
3,086✔
2362
  int32_t partitionColsLen = pReq->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pReq->partitionCols) + 1;
3,086✔
2363
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerFilterCols, filterColsLen));
6,172!
2364
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerCols, triggerColsLen));
6,172!
2365
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->partitionCols, partitionColsLen));
6,172!
2366

2367
  // out col
2368
  int32_t outColSize = (int32_t )taosArrayGetSize(pReq->outCols);
3,086✔
2369
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColSize));
3,086!
2370
  for (int32_t i = 0; i < outColSize; ++i) {
13,230✔
2371
    SFieldWithOptions *pField = taosArrayGet(pReq->outCols, i);
10,144✔
2372
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pField));
10,144!
2373
  }
2374

2375
  // out tag
2376
  int32_t outTagSize = (int32_t )taosArrayGetSize(pReq->outTags);
3,086✔
2377
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagSize));
3,086!
2378
  for (int32_t i = 0; i < outTagSize; ++i) {
4,918✔
2379
    SField *pField = taosArrayGet(pReq->outTags, i);
1,832✔
2380
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->type));
3,664!
2381
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
3,664!
2382
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
3,664!
2383
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
3,664!
2384
  }
2385

2386
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->maxDelay));
6,172!
2387
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->fillHistoryStartTime));
6,172!
2388
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->watermark));
6,172!
2389
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->expiredTime));
6,172!
2390

2391
  switch (pReq->triggerType) {
3,086!
2392
    case WINDOW_TYPE_SESSION: {
108✔
2393
      // session trigger
2394
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.session.slotId));
216!
2395
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.session.sessionVal));
216!
2396
      break;
108✔
2397
    }
2398
    case WINDOW_TYPE_STATE: {
684✔
2399
      // state trigger
2400
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.stateWin.slotId));
1,368!
2401
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.stateWin.trueForDuration));
1,368!
2402
      break;
684✔
2403
    }
2404
    case WINDOW_TYPE_INTERVAL: {
1,714✔
2405
      // slide trigger
2406
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.intervalUnit));
3,428!
2407
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.slidingUnit));
3,428!
2408
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.offsetUnit));
3,428!
2409
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.soffsetUnit));
3,428!
2410
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.precision));
3,428!
2411
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.interval));
3,428!
2412
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.offset));
3,428!
2413
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.sliding));
3,428!
2414
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.soffset));
3,428!
2415
      break;
1,714✔
2416
    }
2417
    case WINDOW_TYPE_EVENT: {
208✔
2418
      // event trigger
2419
      int32_t eventWindowStartCondLen = pReq->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.startCond) + 1;
208!
2420
      int32_t eventWindowEndCondLen = pReq->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.endCond) + 1;
208!
2421

2422
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.startCond, eventWindowStartCondLen));
416!
2423
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.endCond, eventWindowEndCondLen));
416!
2424

2425
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.event.trueForDuration));
416!
2426
      break;
208✔
2427
    }
2428
    case WINDOW_TYPE_COUNT: {
176✔
2429
      // count trigger
2430
      int32_t countWindowCondColsLen = pReq->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.count.condCols) + 1;
176!
2431
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.count.condCols, countWindowCondColsLen));
352!
2432

2433
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.countVal));
352!
2434
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.sliding));
352!
2435
      break;
176✔
2436
    }
2437
    case WINDOW_TYPE_PERIOD: {
196✔
2438
      // period trigger
2439
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.precision));
392!
2440
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.periodUnit));
392!
2441
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.offsetUnit));
392!
2442
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.period));
392!
2443
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.offset));
392!
2444
      break;
196✔
2445
    }
2446
  }
2447

2448
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerTblType));
6,172!
2449
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblUid));
6,172!
2450
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblSuid));
6,172!
2451
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->vtableCalc));
6,172!
2452
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outTblType));
6,172!
2453
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outStbExists));
6,172!
2454
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->outStbUid));
6,172!
2455
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outStbSversion));
6,172!
2456
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->eventTypes));
6,172!
2457
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->flags));
6,172!
2458
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->tsmaId));
6,172!
2459
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->placeHolderBitmap));
6,172!
2460
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->calcTsSlotId));
6,172!
2461
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->triTsSlotId));
6,172!
2462

2463
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->triggerTblVgId));
6,172!
2464
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outTblVgId));
6,172!
2465

2466
  int32_t triggerScanPlanLen = pReq->triggerScanPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerScanPlan) + 1;
3,086✔
2467
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerScanPlan, triggerScanPlanLen));
6,172!
2468

2469
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerHasPF));
6,172!
2470
  int32_t triggerFilterLen = pReq->triggerPrevFilter == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerPrevFilter) + 1;
3,086✔
2471
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerPrevFilter, triggerFilterLen));
6,172!
2472

2473
  int32_t calcScanPlanListSize = (int32_t)taosArrayGetSize(pReq->calcScanPlanList);
3,086✔
2474
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcScanPlanListSize));
3,086!
2475
  for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
7,292✔
2476
    SStreamCalcScan* pCalcScanPlan = (SStreamCalcScan*)taosArrayGet(pReq->calcScanPlanList, i);
4,206✔
2477
    int32_t          vgListSize = (int32_t)taosArrayGetSize(pCalcScanPlan->vgList);
4,206✔
2478
    int32_t          scanPlanLen = pCalcScanPlan->scanPlan == NULL ? 0 : (int32_t)strlen((char*)pCalcScanPlan->scanPlan) + 1;
4,206!
2479
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgListSize));
4,206!
2480
    for (int32_t j = 0; j < vgListSize; ++j) {
8,412✔
2481
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pCalcScanPlan->vgList, j)));
8,412!
2482
    }
2483
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pCalcScanPlan->readFromCache));
8,412!
2484
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCalcScanPlan->scanPlan, scanPlanLen));
8,412!
2485
  }
2486

2487
  int32_t calcPlanLen = pReq->calcPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->calcPlan) + 1;
3,086✔
2488
  int32_t subTblNameExprLen = pReq->subTblNameExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->subTblNameExpr) + 1;
3,086✔
2489
  int32_t tagValueExprLen = pReq->tagValueExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->tagValueExpr) + 1;
3,086✔
2490

2491
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfCalcSubplan));
6,172!
2492
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->calcPlan, calcPlanLen));
6,172!
2493
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->subTblNameExpr, subTblNameExprLen));
6,172!
2494
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->tagValueExpr, tagValueExprLen));
6,172!
2495

2496
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pReq->forceOutCols);
3,086✔
2497
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
3,086!
2498
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
3,386✔
2499
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pReq->forceOutCols, i);
300✔
2500
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
300!
2501

2502
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
600!
2503
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
600!
2504
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
600!
2505
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
600!
2506
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
600!
2507
  }
2508

2509
  switch (pReq->triggerType) {
3,086✔
2510
    case WINDOW_TYPE_STATE: {
684✔
2511
      // state trigger
2512
      int32_t stateExprLen = pReq->trigger.stateWin.expr == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.stateWin.expr) + 1;
684!
2513
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.stateWin.expr, stateExprLen));
1,368!
2514
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.stateWin.extend));
1,368!
2515
      break;
684✔
2516
    }
2517
    case WINDOW_TYPE_INTERVAL: {
1,714✔
2518
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.overlap));
3,428!
2519
      break;
1,714✔
2520
    }
2521
    default: {
688✔
2522
      break;
688✔
2523
    }
2524
  }
2525

2526
_exit:
3,086✔
2527

2528
  if (code) {
3,086!
2529
    return code;
×
2530
  }
2531
  
2532
  return 0;
3,086✔
2533
}
2534

2535
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
1,452✔
2536
  SEncoder encoder = {0};
1,452✔
2537
  tEncoderInit(&encoder, buf, bufLen);
1,452✔
2538
  int32_t code = 0;
1,452✔
2539
  int32_t lino;
2540

2541
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,452!
2542

2543
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
1,452!
2544

2545
  tEndEncode(&encoder);
1,452✔
2546

2547
_exit:
1,452✔
2548
  if (code) {
1,452!
2549
    tEncoderClear(&encoder);
×
2550
    return code;
×
2551
  } else {
2552
    int32_t tlen = encoder.pos;
1,452✔
2553
    tEncoderClear(&encoder);
1,452✔
2554
    return tlen;
1,452✔
2555
  }
2556
  return 0;
2557
}
2558

2559

2560
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
1,270✔
2561
  int32_t code = 0;
1,270✔
2562
  int32_t lino;
2563

2564
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
2,540!
2565

2566
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
2,540!
2567
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
2,540!
2568
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
2,540!
2569
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
2,540!
2570
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
2,540!
2571
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
2,540!
2572
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
2,540!
2573

2574
  int32_t calcDbSize = 0;
1,270✔
2575
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
1,270!
2576
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
1,270✔
2577
  if (pReq->calcDB == NULL) {
1,270!
2578
    TAOS_CHECK_EXIT(terrno);
×
2579
  }
2580
  for (int32_t i = 0; i < calcDbSize; ++i) {
2,530✔
2581
    char *calcDb = NULL;
1,260✔
2582
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
1,260!
2583
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
1,260!
2584
    if (calcDb == NULL) {
1,260!
2585
      TAOS_CHECK_EXIT(terrno);
×
2586
    }
2587
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
2,520!
2588
      taosMemoryFree(calcDb);
×
2589
      TAOS_CHECK_EXIT(terrno);
×
2590
    }
2591
  }
2592

2593
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
2,540!
2594
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
2,540!
2595
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
2,540!
2596
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
2,540!
2597
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
2,540!
2598
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
2,540!
2599
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
2,540!
2600
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
2,540!
2601
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
2,540!
2602
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
2,540!
2603

2604
  int32_t addrSize = 0;
1,270✔
2605
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
1,270!
2606
  if (addrSize > 0) {
1,270✔
2607
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
192✔
2608
    if (pReq->pNotifyAddrUrls == NULL) {
192!
2609
      TAOS_CHECK_EXIT(terrno);
×
2610
    }
2611
  }
2612
  for (int32_t i = 0; i < addrSize; ++i) {
1,477✔
2613
    char *url = NULL;
207✔
2614
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
207!
2615
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
207!
2616
    if (url == NULL) {
207!
2617
      TAOS_CHECK_EXIT(terrno);
×
2618
    }
2619
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
414!
2620
      taosMemoryFree(url);
×
2621
      TAOS_CHECK_EXIT(terrno);
×
2622
    }
2623
  }
2624
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
2,540!
2625
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->addOptions));
2,540!
2626
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
2,540!
2627

2628
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
2,540!
2629
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
2,540!
2630
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
2,540!
2631

2632
  int32_t outColSize = 0;
1,270✔
2633
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
1,270!
2634
  if (outColSize > 0) {
1,270✔
2635
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
1,260✔
2636
    if (pReq->outCols == NULL) {
1,260!
2637
      TAOS_CHECK_EXIT(terrno);
×
2638
    }
2639

2640
    for (int32_t i = 0; i < outColSize; ++i) {
5,880✔
2641
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
4,620✔
2642
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
4,620!
2643
    }
2644
  }
2645

2646
  int32_t outTagSize = 0;
1,270✔
2647
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
1,270!
2648
  if (outTagSize > 0) {
1,270✔
2649
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
543✔
2650
    if (pReq->outTags == NULL) {
543!
2651
      TAOS_CHECK_EXIT(terrno);
×
2652
    }
2653

2654
    for (int32_t i = 0; i < outTagSize; ++i) {
1,404✔
2655
      SFieldWithOptions field = {0};
861✔
2656
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
861!
2657
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
861!
2658
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
861!
2659
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
861!
2660
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
1,722!
2661
        TAOS_CHECK_EXIT(terrno);
×
2662
      }
2663
    }
2664
  }
2665

2666
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
2,540!
2667
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
2,540!
2668
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
2,540!
2669
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
2,540!
2670

2671
  switch (pReq->triggerType) {
1,270!
2672
    case WINDOW_TYPE_SESSION: {
42✔
2673
      // session trigger
2674
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
84!
2675
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
84!
2676
      break;
42✔
2677
    }
2678
      case WINDOW_TYPE_STATE: {
327✔
2679
        // state trigger
2680
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
654!
2681
        pReq->trigger.stateWin.extend = 0;
327✔
2682
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
654!
2683
        break;
327✔
2684
      }
2685
      case WINDOW_TYPE_INTERVAL: {
678✔
2686
        // slide trigger
2687
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
1,356!
2688
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
1,356!
2689
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
1,356!
2690
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
1,356!
2691
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
1,356!
2692
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
1,356!
2693
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
1,356!
2694
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
1,356!
2695
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
1,356!
2696
        break;
678✔
2697
      }
2698
      case WINDOW_TYPE_EVENT: {
92✔
2699
        // event trigger
2700
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
184!
2701
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
184!
2702
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
184!
2703
        break;
92✔
2704
      }
2705
      case WINDOW_TYPE_COUNT: {
65✔
2706
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
130!
2707

2708
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
130!
2709
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
130!
2710
        break;
65✔
2711
      }
2712
      case WINDOW_TYPE_PERIOD: {
66✔
2713
        // period trigger
2714
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
132!
2715
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
132!
2716
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
132!
2717
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
132!
2718
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
132!
2719
        break;
66✔
2720
      }
2721
      default:
×
2722
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2723
  }
2724

2725
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
2,540!
2726
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
2,540!
2727
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
2,540!
2728
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
2,540!
2729
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
2,540!
2730
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
2,540!
2731
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
2,540!
2732
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
2,540!
2733
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
2,540!
2734
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
2,540!
2735
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
2,540!
2736
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
2,540!
2737
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
2,540!
2738
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
2,540!
2739

2740
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
2,540!
2741
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
2,540!
2742

2743
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
2,540!
2744

2745
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
2,540!
2746
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
2,540!
2747

2748
  int32_t calcScanPlanListSize = 0;
1,270✔
2749
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
1,270!
2750
  if (calcScanPlanListSize > 0) {
1,270✔
2751
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
1,260✔
2752
    if (pReq->calcScanPlanList == NULL) {
1,260!
2753
      TAOS_CHECK_EXIT(terrno);
×
2754
    }
2755
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
3,115✔
2756
      SStreamCalcScan calcScan = {0};
1,855✔
2757
      int32_t         vgListSize = 0;
1,855✔
2758
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
1,855!
2759
      if (vgListSize > 0) {
1,855!
2760
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
1,855✔
2761
        if (calcScan.vgList == NULL) {
1,855!
2762
          TAOS_CHECK_EXIT(terrno);
×
2763
        }
2764
        for (int32_t j = 0; j < vgListSize; ++j) {
3,710✔
2765
          int32_t vgId = 0;
1,855✔
2766
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
1,855!
2767
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
3,710!
2768
            TAOS_CHECK_EXIT(terrno);
×
2769
          }
2770
        }
2771
      }
2772
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
1,855!
2773
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
1,855!
2774
      if (taosArrayPush(pReq->calcScanPlanList, &calcScan) == NULL) {
3,710!
2775
        TAOS_CHECK_EXIT(terrno);
×
2776
      }
2777
    }
2778
  }
2779

2780
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
2,540!
2781
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
2,540!
2782
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
2,540!
2783
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
2,540!
2784

2785
  int32_t forceOutColsSize = 0;
1,270✔
2786
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
1,270!
2787
  if (forceOutColsSize > 0) {
1,270✔
2788
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
24✔
2789
    if (pReq->forceOutCols == NULL) {
24!
2790
      TAOS_CHECK_EXIT(terrno);
×
2791
    }
2792
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
174✔
2793
      SStreamOutCol outCol = {0};
150✔
2794
      int64_t       exprLen = 0;
150✔
2795
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
150!
2796
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
150!
2797
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
150!
2798
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
150!
2799
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
150!
2800
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
300!
2801
        TAOS_CHECK_EXIT(terrno);
×
2802
      }
2803
    }
2804
  }
2805

2806
  switch (pReq->triggerType) {
1,270✔
2807
    case WINDOW_TYPE_STATE: {
327✔
2808
      // state trigger
2809
      if (!tDecodeIsEnd(pDecoder)) {
327!
2810
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.stateWin.expr, NULL));
654!
2811
      }
2812
      if (!tDecodeIsEnd(pDecoder)) {
327!
2813
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.extend));
654!
2814
      }
2815
      break;
327✔
2816
    }
2817
    case WINDOW_TYPE_INTERVAL: {
678✔
2818
      if (!tDecodeIsEnd(pDecoder)) {
678!
2819
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.overlap));
1,356!
2820
      }
2821
      break;
678✔
2822
    }
2823
    default:
265✔
2824
      break;
265✔
2825
  }
2826

2827

2828
_exit:
1,270✔
2829

2830
  return code;
1,270✔
2831
}
2832

2833

2834
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
819✔
2835
  SDecoder decoder = {0};
819✔
2836
  tDecoderInit(&decoder, buf, bufLen);
819✔
2837
  int32_t code = 0;
819✔
2838
  int32_t lino;
2839

2840
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
819!
2841
  
2842
  TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImpl(&decoder, pReq));
819!
2843

2844
  tEndDecode(&decoder);
819✔
2845

2846
_exit:
819✔
2847

2848
  tDecoderClear(&decoder);
819✔
2849
  return code;
819✔
2850
}
2851

2852

2853
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
×
2854
  int32_t  code = 0;
×
2855
  int32_t  lino;
2856
  int32_t  tlen;
2857
  SEncoder encoder = {0};
×
2858
  tEncoderInit(&encoder, buf, bufLen);
×
2859

2860
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2861

2862
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2863
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2864
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2865

2866
  tEndEncode(&encoder);
×
2867

2868
_exit:
×
2869
  if (code) {
×
2870
    tlen = code;
×
2871
  } else {
2872
    tlen = encoder.pos;
×
2873
  }
2874
  tEncoderClear(&encoder);
×
2875
  return tlen;
×
2876
}
2877

2878
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
30✔
2879
  SDecoder decoder = {0};
30✔
2880
  int32_t  code = 0;
30✔
2881
  int32_t  lino;
2882
  tDecoderInit(&decoder, buf, bufLen);
30✔
2883

2884
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
30!
2885
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
60!
2886
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
60!
2887

2888
  tEndDecode(&decoder);
30✔
2889

2890
_exit:
30✔
2891
  tDecoderClear(&decoder);
30✔
2892
  return code;
30✔
2893
}
2894

2895
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
30✔
2896
  taosMemoryFreeClear(pReq->name);
30!
2897
}
30✔
2898

2899
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
3,632✔
2900
  if (pScan == NULL) {
3,632!
2901
    return;
×
2902
  }
2903
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
3,632✔
2904
  taosArrayDestroy(pCalcScan->vgList);
3,632✔
2905
  taosMemoryFreeClear(pCalcScan->scanPlan);
3,632!
2906
}
2907

2908
void tFreeStreamOutCol(void* pCol) {
225✔
2909
  if (pCol == NULL) {
225!
2910
    return;
×
2911
  }
2912
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
225✔
2913
  taosMemoryFreeClear(pOutCol->expr);
225!
2914
}
2915

2916

2917

2918
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
3,451✔
2919
  if (NULL == pReq) {
3,451✔
2920
    return;
456✔
2921
  }
2922
  taosMemoryFreeClear(pReq->name);
2,995!
2923
  taosMemoryFreeClear(pReq->sql);
2,995!
2924
  taosMemoryFreeClear(pReq->streamDB);
2,995!
2925
  taosMemoryFreeClear(pReq->triggerDB);
2,995!
2926
  taosMemoryFreeClear(pReq->outDB);
2,995!
2927
  taosMemoryFreeClear(pReq->triggerTblName);
2,995!
2928
  taosMemoryFreeClear(pReq->outTblName);
2,995!
2929

2930
  taosArrayDestroyP(pReq->calcDB, NULL);
2,995✔
2931
  pReq->calcDB = NULL;
2,995✔
2932
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
2,995✔
2933
  pReq->pNotifyAddrUrls = NULL;
2,995✔
2934

2935
  taosMemoryFreeClear(pReq->triggerFilterCols);
2,995!
2936
  taosMemoryFreeClear(pReq->triggerCols);
2,995!
2937
  taosMemoryFreeClear(pReq->partitionCols);
2,995!
2938

2939
  taosArrayDestroy(pReq->outTags);
2,995✔
2940
  pReq->outTags = NULL;
2,995✔
2941
  taosArrayDestroy(pReq->outCols);
2,995✔
2942
  pReq->outCols = NULL;
2,995✔
2943

2944
  switch (pReq->triggerType) {
2,995✔
2945
    case WINDOW_TYPE_STATE:
538✔
2946
      taosMemoryFreeClear(pReq->trigger.stateWin.expr);
538!
2947
      break;
538✔
2948
    case WINDOW_TYPE_EVENT:
168✔
2949
      taosMemoryFreeClear(pReq->trigger.event.startCond);
168!
2950
      taosMemoryFreeClear(pReq->trigger.event.endCond);
168!
2951
      break;
168✔
2952
    default:
2,289✔
2953
      break;
2,289✔
2954
  }
2955

2956
  taosMemoryFreeClear(pReq->triggerScanPlan);
2,995!
2957
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
2,995✔
2958
  pReq->calcScanPlanList = NULL;
2,995✔
2959
  taosMemoryFreeClear(pReq->triggerPrevFilter);
2,995!
2960

2961
  taosMemoryFreeClear(pReq->calcPlan);
2,995!
2962
  taosMemoryFreeClear(pReq->subTblNameExpr);
2,995!
2963
  taosMemoryFreeClear(pReq->tagValueExpr);
2,995!
2964
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
2,995✔
2965
  pReq->forceOutCols = NULL;
2,995✔
2966
}
2967

2968
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
390✔
2969
  int32_t code = 0, lino = 0;
390✔
2970
  if (NULL == pSrc) {
390!
2971
    return code;
×
2972
  } 
2973

2974
  void* p = NULL;
390✔
2975
  int32_t num = 0;
390✔
2976
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
390!
2977
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
390!
2978

2979
  SCMCreateStreamReq* pDst = *ppDst;
390✔
2980

2981
  if (pSrc->outDB) {
390✔
2982
    pDst->outDB = COPY_STR(pSrc->outDB);
385!
2983
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
385!
2984
  }
2985
  
2986
  if (pSrc->triggerTblName) {
390✔
2987
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
388!
2988
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
388!
2989
  }
2990
  
2991
  if (pSrc->outTblName) {
390✔
2992
    pDst->outTblName = COPY_STR(pSrc->outTblName);
385!
2993
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
385!
2994
  }
2995
  
2996
  if (pSrc->pNotifyAddrUrls) {
390✔
2997
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
87✔
2998
    if (num > 0) {
87!
2999
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
87✔
3000
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
87!
3001
    }
3002
    for (int32_t i = 0; i < num; ++i) {
174✔
3003
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
87!
3004
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
87!
3005
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
174!
3006
    }
3007
  }
3008
  
3009
  if (pSrc->triggerFilterCols) {
390✔
3010
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
30!
3011
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
30!
3012
  }
3013
  
3014
  if (pSrc->triggerCols) {
390✔
3015
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
375!
3016
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
375!
3017
  }
3018
  
3019
  if (pSrc->partitionCols) {
390✔
3020
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
196!
3021
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
196!
3022
  }
3023
  
3024
  if (pSrc->outCols) {
390✔
3025
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
385✔
3026
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
385!
3027
  }
3028
  
3029
  if (pSrc->outTags) {
390✔
3030
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
196✔
3031
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
196!
3032
  }
3033

3034
  pDst->triggerType = pSrc->triggerType;
390✔
3035
  
3036
  switch (pSrc->triggerType) {
390✔
3037
    case WINDOW_TYPE_STATE:
145✔
3038
      pDst->trigger.stateWin.slotId = pSrc->trigger.stateWin.slotId;
145✔
3039
      pDst->trigger.stateWin.extend = pSrc->trigger.stateWin.extend;
145✔
3040
      pDst->trigger.stateWin.trueForDuration = pSrc->trigger.stateWin.trueForDuration;
145✔
3041
      if (pSrc->trigger.stateWin.expr) {
145!
3042
        pDst->trigger.stateWin.expr = COPY_STR(pSrc->trigger.stateWin.expr);
145!
3043
        TSDB_CHECK_NULL(pDst->trigger.stateWin.expr, code, lino, _exit, terrno);
145!
3044
      }
3045
      break;
145✔
3046
    case WINDOW_TYPE_EVENT:
40✔
3047
      if (pSrc->trigger.event.startCond) {
40!
3048
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
40!
3049
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
40!
3050
      }
3051
      
3052
      if (pSrc->trigger.event.endCond) {
40!
3053
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
40!
3054
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
40!
3055
      }
3056
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
40✔
3057
      break;
40✔
3058
    default:
205✔
3059
      pDst->trigger = pSrc->trigger;
205✔
3060
      break;
205✔
3061
  }
3062

3063

3064
  if (pSrc->triggerScanPlan) {
390✔
3065
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
388!
3066
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
388!
3067
  }
3068
  
3069
  if (pSrc->calcScanPlanList) {
390✔
3070
    num = taosArrayGetSize(pSrc->calcScanPlanList);
385✔
3071
    if (num > 0) {
385!
3072
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
385✔
3073
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
385!
3074
    }
3075
    for (int32_t i = 0; i < num; ++i) {
1,049✔
3076
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
664✔
3077
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
664✔
3078

3079
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
664✔
3080
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
664!
3081

3082
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
664!
3083
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
664!
3084
      
3085
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
1,328!
3086
    }
3087
  }
3088
  
3089
  if (pSrc->triggerPrevFilter) {
390✔
3090
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
10!
3091
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
10!
3092
  }
3093
  
3094
  if (pSrc->calcPlan) {
390✔
3095
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
385!
3096
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
385!
3097
  }
3098
  
3099
  if (pSrc->subTblNameExpr) {
390✔
3100
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
196!
3101
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
196!
3102
  }
3103
  
3104
  if (pSrc->tagValueExpr) {
390✔
3105
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
196!
3106
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
196!
3107
  }
3108
  
3109
  if (pSrc->forceOutCols) {
390✔
3110
    num = taosArrayGetSize(pSrc->forceOutCols);
12✔
3111
    if (num > 0) {
12!
3112
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
12✔
3113
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
12!
3114
    }
3115
    for (int32_t i = 0; i < num; ++i) {
87✔
3116
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
75✔
3117
      SStreamOutCol  dcol = {.type = scol->type};
75✔
3118

3119
      dcol.expr = COPY_STR(scol->expr);
75!
3120
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
75!
3121
      
3122
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
150!
3123
    }
3124
  }
3125

3126
  pDst->triggerTblUid = pSrc->triggerTblUid;
390✔
3127
  pDst->triggerTblType = pSrc->triggerTblType;
390✔
3128
  pDst->deleteReCalc = pSrc->deleteReCalc;
390✔
3129
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
390✔
3130
  
3131
_exit:
390✔
3132

3133
  if (code) {
390!
3134
    tFreeSCMCreateStreamReq(pDst);
×
3135
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
3136
  }
3137

3138
  return code;
390✔
3139
}
3140

3141

3142
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
×
3143
  int32_t  code = 0;
×
3144
  int32_t  lino;
3145
  int32_t  tlen;
3146
  SEncoder encoder = {0};
×
3147
  tEncoderInit(&encoder, buf, bufLen);
×
3148
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3149

3150
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
3151
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
3152
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
3153
  tEndEncode(&encoder);
×
3154

3155
_exit:
×
3156
  if (code) {
×
3157
    tlen = code;
×
3158
  } else {
3159
    tlen = encoder.pos;
×
3160
  }
3161
  tEncoderClear(&encoder);
×
3162
  return tlen;
×
3163
}
3164

3165
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
22✔
3166
  SDecoder decoder = {0};
22✔
3167
  int32_t  code = 0;
22✔
3168
  int32_t  lino;
3169

3170
  tDecoderInit(&decoder, buf, bufLen);
22✔
3171
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
22!
3172
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
44!
3173
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
44!
3174
  tEndDecode(&decoder);
22✔
3175

3176
_exit:
22✔
3177
  tDecoderClear(&decoder);
22✔
3178
  return code;
22✔
3179
}
3180

3181
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
×
3182
  taosMemoryFreeClear(pReq->name);
×
3183
}
×
3184

3185
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
×
3186
  SEncoder encoder = {0};
×
3187
  int32_t  code = 0;
×
3188
  int32_t  lino;
3189
  int32_t  tlen;
3190
  tEncoderInit(&encoder, buf, bufLen);
×
3191
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3192
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
3193
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
3194
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
3195
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
×
3196
  tEndEncode(&encoder);
×
3197

3198
_exit:
×
3199
  if (code) {
×
3200
    tlen = code;
×
3201
  } else {
3202
    tlen = encoder.pos;
×
3203
  }
3204
  tEncoderClear(&encoder);
×
3205
  return tlen;
×
3206
}
3207

3208
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
22✔
3209
  SDecoder decoder = {0};
22✔
3210
  int32_t  code = 0;
22✔
3211
  int32_t  lino;
3212

3213
  tDecoderInit(&decoder, buf, bufLen);
22✔
3214
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
22!
3215
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
44!
3216
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
44!
3217
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
44!
3218
  tEndDecode(&decoder);
22✔
3219

3220
_exit:
22✔
3221
  tDecoderClear(&decoder);
22✔
3222
  return code;
22✔
3223
}
3224

3225
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
×
3226
  taosMemoryFreeClear(pReq->name);
×
3227
}
×
3228

3229
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
×
3230
  SEncoder encoder = {0};
×
3231
  int32_t  code = 0;
×
3232
  int32_t  lino;
3233
  int32_t  tlen;
3234
  tEncoderInit(&encoder, buf, bufLen);
×
3235
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3236
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
3237
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
3238
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
×
3239
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
×
3240
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
×
3241
  tEndEncode(&encoder);
×
3242

3243
_exit:
×
3244
  if (code) {
×
3245
    tlen = code;
×
3246
  } else {
3247
    tlen = encoder.pos;
×
3248
  }
3249
  tEncoderClear(&encoder);
×
3250
  return tlen;
×
3251
}
3252

3253
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
38✔
3254
  SDecoder decoder = {0};
38✔
3255
  int32_t  code = 0;
38✔
3256
  int32_t  lino;
3257

3258
  tDecoderInit(&decoder, buf, bufLen);
38✔
3259
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
38!
3260

3261
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
76!
3262
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
76!
3263
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
76!
3264
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
76!
3265
  tEndDecode(&decoder);
38✔
3266

3267
_exit:
38✔
3268
  tDecoderClear(&decoder);
38✔
3269
  return code;
38✔
3270
}
3271

3272
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
38✔
3273
  taosMemoryFreeClear(pReq->name);
38!
3274
}
38✔
3275

3276
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
×
3277
  int32_t code = 0;
×
3278
  int32_t lino;
3279

3280
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
3281
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
×
3282
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
×
3283

3284
_exit:
×
3285
  return code;
×
3286
}
3287

3288
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
×
3289
  SEncoder encoder = {0};
×
3290
  int32_t  code = 0;
×
3291
  int32_t  lino;
3292
  int32_t  tlen;
3293
  tEncoderInit(&encoder, buf, bufLen);
×
3294

3295
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3296
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
×
3297

3298
  tEndEncode(&encoder);
×
3299

3300
_exit:
×
3301
  if (code) {
×
3302
    tlen = code;
×
3303
  } else {
3304
    tlen = encoder.pos;
×
3305
  }
3306
  tEncoderClear(&encoder);
×
3307
  return tlen;
×
3308
}
3309

3310
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
×
3311
  int32_t code = 0;
×
3312
  int32_t lino;
3313

3314
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
3315
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
×
3316
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
×
3317

3318
_exit:
×
3319
  return code;
×
3320
}
3321

3322
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
×
3323
  SDecoder decoder = {0};
×
3324
  int32_t  code = 0;
×
3325
  int32_t  lino;
3326

3327
  tDecoderInit(&decoder, (char *)buf, bufLen);
×
3328

3329
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3330
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
×
3331

3332
  tEndDecode(&decoder);
×
3333

3334
_exit:
×
3335
  tDecoderClear(&decoder);
×
3336
  return code;
×
3337
}
3338

3339
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
×
3340
  int32_t code = 0;
×
3341
  int32_t lino;
3342

3343
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
3344
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
×
3345
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
×
3346
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
×
3347

3348
_exit:
×
3349
  return code;
×
3350
}
3351

3352
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
×
3353
  SEncoder encoder = {0};
×
3354
  int32_t  code = 0;
×
3355
  int32_t  lino;
3356
  int32_t  tlen;
3357
  tEncoderInit(&encoder, buf, bufLen);
×
3358

3359
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3360
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
×
3361

3362
  tEndEncode(&encoder);
×
3363

3364
_exit:
×
3365
  if (code) {
×
3366
    tlen = code;
×
3367
  } else {
3368
    tlen = encoder.pos;
×
3369
  }
3370
  tEncoderClear(&encoder);
×
3371
  return tlen;
×
3372
}
3373

3374
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
×
3375
  int32_t code = 0;
×
3376
  int32_t lino;
3377

3378
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
3379
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
×
3380
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
×
3381
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
×
3382

3383
_exit:
×
3384
  return code;
×
3385
}
3386

3387
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
×
3388
  SDecoder decoder = {0};
×
3389
  int32_t  code = 0;
×
3390
  int32_t  lino;
3391

3392
  tDecoderInit(&decoder, buf, bufLen);
×
3393

3394
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3395
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
×
3396

3397
  tEndDecode(&decoder);
×
3398

3399
_exit:
×
3400
  tDecoderClear(&decoder);
×
3401
  return code;
×
3402
}
3403

3404
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
480✔
3405
  SEncoder encoder = {0};
480✔
3406
  int32_t  code = TSDB_CODE_SUCCESS;
480✔
3407
  int32_t  lino = 0;
480✔
3408
  int32_t  tlen = 0;
480✔
3409

3410
  tEncoderInit(&encoder, buf, bufLen);
480✔
3411
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
480!
3412

3413
  int32_t size = taosArrayGetSize(pRsp->cols);
479✔
3414
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
479!
3415
  for (int32_t i = 0; i < size; ++i) {
1,637✔
3416
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
1,158✔
3417
    if (oInfo == NULL) {
1,158!
3418
      uError("col id is NULL at index %d", i);
×
3419
      code = TSDB_CODE_INVALID_PARA;
×
3420
      goto _exit;
×
3421
    }
3422
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
2,316!
3423
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
2,316!
3424
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
2,316!
3425
  }
3426

3427
  tEndEncode(&encoder);
479✔
3428

3429
_exit:
480✔
3430
  if (code != TSDB_CODE_SUCCESS) {
480!
3431
    tlen = code;
×
3432
  } else {
3433
    tlen = encoder.pos;
480✔
3434
  }
3435
  tEncoderClear(&encoder);
480✔
3436
  return tlen;
480✔
3437
}
3438

3439
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
239✔
3440
  SDecoder decoder = {0};
239✔
3441
  int32_t  code = TSDB_CODE_SUCCESS;
239✔
3442
  int32_t  lino = 0;
239✔
3443

3444
  tDecoderInit(&decoder, buf, bufLen);
239✔
3445
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
239!
3446

3447
  int32_t size = 0;
240✔
3448
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
240!
3449
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
240✔
3450
  if (pRsp->cols == NULL) {
240✔
3451
    code = terrno;
1✔
3452
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3453
    goto _exit;
×
3454
  }
3455
  for (int32_t i = 0; i < size; ++i) {
818✔
3456
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
578✔
3457
    if (oInfo == NULL) {
578!
3458
      code = terrno;
×
3459
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3460
      goto _exit;
×
3461
    }
3462
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
1,156!
3463
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
1,157!
3464
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
1,158!
3465
  }
3466

3467
  tEndDecode(&decoder);
240✔
3468

3469
_exit:
240✔
3470
  tDecoderClear(&decoder);
240✔
3471
  return code;
240✔
3472
}
3473

3474
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
32,918✔
3475
  taosArrayDestroy(pRsp->cols);
32,918✔
3476
}
32,918✔
3477

3478
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
65,299✔
3479
  if (pReq == NULL) return;
65,299!
3480
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA_NEW || pReq->base.type == STRIGGER_PULL_WAL_CALC_DATA_NEW) {
73,698✔
3481
    SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
8,387✔
3482
    taosArrayDestroy(pRequest->versions);
8,387✔
3483
    tSimpleHashCleanup(pRequest->ranges);
8,399✔
3484
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
56,912✔
3485
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
823✔
3486
    if (pRequest->cids != NULL) {
823!
3487
      taosArrayDestroy(pRequest->cids);
823✔
3488
      pRequest->cids = NULL;
823✔
3489
    }
3490
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
56,089✔
3491
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
791✔
3492
    if (pRequest->cids != NULL) {
791!
3493
      taosArrayDestroy(pRequest->cids);
791✔
3494
      pRequest->cids = NULL;
791✔
3495
    }
3496
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
55,298✔
3497
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
1,387✔
3498
    if (pRequest->cids != NULL) {
1,387!
3499
      taosArrayDestroy(pRequest->cids);
1,387✔
3500
      pRequest->cids = NULL;
1,387✔
3501
    }
3502
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
53,911✔
3503
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
240✔
3504
    if (pRequest->cols != NULL) {
240!
3505
      taosArrayDestroy(pRequest->cols);
240✔
3506
      pRequest->cols = NULL;
240✔
3507
    }
3508
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
53,671✔
3509
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
240✔
3510
    tSimpleHashCleanup(pRequest->uidInfoTrigger);
240✔
3511
    tSimpleHashCleanup(pRequest->uidInfoCalc);
240✔
3512
  }
3513
}
3514

3515
int32_t encodeColsArray(SEncoder* encoder, SArray* cids) {
6,002✔
3516
  int32_t  code = TSDB_CODE_SUCCESS;
6,002✔
3517
  int32_t  lino = 0;
6,002✔
3518
  int32_t size = taosArrayGetSize(cids);
6,002✔
3519
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
6,001!
3520
  for (int32_t i = 0; i < size; ++i) {
18,808✔
3521
    col_id_t* pColId = taosArrayGet(cids, i);
12,808✔
3522
    if (pColId == NULL) {
12,808✔
3523
      uError("col id is NULL at index %d", i);
1!
3524
      code = TSDB_CODE_INVALID_PARA;
×
3525
      goto _exit;
×
3526
    }
3527
    TAOS_CHECK_EXIT(tEncodeI16(encoder, *pColId));
25,614!
3528
  }
3529
  _exit:
6,000✔
3530

3531
  return code;
6,000✔
3532
}
3533

3534
int32_t decodeColsArray(SDecoder* decoder, SArray** cids) {
3,001✔
3535
  int32_t code = TSDB_CODE_SUCCESS;
3,001✔
3536
  int32_t lino = 0;
3,001✔
3537
  int32_t size = 0;
3,001✔
3538

3539
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
3,001!
3540
  if (size > 0){
3,001!
3541
    *cids = taosArrayInit(size, sizeof(col_id_t));
3,001✔
3542
    if (*cids == NULL) {
3,001!
3543
      code = terrno;
×
3544
      uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3545
      goto _exit;
×
3546
    }
3547
  
3548
    for (int32_t i = 0; i < size; ++i) {
9,406✔
3549
      col_id_t* pColId = taosArrayReserve(*cids, 1);
6,405✔
3550
      if (pColId == NULL) {
6,405!
3551
        code = terrno;
×
3552
        uError("failed to reserve memory for col id at index %d, errno: %d", i, code);
×
3553
        goto _exit;
×
3554
      }
3555
      TAOS_CHECK_RETURN(tDecodeI16(decoder, pColId));
6,405!
3556
    }  
3557
  }
3558
  
3559
_exit:
3,001✔
3560
  if (code != TSDB_CODE_SUCCESS) {
3,001!
3561
    taosArrayDestroy(*cids);
×
3562
    *cids = NULL;
×
3563
  }
3564
  return code;
3,001✔
3565
}
3566

3567
static int32_t encodeSetTableMapInfo(SEncoder* encoder, SSHashObj* pInfo) {
959✔
3568
  int32_t  code = TSDB_CODE_SUCCESS;
959✔
3569
  int32_t  lino = 0;
959✔
3570
  int32_t size = tSimpleHashGetSize(pInfo);
959✔
3571
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
959!
3572
  int32_t iter = 0;
959✔
3573
  void*   px = tSimpleHashIterate(pInfo, NULL, &iter);
959✔
3574
  while (px != NULL) {
4,320✔
3575
    int64_t* uid = tSimpleHashGetKey(px, NULL);
3,360✔
3576
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *uid));
6,720!
3577
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *(uid + 1)));
6,720!
3578
    SSHashObj* info = *(SSHashObj**)px;
3,360✔
3579
    int32_t len = tSimpleHashGetSize(info);
3,360✔
3580
    TAOS_CHECK_EXIT(tEncodeI32(encoder, len));
3,360!
3581
    int32_t iter1 = 0;
3,360✔
3582
    void*   px1 = tSimpleHashIterate(info, NULL, &iter1);
3,360✔
3583
    while (px1 != NULL) {
12,786✔
3584
      int16_t* slot = tSimpleHashGetKey(px1, NULL);
9,426✔
3585
      int16_t* cid = (int16_t*)px1;
9,426✔
3586
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *slot));
18,852!
3587
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *cid));
18,852!
3588

3589
      px1 = tSimpleHashIterate(info, px1, &iter1);
9,426✔
3590
    }
3591

3592
    px = tSimpleHashIterate(pInfo, px, &iter);
3,360✔
3593
  }
3594
  
3595
_exit:
960✔
3596
  return code;
960✔
3597
}
3598

3599
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
131,370✔
3600
  SEncoder encoder = {0};
131,370✔
3601
  int32_t  code = TSDB_CODE_SUCCESS;
131,370✔
3602
  int32_t  lino = 0;
131,370✔
3603
  int32_t  tlen = 0;
131,370✔
3604

3605
  tEncoderInit(&encoder, buf, bufLen);
131,370✔
3606
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
131,339!
3607

3608
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
262,662!
3609
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
262,662!
3610
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
262,662!
3611
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
262,662!
3612

3613
  switch (pReq->type) {
131,331!
3614
    case STRIGGER_PULL_SET_TABLE: {
480✔
3615
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
480✔
3616
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoTrigger));
480!
3617
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoCalc));
480!
3618
      break;
480✔
3619
    }
3620
    case STRIGGER_PULL_LAST_TS: {
1,194✔
3621
      break;
1,194✔
3622
    }
3623
    case STRIGGER_PULL_FIRST_TS: {
962✔
3624
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
962✔
3625
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
1,924!
3626
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
1,924!
3627
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,924!
3628
      break;
962✔
3629
    }
3630
    case STRIGGER_PULL_TSDB_META: {
1,984✔
3631
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
1,984✔
3632
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
3,968!
3633
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
3,968!
3634
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
3,968!
3635
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
3,968!
3636
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3,968!
3637
      break;
1,984✔
3638
    }
3639
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3640
      break;
×
3641
    }
3642
    case STRIGGER_PULL_TSDB_TS_DATA: {
88✔
3643
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
88✔
3644
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
176!
3645
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
176!
3646
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
176!
3647
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
176!
3648
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
176!
3649
      break;
88✔
3650
    }
3651
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
282✔
3652
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
282✔
3653
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
564!
3654
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
564!
3655
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
564!
3656
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
564!
3657
      break;
282✔
3658
    }
3659
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
304✔
3660
      break;
304✔
3661
    }
3662
    case STRIGGER_PULL_TSDB_CALC_DATA: {
57,579✔
3663
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
57,579✔
3664
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
115,158!
3665
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
115,158!
3666
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
115,158!
3667
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
115,158!
3668
      break;
57,579✔
3669
    }
3670
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3671
      break;
×
3672
    }
3673
    case STRIGGER_PULL_TSDB_DATA: {
1,646✔
3674
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
1,646✔
3675
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
3,292!
3676
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
3,292!
3677
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
3,292!
3678
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
3,292!
3679
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
1,646!
3680
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
3,292!
3681
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3,292!
3682
      break;
1,646✔
3683
    }
3684
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3685
      break;
×
3686
    }
3687
    case STRIGGER_PULL_WAL_META_NEW: {
28,036✔
3688
      SSTriggerWalMetaNewRequest* pRequest = (SSTriggerWalMetaNewRequest*)pReq;
28,036✔
3689
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
56,072!
3690
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
56,072!
3691
      break;
28,036✔
3692
    }
3693
    case STRIGGER_PULL_WAL_DATA_NEW:
16,799✔
3694
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3695
      SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
16,799✔
3696
      int32_t                     nVersion = taosArrayGetSize(pRequest->versions);
16,799✔
3697
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nVersion));
16,799!
3698
      for (int32_t i = 0; i < nVersion; i++) {
46,938✔
3699
        int64_t ver = *(int64_t*)TARRAY_GET_ELEM(pRequest->versions, i);
30,139✔
3700
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, ver));
30,139!
3701
      }
3702
      int32_t nRanges = tSimpleHashGetSize(pRequest->ranges);
16,799✔
3703
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nRanges));
16,796!
3704
      int32_t iter = 0;
16,796✔
3705
      void*   px = tSimpleHashIterate(pRequest->ranges, NULL, &iter);
16,796✔
3706
      while (px != NULL) {
23,374✔
3707
        uint64_t* gid = tSimpleHashGetKey(px, NULL);
6,584✔
3708
        TAOS_CHECK_EXIT(tEncodeU64(&encoder, *gid));
13,168!
3709
        int64_t* key = (int64_t*)px;
6,584✔
3710
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[0]));
13,168!
3711
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[1]));
13,168!
3712

3713
        px = tSimpleHashIterate(pRequest->ranges, px, &iter);
6,584✔
3714
      }
3715
      break;
16,790✔
3716
    }
3717
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
15,653✔
3718
      SSTriggerWalMetaDataNewRequest* pRequest = (SSTriggerWalMetaDataNewRequest*)pReq;
15,653✔
3719
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
31,306!
3720
      break;
15,653✔
3721
    }
3722
    case STRIGGER_PULL_GROUP_COL_VALUE: {
1,516✔
3723
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
1,516✔
3724
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
3,032!
3725
      break;
1,516✔
3726
    }
3727
    case STRIGGER_PULL_VTABLE_INFO: {
1,579✔
3728
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
1,579✔
3729
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
1,579!
3730
      break;
1,578✔
3731
    }
3732
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
2,776✔
3733
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
2,776✔
3734
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
5,552!
3735
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
2,776!
3736
      break;
2,776✔
3737
    }
3738
    case STRIGGER_PULL_OTABLE_INFO: {
478✔
3739
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
478✔
3740
      int32_t size = taosArrayGetSize(pRequest->cols);
478✔
3741
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
479!
3742
      for (int32_t i = 0; i < size; ++i) {
1,636✔
3743
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
1,156✔
3744
        if (oInfo == NULL) {
1,157!
3745
          uError("col id is NULL at index %d", i);
×
3746
          code = TSDB_CODE_INVALID_PARA;
×
3747
          goto _exit;
×
3748
        }
3749
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
2,314!
3750
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
2,314!
3751
      }
3752
      break; 
480✔
3753
    }
3754
    default: {
×
3755
      uError("unknown pull type %d", pReq->type);
×
3756
      code = TSDB_CODE_INVALID_PARA;
×
3757
      break;
×
3758
    }
3759
  }
3760

3761
  tEndEncode(&encoder);
131,348✔
3762

3763
_exit:
131,350✔
3764
  if (code != TSDB_CODE_SUCCESS) {
131,350!
3765
    tlen = code;
×
3766
  } else {
3767
    tlen = encoder.pos;
131,350✔
3768
  }
3769
  tEncoderClear(&encoder);
131,350✔
3770
  return tlen;
131,357✔
3771
}
3772

3773
static void destroyHash(void* data){
1,680✔
3774
  if (data){
1,680!
3775
    SSHashObj* tmp = *(SSHashObj**)data;
1,680✔
3776
    tSimpleHashCleanup(tmp);
1,680✔
3777
  }
3778
}
1,680✔
3779

3780
static int32_t decodeSetTableMapInfo(SDecoder* decoder, SSHashObj** ppInfo) {
480✔
3781
  int32_t  code = TSDB_CODE_SUCCESS;
480✔
3782
  int32_t  lino = 0;
480✔
3783
  int32_t size = 0;
480✔
3784
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
480!
3785
  *ppInfo = tSimpleHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
480✔
3786
  if (*ppInfo == NULL) {
480!
3787
    TAOS_CHECK_EXIT(terrno);
×
3788
  }
3789
  tSimpleHashSetFreeFp(*ppInfo, destroyHash);
480✔
3790
  
3791
  for (int32_t i = 0; i < size; ++i) {
2,161✔
3792
    int64_t id[2] = {0};
1,680✔
3793
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id));
1,680!
3794
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id+1));
3,360!
3795
    int32_t len = 0;
1,680✔
3796
    TAOS_CHECK_EXIT(tDecodeI32(decoder, &len));
1,680!
3797
    SSHashObj* tmp = tSimpleHashInit(len, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
1,680✔
3798
    if (tmp == NULL) {
1,680!
3799
      TAOS_CHECK_EXIT(terrno);
×
3800
    }
3801
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppInfo, id, sizeof(id), &tmp, POINTER_BYTES));
1,680!
3802

3803
    for (int32_t j = 0; j < len; ++j) {
6,393✔
3804
      int16_t slotId = 0;
4,712✔
3805
      int16_t cid = 0;
4,712✔
3806
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &slotId));
4,712!
3807
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &cid));
4,713!
3808
      TAOS_CHECK_EXIT(tSimpleHashPut(tmp, &slotId, sizeof(slotId), &cid, sizeof(cid)));
4,713!
3809
    }
3810
  }
3811
_exit:
481✔
3812
  if (code != TSDB_CODE_SUCCESS) {
481!
3813
    tSimpleHashCleanup(*ppInfo);
×
3814
    *ppInfo = NULL;
×
3815
  }
3816
  return code;
481✔
3817
}
3818

3819
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
65,302✔
3820
  SDecoder decoder = {0};
65,302✔
3821
  int32_t  code = TSDB_CODE_SUCCESS;
65,302✔
3822
  int32_t  lino = 0;
65,302✔
3823

3824
  tDecoderInit(&decoder, buf, bufLen);
65,302✔
3825
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
65,285!
3826

3827
  int32_t type = 0;
65,301✔
3828
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
65,300!
3829
  SSTriggerPullRequest* pBase = &(pReq->base);
65,300✔
3830
  pBase->type = type;
65,300✔
3831
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
130,594!
3832
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
130,577!
3833
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
130,563!
3834

3835
  switch (type) {
65,280!
3836
    case STRIGGER_PULL_SET_TABLE: {
240✔
3837
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
240✔
3838
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoTrigger));
240!
3839
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoCalc));
240!
3840
      break;
240✔
3841
    }
3842
    case STRIGGER_PULL_LAST_TS: {
597✔
3843
      break;
597✔
3844
    }
3845
    case STRIGGER_PULL_FIRST_TS: {
472✔
3846
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
472✔
3847
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
944!
3848
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
944!
3849
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
944!
3850
      break;
472✔
3851
    }
3852
    case STRIGGER_PULL_TSDB_META: {
992✔
3853
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
992✔
3854
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
1,984!
3855
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
1,984!
3856
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,984!
3857
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,984!
3858
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,984!
3859
      break;
992✔
3860
    }
3861
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3862
      break;
×
3863
    }
3864
    case STRIGGER_PULL_TSDB_TS_DATA: {
44✔
3865
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
44✔
3866
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
88!
3867
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
88!
3868
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
88!
3869
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
88!
3870
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
88!
3871
      break;
44✔
3872
    }
3873
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
141✔
3874
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
141✔
3875
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
282!
3876
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
282!
3877
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
282!
3878
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
282!
3879
      break;
141✔
3880
    }
3881
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
152✔
3882
      break;
152✔
3883
    }
3884
    case STRIGGER_PULL_TSDB_CALC_DATA: {
28,786✔
3885
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
28,786✔
3886
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
57,572!
3887
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
57,572!
3888
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
57,571!
3889
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
57,570!
3890
      break;
28,785✔
3891
    }
3892
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3893
      break;
×
3894
    }
3895
    case STRIGGER_PULL_TSDB_DATA: {
823✔
3896
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
823✔
3897
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
1,646!
3898
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
1,646!
3899
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
1,646!
3900
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
1,646!
3901
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
823!
3902
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,646!
3903
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,646!
3904
      break;
823✔
3905
    }
3906
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3907
      break;
×
3908
    }
3909
    case STRIGGER_PULL_WAL_META_NEW: {
13,793✔
3910
      SSTriggerWalMetaNewRequest* pRequest = &(pReq->walMetaNewReq);
13,793✔
3911
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
27,586!
3912
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
27,587!
3913
      break;
13,794✔
3914
    }
3915
    case STRIGGER_PULL_WAL_DATA_NEW:
8,395✔
3916
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3917
      SSTriggerWalDataNewRequest* pRequest = &(pReq->walDataNewReq);
8,395✔
3918
      int32_t                     nVersion = 0;
8,395✔
3919
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nVersion));
8,392!
3920
      pRequest->versions = taosArrayInit_s(sizeof(int64_t), nVersion);
8,392✔
3921
      for (int32_t i = 0; i < nVersion; i++) {
23,471✔
3922
        int64_t* pVer = TARRAY_GET_ELEM(pRequest->versions, i);
15,074✔
3923
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, pVer));
15,074!
3924
      }
3925
      int32_t nRanges = 0;
8,397✔
3926
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges));
8,396!
3927
      pRequest->ranges = tSimpleHashInit(nRanges, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
8,396✔
3928
      if (pRequest->ranges == NULL) {
8,396!
3929
        TAOS_CHECK_EXIT(terrno);
×
3930
      }
3931
      for (int32_t i = 0; i < nRanges; i++) {
11,688✔
3932
        uint64_t gid = 0;
3,292✔
3933
        int64_t pRange[2] = {0};
3,292✔
3934
        TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
3,292!
3935
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[0]));
3,292!
3936
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[1]));
3,292!
3937
        TAOS_CHECK_EXIT(tSimpleHashPut(pRequest->ranges, &gid, sizeof(gid), pRange, sizeof(pRange)));
3,292!
3938
      }
3939
      break;
8,396✔
3940
    }
3941
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
7,661✔
3942
      SSTriggerWalMetaDataNewRequest* pRequest = &(pReq->walMetaDataNewReq);
7,661✔
3943
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
15,320!
3944
      break;
7,659✔
3945
    }
3946
    case STRIGGER_PULL_GROUP_COL_VALUE: {
758✔
3947
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
758✔
3948
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,516!
3949
      break;
758✔
3950
    }
3951
    case STRIGGER_PULL_VTABLE_INFO: {
791✔
3952
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
791✔
3953
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
791!
3954
      break;
791✔
3955
    }
3956
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
1,387✔
3957
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
1,387✔
3958
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
2,774!
3959
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
1,387!
3960
      break;
1,387✔
3961
    }
3962
    case STRIGGER_PULL_OTABLE_INFO: {
240✔
3963
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
240✔
3964
      int32_t size = 0;
240✔
3965
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
240!
3966
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
240✔
3967
      if (pRequest->cols == NULL) {
240!
3968
        code = terrno;
×
3969
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3970
        goto _exit;
×
3971
      }
3972
      for (int32_t i = 0; i < size; ++i) {
819✔
3973
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
579✔
3974
        if (oInfo == NULL) {
579!
3975
          code = terrno;
×
3976
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3977
          goto _exit;
×
3978
        }
3979
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
579!
3980
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
579!
3981
      }
3982
      break;
240✔
3983
    }
3984
    default: {
8✔
3985
      uError("unknown pull type %d", type);
8!
3986
      code = TSDB_CODE_INVALID_PARA;
×
3987
      break;
×
3988
    }
3989
  }
3990

3991
  tEndDecode(&decoder);
65,271✔
3992

3993
_exit:
65,262✔
3994
  tDecoderClear(&decoder);
65,262✔
3995
  return code;
65,287✔
3996
}
3997

3998
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo, bool full) {
96,053✔
3999
  int32_t size = full ? taosArrayGetSize(pParams) : 0;
96,053✔
4000
  int32_t code = 0;
96,054✔
4001
  int32_t lino = 0;
96,054✔
4002
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
96,054!
4003
  for (int32_t i = 0; i < size; ++i) {
11,222,759✔
4004
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
11,128,885✔
4005
    if (param == NULL) {
11,132,987✔
4006
      TAOS_CHECK_EXIT(terrno);
6,282!
4007
    }
4008
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
11,126,705✔
4009
    if (pEncoder->data) {
11,126,705✔
4010
      TAOS_MEMCPY(pEncoder->data + pEncoder->pos, param, plainFieldSize);
5,661,178✔
4011
    }
4012
    pEncoder->pos += plainFieldSize;
11,126,705✔
4013

4014
    if (!ignoreNotificationInfo) {
11,126,705✔
4015
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
369,634!
4016
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
184,817✔
4017
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
369,634!
4018
    }
4019
  }
4020
_exit:
93,874✔
4021
  return code;
93,874✔
4022
}
4023

4024
void tDestroySSTriggerCalcParam(void* ptr) {
7,386,182✔
4025
  SSTriggerCalcParam* pParam = ptr;
7,386,182✔
4026
  if (pParam && pParam->extraNotifyContent != NULL) {
7,386,182✔
4027
    taosMemoryFreeClear(pParam->extraNotifyContent);
361!
4028
  }
4029
  if (pParam && pParam->resultNotifyContent != NULL) {
7,386,182!
4030
    taosMemoryFreeClear(pParam->resultNotifyContent);
×
4031
  }
4032
}
7,386,182✔
4033

4034
void tDestroySStreamGroupValue(void* ptr) {
49,391✔
4035
  SStreamGroupValue* pValue = ptr;
49,391✔
4036
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
49,391!
4037
    taosMemoryFreeClear(pValue->data.pData);
46,176!
4038
    pValue->data.nData = 0;
46,180✔
4039
  }
4040
}
49,395✔
4041

4042
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
48,023✔
4043
  int32_t size = 0, code = 0, lino = 0;
48,023✔
4044
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
48,024!
4045
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
48,024✔
4046
  if (*ppParams == NULL) {
48,022!
4047
    TAOS_CHECK_EXIT(terrno);
×
4048
  }
4049
  for (int32_t i = 0; i < size; ++i) {
5,275,788✔
4050
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
5,228,144✔
4051
    if (param == NULL) {
5,226,663!
4052
      TAOS_CHECK_EXIT(terrno);
×
4053
    }
4054
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
5,227,766✔
4055
    TAOS_MEMCPY(param, pDecoder->data + pDecoder->pos, plainFieldSize);
5,227,766✔
4056
    pDecoder->pos += plainFieldSize;
5,227,766✔
4057

4058
    if (!ignoreNotificationInfo) {
5,227,766✔
4059
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
184,820!
4060
      uint64_t len = 0;
92,410✔
4061
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
184,820!
4062
    }
4063
  }
4064

4065
_exit:
47,644✔
4066
  return code;
47,644✔
4067
}
4068

4069
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
97,568✔
4070
  int32_t code = TSDB_CODE_SUCCESS;
97,568✔
4071
  int32_t lino = 0;
97,568✔
4072

4073
  int32_t size = taosArrayGetSize(pGroupColVals);
97,568✔
4074
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
97,567!
4075
  for (int32_t i = 0; i < size; ++i) {
168,326✔
4076
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
70,764✔
4077
    if (pValue == NULL) {
70,762!
4078
      TAOS_CHECK_EXIT(terrno);
×
4079
    }
4080
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
70,762!
4081
    if (pValue->isNull) {
70,764!
4082
      continue;
×
4083
    }
4084
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
70,764!
4085
    if (pValue->isTbname) {
70,759✔
4086
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
67,510!
4087
      if (vgId != -1) { pValue->vgId = vgId; }
33,755✔
4088
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
67,510!
4089
    }
4090
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
141,518!
4091
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
70,759!
4092
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
131,370!
4093
    } else {
4094
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
10,148!
4095
    }
4096
  }
4097

4098
_exit:
97,562✔
4099
  return code;
97,562✔
4100
}
4101

4102
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
48,783✔
4103
  int32_t code = TSDB_CODE_SUCCESS;
48,783✔
4104
  int32_t lino = 0;
48,783✔
4105
  int32_t size = 0;
48,783✔
4106

4107
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
48,783!
4108
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
48,783✔
4109
  if (size > 0) {
48,781✔
4110
    if (*ppGroupColVals == NULL) {
21,827✔
4111
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
21,069✔
4112
      if (*ppGroupColVals == NULL) {
21,069!
4113
        TAOS_CHECK_EXIT(terrno);
×
4114
      }
4115
    } else {
4116
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
758!
4117
    }
4118
  }
4119
  for (int32_t i = 0; i < size; ++i) {
84,166✔
4120
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
35,381✔
4121
    if (pValue == NULL) {
35,382!
4122
      TAOS_CHECK_EXIT(terrno);
×
4123
    }
4124
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
35,382!
4125
    if (pValue->isNull) {
35,382!
4126
      continue;
×
4127
    }
4128
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
35,382!
4129
    if (pValue->isTbname) {
35,383✔
4130
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
33,752!
4131
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
33,752!
4132
    }
4133
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
70,766!
4134
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
35,383!
4135
      uint64_t len = 0;
32,845✔
4136
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
65,692!
4137
      pValue->data.nData = len;
32,847✔
4138
    } else {
4139
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
5,076!
4140
    }
4141
  }
4142
_exit:
48,785✔
4143
  return code;
48,785✔
4144
}
4145

4146
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
1,516✔
4147
  SEncoder encoder = {0};
1,516✔
4148
  int32_t  code = TSDB_CODE_SUCCESS;
1,516✔
4149
  int32_t  lino = 0;
1,516✔
4150
  int32_t  tlen = 0;
1,516✔
4151

4152
  tEncoderInit(&encoder, buf, bufLen);
1,516✔
4153
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,516!
4154

4155
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
1,516!
4156

4157
  tEndEncode(&encoder);
1,516✔
4158

4159
_exit:
1,516✔
4160
  if (code != TSDB_CODE_SUCCESS) {
1,516!
4161
    tlen = code;
×
4162
  } else {
4163
    tlen = encoder.pos;
1,516✔
4164
  }
4165
  tEncoderClear(&encoder);
1,516✔
4166
  return tlen;
1,516✔
4167
}
4168

4169
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
757✔
4170
  SDecoder decoder = {0};
757✔
4171
  int32_t  code = TSDB_CODE_SUCCESS;
757✔
4172
  int32_t  lino = 0;
757✔
4173
  int32_t  size = 0;
757✔
4174

4175
  tDecoderInit(&decoder, buf, bufLen);
757✔
4176
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
757!
4177

4178
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
758!
4179

4180
  tEndDecode(&decoder);
758✔
4181

4182
_exit:
758✔
4183
  tDecoderClear(&decoder);
758✔
4184
  return code;
758✔
4185
}
4186

4187
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
37,897✔
4188
  SEncoder encoder = {0};
37,897✔
4189
  int32_t  code = TSDB_CODE_SUCCESS;
37,897✔
4190
  int32_t  lino = 0;
37,897✔
4191
  int32_t  tlen = 0;
37,897✔
4192

4193
  tEncoderInit(&encoder, buf, bufLen);
37,897✔
4194
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
37,896!
4195

4196
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
75,792!
4197
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
75,792!
4198
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
75,792!
4199
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
75,792!
4200
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
75,792!
4201

4202
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false, true));
37,896!
4203
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
37,896!
4204
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
75,790!
4205

4206
  tEndEncode(&encoder);
37,895✔
4207

4208
_exit:
37,897✔
4209
  if (code != TSDB_CODE_SUCCESS) {
37,897!
4210
    tlen = code;
×
4211
  } else {
4212
    tlen = encoder.pos;
37,897✔
4213
  }
4214
  tEncoderClear(&encoder);
37,897✔
4215
  return tlen;
37,897✔
4216
}
4217

4218
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
18,947✔
4219
  SDecoder decoder = {0};
18,947✔
4220
  int32_t  code = TSDB_CODE_SUCCESS;
18,947✔
4221
  int32_t  lino = 0;
18,947✔
4222

4223
  tDecoderInit(&decoder, buf, bufLen);
18,947✔
4224
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
18,947!
4225

4226
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
37,894!
4227
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
37,894!
4228
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
37,894!
4229
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
37,894!
4230
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
37,894!
4231

4232
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
18,947!
4233
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
18,947!
4234
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
37,894!
4235

4236
  tEndDecode(&decoder);
18,947✔
4237

4238
_exit:
18,947✔
4239
  tDecoderClear(&decoder);
18,947✔
4240
  return code;
18,947✔
4241
}
4242

4243
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
25,027✔
4244
  if (pReq != NULL) {
25,027!
4245
    if (pReq->params != NULL) {
25,027✔
4246
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
19,502✔
4247
      pReq->params = NULL;
19,502✔
4248
    }
4249
    if (pReq->groupColVals != NULL) {
25,027✔
4250
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
1,590✔
4251
      pReq->groupColVals = NULL;
1,590✔
4252
    }
4253
    blockDataDestroy(pReq->pOutBlock);
25,027✔
4254
  }
4255
}
25,027✔
4256

4257
int32_t tSerializeSTriggerDropTableRequest(void* buf, int32_t bufLen, const SSTriggerDropRequest* pReq) {
2✔
4258
  SEncoder encoder = {0};
2✔
4259
  int32_t  code = TSDB_CODE_SUCCESS;
2✔
4260
  int32_t  lino = 0;
2✔
4261
  int32_t  tlen = 0;
2✔
4262

4263
  tEncoderInit(&encoder, buf, bufLen);
2✔
4264
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2!
4265

4266
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
4!
4267
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
4!
4268
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
4!
4269
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
4!
4270

4271
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
2!
4272

4273
  tEndEncode(&encoder);
2✔
4274

4275
_exit:
2✔
4276
  if (code != TSDB_CODE_SUCCESS) {
2!
4277
    tlen = code;
×
4278
  } else {
4279
    tlen = encoder.pos;
2✔
4280
  }
4281
  tEncoderClear(&encoder);
2✔
4282
  return tlen;
2✔
4283
}
4284

4285
int32_t tDeserializeSTriggerDropTableRequest(void* buf, int32_t bufLen, SSTriggerDropRequest* pReq) {
1✔
4286
  SDecoder decoder = {0};
1✔
4287
  int32_t  code = TSDB_CODE_SUCCESS;
1✔
4288
  int32_t  lino = 0;
1✔
4289

4290
  tDecoderInit(&decoder, buf, bufLen);
1✔
4291
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1!
4292

4293
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
2!
4294
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
2!
4295
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
2!
4296
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
2!
4297

4298
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
1!
4299

4300
  tEndDecode(&decoder);
1✔
4301

4302
_exit:
1✔
4303
  tDecoderClear(&decoder);
1✔
4304
  return code;
1✔
4305
}
4306

4307
void tDestroySSTriggerDropRequest(SSTriggerDropRequest* pReq) {
1✔
4308
  if (pReq != NULL) {
1!
4309
    if (pReq->groupColVals != NULL) {
1!
4310
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
1✔
4311
      pReq->groupColVals = NULL;
1✔
4312
    }
4313
  }
4314
}
1✔
4315

4316
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
68,031✔
4317
  SEncoder encoder = {0};
68,031✔
4318
  int32_t  code = TSDB_CODE_SUCCESS;
68,031✔
4319
  int32_t  lino = 0;
68,031✔
4320
  int32_t  tlen = 0;
68,031✔
4321

4322
  tEncoderInit(&encoder, buf, bufLen);
68,031✔
4323
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
68,031!
4324

4325
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
136,064!
4326
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
136,064!
4327
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
136,064!
4328
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
136,064!
4329

4330
  tEndEncode(&encoder);
68,032✔
4331

4332
_exit:
68,032✔
4333
  if (code != TSDB_CODE_SUCCESS) {
68,032!
4334
    tlen = code;
×
4335
  } else {
4336
    tlen = encoder.pos;
68,032✔
4337
  }
4338
  tEncoderClear(&encoder);
68,032✔
4339
  return tlen;
68,032✔
4340
}
4341

4342
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
101,207✔
4343
  SDecoder decoder = {0};
101,207✔
4344
  int32_t  code = TSDB_CODE_SUCCESS;
101,207✔
4345
  int32_t  lino = 0;
101,207✔
4346

4347
  tDecoderInit(&decoder, buf, bufLen);
101,207✔
4348
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
101,181!
4349

4350
  int32_t type = 0;
101,623✔
4351
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
101,595!
4352
  pReq->type = type;
101,595✔
4353
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
203,124!
4354
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
202,887!
4355
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
202,664!
4356

4357
  tEndDecode(&decoder);
101,306✔
4358

4359
_exit:
101,112✔
4360
  tDecoderClear(&decoder);
101,112✔
4361
  return code;
101,543✔
4362
}
4363

4364
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo, bool full) {
58,156✔
4365
  int32_t code = 0, lino = 0;
58,156✔
4366
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true, full));
58,156!
4367
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
58,152!
4368
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.skey));
116,318!
4369
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.ekey));
116,318!
4370
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
116,318!
4371
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
116,318!
4372
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
116,318!
4373
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
58,159!
4374
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
116,316!
4375
_exit:
58,158✔
4376
  return code;
58,158✔
4377
}
4378

4379
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
29,076✔
4380
  int32_t code = 0, lino = 0;
29,076✔
4381
  int32_t size = 0;
29,076✔
4382
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
29,076!
4383
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
29,077!
4384
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.skey));
58,152!
4385
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.ekey));
58,151!
4386
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
58,150!
4387
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
58,150!
4388
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
58,149!
4389
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
29,074!
4390
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
58,149!
4391
_exit:
29,075✔
4392
  return code;
29,075✔
4393
}
4394

4395
void tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
42,768✔
4396
  if (pInfo == NULL) return;
42,768!
4397
  if (pInfo->pStreamPesudoFuncVals != NULL) {
42,768✔
4398
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
29,788✔
4399
    pInfo->pStreamPesudoFuncVals = NULL;
29,788✔
4400
  }
4401
  if (pInfo->pStreamPartColVals != NULL) {
42,768✔
4402
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
20,746✔
4403
    pInfo->pStreamPartColVals = NULL;
20,746✔
4404
  }
4405
}
4406

4407
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
1,580✔
4408
  SEncoder encoder = {0};
1,580✔
4409
  int32_t  code = TSDB_CODE_SUCCESS;
1,580✔
4410
  int32_t  lino = 0;
1,580✔
4411
  int32_t  tlen = 0;
1,580✔
4412

4413
  tEncoderInit(&encoder, buf, bufLen);
1,580✔
4414
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,580!
4415

4416
  int32_t size = taosArrayGetSize(pRsp->infos);
1,581✔
4417
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,581!
4418
  for (int32_t i = 0; i < size; ++i) {
4,951✔
4419
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
3,370✔
4420
    if (info == NULL) {
3,370!
4421
      TAOS_CHECK_EXIT(terrno);
×
4422
    }
4423
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
6,740!
4424
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
6,740!
4425
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
3,370!
4426
  }
4427

4428
  tEndEncode(&encoder);
1,581✔
4429

4430
_exit:
1,582✔
4431
  if (code != TSDB_CODE_SUCCESS) {
1,582!
4432
    tlen = code;
×
4433
  } else {
4434
    tlen = encoder.pos;
1,582✔
4435
  }
4436
  tEncoderClear(&encoder);
1,582✔
4437
  return tlen;
1,582✔
4438
}
4439

4440
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
791✔
4441
  SDecoder decoder = {0};
791✔
4442
  int32_t  code = TSDB_CODE_SUCCESS;
791✔
4443
  int32_t  lino = 0;
791✔
4444
  int32_t  size = 0;
791✔
4445

4446
  tDecoderInit(&decoder, buf, bufLen);
791✔
4447
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
791!
4448

4449
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
791!
4450
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
791✔
4451
  if (vTableInfo->infos == NULL) {
791!
4452
    TAOS_CHECK_EXIT(terrno);
×
4453
  }
4454
  for (int32_t i = 0; i < size; ++i) {
2,475✔
4455
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
1,685✔
4456
    if (info == NULL) {
1,685!
4457
      TAOS_CHECK_EXIT(terrno);
×
4458
    }
4459
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
3,370!
4460
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
3,370!
4461
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
1,685!
4462
  }
4463

4464
  tEndDecode(&decoder);
790✔
4465

4466
_exit:
790✔
4467
  tDecoderClear(&decoder);
790✔
4468
  return code;
791✔
4469
}
4470

4471

4472
void tDestroyVTableInfo(void *ptr) {
3,370✔
4473
  if (NULL == ptr) {
3,370!
4474
    return;
×
4475
  }
4476
  VTableInfo* pTable = (VTableInfo*)ptr;
3,370✔
4477
  taosMemoryFree(pTable->cols.pColRef);
3,370!
4478
}
4479

4480
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
33,473✔
4481
  if (ptr == NULL) return;
33,473!
4482
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
33,473✔
4483
  ptr->infos = NULL;
33,471✔
4484
}
4485

4486
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
2,114✔
4487
  SEncoder encoder = {0};
2,114✔
4488
  int32_t  code = TSDB_CODE_SUCCESS;
2,114✔
4489
  int32_t  lino = 0;
2,114✔
4490
  int32_t  tlen = 0;
2,114✔
4491

4492
  tEncoderInit(&encoder, buf, bufLen);
2,114✔
4493
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,114!
4494

4495
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
4,228!
4496
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
2,114✔
4497
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
2,114!
4498
  for (int32_t i = 0; i < size; ++i) {
4,076✔
4499
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
1,962✔
4500
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
3,924!
4501
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
3,924!
4502
  }
4503

4504
  tEndEncode(&encoder);
2,114✔
4505

4506
_exit:
2,114✔
4507
  if (code != TSDB_CODE_SUCCESS) {
2,114!
4508
    tlen = code;
×
4509
  } else {
4510
    tlen = encoder.pos;
2,114✔
4511
  }
4512
  tEncoderClear(&encoder);
2,114✔
4513
  return tlen;
2,114✔
4514
}
4515

4516
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
1,057✔
4517
  SDecoder decoder = {0};
1,057✔
4518
  int32_t  code = TSDB_CODE_SUCCESS;
1,057✔
4519
  int32_t  lino = 0;
1,057✔
4520
  SSDataBlock *pResBlock = pBlock;
1,057✔
4521

4522
  tDecoderInit(&decoder, buf, bufLen);
1,057✔
4523
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,057!
4524

4525
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
2,114!
4526
  int32_t numOfCols = 2;
1,057✔
4527
  if (pResBlock->pDataBlock == NULL) {
1,057!
4528
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
1,057✔
4529
    if (pResBlock->pDataBlock == NULL) {
1,057!
4530
      TAOS_CHECK_EXIT(terrno);
×
4531
    }
4532
    for (int32_t i = 0; i< numOfCols; ++i) {
3,171✔
4533
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
2,114✔
4534
      if (pColInfoData == NULL) {
2,114!
4535
        TAOS_CHECK_EXIT(terrno);
×
4536
      }
4537
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
2,114✔
4538
      pColInfoData->info.bytes = sizeof(int64_t);
2,114✔
4539
    }
4540
  }
4541
  int32_t numOfRows = 0;
1,057✔
4542
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
1,057!
4543
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
1,057!
4544
  for (int32_t i = 0; i < numOfRows; ++i) {
2,038✔
4545
    for (int32_t j = 0; j < numOfCols; ++j) {
2,943✔
4546
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
1,962✔
4547
      if (pColInfoData == NULL) {
1,962!
4548
        TAOS_CHECK_EXIT(terrno);
×
4549
      }
4550
      int64_t value = 0;
1,962✔
4551
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
1,962!
4552
      colDataSetInt64(pColInfoData, i, &value);
1,962✔
4553
    }
4554
  }
4555

4556
  pResBlock->info.dataLoad = 1;
1,057✔
4557
  pResBlock->info.rows = numOfRows;
1,057✔
4558

4559
  tEndDecode(&decoder);
1,057✔
4560

4561
_exit:
1,057✔
4562
  tDecoderClear(&decoder);
1,057✔
4563
  return code;
1,057✔
4564
}
4565

4566
static int32_t encodeData(SEncoder* encoder, void* pBlock, SSHashObj* indexHash) {
4,150✔
4567
  int32_t code = TSDB_CODE_SUCCESS;
4,150✔
4568
  int32_t lino = 0;
4,150✔
4569
  int32_t len = 0;
4,150✔
4570
  if (encoder->data == NULL){
4,150✔
4571
    len = blockGetEncodeSize(pBlock);
2,075✔
4572
  } else {
4573
    len = blockEncode(pBlock, (char*)(encoder->data + encoder->pos), encoder->size - encoder->pos, blockDataGetNumOfCols(pBlock));
2,075✔
4574
    if (len < 0) {
2,075!
4575
      TAOS_CHECK_EXIT(terrno);
×
4576
    }
4577
  }
4578
  encoder->pos += len;
4,154✔
4579

4580
  if (indexHash == NULL) {
4,154✔
4581
    goto _exit;
1,582✔
4582
  } 
4583
  
4584
  uint32_t pos = encoder->pos;
2,572✔
4585
  encoder->pos += sizeof(uint32_t); // reserve space for tables
2,572✔
4586
  int32_t tables = 0;
2,572✔
4587
  
4588
  void*   pe = NULL;
2,572✔
4589
  int32_t iter = 0;
2,572✔
4590
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
9,851✔
4591
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
7,279✔
4592
    if (pInfo->gId == -1){
7,279✔
4593
      continue;
4,045✔
4594
    }
4595
    int64_t uid = *(int64_t*)(tSimpleHashGetKey(pe, NULL));
3,234✔
4596
    TAOS_CHECK_EXIT(tEncodeI64(encoder, uid));
3,234!
4597
    TAOS_CHECK_EXIT(tEncodeU64(encoder, pInfo->gId));
6,468!
4598
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->startRowIdx));
6,468!
4599
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->numRows));
6,468!
4600
    tables++;
3,234✔
4601
  }
4602
  uint32_t tmpPos = encoder->pos;
2,571✔
4603
  encoder->pos = pos;
2,571✔
4604
  TAOS_CHECK_EXIT(tEncodeI32(encoder, tables));
2,571!
4605
  encoder->pos = tmpPos;
2,571✔
4606
_exit:
4,153✔
4607
  return code;
4,153✔
4608
}
4609
 
4610
int32_t tSerializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* rsp, SSHashObj* indexHash) {
3,554✔
4611
  SEncoder encoder = {0};
3,554✔
4612
  int32_t  code = TSDB_CODE_SUCCESS;
3,554✔
4613
  int32_t  lino = 0;
3,554✔
4614
  int32_t  tlen = 0;
3,554✔
4615

4616
  tEncoderInit(&encoder, buf, bufLen);
3,554✔
4617
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
3,554!
4618

4619
  if (rsp->dataBlock != NULL && ((SSDataBlock*)rsp->dataBlock)->info.rows > 0) {
3,554✔
4620
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, 1)); // has real data
2,572!
4621
    TAOS_CHECK_EXIT(encodeData(&encoder, rsp->dataBlock, indexHash));
2,572!
4622
  } else {
4623
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, 0));  // no real data
982!
4624
  }
4625

4626
  if (rsp->metaBlock != NULL && ((SSDataBlock*)rsp->metaBlock)->info.rows > 0) {
3,553✔
4627
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, 1)); // has metada
1,521!
4628
    TAOS_CHECK_EXIT(encodeData(&encoder, rsp->metaBlock, NULL));
1,521!
4629
  } else {
4630
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, 0));  // no meta data
2,032!
4631
  }
4632

4633
  if (rsp->deleteBlock != NULL && ((SSDataBlock*)rsp->deleteBlock)->info.rows > 0) {
3,554✔
4634
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, 1)); // has deletedata
58!
4635
    TAOS_CHECK_EXIT(encodeData(&encoder, rsp->deleteBlock, NULL));
58!
4636
  } else {
4637
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, 0));  // no delete data
3,496!
4638
  }
4639

4640
  if (rsp->dropBlock != NULL && ((SSDataBlock*)rsp->dropBlock)->info.rows > 0) {
3,554!
4641
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, 1)); // has drop table data
2!
4642
    TAOS_CHECK_EXIT(encodeData(&encoder, rsp->dropBlock, NULL));
2!
4643
  } else {
4644
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, 0));  // no drop table data
3,552!
4645
  }
4646

4647
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->ver));
7,108!
4648
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->verTime));
7,108!
4649
  tEndEncode(&encoder);
3,554✔
4650

4651
_exit:
3,552✔
4652
  if (code != TSDB_CODE_SUCCESS) {
3,552!
4653
    tlen = code;
×
4654
  } else {
4655
    tlen = encoder.pos;
3,552✔
4656
  }
4657
  tEncoderClear(&encoder);
3,552✔
4658
  return tlen;
3,552✔
4659
}
4660

4661
int32_t tDeserializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* pRsp, SArray* pSlices){
1,777✔
4662
  SDecoder     decoder = {0};
1,777✔
4663
  int32_t      code = TSDB_CODE_SUCCESS;
1,777✔
4664
  int32_t      lino = 0;
1,777✔
4665
  SSDataBlock* pBlock = NULL;
1,777✔
4666

4667
  tDecoderInit(&decoder, buf, bufLen);
1,777✔
4668
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,777!
4669

4670
  // decode data block
4671
  int8_t hasData = false;
1,777✔
4672
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasData));
1,776!
4673
  pBlock = pRsp->dataBlock;
1,776✔
4674
  if (hasData) {
1,776✔
4675
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
1,286!
4676
    const char* pEndPos = NULL;
1,286✔
4677
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
1,286!
4678
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
1,286✔
4679

4680
    int32_t nSlices = 0;
1,286✔
4681
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nSlices));
1,286!
4682
    TAOS_CHECK_EXIT(taosArrayEnsureCap(pSlices, nSlices));
1,286!
4683
    taosArrayClear(pSlices);
1,286✔
4684
    int64_t  uid = 0;
1,286✔
4685
    uint64_t gid = 0;
1,286✔
4686
    int32_t  startIdx = 0;
1,286✔
4687
    int32_t  numRows = 0;
1,286✔
4688
    for (int32_t i = 0; i < nSlices; i++) {
2,904✔
4689
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &uid));
1,618!
4690
      TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
1,618!
4691
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &startIdx));
1,618!
4692
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numRows));
1,618!
4693
      int32_t endIdx = startIdx + numRows;
1,618✔
4694
      int64_t value[3] = {gid, uid, (int64_t)startIdx << 32 | endIdx};
1,618✔
4695
      void*   px = taosArrayPush(pSlices, value);
1,618✔
4696
      if (px == NULL) {
1,618!
4697
        code = terrno;
×
4698
        goto _exit;
×
4699
      }
4700
    }
4701
  } else if (pBlock != NULL) {
490✔
4702
    blockDataEmpty(pBlock);
11✔
4703
    taosArrayClear(pSlices);
11✔
4704
  }
4705

4706
  int8_t hasMeta = false;
1,776✔
4707
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasMeta));
1,776!
4708
  pBlock = pRsp->metaBlock;
1,776✔
4709
  if (hasMeta) {
1,776✔
4710
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
760!
4711
    const char* pEndPos = NULL;
760✔
4712
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
760!
4713
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
761✔
4714
  } else if (pBlock != NULL) {
1,016✔
4715
    blockDataEmpty(pBlock);
17✔
4716
  }
4717

4718
  int8_t hasDel = false;
1,777✔
4719
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasDel));
1,777!
4720
  pBlock = pRsp->deleteBlock;
1,777✔
4721
  if (hasDel) {
1,777✔
4722
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
29!
4723
    const char* pEndPos = NULL;
29✔
4724
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
29!
4725
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
29✔
4726
  } else if (pBlock != NULL) {
1,748✔
4727
    blockDataEmpty(pBlock);
749✔
4728
  }
4729

4730
  int8_t hasDrop = false;
1,777✔
4731
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasDrop));
1,777!
4732
  pBlock = pRsp->dropBlock;
1,777✔
4733
  if (hasDrop) {
1,777✔
4734
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
1!
4735
    const char* pEndPos = NULL;
1✔
4736
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
1!
4737
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
1✔
4738
  } else if (pBlock != NULL) {
1,776✔
4739
    blockDataEmpty(pBlock);
777✔
4740
  }
4741
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
3,554!
4742
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->verTime));
3,554!
4743

4744
  tEndDecode(&decoder);
1,777✔
4745

4746
_exit:
1,777✔
4747
  if (code != TSDB_CODE_SUCCESS) {
1,777!
4748
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4749
  }
4750
  tDecoderClear(&decoder);
1,777✔
4751
  return code;
1,777✔
4752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc