• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4658

09 Aug 2025 02:19PM UTC coverage: 59.866% (-1.3%) from 61.2%
#4658

push

travis-ci

GitHub
fix(stream)[TD-37079]: force close the last window in fill_history (#32436)

137251 of 291849 branches covered (47.03%)

Branch coverage included in aggregate %.

207628 of 284239 relevant lines covered (73.05%)

4861547.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.02
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "tdatablock.h"
18
#include "tmsg.h"
19
#include "os.h"
20
#include "tcommon.h"
21

22
typedef struct STaskId {
23
  int64_t streamId;
24
  int64_t taskId;
25
} STaskId;
26

27
typedef enum EWindowType {
28
  WINDOW_TYPE_INTERVAL = 1,
29
  WINDOW_TYPE_SESSION,
30
  WINDOW_TYPE_STATE,
31
  WINDOW_TYPE_EVENT,
32
  WINDOW_TYPE_COUNT,
33
  WINDOW_TYPE_ANOMALY,
34
  WINDOW_TYPE_EXTERNAL,
35
  WINDOW_TYPE_PERIOD
36
} EWindowType;
37

38
typedef struct STaskCkptInfo {
39
  int64_t latestId;          // saved checkpoint id
40
  int64_t latestVer;         // saved checkpoint ver
41
  int64_t latestTime;        // latest checkpoint time
42
  int64_t latestSize;        // latest checkpoint size
43
  int8_t  remoteBackup;      // latest checkpoint backup done
44
  int64_t activeId;          // current active checkpoint id
45
  int32_t activeTransId;     // checkpoint trans id
46
  int8_t  failed;            // denote if the checkpoint is failed or not
47
  int8_t  consensusChkptId;  // required the consensus-checkpointId
48
  int64_t consensusTs;       //
49
} STaskCkptInfo;
50

51
typedef struct STaskStatusEntry {
52
  STaskId       id;
53
  int32_t       status;
54
  int32_t       statusLastDuration;  // to record the last duration of current status
55
  int64_t       stage;
56
  int32_t       nodeId;
57
  SVersionRange verRange;      // start/end version in WAL, only valid for source task
58
  int64_t       processedVer;  // only valid for source task
59
  double        inputQUsed;    // in MiB
60
  double        inputRate;
61
  double        procsThroughput;   // duration between one element put into input queue and being processed.
62
  double        procsTotal;        // duration between one element put into input queue and being processed.
63
  double        outputThroughput;  // the size of dispatched result blocks in bytes
64
  double        outputTotal;       // the size of dispatched result blocks in bytes
65
  double        sinkQuota;         // existed quota size for sink task
66
  double        sinkDataSize;      // sink to dst data size
67
  int64_t       startTime;
68
  int64_t       startCheckpointId;
69
  int64_t       startCheckpointVer;
70
  int64_t       hTaskId;
71
  STaskCkptInfo checkpointInfo;
72
  STaskNotifyEventStat notifyEventStat;
73
} STaskStatusEntry;
74

75
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
×
76
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
×
77
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
×
78
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
×
79
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
×
80
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
×
81
  return 0;
×
82
}
83

84
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
×
85
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
×
86
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
×
87
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
×
88
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
×
89
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
×
90
  return 0;
×
91
}
92

93
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
×
94
  int32_t code = 0;
×
95
  int32_t lino;
96

97
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
98
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
×
99
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
×
100

101
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
×
102
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
103

104
  for (int32_t i = 0; i < size; ++i) {
×
105
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
×
106
    if (pInfo == NULL) {
×
107
      TAOS_CHECK_EXIT(terrno);
×
108
    }
109

110
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
×
111
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
×
112
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
×
113
  }
114

115
  // todo this new attribute will be result in being incompatible with previous version
116
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
×
117

118
  int32_t numOfTasks = taosArrayGetSize(pMsg->pTaskList);
×
119
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfTasks));
×
120

121
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
122
    int32_t* pId = taosArrayGet(pMsg->pTaskList, i);
×
123
    if (pId == NULL) {
×
124
      TAOS_CHECK_EXIT(terrno);
×
125
    }
126
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)pId));
×
127
  }
128

129
  tEndEncode(pEncoder);
×
130
_exit:
×
131
  if (code) {
×
132
    return code;
×
133
  } else {
134
    return pEncoder->pos;
×
135
  }
136
}
137

138
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
×
139
  int32_t code = 0;
×
140
  int32_t lino;
141

142
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
143
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
×
144
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
×
145

146
  int32_t size = 0;
×
147
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
148

149
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
×
150
  TSDB_CHECK_NULL(pMsg->pNodeList, code, lino, _exit, terrno);
×
151

152
  for (int32_t i = 0; i < size; ++i) {
×
153
    SNodeUpdateInfo info = {0};
×
154
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
×
155
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
×
156
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
×
157

158
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
×
159
      TAOS_CHECK_EXIT(terrno);
×
160
    }
161
  }
162

163
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
×
164

165
  // number of tasks
166
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
167
  pMsg->pTaskList = taosArrayInit(size, sizeof(int32_t));
×
168
  if (pMsg->pTaskList == NULL) {
×
169
    TAOS_CHECK_EXIT(terrno);
×
170
  }
171

172
  for (int32_t i = 0; i < size; ++i) {
×
173
    int32_t id = 0;
×
174
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &id));
×
175
    if (taosArrayPush(pMsg->pTaskList, &id) == NULL) {
×
176
      TAOS_CHECK_EXIT(terrno);
×
177
    }
178
  }
179

180
  tEndDecode(pDecoder);
×
181
_exit:
×
182
  return code;
×
183
}
184

185
void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg) {
×
186
  taosArrayDestroy(pMsg->pNodeList);
×
187
  taosArrayDestroy(pMsg->pTaskList);
×
188
  pMsg->pNodeList = NULL;
×
189
  pMsg->pTaskList = NULL;
×
190
}
×
191

192
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
×
193
  int32_t code = 0;
×
194
  int32_t lino;
195

196
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
197
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
198
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
×
202
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
×
203
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
×
204
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
205
  tEndEncode(pEncoder);
×
206

207
_exit:
×
208
  if (code) {
×
209
    return code;
×
210
  } else {
211
    return pEncoder->pos;
×
212
  }
213
}
214

215
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
×
216
  int32_t code = 0;
×
217
  int32_t lino;
218

219
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
220
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
221
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
224
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
×
225
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
×
226
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
×
227
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
228
  tEndDecode(pDecoder);
×
229

230
_exit:
×
231
  return code;
×
232
}
233

234
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
×
235
  int32_t code = 0;
×
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
×
240
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
×
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
×
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
×
244
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
×
245
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
×
246
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
×
247
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
×
248
  tEndEncode(pEncoder);
×
249

250
_exit:
×
251
  if (code) {
×
252
    return code;
×
253
  } else {
254
    return pEncoder->pos;
×
255
  }
256
}
257

258
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
×
259
  int32_t code = 0;
×
260
  int32_t lino;
261

262
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
263
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
×
264
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
×
266
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
×
267
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
×
268
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
×
269
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
×
270
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
×
271
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
×
272
  tEndDecode(pDecoder);
×
273

274
_exit:
×
275
  return code;
×
276
}
277

278
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
283
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
×
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
×
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
287
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
289
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
290
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
291
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
×
292
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
293
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
×
294
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
×
295
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
×
296

297
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
×
298
    uError("invalid dispatch req msg");
×
299
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
300
  }
301

302
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
303
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
×
304
    void*    data = taosArrayGetP(pReq->data, i);
×
305
    if (data == NULL || pLen == NULL) {
×
306
      TAOS_CHECK_EXIT(terrno);
×
307
    }
308

309
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
×
310
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
×
311
  }
312
  tEndEncode(pEncoder);
×
313
_exit:
×
314
  if (code) {
×
315
    return code;
×
316
  } else {
317
    return pEncoder->pos;
×
318
  }
319
}
320

321
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
×
322
  int32_t code = 0;
×
323
  int32_t lino;
324

325
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
326
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
×
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
×
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
330
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
332
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
333
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
334
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
×
335
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
336
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
×
337
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
×
338
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
×
339

340
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
×
341
    TAOS_CHECK_EXIT(terrno);
×
342
  }
343
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
×
344
    TAOS_CHECK_EXIT(terrno);
×
345
  }
346
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
347
    int32_t  len1;
348
    uint64_t len2;
349
    void*    data;
350
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
×
351
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
×
352

353
    if (len1 != len2) {
×
354
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
355
    }
356

357
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
×
358
      TAOS_CHECK_EXIT(terrno);
×
359
    }
360

361
    if (taosArrayPush(pReq->data, &data) == NULL) {
×
362
      TAOS_CHECK_EXIT(terrno);
×
363
    }
364
  }
365

366
  tEndDecode(pDecoder);
×
367
_exit:
×
368
  return code;
×
369
}
370

371
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
×
372
  taosArrayDestroyP(pReq->data, NULL);
×
373
  taosArrayDestroy(pReq->dataLen);
×
374
}
×
375

376
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
×
377
  int32_t code = 0;
×
378
  int32_t lino;
379

380
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
381
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
382
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
383
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
×
384
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
×
385
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
×
386
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
×
387
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
×
388
  tEndEncode(pEncoder);
×
389

390
_exit:
×
391
  if (code) {
×
392
    return code;
×
393
  } else {
394
    return pEncoder->pos;
×
395
  }
396
}
397

398
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
×
399
  int32_t code = 0;
×
400
  int32_t lino;
401

402
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
403
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
404
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
×
406
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
×
407
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
×
408
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
×
409
  uint64_t len = 0;
×
410
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
×
411
  pReq->retrieveLen = (int32_t)len;
×
412
  tEndDecode(pDecoder);
×
413

414
_exit:
×
415
  return code;
×
416
}
417

418
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
×
419

420

421
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
56✔
422
  int32_t code = 0;
56✔
423
  int32_t lino = 0;
56✔
424
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
112!
425
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
112!
426
  switch (pReq->type) {
56!
427
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
56✔
428
      if (pReq->cont.fullTableNames) {
56!
429
        int32_t num = taosArrayGetSize(pReq->cont.fullTableNames);
56✔
430
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
56!
431
        for (int32_t i = 0; i < num; ++i) {
176✔
432
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.fullTableNames, i);
120✔
433
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
240!
434
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
240!
435
        }
436
      } else {
437
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
438
      }
439
      break;
56✔
440
    }
441
    default:
×
442
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
443
      break;
×
444
  }
445

446
_exit:
56✔
447

448
  return code;
56✔
449
}
450

451
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
84✔
452
  if (NULL == pReq) {
84✔
453
    return;
28✔
454
  }
455

456
  taosArrayDestroy(pReq->cont.fullTableNames);
56✔
457
}
458

459

460
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
28✔
461
  *ppDst = NULL;
28✔
462
  
463
  if (NULL == pSrc) {
28!
464
    return TSDB_CODE_SUCCESS;
×
465
  }
466

467
  int32_t code = 0, lino = 0;
28✔
468
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
28!
469
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
28!
470

471
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
28✔
472
  if (pSrc->cont.fullTableNames) {
28!
473
    (*ppDst)->cont.fullTableNames = taosArrayDup(pSrc->cont.fullTableNames, NULL);
28✔
474
    TSDB_CHECK_NULL((*ppDst)->cont.fullTableNames, code, lino, _exit, terrno);
28!
475
  }
476
  
477
_exit:
28✔
478

479
  if (code) {
28!
480
    tFreeSStreamMgmtReq(*ppDst);
×
481
    taosMemoryFreeClear(*ppDst);
×
482
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
483
  }
484
  
485
  return code;
28✔
486
}
487

488

489
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
28✔
490
  int32_t code = 0;
28✔
491
  int32_t lino = 0;
28✔
492

493
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
56!
494
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
56!
495
  switch (pReq->type) {
28!
496
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
28✔
497
      int32_t num = 0;
28✔
498
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
28!
499
      if (num > 0) {
28!
500
        pReq->cont.fullTableNames = taosArrayInit(num, sizeof(SStreamDbTableName));
28✔
501
        TSDB_CHECK_NULL(pReq->cont.fullTableNames, code, lino, _exit, terrno);
28!
502
        for (int32_t i = 0; i < num; ++i) {
88✔
503
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.fullTableNames, 1);
60✔
504
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
60!
505
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
60!
506
        }
507
      }
508
      break;
28✔
509
    }
510
    default:
×
511
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
512
      break;
×
513
  }
514

515
_exit:
28✔
516

517
  return code;  
28✔
518
}
519

520
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
34,946✔
521
  int32_t code = 0;
34,946✔
522
  int32_t lino;
523

524
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
69,892!
525
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
69,892!
526
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
69,892!
527

528
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
69,892!
529
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
69,892!
530
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
69,892!
531
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
69,892!
532
  // SKIP SESSIONID
533
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
69,892!
534
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
69,892!
535
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
69,892!
536
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
69,892!
537
  if (pTask->pMgmtReq) {
34,946✔
538
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
56!
539
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
56!
540
  } else {
541
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
34,890!
542
  }
543

544
_exit:
34,890✔
545

546
  return code;
34,946✔
547
}
548

549

550
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
16,731✔
551
  int32_t code = 0;
16,731✔
552
  int32_t lino;
553

554
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
33,462!
555
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
33,462!
556
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
33,462!
557
  
558
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
33,462!
559
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
33,462!
560
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
33,462!
561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
33,462!
562
  // SKIP SESSIONID
563
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
33,462!
564
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
33,462!
565
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
33,462!
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
33,462!
567
  int32_t req = 0;
16,731✔
568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
16,731!
569
  if (req) {
16,731✔
570
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
28!
571
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
28!
572
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
28!
573
  }
574

575
_exit:
16,731✔
576

577
  return code;
16,731✔
578
}
579

580
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
581
  int32_t code = 0;
×
582
  int32_t lino;
583

584
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
585
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
586
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
587
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
588

589
_exit:
×
590

591
  return code;
×
592
}
593

594
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
595
  int32_t code = 0;
×
596
  int32_t lino;
597

598
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
599
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
600
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
601
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
602

603
_exit:
×
604

605
  return code;
×
606
}
607

608

609
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
2,804✔
610
  int32_t code = 0;
2,804✔
611
  int32_t lino;
612

613
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
5,608!
614
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
5,608!
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
5,608!
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
5,608!
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
5,608!
618

619
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
2,804✔
620
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
2,804!
621
  for (int32_t i = 0; i < recalcNum; ++i) {
2,804!
622
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
623
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
624
  }
625

626
_exit:
2,804✔
627

628
  return code;
2,804✔
629
}
630

631
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
1,342✔
632
  int32_t code = 0;
1,342✔
633
  int32_t lino;
634

635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
2,684!
636
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
2,684!
637
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
2,684!
638
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
2,684!
639
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
2,684!
640

641
  int32_t recalcNum = 0;
1,342✔
642
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
1,342!
643
  if (recalcNum > 0) {
1,342!
644
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
645
    if (NULL == pStatus->userRecalcs) {
×
646
      code = terrno;
×
647
      goto _exit;
×
648
    }
649
  }
650

651
  for (int32_t i = 0; i < recalcNum; ++i) {
1,342!
652
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
653
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
654
  }
655

656
_exit:
1,342✔
657

658
  return code;
1,342✔
659
}
660

661

662
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
71,508✔
663
  int32_t code = 0;
71,508✔
664
  int32_t lino;
665

666
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
71,508!
667
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
143,016!
668
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
143,016!
669
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
143,016!
670
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
143,016!
671

672
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
71,508✔
673
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
71,508!
674
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
257,992✔
675
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
186,484✔
676
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
372,968!
677
  }
678
  
679
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
71,508✔
680
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
71,508!
681
  for (int32_t i = 0; i < statusNum; ++i) {
102,984✔
682
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
31,476✔
683
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
31,476!
684
  }
685

686
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
71,508✔
687
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
71,508!
688
  for (int32_t i = 0; i < reqNum; ++i) {
71,564✔
689
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
56✔
690
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
112!
691
  }
692

693
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
71,508✔
694
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
71,508!
695
  for (int32_t i = 0; i < triggerNum; ++i) {
74,312✔
696
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
2,804✔
697
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
2,804!
698
  }
699
  
700
  tEndEncode(pEncoder);
71,508✔
701

702
_exit:
71,508✔
703
  if (code) {
71,508!
704
    return code;
×
705
  } else {
706
    return pEncoder->pos;
71,508✔
707
  }
708
}
709

710
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
33,569✔
711
  int32_t code = 0;
33,569✔
712
  int32_t lino;
713

714
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
33,569!
715
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
67,138!
716
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
67,138!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
67,138!
718
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
67,138!
719

720
  int32_t vgLearderNum = 0;
33,569✔
721
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
33,569!
722
  if (vgLearderNum > 0) {
33,569✔
723
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
25,585✔
724
    if (NULL == pReq->pVgLeaders) {
25,585!
725
      code = terrno;
×
726
      goto _exit;
×
727
    }
728
  }
729
  for (int32_t i = 0; i < vgLearderNum; ++i) {
121,328✔
730
    int32_t vgId = 0;
87,759✔
731
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
87,759!
732
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
175,518!
733
      code = terrno;
×
734
      goto _exit;
×
735
    }
736
  }
737

738

739
  int32_t statusNum = 0;
33,569✔
740
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
33,569!
741
  if (statusNum > 0) {
33,569✔
742
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
1,500✔
743
    if (NULL == pReq->pStreamStatus) {
1,500!
744
      code = terrno;
×
745
      goto _exit;
×
746
    }
747
  }
748
  for (int32_t i = 0; i < statusNum; ++i) {
48,585✔
749
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
15,016✔
750
    if (NULL == pTask) {
15,016!
751
      code = terrno;
×
752
      goto _exit;
×
753
    }
754
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
15,016!
755
  }
756

757

758
  int32_t reqNum = 0;
33,569✔
759
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
33,569!
760
  if (reqNum > 0) {
33,569✔
761
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
12✔
762
    if (NULL == pReq->pStreamReq) {
12!
763
      code = terrno;
×
764
      goto _exit;
×
765
    }
766
  }
767
  for (int32_t i = 0; i < reqNum; ++i) {
33,597✔
768
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
28✔
769
    if (NULL == pIdx) {
28!
770
      code = terrno;
×
771
      goto _exit;
×
772
    }
773
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
28!
774
  }
775

776

777
  int32_t triggerNum = 0;
33,569✔
778
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
33,569!
779
  if (triggerNum > 0) {
33,569✔
780
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
569✔
781
    if (NULL == pReq->pTriggerStatus) {
569!
782
      code = terrno;
×
783
      goto _exit;
×
784
    }
785
  }
786
  for (int32_t i = 0; i < triggerNum; ++i) {
34,911✔
787
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
1,342✔
788
    if (NULL == pStatus) {
1,342!
789
      code = terrno;
×
790
      goto _exit;
×
791
    }
792
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
1,342!
793
  }
794

795
  
796
  tEndDecode(pDecoder);
33,569✔
797

798
_exit:
33,569✔
799
  return code;
33,569✔
800
}
801

802
void tFreeSSTriggerRuntimeStatus(void* param) {
2,744✔
803
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
2,744✔
804
  if (NULL == pStatus) {
2,744!
805
    return;
×
806
  }
807
  taosArrayDestroy(pStatus->userRecalcs);
2,744✔
808
}
809

810
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
213,953✔
811
  if (pMsg == NULL) {
213,953!
812
    return;
×
813
  }
814

815
  taosArrayDestroy(pMsg->pVgLeaders);
213,953✔
816
  if (deepClean) {
213,953!
817
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
213,953✔
818
    for (int32_t i = 0; i < reqNum; ++i) {
214,009✔
819
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
56✔
820
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
56✔
821
      if (NULL == pTask) {
56!
822
        continue;
×
823
      }
824

825
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
56✔
826
      taosMemoryFree(pTask->pMgmtReq);
56!
827
    }
828
  }
829
  taosArrayDestroy(pMsg->pStreamReq);
213,953✔
830
  taosArrayDestroy(pMsg->pStreamStatus);
213,953✔
831
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
213,953✔
832
}
833

834
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
632✔
835
  int32_t code = 0;
632✔
836
  int32_t lino;
837

838
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
1,264!
839
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
1,264!
840
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
1,264!
841
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
1,264!
842
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
1,264!
843
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
1,264!
844
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
1,264!
845
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
846
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
1,264!
847
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
1,264!
848

849
_exit:
632✔
850

851
  return code;
632✔
852
}
853

854
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
224✔
855
  int32_t code = 0;
224✔
856
  int32_t lino;
857

858
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
448!
859
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
448!
860

861
_exit:
224✔
862

863
  return code;
224✔
864
}
865

866

867
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
856✔
868
  int32_t code = 0;
856✔
869
  int32_t lino;
870

871
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
1,712!
872
  if (pMsg->triggerReader) {
856✔
873
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
632!
874
  } else {
875
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
224!
876
  }
877
  
878
_exit:
224✔
879

880
  return code;
856✔
881
}
882

883
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
1,966✔
884
  int32_t code = 0;
1,966✔
885
  int32_t lino;
886

887
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
3,932!
888
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
3,932!
889
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
1,966!
890

891
_exit:
1,966✔
892

893
  return code;
1,966✔
894
}
895

896
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
1,362✔
897
  int32_t code = 0;
1,362✔
898
  int32_t lino;
899

900
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
1,362!
901
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,724!
902

903
_exit:
1,362✔
904

905
  return code;
1,362✔
906
}
907

908

909
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
456✔
910
  int32_t code = 0;
456✔
911
  int32_t lino;
912

913
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
912!
914
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
912!
915
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
912!
916
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
912!
917
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
912!
918
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
912!
919
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->hasPartitionBy));
912!
920
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
912!
921
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
912!
922

923
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
456✔
924
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
456!
925
  for (int32_t i = 0; i < addrSize; ++i) {
474✔
926
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
18✔
927
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
36!
928
  }
929
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
912!
930
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
912!
931
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
912!
932

933
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
912!
934
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
912!
935
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
912!
936
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
912!
937

938
  switch (pMsg->triggerType) {
456!
939
    case WINDOW_TYPE_SESSION: {
6✔
940
      // session trigger
941
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
12!
942
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
12!
943
      break;
6✔
944
    }
945
    case WINDOW_TYPE_STATE: {
208✔
946
      // state trigger
947
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
416!
948
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
416!
949
      break;
208✔
950
    }
951
    case WINDOW_TYPE_INTERVAL: {
186✔
952
      // slide trigger
953
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
372!
954
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
372!
955
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
372!
956
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
372!
957
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
372!
958
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
372!
959
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
372!
960
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
372!
961
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
372!
962
      break;
186✔
963
    }
964
    case WINDOW_TYPE_EVENT: {
28✔
965
      // event trigger
966
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
28!
967
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
28!
968

969
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
56!
970
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
56!
971

972
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
56!
973
      break;
28✔
974
    }
975
    case WINDOW_TYPE_COUNT: {
18✔
976
      // count trigger
977
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
18!
978
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
36!
979

980
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
36!
981
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
36!
982
      break;
18✔
983
    }
984
    case WINDOW_TYPE_PERIOD: {
10✔
985
      // period trigger
986
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
20!
987
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
20!
988
      break;
10✔
989
    }
990
    default:
×
991
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
992
      break;
×
993
  }
994

995
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
912!
996
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
912!
997
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
912!
998
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
912!
999
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
456✔
1000
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
912!
1001
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
456✔
1002
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
912!
1003
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
456✔
1004
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
912!
1005

1006
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
456✔
1007
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
456!
1008
  for (int32_t i = 0; i < readerNum; ++i) {
1,060✔
1009
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
604✔
1010
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
604!
1011
  }
1012

1013
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
456✔
1014
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
456!
1015
  for (int32_t i = 0; i < runnerNum; ++i) {
1,818✔
1016
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1,362✔
1017
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
1,362!
1018
  }
1019

1020
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
912!
1021
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
912!
1022

1023
_exit:
456✔
1024

1025
  return code;
456✔
1026
}
1027

1028

1029
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
12,064✔
1030
  int32_t code = 0;
12,064✔
1031
  int32_t lino;
1032

1033
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
24,128!
1034
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
24,128!
1035
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
24,128!
1036
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
24,128!
1037
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
24,128!
1038
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
24,128!
1039

1040
_exit:
12,064✔
1041

1042
  return code;
12,064✔
1043
}
1044

1045

1046
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
1,374✔
1047
  int32_t code = 0;
1,374✔
1048
  int32_t lino;
1049

1050
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,748!
1051
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
2,748!
1052
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
2,748!
1053
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
2,748!
1054
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
2,748!
1055
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
2,748!
1056
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
2,748!
1057
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
2,748!
1058

1059
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
1,374✔
1060
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
1,374!
1061
  for (int32_t i = 0; i < addrSize; ++i) {
1,422✔
1062
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
48✔
1063
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
96!
1064
  }
1065
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
2,748!
1066

1067
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
1,374✔
1068
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
1,374!
1069
  for (int32_t i = 0; i < outColNum; ++i) {
7,122✔
1070
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
5,748✔
1071
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
5,748!
1072
  }
1073

1074
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
1,374✔
1075
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
1,374!
1076
  for (int32_t i = 0; i < outTagNum; ++i) {
2,742✔
1077
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1,368✔
1078
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
1,368!
1079
  }
1080

1081
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
2,748!
1082
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
2,748!
1083

1084
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
2,748!
1085
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
2,748!
1086

1087
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
1,374✔
1088
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
1,374!
1089
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,722✔
1090
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
348✔
1091
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
348!
1092

1093
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
696!
1094
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
696!
1095
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
696!
1096
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
696!
1097
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
696!
1098
  }
1099

1100
_exit:
1,374✔
1101

1102
  return code;
1,374✔
1103
}
1104

1105

1106
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
2,686✔
1107
  int32_t code = 0;
2,686✔
1108
  int32_t lino;
1109

1110
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
2,686!
1111
  switch (pTask->task.type) {
2,686!
1112
    case STREAM_READER_TASK:
856✔
1113
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
856!
1114
      break;
856✔
1115
    case STREAM_TRIGGER_TASK:
456✔
1116
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
456!
1117
      break;
456✔
1118
    case STREAM_RUNNER_TASK:
1,374✔
1119
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
1,374!
1120
      break;
1,374✔
1121
    default:
×
1122
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1123
      break;
×
1124
  }
1125
  
1126
_exit:
2,686✔
1127

1128
  return code;
2,686✔
1129
}
1130

1131

1132
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
580✔
1133
  int32_t code = 0;
580✔
1134
  int32_t lino;
1135

1136
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
1,160!
1137

1138
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
580✔
1139
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
580!
1140
  for (int32_t i = 0; i < readerNum; ++i) {
1,436✔
1141
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
856✔
1142
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
856!
1143
  }
1144

1145
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
1,160!
1146
  if (pStream->triggerTask) {
580✔
1147
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
456!
1148
  }
1149
  
1150
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
580✔
1151
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
580!
1152
  for (int32_t i = 0; i < runnerNum; ++i) {
1,954✔
1153
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
1,374✔
1154
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,374!
1155
  }
1156

1157
_exit:
580✔
1158

1159
  return code;
580✔
1160
}
1161

1162
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
784✔
1163
  int32_t code = 0;
784✔
1164
  int32_t lino = 0;
784✔
1165

1166
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
1,568!
1167

1168
_exit:
784✔
1169
  return code;
784✔
1170
}
1171

1172
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
385✔
1173
  int32_t code = 0;
385✔
1174
  int32_t lino;
1175

1176
  int32_t type = 0;
385✔
1177
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
385!
1178
  pMsg->msgType = type;
385✔
1179

1180
_exit:
385✔
1181
  return code;
385✔
1182
}
1183

1184
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
494✔
1185
  int32_t code = 0;
494✔
1186
  int32_t lino;
1187

1188
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
494!
1189

1190
_exit:
494✔
1191

1192
  return code;
494✔
1193
}
1194

1195
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
494✔
1196
  int32_t code = 0;
494✔
1197
  int32_t lino;
1198

1199
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
494!
1200
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
494!
1201

1202
_exit:
494✔
1203

1204
  return code;
494✔
1205
}
1206

1207
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
234✔
1208
  int32_t code = 0;
234✔
1209
  int32_t lino;
1210

1211
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
234!
1212
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
468!
1213
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
468!
1214

1215
_exit:
234✔
1216

1217
  return code;
234✔
1218
}
1219

1220
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
234✔
1221
  int32_t code = 0;
234✔
1222
  int32_t lino;
1223

1224
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
234!
1225
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
234!
1226

1227
_exit:
234✔
1228

1229
  return code;
234✔
1230
}
1231

1232

1233
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
×
1234
  int32_t code = 0;
×
1235
  int32_t lino;
1236

1237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
×
1238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
×
1239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
×
1240

1241
_exit:
×
1242

1243
  return code;
×
1244
}
1245

1246
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
56✔
1247
  int32_t code = 0;
56✔
1248
  int32_t lino;
1249

1250
  switch (msgType) {
56!
1251
    case STREAM_MSG_ORIGTBL_READER_INFO: {
56✔
1252
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
56✔
1253
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
56!
1254

1255
      for (int32_t i = 0; i < vgNum; ++i) {
176✔
1256
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
120✔
1257
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
240!
1258
      }
1259

1260
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
56✔
1261
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
56!
1262
      
1263
      for (int32_t i = 0; i < readerNum; ++i) {
56!
1264
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
×
1265
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
×
1266
      }
1267
      break;
56✔
1268
    }
1269
    case STREAM_MSG_UPDATE_RUNNER: {
×
1270
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
1271
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
1272
      
1273
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1274
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1275
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1276
      }
1277
      break;
×
1278
    }
1279
    case STREAM_MSG_USER_RECALC:{
×
1280
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
×
1281
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
×
1282
      
1283
      for (int32_t i = 0; i < recalcNum; ++i) {
×
1284
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
×
1285
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
×
1286
      }
1287
      break;
×
1288
    }
1289
    default:
×
1290
      break;
×
1291
  }
1292

1293
_exit:
56✔
1294

1295
  return code;
56✔
1296
}
1297

1298
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
56✔
1299
  int32_t code = 0;
56✔
1300
  int32_t lino;
1301

1302
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
56!
1303
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
112!
1304
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
112!
1305
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
56!
1306
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
56!
1307

1308
_exit:
56✔
1309

1310
  return code;
56✔
1311
}
1312

1313

1314
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
65,014✔
1315
  int32_t code = 0;
65,014✔
1316
  int32_t lino;
1317

1318
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
65,014!
1319
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
130,028!
1320
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
65,014✔
1321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
65,014!
1322
  for (int32_t i = 0; i < deployNum; ++i) {
65,594✔
1323
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
580✔
1324
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
580!
1325
  }
1326

1327
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
65,014✔
1328
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
65,014!
1329
  for (int32_t i = 0; i < startNum; ++i) {
65,508✔
1330
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
494✔
1331
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
494!
1332
  }
1333

1334
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
130,028!
1335
  if (!pRsp->undeploy.undeployAll) {
65,014!
1336
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
65,014✔
1337
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
65,014!
1338
    for (int32_t i = 0; i < undeployNum; ++i) {
65,248✔
1339
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
234✔
1340
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
234!
1341
    }
1342
  }
1343

1344
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
65,014✔
1345
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
65,014!
1346
  for (int32_t i = 0; i < rspNum; ++i) {
65,070✔
1347
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
56✔
1348
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
56!
1349
  }
1350
  
1351
_exit:
65,014✔
1352

1353
  tEndEncode(pEncoder);
65,014✔
1354

1355
  return code;
65,014✔
1356
}
1357

1358
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
315✔
1359
  int32_t code = 0;
315✔
1360
  int32_t lino;
1361

1362
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
630!
1363
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
630!
1364
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
630!
1365
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
630!
1366
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
630!
1367
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
630!
1368
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
630!
1369
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
630!
1370
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
630!
1371

1372
_exit:
315✔
1373

1374
  return code;
315✔
1375
}
1376

1377

1378
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
111✔
1379
  int32_t code = 0;
111✔
1380
  int32_t lino;
1381

1382
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
222!
1383
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
222!
1384

1385
_exit:
111✔
1386

1387
  return code;
111✔
1388
}
1389

1390

1391
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
426✔
1392
  int32_t code = 0;
426✔
1393
  int32_t lino;
1394

1395
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
852!
1396
  if (pMsg->triggerReader) {
426✔
1397
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
315!
1398
  } else {
1399
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
111!
1400
  }
1401
  
1402
_exit:
111✔
1403

1404
  return code;
426✔
1405
}
1406

1407

1408
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
975✔
1409
  int32_t code = 0;
975✔
1410
  int32_t lino;
1411

1412
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
1,950!
1413
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
1,950!
1414
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
975!
1415

1416
_exit:
975✔
1417

1418
  return code;
975✔
1419
}
1420

1421

1422
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
675✔
1423
  int32_t code = 0;
675✔
1424
  int32_t lino;
1425

1426
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
675!
1427
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,350!
1428

1429
_exit:
675✔
1430

1431
  return code;
675✔
1432
}
1433

1434

1435
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
226✔
1436
  int32_t code = 0;
226✔
1437
  int32_t lino;
1438

1439
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
452!
1440
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
452!
1441
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
452!
1442
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
452!
1443
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
452!
1444
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
452!
1445
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->hasPartitionBy));
452!
1446
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
452!
1447
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
452!
1448

1449
  int32_t addrSize = 0;
226✔
1450
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
226!
1451
  if (addrSize > 0) {
226✔
1452
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
9✔
1453
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
9!
1454
  }
1455
  for (int32_t i = 0; i < addrSize; ++i) {
235✔
1456
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
9✔
1457
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
9!
1458
  }
1459
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
452!
1460
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
452!
1461
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
452!
1462

1463
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
452!
1464
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
452!
1465
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
452!
1466
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
452!
1467

1468
  switch (pMsg->triggerType) {
226!
1469
    case WINDOW_TYPE_SESSION:
3✔
1470
      // session trigger
1471
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
6!
1472
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
6!
1473
      break;
3✔
1474
    case WINDOW_TYPE_STATE:
103✔
1475
      // state trigger
1476
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
206!
1477
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
206!
1478
      break;
103✔
1479
    
1480
    case WINDOW_TYPE_INTERVAL:
92✔
1481
      // slide trigger
1482
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
184!
1483
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
184!
1484
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
184!
1485
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
184!
1486
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
184!
1487
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
184!
1488
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
184!
1489
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
184!
1490
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
184!
1491
      break;
92✔
1492
    
1493
    case WINDOW_TYPE_EVENT:
14✔
1494
      // event trigger
1495
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
28!
1496
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
28!
1497
      
1498
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
28!
1499
      break;
14✔
1500
    
1501
    case WINDOW_TYPE_COUNT:
9✔
1502
      // count trigger
1503
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
18!
1504
      
1505
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
18!
1506
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
18!
1507
      break;
9✔
1508
    
1509
    case WINDOW_TYPE_PERIOD:
5✔
1510
      // period trigger
1511
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
10!
1512
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
10!
1513
      break;
5✔
1514
    default:
×
1515
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1516
      break;
×
1517
  }
1518

1519
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
452!
1520
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
452!
1521
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
452!
1522
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
452!
1523
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
452!
1524
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
452!
1525
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
452!
1526

1527
  int32_t readerNum = 0;
226✔
1528
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
226!
1529
  if (readerNum > 0) {
226✔
1530
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
225✔
1531
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
225!
1532
  }
1533
  for (int32_t i = 0; i < readerNum; ++i) {
526✔
1534
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
300✔
1535
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
300!
1536
  }
1537

1538
  int32_t runnerNum = 0;
226✔
1539
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
226!
1540
  if (runnerNum > 0) {
226✔
1541
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
225✔
1542
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
225!
1543
  }
1544
  for (int32_t i = 0; i < runnerNum; ++i) {
901✔
1545
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
675✔
1546
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
675!
1547
  }
1548

1549
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
452!
1550
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
452!
1551

1552
_exit:
226✔
1553

1554
  return code;
226✔
1555
}
1556

1557

1558

1559
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
5,934✔
1560
  int32_t code = 0;
5,934✔
1561
  int32_t lino;
1562

1563
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
5,934!
1564
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
11,868!
1565
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
11,868!
1566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
11,868!
1567
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
11,868!
1568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
11,868!
1569

1570
_exit:
5,934✔
1571

1572
  return code;
5,934✔
1573
}
1574

1575
void destroySStreamOutCols(void* p){
174✔
1576
  if (p == NULL) return;
174!
1577
  SStreamOutCol* col = (SStreamOutCol*)p;
174✔
1578
  taosMemoryFreeClear(col->expr);
174!
1579
}
1580

1581
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
678✔
1582
  int32_t code = 0;
678✔
1583
  int32_t lino;
1584

1585
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,356!
1586
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1,356!
1587
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
1,356!
1588
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
1,356!
1589
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
1,356!
1590
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
1,356!
1591
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
1,356!
1592
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
1,356!
1593

1594
  int32_t addrSize = 0;
678✔
1595
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
678!
1596
  if (addrSize > 0) {
678✔
1597
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
24✔
1598
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
24!
1599
  }
1600
  for (int32_t i = 0; i < addrSize; ++i) {
702✔
1601
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
24✔
1602
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
24!
1603
  }
1604
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
1,356!
1605

1606
  int32_t outColNum = 0;
678✔
1607
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
678!
1608
  if (outColNum > 0) {
678!
1609
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
678✔
1610
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
678!
1611
  }
1612
  for (int32_t i = 0; i < outColNum; ++i) {
3,458✔
1613
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
2,780✔
1614
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
2,780!
1615
  }
1616

1617
  int32_t outTagNum = 0;
678✔
1618
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
678!
1619
  if (outTagNum > 0) {
678✔
1620
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
368✔
1621
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
368!
1622
  }
1623
  for (int32_t i = 0; i < outTagNum; ++i) {
1,361✔
1624
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
683✔
1625
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
683!
1626
  }
1627

1628
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
1,356!
1629
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
1,356!
1630

1631
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
1,356!
1632
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
1,356!
1633

1634
  int32_t forceOutColsSize = 0;
678✔
1635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
678!
1636
  if (forceOutColsSize > 0) {
678✔
1637
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
27✔
1638
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
27!
1639
  }
1640
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
852✔
1641
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
174✔
1642

1643
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
348!
1644
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
348!
1645
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
348!
1646
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
348!
1647
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
348!
1648
  }
1649

1650
_exit:
678✔
1651

1652
  return code;
678✔
1653
}
1654

1655
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
1,330✔
1656
  int32_t code = 0;
1,330✔
1657
  int32_t lino;
1658

1659
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1,330!
1660
  switch (pTask->task.type) {
1,330!
1661
    case STREAM_READER_TASK:
426✔
1662
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
426!
1663
      break;
426✔
1664
    case STREAM_TRIGGER_TASK:
226✔
1665
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
226!
1666
      break;
226✔
1667
    case STREAM_RUNNER_TASK:
678✔
1668
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
678!
1669
      break;
678✔
1670
    default:
×
1671
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1672
      break;
×
1673
  }
1674
  
1675
_exit:
1,330✔
1676

1677
  return code;
1,330✔
1678
}
1679

1680

1681
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
283✔
1682
  int32_t code = 0;
283✔
1683
  int32_t lino;
1684

1685
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
566!
1686

1687
  int32_t readerNum = 0;
283✔
1688
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
283!
1689
  if (readerNum > 0) {
283✔
1690
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
241✔
1691
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
241!
1692
  }
1693
  for (int32_t i = 0; i < readerNum; ++i) {
709✔
1694
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
426✔
1695
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
426!
1696
  }
1697

1698
  int32_t triggerTask = 0;
283✔
1699
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
283!
1700
  if (triggerTask) {
283✔
1701
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
226!
1702
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
226!
1703
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
226!
1704
  }
1705
  
1706
  int32_t runnerNum = 0;
283✔
1707
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
283!
1708
  if (runnerNum > 0) {
283✔
1709
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
260✔
1710
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
260!
1711
  }
1712
  for (int32_t i = 0; i < runnerNum; ++i) {
961✔
1713
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
678✔
1714
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
678!
1715
  }
1716

1717
_exit:
283✔
1718

1719
  return code;
283✔
1720
}
1721

1722

1723
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
246✔
1724
  int32_t code = 0;
246✔
1725
  int32_t lino;
1726

1727
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
246!
1728

1729
_exit:
246✔
1730

1731
  return code;
246✔
1732
}
1733

1734

1735
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
246✔
1736
  int32_t code = 0;
246✔
1737
  int32_t lino;
1738

1739
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
246!
1740
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
246!
1741

1742
_exit:
246✔
1743

1744
  return code;
246✔
1745
}
1746

1747

1748
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
111✔
1749
  int32_t code = 0;
111✔
1750
  int32_t lino;
1751

1752
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
111!
1753
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
222!
1754
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
222!
1755

1756
_exit:
111✔
1757

1758
  return code;
111✔
1759
}
1760

1761

1762
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
111✔
1763
  int32_t code = 0;
111✔
1764
  int32_t lino;
1765

1766
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
111!
1767
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
111!
1768

1769
_exit:
111✔
1770

1771
  return code;
111✔
1772
}
1773

1774
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
×
1775
  int32_t code = 0;
×
1776
  int32_t lino;
1777

1778
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
×
1779
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
×
1780
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
×
1781

1782
_exit:
×
1783

1784
  return code;
×
1785
}
1786

1787
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
28✔
1788
  int32_t code = 0;
28✔
1789
  int32_t lino;
1790

1791
  switch (msgType) {
28!
1792
    case STREAM_MSG_ORIGTBL_READER_INFO: {
28✔
1793
      int32_t vgNum = 0;
28✔
1794
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
28!
1795
      if (vgNum > 0) {
28!
1796
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
28✔
1797
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
28!
1798
      }
1799
      for (int32_t i = 0; i < vgNum; ++i) {
88✔
1800
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
60✔
1801
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
60!
1802
      }
1803

1804
      int32_t readerNum = 0;
28✔
1805
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
28!
1806
      if (readerNum > 0) {
28!
1807
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
×
1808
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
×
1809
      }
1810
      for (int32_t i = 0; i < readerNum; ++i) {
28!
1811
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
×
1812
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
×
1813
      }
1814
      break;
28✔
1815
    }
1816
    case STREAM_MSG_UPDATE_RUNNER: {
×
1817
      int32_t runnerNum = 0;
×
1818
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1819
      if (runnerNum > 0) {
×
1820
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1821
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1822
      }
1823
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1824
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1825
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1826
      }
1827
      break;
×
1828
    }
1829
    case STREAM_MSG_USER_RECALC: {
×
1830
      int32_t recalcNum = 0;
×
1831
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
×
1832
      if (recalcNum > 0) {
×
1833
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
×
1834
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
×
1835
      }
1836
      for (int32_t i = 0; i < recalcNum; ++i) {
×
1837
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
×
1838
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
×
1839
      }
1840
      break;
×
1841
    }
1842
    default:
×
1843
      break;
×
1844
  }
1845

1846
_exit:
28✔
1847

1848
  return code;
28✔
1849
}
1850

1851

1852
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
28✔
1853
  int32_t code = 0;
28✔
1854
  int32_t lino;
1855

1856
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
28!
1857
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
56!
1858
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
56!
1859
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
28!
1860
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
28!
1861

1862
_exit:
28✔
1863

1864
  return code;
28✔
1865
}
1866

1867
void tFreeSStreamMgmtRsp(void* param) {
56✔
1868
  if (NULL == param) {
56!
1869
    return;
×
1870
  }
1871
  
1872
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
56✔
1873

1874
  taosArrayDestroy(pRsp->cont.vgIds);
56✔
1875
  taosArrayDestroy(pRsp->cont.readerList);
56✔
1876
  taosArrayDestroy(pRsp->cont.runnerList);
56✔
1877
  taosArrayDestroy(pRsp->cont.recalcList);
56✔
1878
}
1879

1880
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
426✔
1881
  if (NULL == pReader) {
426!
1882
    return;
×
1883
  }
1884
  
1885
  if (pReader->triggerReader) {
426✔
1886
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
315✔
1887
    taosMemoryFree(pMsg->triggerTblName);
315!
1888
    taosMemoryFree(pMsg->partitionCols);
315!
1889
    taosMemoryFree(pMsg->triggerCols);
315!
1890
    taosMemoryFree(pMsg->triggerScanPlan);
315!
1891
    taosMemoryFree(pMsg->calcCacheScanPlan);
315!
1892
  } else {
1893
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
111✔
1894
    taosMemoryFree(pMsg->calcScanPlan);
111!
1895
  }
1896
}
1897

1898
void tFreeStreamNotifyUrl(void* param) {
×
1899
  if (NULL == param) {
×
1900
    return;
×
1901
  }
1902

1903
  taosMemoryFree(*(void**)param);
×
1904
}
1905

1906
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
226✔
1907
  if (NULL == pTrigger) {
226!
1908
    return;
×
1909
  }
1910
  
1911
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
226✔
1912
  switch (pTrigger->triggerType) {
226✔
1913
    case WINDOW_TYPE_EVENT:
14✔
1914
      taosMemoryFree(pTrigger->trigger.event.startCond);
14!
1915
      taosMemoryFree(pTrigger->trigger.event.endCond);
14!
1916
      break;
14✔
1917
    case WINDOW_TYPE_COUNT:
9✔
1918
      taosMemoryFree(pTrigger->trigger.count.condCols);  
9!
1919
      break;
9✔
1920
    default:
203✔
1921
      break;
203✔
1922
  }
1923

1924
  taosMemoryFree(pTrigger->triggerPrevFilter);
226!
1925
  taosMemoryFree(pTrigger->triggerScanPlan);
226!
1926
  taosMemoryFree(pTrigger->calcCacheScanPlan);
226!
1927

1928
  taosArrayDestroy(pTrigger->readerList);
226✔
1929
  taosArrayDestroy(pTrigger->runnerList);
226✔
1930
  taosMemoryFree(pTrigger->streamName);
226!
1931
}
1932

1933
void tFreeSStreamOutCol(void* param) {
×
1934
  if (NULL == param) {
×
1935
    return;
×
1936
  }
1937

1938
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1939
  taosMemoryFree(pOut->expr);
×
1940
}
1941

1942
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
678✔
1943
  if (NULL == pRunner) {
678!
1944
    return;
×
1945
  }
1946

1947
  taosMemoryFree(pRunner->streamName);
678!
1948
  taosMemoryFree(pRunner->pPlan);
678!
1949
  taosMemoryFree(pRunner->outDBFName);
678!
1950
  taosMemoryFree(pRunner->outTblName);
678!
1951

1952
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
678✔
1953
  taosArrayDestroy(pRunner->outCols);
678✔
1954
  taosArrayDestroy(pRunner->outTags);
678✔
1955

1956
  taosMemoryFree(pRunner->subTblNameExpr);
678!
1957
  taosMemoryFree(pRunner->tagValueExpr);
678!
1958
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
678✔
1959
}
1960

1961
void tFreeSStmTaskDeploy(void* param) {
1,677✔
1962
  if (NULL == param) {
1,677✔
1963
    return;
347✔
1964
  }
1965

1966
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
1,330✔
1967
  switch (pTask->task.type)  {
1,330!
1968
    case STREAM_READER_TASK:
426✔
1969
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
426✔
1970
      break;
426✔
1971
    case STREAM_TRIGGER_TASK:
226✔
1972
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
226✔
1973
      break;
226✔
1974
    case STREAM_RUNNER_TASK:
678✔
1975
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
678✔
1976
      break;
678✔
1977
    default:
×
1978
      break;
×
1979
  }
1980
}
1981

1982
void tFreeSStmStreamDeploy(void* param) {
290✔
1983
  if (NULL == param) {
290!
1984
    return;
×
1985
  }
1986
  
1987
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
290✔
1988
  taosArrayDestroy(pDeploy->readerTasks);
290✔
1989
  if (pDeploy->triggerTask) {
290✔
1990
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
228✔
1991
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
228✔
1992
    taosMemoryFree(pDeploy->triggerTask);
228!
1993
  }
1994

1995
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
290✔
1996
  for (int32_t i = 0; i < runnerNum; ++i) {
977✔
1997
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
687✔
1998
    taosMemoryFree(pRunner->msg.runner.pPlan);
687!
1999
  }
2000
  taosArrayDestroy(pDeploy->runnerTasks);
290✔
2001
}
2002

2003
void tDeepFreeSStmStreamDeploy(void* param) {
573✔
2004
  if (NULL == param) {
573!
2005
    return;
×
2006
  }
2007
  
2008
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
573✔
2009
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
573✔
2010
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
573✔
2011
  taosMemoryFree(pDeploy->triggerTask);
573!
2012
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
573✔
2013
}
2014

2015

2016
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
66,406✔
2017
  if (NULL == pRsp) {
66,406!
2018
    return;
×
2019
  }
2020
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
66,406✔
2021
  taosArrayDestroy(pRsp->start.taskList);
66,406✔
2022
  taosArrayDestroy(pRsp->undeploy.taskList);
66,406✔
2023
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
66,406✔
2024
}
2025

2026
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
32,401✔
2027
  if (NULL == pRsp) {
32,401!
2028
    return;
×
2029
  }
2030
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
32,401✔
2031
  taosArrayDestroy(pRsp->start.taskList);
32,401✔
2032
  taosArrayDestroy(pRsp->undeploy.taskList);
32,401✔
2033
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
32,401✔
2034
}
2035

2036

2037

2038
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
32,401✔
2039
  int32_t code = 0;
32,401✔
2040
  int32_t lino;
2041

2042
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
32,401!
2043
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
64,802!
2044
  int32_t deployNum = 0;
32,401✔
2045
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
32,401!
2046
  if (deployNum > 0) {
32,401✔
2047
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
87✔
2048
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
87!
2049
  }
2050
  for (int32_t i = 0; i < deployNum; ++i) {
32,684✔
2051
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
283✔
2052
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
283!
2053
  }
2054

2055
  int32_t startNum = 0;
32,401✔
2056
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
32,401!
2057
  if (startNum > 0) {
32,401✔
2058
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
125✔
2059
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
125!
2060
  }
2061
  for (int32_t i = 0; i < startNum; ++i) {
32,647✔
2062
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
246✔
2063
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
246!
2064
  }
2065

2066
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
64,802!
2067
  if (!pRsp->undeploy.undeployAll) {
32,401!
2068
    int32_t undeployNum = 0;
32,401✔
2069
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
32,401!
2070
    if (undeployNum > 0) {
32,401✔
2071
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
38✔
2072
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
38!
2073
    }
2074
    for (int32_t i = 0; i < undeployNum; ++i) {
32,512✔
2075
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
111✔
2076
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
111!
2077
    }
2078
  }  
2079

2080
  int32_t rspNum = 0;
32,401✔
2081
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
32,401!
2082
  if (rspNum > 0) {
32,401✔
2083
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
12✔
2084
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
12!
2085
    for (int32_t i = 0; i < rspNum; ++i) {
40✔
2086
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
28✔
2087
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
28!
2088
    }
2089
  }
2090

2091
  tEndDecode(pDecoder);
32,401✔
2092

2093
_exit:
32,401✔
2094
  return code;
32,401✔
2095
}
2096

2097
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
2098
  int32_t code = 0;
×
2099
  int32_t lino;
2100

2101
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2102
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2103
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
2104
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
2105
  tEndEncode(pEncoder);
×
2106

2107
_exit:
×
2108
  return code;
×
2109
}
2110

2111
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
2112
  int32_t code = 0;
×
2113
  int32_t lino;
2114

2115
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2116
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2117
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
2118
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
2119
  tEndDecode(pDecoder);
×
2120

2121
_exit:
×
2122
  return code;
×
2123
}
2124

2125
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
2126
  int32_t code = 0;
×
2127
  int32_t lino;
2128

2129
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2130
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2131
  tEndEncode(pEncoder);
×
2132

2133
_exit:
×
2134
  return code;
×
2135
}
2136

2137
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
2138
  int32_t code = 0;
×
2139
  int32_t lino;
2140

2141
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2142
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2143
  tEndDecode(pDecoder);
×
2144

2145
_exit:
×
2146
  return code;
×
2147

2148
}
2149

2150

2151
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
1,402✔
2152
  int32_t code = 0;
1,402✔
2153
  int32_t lino;
2154

2155
  // name part
2156
  int32_t sqlLen = pReq->sql == NULL ? 0 : (int32_t)strlen(pReq->sql) + 1;
1,402!
2157
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
1,402!
2158
  int32_t outDbLen = pReq->outDB == NULL ? 0 : (int32_t)strlen(pReq->outDB) + 1;
1,402✔
2159
  int32_t streamDBLen = pReq->streamDB == NULL ? 0 : (int32_t)strlen(pReq->streamDB) + 1;
1,402!
2160
  int32_t triggerDBLen = pReq->triggerDB == NULL ? 0 : (int32_t)strlen(pReq->triggerDB) + 1;
1,402✔
2161
  int32_t triggerTblNameLen = pReq->triggerTblName == NULL ? 0 : (int32_t)strlen(pReq->triggerTblName) + 1;
1,402✔
2162
  int32_t outTblNameLen = pReq->outTblName == NULL ? 0 : (int32_t)strlen(pReq->outTblName) + 1;
1,402✔
2163

2164
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
2,804!
2165

2166
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->name, nameLen));
2,804!
2167
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->sql, sqlLen));
2,804!
2168
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outDB, outDbLen));
2,804!
2169
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->streamDB, streamDBLen));
2,804!
2170
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerDB, triggerDBLen));
2,804!
2171
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerTblName, triggerTblNameLen));
2,804!
2172
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outTblName, outTblNameLen));
2,804!
2173

2174
  int32_t calcDbSize = (int32_t)taosArrayGetSize(pReq->calcDB);
1,402✔
2175
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcDbSize));
1,402!
2176
  for (int32_t i = 0; i < calcDbSize; ++i) {
2,800✔
2177
    const char *dbName = taosArrayGetP(pReq->calcDB, i);
1,398✔
2178
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, dbName)));
1,398!
2179
  }
2180

2181
  // trigger control part
2182
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igExists));
2,804!
2183
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerType));
2,804!
2184
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igDisorder));
2,804!
2185
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteReCalc));
2,804!
2186
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteOutTbl));
2,804!
2187
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistory));
2,804!
2188
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistoryFirst));
2,804!
2189
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->calcNotifyOnly));
2,804!
2190
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->lowLatencyCalc));
2,804!
2191
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igNoDataTrigger));
2,804!
2192

2193
  // notify part
2194
  int32_t addrSize = (int32_t)taosArrayGetSize(pReq->pNotifyAddrUrls);
1,402✔
2195
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
1,402!
2196
  for (int32_t i = 0; i < addrSize; ++i) {
1,482✔
2197
    const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
80✔
2198
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, url)));
80!
2199
  }
2200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyEventTypes));
2,804!
2201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyErrorHandle));
2,804!
2202
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->notifyHistory));
2,804!
2203

2204
  // out table part
2205

2206
  // trigger cols and partition cols
2207
  int32_t filterColsLen = pReq->triggerFilterCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerFilterCols) + 1;
1,402✔
2208
  int32_t triggerColsLen = pReq->triggerCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerCols) + 1;
1,402✔
2209
  int32_t partitionColsLen = pReq->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pReq->partitionCols) + 1;
1,402✔
2210
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerFilterCols, filterColsLen));
2,804!
2211
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerCols, triggerColsLen));
2,804!
2212
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->partitionCols, partitionColsLen));
2,804!
2213

2214
  // out col
2215
  int32_t outColSize = (int32_t )taosArrayGetSize(pReq->outCols);
1,402✔
2216
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColSize));
1,402!
2217
  for (int32_t i = 0; i < outColSize; ++i) {
6,350✔
2218
    SFieldWithOptions *pField = taosArrayGet(pReq->outCols, i);
4,948✔
2219
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pField));
4,948!
2220
  }
2221

2222
  // out tag
2223
  int32_t outTagSize = (int32_t )taosArrayGetSize(pReq->outTags);
1,402✔
2224
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagSize));
1,402!
2225
  for (int32_t i = 0; i < outTagSize; ++i) {
2,502✔
2226
    SField *pField = taosArrayGet(pReq->outTags, i);
1,100✔
2227
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->type));
2,200!
2228
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
2,200!
2229
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
2,200!
2230
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
2,200!
2231
  }
2232

2233
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->maxDelay));
2,804!
2234
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->fillHistoryStartTime));
2,804!
2235
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->watermark));
2,804!
2236
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->expiredTime));
2,804!
2237

2238
  switch (pReq->triggerType) {
1,402!
2239
    case WINDOW_TYPE_SESSION: {
28✔
2240
      // session trigger
2241
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.session.slotId));
56!
2242
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.session.sessionVal));
56!
2243
      break;
28✔
2244
    }
2245
    case WINDOW_TYPE_STATE: {
452✔
2246
      // state trigger
2247
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.stateWin.slotId));
904!
2248
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.stateWin.trueForDuration));
904!
2249
      break;
452✔
2250
    }
2251
    case WINDOW_TYPE_INTERVAL: {
718✔
2252
      // slide trigger
2253
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.intervalUnit));
1,436!
2254
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.slidingUnit));
1,436!
2255
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.offsetUnit));
1,436!
2256
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.soffsetUnit));
1,436!
2257
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.precision));
1,436!
2258
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.interval));
1,436!
2259
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.offset));
1,436!
2260
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.sliding));
1,436!
2261
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.soffset));
1,436!
2262
      break;
718✔
2263
    }
2264
    case WINDOW_TYPE_EVENT: {
72✔
2265
      // event trigger
2266
      int32_t eventWindowStartCondLen = pReq->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.startCond) + 1;
72!
2267
      int32_t eventWindowEndCondLen = pReq->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.endCond) + 1;
72!
2268

2269
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.startCond, eventWindowStartCondLen));
144!
2270
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.endCond, eventWindowEndCondLen));
144!
2271

2272
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.event.trueForDuration));
144!
2273
      break;
72✔
2274
    }
2275
    case WINDOW_TYPE_COUNT: {
68✔
2276
      // count trigger
2277
      int32_t countWindowCondColsLen = pReq->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.count.condCols) + 1;
68!
2278
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.count.condCols, countWindowCondColsLen));
136!
2279

2280
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.countVal));
136!
2281
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.sliding));
136!
2282
      break;
68✔
2283
    }
2284
    case WINDOW_TYPE_PERIOD: {
64✔
2285
      // period trigger
2286
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.precision));
128!
2287
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.periodUnit));
128!
2288
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.offsetUnit));
128!
2289
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.period));
128!
2290
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.offset));
128!
2291
      break;
64✔
2292
    }
2293
  }
2294

2295
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerTblType));
2,804!
2296
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblUid));
2,804!
2297
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblSuid));
2,804!
2298
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->vtableCalc));
2,804!
2299
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outTblType));
2,804!
2300
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outStbExists));
2,804!
2301
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->outStbUid));
2,804!
2302
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outStbSversion));
2,804!
2303
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->eventTypes));
2,804!
2304
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->flags));
2,804!
2305
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->tsmaId));
2,804!
2306
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->placeHolderBitmap));
2,804!
2307
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->calcTsSlotId));
2,804!
2308
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->triTsSlotId));
2,804!
2309

2310
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->triggerTblVgId));
2,804!
2311
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outTblVgId));
2,804!
2312

2313
  int32_t triggerScanPlanLen = pReq->triggerScanPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerScanPlan) + 1;
1,402✔
2314
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerScanPlan, triggerScanPlanLen));
2,804!
2315

2316
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerHasPF));
2,804!
2317
  int32_t triggerFilterLen = pReq->triggerPrevFilter == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerPrevFilter) + 1;
1,402✔
2318
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerPrevFilter, triggerFilterLen));
2,804!
2319

2320
  int32_t calcScanPlanListSize = (int32_t)taosArrayGetSize(pReq->calcScanPlanList);
1,402✔
2321
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcScanPlanListSize));
1,402!
2322
  for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
2,812✔
2323
    SStreamCalcScan* pCalcScanPlan = (SStreamCalcScan*)taosArrayGet(pReq->calcScanPlanList, i);
1,410✔
2324
    int32_t          vgListSize = (int32_t)taosArrayGetSize(pCalcScanPlan->vgList);
1,410✔
2325
    int32_t          scanPlanLen = pCalcScanPlan->scanPlan == NULL ? 0 : (int32_t)strlen((char*)pCalcScanPlan->scanPlan) + 1;
1,410!
2326
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgListSize));
1,410!
2327
    for (int32_t j = 0; j < vgListSize; ++j) {
2,820✔
2328
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pCalcScanPlan->vgList, j)));
2,820!
2329
    }
2330
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pCalcScanPlan->readFromCache));
2,820!
2331
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCalcScanPlan->scanPlan, scanPlanLen));
2,820!
2332
  }
2333

2334
  int32_t calcPlanLen = pReq->calcPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->calcPlan) + 1;
1,402✔
2335
  int32_t subTblNameExprLen = pReq->subTblNameExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->subTblNameExpr) + 1;
1,402✔
2336
  int32_t tagValueExprLen = pReq->tagValueExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->tagValueExpr) + 1;
1,402✔
2337

2338
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfCalcSubplan));
2,804!
2339
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->calcPlan, calcPlanLen));
2,804!
2340
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->subTblNameExpr, subTblNameExprLen));
2,804!
2341
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->tagValueExpr, tagValueExprLen));
2,804!
2342

2343
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pReq->forceOutCols);
1,402✔
2344
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
1,402!
2345
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,634✔
2346
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pReq->forceOutCols, i);
232✔
2347
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
232!
2348

2349
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
464!
2350
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
464!
2351
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
464!
2352
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
464!
2353
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
464!
2354
  }
2355

2356
_exit:
1,402✔
2357

2358
  if (code) {
1,402!
2359
    return code;
×
2360
  }
2361
  
2362
  return 0;
1,402✔
2363
}
2364

2365
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
484✔
2366
  SEncoder encoder = {0};
484✔
2367
  tEncoderInit(&encoder, buf, bufLen);
484✔
2368
  int32_t code = 0;
484✔
2369
  int32_t lino;
2370

2371
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
484!
2372

2373
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
484!
2374

2375
  tEndEncode(&encoder);
484✔
2376

2377
_exit:
484✔
2378
  if (code) {
484!
2379
    tEncoderClear(&encoder);
×
2380
    return code;
×
2381
  } else {
2382
    int32_t tlen = encoder.pos;
484✔
2383
    tEncoderClear(&encoder);
484✔
2384
    return tlen;
484✔
2385
  }
2386
  return 0;
2387
}
2388

2389

2390
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
656✔
2391
  int32_t code = 0;
656✔
2392
  int32_t lino;
2393

2394
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
1,312!
2395

2396
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
1,312!
2397
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
1,312!
2398
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
1,312!
2399
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
1,312!
2400
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
1,312!
2401
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
1,312!
2402
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
1,312!
2403

2404
  int32_t calcDbSize = 0;
656✔
2405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
656!
2406
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
656✔
2407
  if (pReq->calcDB == NULL) {
656!
2408
    TAOS_CHECK_EXIT(terrno);
×
2409
  }
2410
  for (int32_t i = 0; i < calcDbSize; ++i) {
1,310✔
2411
    char *calcDb = NULL;
654✔
2412
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
654!
2413
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
654!
2414
    if (calcDb == NULL) {
654!
2415
      TAOS_CHECK_EXIT(terrno);
×
2416
    }
2417
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
1,308!
2418
      taosMemoryFree(calcDb);
×
2419
      TAOS_CHECK_EXIT(terrno);
×
2420
    }
2421
  }
2422

2423
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
1,312!
2424
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
1,312!
2425
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
1,312!
2426
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
1,312!
2427
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
1,312!
2428
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
1,312!
2429
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
1,312!
2430
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
1,312!
2431
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
1,312!
2432
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
1,312!
2433

2434
  int32_t addrSize = 0;
656✔
2435
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
656!
2436
  if (addrSize > 0) {
656✔
2437
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
24✔
2438
    if (pReq->pNotifyAddrUrls == NULL) {
24!
2439
      TAOS_CHECK_EXIT(terrno);
×
2440
    }
2441
  }
2442
  for (int32_t i = 0; i < addrSize; ++i) {
685✔
2443
    char *url = NULL;
29✔
2444
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
29!
2445
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
29!
2446
    if (url == NULL) {
29!
2447
      TAOS_CHECK_EXIT(terrno);
×
2448
    }
2449
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
58!
2450
      taosMemoryFree(url);
×
2451
      TAOS_CHECK_EXIT(terrno);
×
2452
    }
2453
  }
2454
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
1,312!
2455
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyErrorHandle));
1,312!
2456
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
1,312!
2457

2458
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
1,312!
2459
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
1,312!
2460
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
1,312!
2461

2462
  int32_t outColSize = 0;
656✔
2463
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
656!
2464
  if (outColSize > 0) {
656✔
2465
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
654✔
2466
    if (pReq->outCols == NULL) {
654!
2467
      TAOS_CHECK_EXIT(terrno);
×
2468
    }
2469

2470
    for (int32_t i = 0; i < outColSize; ++i) {
3,125✔
2471
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
2,471✔
2472
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
2,471!
2473
    }
2474
  }
2475

2476
  int32_t outTagSize = 0;
656✔
2477
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
656!
2478
  if (outTagSize > 0) {
656✔
2479
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
336✔
2480
    if (pReq->outTags == NULL) {
336!
2481
      TAOS_CHECK_EXIT(terrno);
×
2482
    }
2483

2484
    for (int32_t i = 0; i < outTagSize; ++i) {
905✔
2485
      SFieldWithOptions field = {0};
569✔
2486
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
569!
2487
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
569!
2488
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
569!
2489
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
569!
2490
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
1,138!
2491
        TAOS_CHECK_EXIT(terrno);
×
2492
      }
2493
    }
2494
  }
2495

2496
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
1,312!
2497
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
1,312!
2498
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
1,312!
2499
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
1,312!
2500

2501
  switch (pReq->triggerType) {
656!
2502
    case WINDOW_TYPE_SESSION: {
10✔
2503
      // session trigger
2504
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
20!
2505
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
20!
2506
      break;
10✔
2507
    }
2508
      case WINDOW_TYPE_STATE: {
224✔
2509
        // state trigger
2510
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
448!
2511
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
448!
2512
        break;
224✔
2513
      }
2514
      case WINDOW_TYPE_INTERVAL: {
342✔
2515
        // slide trigger
2516
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
684!
2517
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
684!
2518
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
684!
2519
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
684!
2520
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
684!
2521
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
684!
2522
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
684!
2523
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
684!
2524
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
684!
2525
        break;
342✔
2526
      }
2527
      case WINDOW_TYPE_EVENT: {
32✔
2528
        // event trigger
2529
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
64!
2530
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
64!
2531
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
64!
2532
        break;
32✔
2533
      }
2534
      case WINDOW_TYPE_COUNT: {
26✔
2535
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
52!
2536

2537
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
52!
2538
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
52!
2539
        break;
26✔
2540
      }
2541
      case WINDOW_TYPE_PERIOD: {
22✔
2542
        // period trigger
2543
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
44!
2544
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
44!
2545
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
44!
2546
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
44!
2547
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
44!
2548
        break;
22✔
2549
      }
2550
      default:
×
2551
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2552
  }
2553

2554
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
1,312!
2555
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
1,312!
2556
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
1,312!
2557
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
1,312!
2558
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
1,312!
2559
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
1,312!
2560
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
1,312!
2561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
1,312!
2562
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
1,312!
2563
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
1,312!
2564
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
1,312!
2565
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
1,312!
2566
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
1,312!
2567
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
1,312!
2568

2569
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
1,312!
2570
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
1,312!
2571

2572
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
1,312!
2573

2574
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
1,312!
2575
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
1,312!
2576

2577
  int32_t calcScanPlanListSize = 0;
656✔
2578
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
656!
2579
  if (calcScanPlanListSize > 0) {
656✔
2580
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
654✔
2581
    if (pReq->calcScanPlanList == NULL) {
654!
2582
      TAOS_CHECK_EXIT(terrno);
×
2583
    }
2584
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
1,314✔
2585
      SStreamCalcScan calcScan = {0};
660✔
2586
      int32_t         vgListSize = 0;
660✔
2587
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
660!
2588
      if (vgListSize > 0) {
660!
2589
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
660✔
2590
        if (calcScan.vgList == NULL) {
660!
2591
          TAOS_CHECK_EXIT(terrno);
×
2592
        }
2593
        for (int32_t j = 0; j < vgListSize; ++j) {
1,320✔
2594
          int32_t vgId = 0;
660✔
2595
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
660!
2596
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
1,320!
2597
            TAOS_CHECK_EXIT(terrno);
×
2598
          }
2599
        }
2600
      }
2601
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
660!
2602
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
660!
2603
      taosArrayPush(pReq->calcScanPlanList, &calcScan);
660✔
2604
    }
2605
  }
2606

2607
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
1,312!
2608
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
1,312!
2609
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
1,312!
2610
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
1,312!
2611

2612
  int32_t forceOutColsSize = 0;
656✔
2613
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
656!
2614
  if (forceOutColsSize > 0) {
656✔
2615
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
18✔
2616
    if (pReq->forceOutCols == NULL) {
18!
2617
      TAOS_CHECK_EXIT(terrno);
×
2618
    }
2619
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
134✔
2620
      SStreamOutCol outCol = {0};
116✔
2621
      int64_t       exprLen = 0;
116✔
2622
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
116!
2623
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
116!
2624
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
116!
2625
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
116!
2626
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
116!
2627
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
232!
2628
        TAOS_CHECK_EXIT(terrno);
×
2629
      }
2630
    }
2631
  }
2632

2633
_exit:
656✔
2634

2635
  return code;
656✔
2636
}
2637

2638

2639
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
394✔
2640
  SDecoder decoder = {0};
394✔
2641
  tDecoderInit(&decoder, buf, bufLen);
394✔
2642
  int32_t code = 0;
394✔
2643
  int32_t lino;
2644

2645
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
394!
2646
  
2647
  TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImpl(&decoder, pReq));
394!
2648

2649
  tEndDecode(&decoder);
394✔
2650

2651
_exit:
394✔
2652

2653
  tDecoderClear(&decoder);
394✔
2654
  return code;
394✔
2655
}
2656

2657

2658
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
×
2659
  int32_t  code = 0;
×
2660
  int32_t  lino;
2661
  int32_t  tlen;
2662
  SEncoder encoder = {0};
×
2663
  tEncoderInit(&encoder, buf, bufLen);
×
2664

2665
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2666

2667
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2668
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2669
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2670

2671
  tEndEncode(&encoder);
×
2672

2673
_exit:
×
2674
  if (code) {
×
2675
    tlen = code;
×
2676
  } else {
2677
    tlen = encoder.pos;
×
2678
  }
2679
  tEncoderClear(&encoder);
×
2680
  return tlen;
×
2681
}
2682

2683
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
29✔
2684
  SDecoder decoder = {0};
29✔
2685
  int32_t  code = 0;
29✔
2686
  int32_t  lino;
2687
  tDecoderInit(&decoder, buf, bufLen);
29✔
2688

2689
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
29!
2690
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
58!
2691
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
58!
2692

2693
  tEndDecode(&decoder);
29✔
2694

2695
_exit:
29✔
2696
  tDecoderClear(&decoder);
29✔
2697
  return code;
29✔
2698
}
2699

2700
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
29✔
2701
  taosMemoryFreeClear(pReq->name);
29!
2702
}
29✔
2703

2704
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
1,148✔
2705
  if (pScan == NULL) {
1,148!
2706
    return;
×
2707
  }
2708
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
1,148✔
2709
  taosArrayDestroy(pCalcScan->vgList);
1,148✔
2710
  taosMemoryFreeClear(pCalcScan->scanPlan);
1,148!
2711
}
2712

2713
void tFreeStreamOutCol(void* pCol) {
174✔
2714
  if (pCol == NULL) {
174!
2715
    return;
×
2716
  }
2717
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
174✔
2718
  taosMemoryFreeClear(pOutCol->expr);
174!
2719
}
2720

2721

2722

2723
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
1,584✔
2724
  if (NULL == pReq) {
1,584✔
2725
    return;
273✔
2726
  }
2727
  taosMemoryFreeClear(pReq->name);
1,311!
2728
  taosMemoryFreeClear(pReq->sql);
1,311!
2729
  taosMemoryFreeClear(pReq->streamDB);
1,311!
2730
  taosMemoryFreeClear(pReq->triggerDB);
1,311!
2731
  taosMemoryFreeClear(pReq->outDB);
1,311!
2732
  taosMemoryFreeClear(pReq->triggerTblName);
1,311!
2733
  taosMemoryFreeClear(pReq->outTblName);
1,311!
2734

2735
  taosArrayDestroyP(pReq->calcDB, NULL);
1,311✔
2736
  pReq->calcDB = NULL;
1,311✔
2737
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
1,311✔
2738
  pReq->pNotifyAddrUrls = NULL;
1,311✔
2739

2740
  taosMemoryFreeClear(pReq->triggerFilterCols);
1,311!
2741
  taosMemoryFreeClear(pReq->triggerCols);
1,311!
2742
  taosMemoryFreeClear(pReq->partitionCols);
1,311!
2743

2744
  taosArrayDestroy(pReq->outTags);
1,311✔
2745
  pReq->outTags = NULL;
1,311✔
2746
  taosArrayDestroy(pReq->outCols);
1,311✔
2747
  pReq->outCols = NULL;
1,311✔
2748

2749
  switch (pReq->triggerType) {
1,311✔
2750
    case WINDOW_TYPE_EVENT:
58✔
2751
      taosMemoryFreeClear(pReq->trigger.event.startCond);
58!
2752
      taosMemoryFreeClear(pReq->trigger.event.endCond);
58!
2753
      break;
58✔
2754
    default:
1,253✔
2755
      break;
1,253✔
2756
  }
2757

2758
  taosMemoryFreeClear(pReq->triggerScanPlan);
1,311!
2759
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
1,311✔
2760
  pReq->calcScanPlanList = NULL;
1,311✔
2761
  taosMemoryFreeClear(pReq->triggerPrevFilter);
1,311!
2762

2763
  taosMemoryFreeClear(pReq->calcPlan);
1,311!
2764
  taosMemoryFreeClear(pReq->subTblNameExpr);
1,311!
2765
  taosMemoryFreeClear(pReq->tagValueExpr);
1,311!
2766
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
1,311✔
2767
  pReq->forceOutCols = NULL;
1,311✔
2768
}
2769

2770
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
212✔
2771
  int32_t code = 0, lino = 0;
212✔
2772
  if (NULL == pSrc) {
212!
2773
    return code;
×
2774
  } 
2775

2776
  void* p = NULL;
212✔
2777
  int32_t num = 0;
212✔
2778
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
212!
2779
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
212!
2780

2781
  SCMCreateStreamReq* pDst = *ppDst;
212✔
2782

2783
  if (pSrc->outDB) {
212✔
2784
    pDst->outDB = COPY_STR(pSrc->outDB);
211!
2785
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
211!
2786
  }
2787
  
2788
  if (pSrc->triggerTblName) {
212✔
2789
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
211!
2790
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
211!
2791
  }
2792
  
2793
  if (pSrc->outTblName) {
212✔
2794
    pDst->outTblName = COPY_STR(pSrc->outTblName);
211!
2795
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
211!
2796
  }
2797
  
2798
  if (pSrc->pNotifyAddrUrls) {
212✔
2799
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
9✔
2800
    if (num > 0) {
9!
2801
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
9✔
2802
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
9!
2803
    }
2804
    for (int32_t i = 0; i < num; ++i) {
18✔
2805
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
9!
2806
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
9!
2807
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
18!
2808
    }
2809
  }
2810
  
2811
  if (pSrc->triggerFilterCols) {
212✔
2812
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
24!
2813
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
24!
2814
  }
2815
  
2816
  if (pSrc->triggerCols) {
212✔
2817
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
207!
2818
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
207!
2819
  }
2820
  
2821
  if (pSrc->partitionCols) {
212✔
2822
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
121!
2823
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
121!
2824
  }
2825
  
2826
  if (pSrc->outCols) {
212✔
2827
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
211✔
2828
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
211!
2829
  }
2830
  
2831
  if (pSrc->outTags) {
212✔
2832
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
121✔
2833
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
121!
2834
  }
2835

2836
  pDst->triggerType = pSrc->triggerType;
212✔
2837
  
2838
  switch (pSrc->triggerType) {
212✔
2839
    case WINDOW_TYPE_EVENT:
14✔
2840
      if (pSrc->trigger.event.startCond) {
14!
2841
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
14!
2842
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
14!
2843
      }
2844
      
2845
      if (pSrc->trigger.event.endCond) {
14!
2846
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
14!
2847
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
14!
2848
      }
2849
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
14✔
2850
      break;
14✔
2851
    default:
198✔
2852
      pDst->trigger = pSrc->trigger;
198✔
2853
      break;
198✔
2854
  }
2855

2856

2857
  if (pSrc->triggerScanPlan) {
212✔
2858
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
211!
2859
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
211!
2860
  }
2861
  
2862
  if (pSrc->calcScanPlanList) {
212✔
2863
    num = taosArrayGetSize(pSrc->calcScanPlanList);
211✔
2864
    if (num > 0) {
211!
2865
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
211✔
2866
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
211!
2867
    }
2868
    for (int32_t i = 0; i < num; ++i) {
422✔
2869
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
211✔
2870
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
211✔
2871

2872
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
211✔
2873
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
211!
2874

2875
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
211!
2876
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
211!
2877
      
2878
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
422!
2879
    }
2880
  }
2881
  
2882
  if (pSrc->triggerPrevFilter) {
212✔
2883
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
24!
2884
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
24!
2885
  }
2886
  
2887
  if (pSrc->calcPlan) {
212✔
2888
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
211!
2889
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
211!
2890
  }
2891
  
2892
  if (pSrc->subTblNameExpr) {
212✔
2893
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
121!
2894
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
121!
2895
  }
2896
  
2897
  if (pSrc->tagValueExpr) {
212✔
2898
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
121!
2899
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
121!
2900
  }
2901
  
2902
  if (pSrc->forceOutCols) {
212✔
2903
    num = taosArrayGetSize(pSrc->forceOutCols);
9✔
2904
    if (num > 0) {
9!
2905
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
9✔
2906
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
9!
2907
    }
2908
    for (int32_t i = 0; i < num; ++i) {
67✔
2909
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
58✔
2910
      SStreamOutCol  dcol = {.type = scol->type};
58✔
2911

2912
      dcol.expr = COPY_STR(scol->expr);
58!
2913
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
58!
2914
      
2915
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
116!
2916
    }
2917
  }
2918

2919
  pDst->triggerTblUid = pSrc->triggerTblUid;
212✔
2920
  pDst->triggerTblType = pSrc->triggerTblType;
212✔
2921
  pDst->deleteReCalc = pSrc->deleteReCalc;
212✔
2922
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
212✔
2923
  
2924
_exit:
212✔
2925

2926
  if (code) {
212!
2927
    tFreeSCMCreateStreamReq(pDst);
×
2928
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2929
  }
2930

2931
  return code;
212✔
2932
}
2933

2934

2935
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
×
2936
  int32_t  code = 0;
×
2937
  int32_t  lino;
2938
  int32_t  tlen;
2939
  SEncoder encoder = {0};
×
2940
  tEncoderInit(&encoder, buf, bufLen);
×
2941
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2942

2943
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2944
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2945
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2946
  tEndEncode(&encoder);
×
2947

2948
_exit:
×
2949
  if (code) {
×
2950
    tlen = code;
×
2951
  } else {
2952
    tlen = encoder.pos;
×
2953
  }
2954
  tEncoderClear(&encoder);
×
2955
  return tlen;
×
2956
}
2957

2958
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
21✔
2959
  SDecoder decoder = {0};
21✔
2960
  int32_t  code = 0;
21✔
2961
  int32_t  lino;
2962

2963
  tDecoderInit(&decoder, buf, bufLen);
21✔
2964
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
2965
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
2966
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
42!
2967
  tEndDecode(&decoder);
21✔
2968

2969
_exit:
21✔
2970
  tDecoderClear(&decoder);
21✔
2971
  return code;
21✔
2972
}
2973

2974
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
×
2975
  taosMemoryFreeClear(pReq->name);
×
2976
}
×
2977

2978
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
×
2979
  SEncoder encoder = {0};
×
2980
  int32_t  code = 0;
×
2981
  int32_t  lino;
2982
  int32_t  tlen;
2983
  tEncoderInit(&encoder, buf, bufLen);
×
2984
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2985
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2986
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2987
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2988
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
×
2989
  tEndEncode(&encoder);
×
2990

2991
_exit:
×
2992
  if (code) {
×
2993
    tlen = code;
×
2994
  } else {
2995
    tlen = encoder.pos;
×
2996
  }
2997
  tEncoderClear(&encoder);
×
2998
  return tlen;
×
2999
}
3000

3001
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
21✔
3002
  SDecoder decoder = {0};
21✔
3003
  int32_t  code = 0;
21✔
3004
  int32_t  lino;
3005

3006
  tDecoderInit(&decoder, buf, bufLen);
21✔
3007
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
3008
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
3009
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
42!
3010
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
42!
3011
  tEndDecode(&decoder);
21✔
3012

3013
_exit:
21✔
3014
  tDecoderClear(&decoder);
21✔
3015
  return code;
21✔
3016
}
3017

3018
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
×
3019
  taosMemoryFreeClear(pReq->name);
×
3020
}
×
3021

3022
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
×
3023
  SEncoder encoder = {0};
×
3024
  int32_t  code = 0;
×
3025
  int32_t  lino;
3026
  int32_t  tlen;
3027
  tEncoderInit(&encoder, buf, bufLen);
×
3028
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3029
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
3030
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
3031
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
×
3032
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
×
3033
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
×
3034
  tEndEncode(&encoder);
×
3035

3036
_exit:
×
3037
  if (code) {
×
3038
    tlen = code;
×
3039
  } else {
3040
    tlen = encoder.pos;
×
3041
  }
3042
  tEncoderClear(&encoder);
×
3043
  return tlen;
×
3044
}
3045

3046
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
21✔
3047
  SDecoder decoder = {0};
21✔
3048
  int32_t  code = 0;
21✔
3049
  int32_t  lino;
3050

3051
  tDecoderInit(&decoder, buf, bufLen);
21✔
3052
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21!
3053

3054
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
42!
3055
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
42!
3056
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
42!
3057
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
42!
3058
  tEndDecode(&decoder);
21✔
3059

3060
_exit:
21✔
3061
  tDecoderClear(&decoder);
21✔
3062
  return code;
21✔
3063
}
3064

3065
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
21✔
3066
  taosMemoryFreeClear(pReq->name);
21!
3067
}
21✔
3068

3069
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
×
3070
  int32_t code = 0;
×
3071
  int32_t lino;
3072

3073
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
3074
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
×
3075
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
×
3076

3077
_exit:
×
3078
  return code;
×
3079
}
3080

3081
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
×
3082
  SEncoder encoder = {0};
×
3083
  int32_t  code = 0;
×
3084
  int32_t  lino;
3085
  int32_t  tlen;
3086
  tEncoderInit(&encoder, buf, bufLen);
×
3087

3088
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3089
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
×
3090

3091
  tEndEncode(&encoder);
×
3092

3093
_exit:
×
3094
  if (code) {
×
3095
    tlen = code;
×
3096
  } else {
3097
    tlen = encoder.pos;
×
3098
  }
3099
  tEncoderClear(&encoder);
×
3100
  return tlen;
×
3101
}
3102

3103
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
×
3104
  int32_t code = 0;
×
3105
  int32_t lino;
3106

3107
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
3108
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
×
3109
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
×
3110

3111
_exit:
×
3112
  return code;
×
3113
}
3114

3115
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
×
3116
  SDecoder decoder = {0};
×
3117
  int32_t  code = 0;
×
3118
  int32_t  lino;
3119

3120
  tDecoderInit(&decoder, (char *)buf, bufLen);
×
3121

3122
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3123
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
×
3124

3125
  tEndDecode(&decoder);
×
3126

3127
_exit:
×
3128
  tDecoderClear(&decoder);
×
3129
  return code;
×
3130
}
3131

3132
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
×
3133
  int32_t code = 0;
×
3134
  int32_t lino;
3135

3136
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
3137
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
×
3138
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
×
3139
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
×
3140

3141
_exit:
×
3142
  return code;
×
3143
}
3144

3145
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
×
3146
  SEncoder encoder = {0};
×
3147
  int32_t  code = 0;
×
3148
  int32_t  lino;
3149
  int32_t  tlen;
3150
  tEncoderInit(&encoder, buf, bufLen);
×
3151

3152
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3153
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
×
3154

3155
  tEndEncode(&encoder);
×
3156

3157
_exit:
×
3158
  if (code) {
×
3159
    tlen = code;
×
3160
  } else {
3161
    tlen = encoder.pos;
×
3162
  }
3163
  tEncoderClear(&encoder);
×
3164
  return tlen;
×
3165
}
3166

3167
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
×
3168
  int32_t code = 0;
×
3169
  int32_t lino;
3170

3171
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
3172
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
×
3173
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
×
3174
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
×
3175

3176
_exit:
×
3177
  return code;
×
3178
}
3179

3180
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
×
3181
  SDecoder decoder = {0};
×
3182
  int32_t  code = 0;
×
3183
  int32_t  lino;
3184

3185
  tDecoderInit(&decoder, buf, bufLen);
×
3186

3187
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3188
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
×
3189

3190
  tEndDecode(&decoder);
×
3191

3192
_exit:
×
3193
  tDecoderClear(&decoder);
×
3194
  return code;
×
3195
}
3196

3197
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
55✔
3198
  SEncoder encoder = {0};
55✔
3199
  int32_t  code = TSDB_CODE_SUCCESS;
55✔
3200
  int32_t  lino = 0;
55✔
3201
  int32_t  tlen = 0;
55✔
3202

3203
  tEncoderInit(&encoder, buf, bufLen);
55✔
3204
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
55!
3205

3206
  int32_t size = taosArrayGetSize(pRsp->cols);
54✔
3207
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
54!
3208
  for (int32_t i = 0; i < size; ++i) {
185✔
3209
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
131✔
3210
    if (oInfo == NULL) {
131!
3211
      uError("col id is NULL at index %d", i);
×
3212
      code = TSDB_CODE_INVALID_PARA;
×
3213
      goto _exit;
×
3214
    }
3215
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
262!
3216
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
262!
3217
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
262!
3218
  }
3219

3220
  tEndEncode(&encoder);
54✔
3221

3222
_exit:
55✔
3223
  if (code != TSDB_CODE_SUCCESS) {
55!
3224
    tlen = code;
×
3225
  } else {
3226
    tlen = encoder.pos;
55✔
3227
  }
3228
  tEncoderClear(&encoder);
55✔
3229
  return tlen;
55✔
3230
}
3231

3232
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
28✔
3233
  SDecoder decoder = {0};
28✔
3234
  int32_t  code = TSDB_CODE_SUCCESS;
28✔
3235
  int32_t  lino = 0;
28✔
3236

3237
  tDecoderInit(&decoder, buf, bufLen);
28✔
3238
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
28!
3239

3240
  int32_t size = 0;
28✔
3241
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
28!
3242
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
28✔
3243
  if (pRsp->cols == NULL) {
28!
3244
    code = terrno;
×
3245
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3246
    goto _exit;
×
3247
  }
3248
  for (int32_t i = 0; i < size; ++i) {
98✔
3249
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
70✔
3250
    if (oInfo == NULL) {
70!
3251
      code = terrno;
×
3252
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3253
      goto _exit;
×
3254
    }
3255
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
140!
3256
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
140!
3257
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
140!
3258
  }
3259

3260
  tEndDecode(&decoder);
28✔
3261

3262
_exit:
28✔
3263
  tDecoderClear(&decoder);
28✔
3264
  return code;
28✔
3265
}
3266

3267
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
13,254✔
3268
  taosArrayDestroy(pRsp->cols);
13,254✔
3269
}
13,255✔
3270

3271
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
178,628✔
3272
  if (pReq == NULL) return;
178,628!
3273
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA) {
178,628✔
3274
    SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
1,089✔
3275
    if (pRequest->cids != NULL) {
1,089!
3276
      taosArrayDestroy(pRequest->cids);
1,091✔
3277
      pRequest->cids = NULL;
1,093✔
3278
    }
3279
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
177,539✔
3280
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
262✔
3281
    if (pRequest->cids != NULL) {
262!
3282
      taosArrayDestroy(pRequest->cids);
262✔
3283
      pRequest->cids = NULL;
262✔
3284
    }
3285
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
177,277✔
3286
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
53✔
3287
    if (pRequest->cids != NULL) {
53!
3288
      taosArrayDestroy(pRequest->cids);
54✔
3289
      pRequest->cids = NULL;
54✔
3290
    }
3291
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
177,224✔
3292
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
488✔
3293
    if (pRequest->cids != NULL) {
488!
3294
      taosArrayDestroy(pRequest->cids);
488✔
3295
      pRequest->cids = NULL;
488✔
3296
    }
3297
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
176,736✔
3298
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
28✔
3299
    if (pRequest->cols != NULL) {
28!
3300
      taosArrayDestroy(pRequest->cols);
28✔
3301
      pRequest->cols = NULL;
28✔
3302
    }
3303
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
176,708✔
3304
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
27✔
3305
    if (pRequest->uids != NULL) {
27!
3306
      taosArrayDestroy(pRequest->uids);
×
3307
      pRequest->uids = NULL;
×
3308
    }
3309
  }
3310
}
3311

3312
int32_t encodeColsArray(SEncoder* encoder, SArray* cids) {
13,283✔
3313
  int32_t  code = TSDB_CODE_SUCCESS;
13,283✔
3314
  int32_t  lino = 0;
13,283✔
3315
  int32_t size = taosArrayGetSize(cids);
13,283✔
3316
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
13,280!
3317
  for (int32_t i = 0; i < size; ++i) {
21,068✔
3318
    col_id_t* pColId = taosArrayGet(cids, i);
7,791✔
3319
    if (pColId == NULL) {
7,791✔
3320
      uError("col id is NULL at index %d", i);
3!
3321
      code = TSDB_CODE_INVALID_PARA;
×
3322
      goto _exit;
×
3323
    }
3324
    TAOS_CHECK_EXIT(tEncodeI16(encoder, *pColId));
15,576!
3325
  }
3326
  _exit:
13,277✔
3327

3328
  return code;
13,277✔
3329
}
3330

3331
int32_t decodeColsArray(SDecoder* decoder, SArray** cids) {
6,640✔
3332
  int32_t code = TSDB_CODE_SUCCESS;
6,640✔
3333
  int32_t lino = 0;
6,640✔
3334
  int32_t size = 0;
6,640✔
3335

3336
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
6,640!
3337
  if (size > 0){
6,640✔
3338
    *cids = taosArrayInit(size, sizeof(col_id_t));
1,894✔
3339
    if (*cids == NULL) {
1,896!
3340
      code = terrno;
×
3341
      uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3342
      goto _exit;
×
3343
    }
3344
  
3345
    for (int32_t i = 0; i < size; ++i) {
5,795✔
3346
      col_id_t* pColId = taosArrayReserve(*cids, 1);
3,897✔
3347
      if (pColId == NULL) {
3,897!
3348
        code = terrno;
×
3349
        uError("failed to reserve memory for col id at index %d, errno: %d", i, code);
×
3350
        goto _exit;
×
3351
      }
3352
      TAOS_CHECK_RETURN(tDecodeI16(decoder, pColId));
3,899!
3353
    }  
3354
  }
3355
  
3356
_exit:
6,644✔
3357
  if (code != TSDB_CODE_SUCCESS) {
6,644!
3358
    taosArrayDestroy(*cids);
×
3359
    *cids = NULL;
×
3360
  }
3361
  return code;
6,644✔
3362
}
3363

3364
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
357,705✔
3365
  SEncoder encoder = {0};
357,705✔
3366
  int32_t  code = TSDB_CODE_SUCCESS;
357,705✔
3367
  int32_t  lino = 0;
357,705✔
3368
  int32_t  tlen = 0;
357,705✔
3369

3370
  tEncoderInit(&encoder, buf, bufLen);
357,705✔
3371
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
357,693!
3372

3373
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
715,334!
3374
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
715,334!
3375
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
715,334!
3376
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
715,334!
3377

3378
  switch (pReq->type) {
357,667!
3379
    case STRIGGER_PULL_SET_TABLE: {
56✔
3380
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
56✔
3381
      int32_t size = taosArrayGetSize(pRequest->uids);
56✔
3382
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
56!
3383
      for (int32_t i = 0; i < size; ++i) {
176✔
3384
        int64_t* uids = taosArrayGet(pRequest->uids, i);
120✔
3385
        if (uids == NULL) {
120!
3386
          uError("uid is NULL at index %d", i);
×
3387
          code = TSDB_CODE_INVALID_PARA;
×
3388
          goto _exit;
×
3389
        }
3390
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[0]));
240!
3391
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[1]));
240!
3392
      }
3393
      break;
56✔
3394
    }
3395
    case STRIGGER_PULL_LAST_TS: {
570✔
3396
      break;
570✔
3397
    }
3398
    case STRIGGER_PULL_FIRST_TS: {
652✔
3399
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
652✔
3400
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
1,304!
3401
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,304!
3402
      break;
652✔
3403
    }
3404
    case STRIGGER_PULL_TSDB_META: {
1,180✔
3405
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
1,180✔
3406
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
2,360!
3407
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
2,360!
3408
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,360!
3409
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
2,360!
3410
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,360!
3411
      break;
1,180✔
3412
    }
3413
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3414
      break;
×
3415
    }
3416
    case STRIGGER_PULL_TSDB_TS_DATA: {
80✔
3417
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
80✔
3418
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
160!
3419
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
160!
3420
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
160!
3421
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
160!
3422
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
160!
3423
      break;
80✔
3424
    }
3425
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
324✔
3426
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
324✔
3427
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
648!
3428
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
648!
3429
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
648!
3430
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
648!
3431
      break;
324✔
3432
    }
3433
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
348✔
3434
      break;
348✔
3435
    }
3436
    case STRIGGER_PULL_TSDB_CALC_DATA: {
326,786✔
3437
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
326,786✔
3438
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
653,572!
3439
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
653,572!
3440
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
653,572!
3441
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
653,572!
3442
      break;
326,786✔
3443
    }
3444
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3445
      break;
×
3446
    }
3447
    case STRIGGER_PULL_TSDB_DATA: {
524✔
3448
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
524✔
3449
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
1,048!
3450
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
1,048!
3451
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
1,048!
3452
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
1,048!
3453
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
524!
3454
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
1,048!
3455
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,048!
3456
      break;
524✔
3457
    }
3458
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3459
      break;
×
3460
    }
3461
    case STRIGGER_PULL_WAL_META: {
13,319✔
3462
      SSTriggerWalMetaRequest* pRequest = (SSTriggerWalMetaRequest*)pReq;
13,319✔
3463
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
26,638!
3464
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
26,638!
3465
      break;
13,319✔
3466
    }
3467
    case STRIGGER_PULL_WAL_TS_DATA:
11,675✔
3468
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3469
    case STRIGGER_PULL_WAL_CALC_DATA:
3470
    case STRIGGER_PULL_WAL_DATA: {
3471
      SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
11,675✔
3472
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
23,350!
3473
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
23,350!
3474
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
23,350!
3475
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
23,350!
3476
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
11,675!
3477
      break;
11,675✔
3478
    }
3479
    case STRIGGER_PULL_GROUP_COL_VALUE: {
1,036✔
3480
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
1,036✔
3481
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,072!
3482
      break;
1,036✔
3483
    }
3484
    case STRIGGER_PULL_VTABLE_INFO: {
108✔
3485
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
108✔
3486
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
108!
3487
      break;
108✔
3488
    }
3489
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
976✔
3490
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
976✔
3491
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
1,952!
3492
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
976!
3493
      break;
976✔
3494
    }
3495
    case STRIGGER_PULL_OTABLE_INFO: {
56✔
3496
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
56✔
3497
      int32_t size = taosArrayGetSize(pRequest->cols);
56✔
3498
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
56!
3499
      for (int32_t i = 0; i < size; ++i) {
196✔
3500
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
140✔
3501
        if (oInfo == NULL) {
140!
3502
          uError("col id is NULL at index %d", i);
×
3503
          code = TSDB_CODE_INVALID_PARA;
×
3504
          goto _exit;
×
3505
        }
3506
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
280!
3507
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
280!
3508
      }
3509
      break; 
56✔
3510
    }
3511
    default: {
×
3512
      uError("unknown pull type %d", pReq->type);
×
3513
      code = TSDB_CODE_INVALID_PARA;
×
3514
      break;
×
3515
    }
3516
  }
3517

3518
  tEndEncode(&encoder);
357,690✔
3519

3520
_exit:
357,686✔
3521
  if (code != TSDB_CODE_SUCCESS) {
357,686!
3522
    tlen = code;
×
3523
  } else {
3524
    tlen = encoder.pos;
357,686✔
3525
  }
3526
  tEncoderClear(&encoder);
357,686✔
3527
  return tlen;
357,681✔
3528
}
3529

3530

3531
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
178,635✔
3532
  SDecoder decoder = {0};
178,635✔
3533
  int32_t  code = TSDB_CODE_SUCCESS;
178,635✔
3534
  int32_t  lino = 0;
178,635✔
3535

3536
  tDecoderInit(&decoder, buf, bufLen);
178,635✔
3537
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
178,628!
3538

3539
  int32_t type = 0;
178,659✔
3540
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
178,653!
3541
  SSTriggerPullRequest* pBase = &(pReq->base);
178,653✔
3542
  pBase->type = type;
178,653✔
3543
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
357,299!
3544
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
357,289!
3545
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
357,284!
3546

3547
  switch (type) {
178,641!
3548
    case STRIGGER_PULL_SET_TABLE: {
28✔
3549
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
28✔
3550
      int32_t size = 0;
28✔
3551
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
28!
3552
      pRequest->uids = taosArrayInit(size, 2 * sizeof(int64_t));
28✔
3553
      if (pRequest->uids == NULL) {
27!
3554
        code = terrno;
×
3555
        uError("failed to allocate memory for uids, size: %d, errno: %d", size, code);
×
3556
        goto _exit;
×
3557
      }
3558
      for (int32_t i = 0; i < size; ++i) {
87✔
3559
        int64_t* uid = taosArrayReserve(pRequest->uids, 1);
59✔
3560
        if (uid == NULL) {
60!
3561
          code = terrno;
×
3562
          uError("failed to reserve memory for uid, size: %d, errno: %d", size, code);
×
3563
          goto _exit;
×
3564
        }
3565
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid));
60!
3566
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid + 1));
120!
3567
      }
3568
      break;
28✔
3569
    }
3570
    case STRIGGER_PULL_LAST_TS: {
287✔
3571
      break;
287✔
3572
    }
3573
    case STRIGGER_PULL_FIRST_TS: {
322✔
3574
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
322✔
3575
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
644!
3576
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
644!
3577
      break;
322✔
3578
    }
3579
    case STRIGGER_PULL_TSDB_META: {
590✔
3580
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
590✔
3581
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
1,180!
3582
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
1,180!
3583
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,180!
3584
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,180!
3585
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,180!
3586
      break;
590✔
3587
    }
3588
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3589
      break;
×
3590
    }
3591
    case STRIGGER_PULL_TSDB_TS_DATA: {
40✔
3592
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
40✔
3593
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
80!
3594
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
80!
3595
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
80!
3596
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
80!
3597
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
80!
3598
      break;
40✔
3599
    }
3600
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
162✔
3601
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
162✔
3602
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
324!
3603
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
324!
3604
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
324!
3605
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
324!
3606
      break;
162✔
3607
    }
3608
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
174✔
3609
      break;
174✔
3610
    }
3611
    case STRIGGER_PULL_TSDB_CALC_DATA: {
163,378✔
3612
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
163,378✔
3613
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
326,752!
3614
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
326,747!
3615
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
326,744!
3616
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
326,740!
3617
      break;
163,369✔
3618
    }
3619
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3620
      break;
×
3621
    }
3622
    case STRIGGER_PULL_TSDB_DATA: {
262✔
3623
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
262✔
3624
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
524!
3625
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
524!
3626
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
524!
3627
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
524!
3628
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
262!
3629
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
524!
3630
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
524!
3631
      break;
262✔
3632
    }
3633
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3634
      break;
×
3635
    }
3636
    case STRIGGER_PULL_WAL_META: {
6,447✔
3637
      SSTriggerWalMetaRequest* pRequest = &(pReq->walMetaReq);
6,447✔
3638
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
12,890!
3639
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
12,907!
3640
      break;
6,464✔
3641
    }
3642
    case STRIGGER_PULL_WAL_TS_DATA:
5,835✔
3643
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3644
    case STRIGGER_PULL_WAL_CALC_DATA:
3645
    case STRIGGER_PULL_WAL_DATA: {
3646
      SSTriggerWalDataRequest* pRequest = &(pReq->walDataReq);
5,835✔
3647
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
11,671!
3648
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
11,675!
3649
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
11,676!
3650
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
11,675!
3651
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
5,838!
3652
      break;
5,838✔
3653
    }
3654
    case STRIGGER_PULL_GROUP_COL_VALUE: {
518✔
3655
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
518✔
3656
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,036!
3657
      break;
518✔
3658
    }
3659
    case STRIGGER_PULL_VTABLE_INFO: {
54✔
3660
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
54✔
3661
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
54!
3662
      break;
54✔
3663
    }
3664
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
488✔
3665
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
488✔
3666
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
976!
3667
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
488!
3668
      break;
488✔
3669
    }
3670
    case STRIGGER_PULL_OTABLE_INFO: {
28✔
3671
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
28✔
3672
      int32_t size = 0;
28✔
3673
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
28!
3674
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
28✔
3675
      if (pRequest->cols == NULL) {
28!
3676
        code = terrno;
×
3677
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3678
        goto _exit;
×
3679
      }
3680
      for (int32_t i = 0; i < size; ++i) {
98✔
3681
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
70✔
3682
        if (oInfo == NULL) {
70!
3683
          code = terrno;
×
3684
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3685
          goto _exit;
×
3686
        }
3687
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
70!
3688
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
70!
3689
      }
3690
      break;
28✔
3691
    }
3692
    default: {
28✔
3693
      uError("unknown pull type %d", type);
28!
3694
      code = TSDB_CODE_INVALID_PARA;
×
3695
      break;
×
3696
    }
3697
  }
3698

3699
  tEndDecode(&decoder);
178,624✔
3700

3701
_exit:
178,610✔
3702
  tDecoderClear(&decoder);
178,610✔
3703
  return code;
178,640✔
3704
}
3705

3706
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo) {
92,396✔
3707
  int32_t size = taosArrayGetSize(pParams);
92,396✔
3708
  int32_t code = 0;
92,396✔
3709
  int32_t lino = 0;
92,396✔
3710
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
92,396!
3711
  for (int32_t i = 0; i < size; ++i) {
221,402,850✔
3712
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
221,313,439✔
3713
    if (param == NULL) {
221,312,566✔
3714
      TAOS_CHECK_EXIT(terrno);
2,112!
3715
    }
3716

3717
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevTs));
442,620,908!
3718
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->currentTs));
442,620,908!
3719
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextTs));
442,620,908!
3720

3721
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wstart));
442,620,908!
3722
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wend));
442,620,908!
3723
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wduration));
442,620,908!
3724
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wrownum));
442,620,908!
3725

3726
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevLocalTime));
442,620,908!
3727
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextLocalTime));
442,620,908!
3728

3729
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->triggerTime));
442,620,908!
3730
    if (!ignoreNotificationInfo) {
221,310,454✔
3731
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
170,568!
3732
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
85,284✔
3733
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
170,568!
3734
    }
3735
  }
3736
_exit:
89,411✔
3737
  return code;
89,411✔
3738
}
3739

3740
void tDestroySSTriggerCalcParam(void* ptr) {
110,903,652✔
3741
  SSTriggerCalcParam* pParam = ptr;
110,903,652✔
3742
  if (pParam && pParam->extraNotifyContent != NULL) {
110,903,652!
3743
    taosMemoryFreeClear(pParam->extraNotifyContent);
140!
3744
  }
3745
  if (pParam && pParam->resultNotifyContent != NULL) {
110,903,652!
3746
    taosMemoryFreeClear(pParam->resultNotifyContent);
47!
3747
  }
3748
}
110,903,652✔
3749

3750
void tDestroySStreamGroupValue(void* ptr) {
60,565✔
3751
  SStreamGroupValue* pValue = ptr;
60,565✔
3752
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
60,565!
3753
    taosMemoryFreeClear(pValue->data.pData);
51,796!
3754
    pValue->data.nData = 0;
51,796✔
3755
  }
3756
}
60,565✔
3757

3758
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
46,932✔
3759
  int32_t size = 0, code = 0, lino = 0;
46,932✔
3760
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
46,930!
3761
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
46,930✔
3762
  if (*ppParams == NULL) {
46,929!
3763
    TAOS_CHECK_EXIT(terrno);
×
3764
  }
3765
  for (int32_t i = 0; i < size; ++i) {
110,908,048✔
3766
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
110,861,198✔
3767
    if (param == NULL) {
110,861,201!
3768
      TAOS_CHECK_EXIT(terrno);
×
3769
    }
3770
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevTs));
221,722,401!
3771
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->currentTs));
221,722,391!
3772
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextTs));
221,722,360!
3773

3774
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wstart));
221,722,332!
3775
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wend));
221,722,329!
3776
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wduration));
221,722,312!
3777
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wrownum));
221,722,275!
3778

3779
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevLocalTime));
221,722,249!
3780
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextLocalTime));
221,722,241!
3781

3782
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->triggerTime));
221,722,240!
3783
    if (!ignoreNotificationInfo) {
110,861,119✔
3784
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
84,200!
3785
      uint64_t len = 0;
42,100✔
3786
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
84,200!
3787
    }
3788
  }
3789

3790
_exit:
46,850✔
3791
  return code;
46,850✔
3792
}
3793

3794
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
93,435✔
3795
  int32_t code = TSDB_CODE_SUCCESS;
93,435✔
3796
  int32_t lino = 0;
93,435✔
3797

3798
  int32_t size = taosArrayGetSize(pGroupColVals);
93,435✔
3799
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
93,430!
3800
  for (int32_t i = 0; i < size; ++i) {
182,648✔
3801
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
89,224✔
3802
    if (pValue == NULL) {
89,223!
3803
      TAOS_CHECK_EXIT(terrno);
×
3804
    }
3805
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
89,223!
3806
    if (pValue->isNull) {
89,224!
3807
      continue;
×
3808
    }
3809
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
89,224!
3810
    if (pValue->isTbname) {
89,218✔
3811
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
50,734!
3812
      if (vgId != -1) { pValue->vgId = vgId; }
25,367✔
3813
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
50,734!
3814
    }
3815
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
178,436!
3816
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
89,218!
3817
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
167,894!
3818
    } else {
3819
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
10,542!
3820
    }
3821
  }
3822

3823
_exit:
93,424✔
3824
  return code;
93,424✔
3825
}
3826

3827
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
47,449✔
3828
  int32_t code = TSDB_CODE_SUCCESS;
47,449✔
3829
  int32_t lino = 0;
47,449✔
3830
  int32_t size = 0;
47,449✔
3831

3832
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
47,448!
3833
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
47,448✔
3834
  if (size > 0) {
47,449✔
3835
    if (*ppGroupColVals == NULL) {
25,315✔
3836
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
24,797✔
3837
      if (*ppGroupColVals == NULL) {
24,797!
3838
        TAOS_CHECK_EXIT(terrno);
×
3839
      }
3840
    } else {
3841
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
518!
3842
    }
3843
  }
3844
  for (int32_t i = 0; i < size; ++i) {
92,252✔
3845
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
44,804✔
3846
    if (pValue == NULL) {
44,802!
3847
      TAOS_CHECK_EXIT(terrno);
×
3848
    }
3849
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
44,802!
3850
    if (pValue->isNull) {
44,803!
3851
      continue;
×
3852
    }
3853
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
44,803!
3854
    if (pValue->isTbname) {
44,803✔
3855
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
25,751!
3856
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
25,750!
3857
    }
3858
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
89,603!
3859
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
44,801!
3860
      uint64_t len = 0;
42,167✔
3861
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
84,336!
3862
      pValue->data.nData = len;
42,169✔
3863
    } else {
3864
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
5,268!
3865
    }
3866
  }
3867
_exit:
47,448✔
3868
  return code;
47,448✔
3869
}
3870

3871
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
1,036✔
3872
  SEncoder encoder = {0};
1,036✔
3873
  int32_t  code = TSDB_CODE_SUCCESS;
1,036✔
3874
  int32_t  lino = 0;
1,036✔
3875
  int32_t  tlen = 0;
1,036✔
3876

3877
  tEncoderInit(&encoder, buf, bufLen);
1,036✔
3878
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,036!
3879

3880
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
1,036!
3881

3882
  tEndEncode(&encoder);
1,036✔
3883

3884
_exit:
1,036✔
3885
  if (code != TSDB_CODE_SUCCESS) {
1,036!
3886
    tlen = code;
×
3887
  } else {
3888
    tlen = encoder.pos;
1,036✔
3889
  }
3890
  tEncoderClear(&encoder);
1,036✔
3891
  return tlen;
1,036✔
3892
}
3893

3894
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
518✔
3895
  SDecoder decoder = {0};
518✔
3896
  int32_t  code = TSDB_CODE_SUCCESS;
518✔
3897
  int32_t  lino = 0;
518✔
3898
  int32_t  size = 0;
518✔
3899

3900
  tDecoderInit(&decoder, buf, bufLen);
518✔
3901
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
518!
3902

3903
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
518!
3904

3905
  tEndDecode(&decoder);
518✔
3906

3907
_exit:
518✔
3908
  tDecoderClear(&decoder);
518✔
3909
  return code;
518✔
3910
}
3911

3912
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
1,902✔
3913
  SEncoder encoder = {0};
1,902✔
3914
  int32_t  code = TSDB_CODE_SUCCESS;
1,902✔
3915
  int32_t  lino = 0;
1,902✔
3916
  int32_t  tlen = 0;
1,902✔
3917

3918
  tEncoderInit(&encoder, buf, bufLen);
1,902✔
3919
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,902!
3920

3921
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
3,804!
3922
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
3,804!
3923
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
3,804!
3924
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
3,804!
3925
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
3,804!
3926

3927
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false));
1,902!
3928
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
1,902!
3929
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
3,804!
3930

3931
  tEndEncode(&encoder);
1,902✔
3932

3933
_exit:
1,902✔
3934
  if (code != TSDB_CODE_SUCCESS) {
1,902!
3935
    tlen = code;
×
3936
  } else {
3937
    tlen = encoder.pos;
1,902✔
3938
  }
3939
  tEncoderClear(&encoder);
1,902✔
3940
  return tlen;
1,902✔
3941
}
3942

3943
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
949✔
3944
  SDecoder decoder = {0};
949✔
3945
  int32_t  code = TSDB_CODE_SUCCESS;
949✔
3946
  int32_t  lino = 0;
949✔
3947

3948
  tDecoderInit(&decoder, buf, bufLen);
949✔
3949
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
949!
3950

3951
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
1,898!
3952
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
1,898!
3953
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
1,898!
3954
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
1,898!
3955
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
1,898!
3956

3957
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
949!
3958
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
949!
3959
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
1,898!
3960

3961
  tEndDecode(&decoder);
949✔
3962

3963
_exit:
949✔
3964
  tDecoderClear(&decoder);
949✔
3965
  return code;
949✔
3966
}
3967

3968
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
2,911✔
3969
  if (pReq != NULL) {
2,911!
3970
    if (pReq->params != NULL) {
2,911✔
3971
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
988✔
3972
      pReq->params = NULL;
988✔
3973
    }
3974
    if (pReq->groupColVals != NULL) {
2,911✔
3975
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
848✔
3976
      pReq->groupColVals = NULL;
848✔
3977
    }
3978
    blockDataDestroy(pReq->pOutBlock);
2,911✔
3979
  }
3980
}
2,911✔
3981

3982
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
23,254✔
3983
  SEncoder encoder = {0};
23,254✔
3984
  int32_t  code = TSDB_CODE_SUCCESS;
23,254✔
3985
  int32_t  lino = 0;
23,254✔
3986
  int32_t  tlen = 0;
23,254✔
3987

3988
  tEncoderInit(&encoder, buf, bufLen);
23,254✔
3989
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
23,254!
3990

3991
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
46,508!
3992
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
46,508!
3993
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
46,508!
3994
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
46,508!
3995

3996
  tEndEncode(&encoder);
23,254✔
3997

3998
_exit:
23,253✔
3999
  if (code != TSDB_CODE_SUCCESS) {
23,253!
4000
    tlen = code;
×
4001
  } else {
4002
    tlen = encoder.pos;
23,253✔
4003
  }
4004
  tEncoderClear(&encoder);
23,253✔
4005
  return tlen;
23,253✔
4006
}
4007

4008
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
34,392✔
4009
  SDecoder decoder = {0};
34,392✔
4010
  int32_t  code = TSDB_CODE_SUCCESS;
34,392✔
4011
  int32_t  lino = 0;
34,392✔
4012

4013
  tDecoderInit(&decoder, buf, bufLen);
34,392✔
4014
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
34,334!
4015

4016
  int32_t type = 0;
34,711✔
4017
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
34,687!
4018
  pReq->type = type;
34,687✔
4019
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
69,332!
4020
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
69,188!
4021
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
69,058!
4022

4023
  tEndDecode(&decoder);
34,515✔
4024

4025
_exit:
34,369✔
4026
  tDecoderClear(&decoder);
34,369✔
4027
  return code;
34,583✔
4028
}
4029

4030
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo) {
90,494✔
4031
  int32_t code = 0, lino = 0;
90,494✔
4032
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true));
90,494!
4033
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
90,498!
4034
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
180,990!
4035
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
180,990!
4036
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
180,990!
4037
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
90,495!
4038
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
180,988!
4039
_exit:
90,494✔
4040
  return code;
90,494✔
4041
}
4042

4043
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
45,983✔
4044
  int32_t code = 0, lino = 0;
45,983✔
4045
  int32_t size = 0;
45,983✔
4046
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
45,983!
4047
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
45,982!
4048
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
91,966!
4049
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
91,966!
4050
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
91,966!
4051
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
45,983!
4052
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
91,966!
4053
_exit:
45,983✔
4054
  return code;
45,983✔
4055
}
4056

4057
int32_t tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
49,014✔
4058
  if (pInfo == NULL) return TSDB_CODE_SUCCESS;
49,014!
4059
  if (pInfo->pStreamPesudoFuncVals != NULL) {
49,014✔
4060
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
46,627✔
4061
    pInfo->pStreamPesudoFuncVals = NULL;
46,627✔
4062
  }
4063
  if (pInfo->pStreamPartColVals != NULL) {
49,014✔
4064
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
24,633✔
4065
    pInfo->pStreamPartColVals = NULL;
24,633✔
4066
  }
4067
  return TSDB_CODE_SUCCESS;
49,014✔
4068
}
4069

4070
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
107✔
4071
  SEncoder encoder = {0};
107✔
4072
  int32_t  code = TSDB_CODE_SUCCESS;
107✔
4073
  int32_t  lino = 0;
107✔
4074
  int32_t  tlen = 0;
107✔
4075

4076
  tEncoderInit(&encoder, buf, bufLen);
107✔
4077
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
106!
4078

4079
  int32_t size = taosArrayGetSize(pRsp->infos);
107✔
4080
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
106!
4081
  for (int32_t i = 0; i < size; ++i) {
341✔
4082
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
235✔
4083
    if (info == NULL) {
235✔
4084
      TAOS_CHECK_EXIT(terrno);
2!
4085
    }
4086
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
466!
4087
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
466!
4088
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
233!
4089
  }
4090

4091
  tEndEncode(&encoder);
106✔
4092

4093
_exit:
108✔
4094
  if (code != TSDB_CODE_SUCCESS) {
108!
4095
    tlen = code;
×
4096
  } else {
4097
    tlen = encoder.pos;
108✔
4098
  }
4099
  tEncoderClear(&encoder);
108✔
4100
  return tlen;
108✔
4101
}
4102

4103
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
54✔
4104
  SDecoder decoder = {0};
54✔
4105
  int32_t  code = TSDB_CODE_SUCCESS;
54✔
4106
  int32_t  lino = 0;
54✔
4107
  int32_t  size = 0;
54✔
4108

4109
  tDecoderInit(&decoder, buf, bufLen);
54✔
4110
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
54!
4111

4112
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
54!
4113
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
54✔
4114
  if (vTableInfo->infos == NULL) {
53!
4115
    TAOS_CHECK_EXIT(terrno);
×
4116
  }
4117
  for (int32_t i = 0; i < size; ++i) {
171✔
4118
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
117✔
4119
    if (info == NULL) {
117!
4120
      TAOS_CHECK_EXIT(terrno);
×
4121
    }
4122
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
234!
4123
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
234!
4124
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
117!
4125
  }
4126

4127
  tEndDecode(&decoder);
54✔
4128

4129
_exit:
54✔
4130
  tDecoderClear(&decoder);
54✔
4131
  return code;
54✔
4132
}
4133

4134

4135
void tDestroyVTableInfo(void *ptr) {
238✔
4136
  if (NULL == ptr) {
238!
4137
    return;
×
4138
  }
4139
  VTableInfo* pTable = (VTableInfo*)ptr;
238✔
4140
  taosMemoryFree(pTable->cols.pColRef);
238!
4141
}
4142

4143
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
13,281✔
4144
  if (ptr == NULL) return;
13,281!
4145
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
13,281✔
4146
  ptr->infos = NULL;
13,282✔
4147
}
4148

4149
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
1,216✔
4150
  SEncoder encoder = {0};
1,216✔
4151
  int32_t  code = TSDB_CODE_SUCCESS;
1,216✔
4152
  int32_t  lino = 0;
1,216✔
4153
  int32_t  tlen = 0;
1,216✔
4154

4155
  tEncoderInit(&encoder, buf, bufLen);
1,216✔
4156
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,216!
4157

4158
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
2,432!
4159
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
1,216✔
4160
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,217!
4161
  for (int32_t i = 0; i < size; ++i) {
2,705✔
4162
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
1,490✔
4163
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
2,976!
4164
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
2,976!
4165
  }
4166

4167
  tEndEncode(&encoder);
1,215✔
4168

4169
_exit:
1,216✔
4170
  if (code != TSDB_CODE_SUCCESS) {
1,216!
4171
    tlen = code;
×
4172
  } else {
4173
    tlen = encoder.pos;
1,216✔
4174
  }
4175
  tEncoderClear(&encoder);
1,216✔
4176
  return tlen;
1,216✔
4177
}
4178

4179
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
607✔
4180
  SDecoder decoder = {0};
607✔
4181
  int32_t  code = TSDB_CODE_SUCCESS;
607✔
4182
  int32_t  lino = 0;
607✔
4183
  SSDataBlock *pResBlock = pBlock;
607✔
4184

4185
  tDecoderInit(&decoder, buf, bufLen);
607✔
4186
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
607!
4187

4188
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
1,214!
4189
  int32_t numOfCols = 2;
607✔
4190
  if (pResBlock->pDataBlock == NULL) {
607!
4191
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
607✔
4192
    if (pResBlock->pDataBlock == NULL) {
607!
4193
      TAOS_CHECK_EXIT(terrno);
×
4194
    }
4195
    for (int32_t i = 0; i< numOfCols; ++i) {
1,821✔
4196
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
1,214✔
4197
      if (pColInfoData == NULL) {
1,214!
4198
        TAOS_CHECK_EXIT(terrno);
×
4199
      }
4200
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
1,214✔
4201
      pColInfoData->info.bytes = sizeof(int64_t);
1,214✔
4202
    }
4203
  }
4204
  int32_t numOfRows = 0;
607✔
4205
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
607!
4206
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
607!
4207
  for (int32_t i = 0; i < numOfRows; ++i) {
1,352✔
4208
    for (int32_t j = 0; j < numOfCols; ++j) {
2,235✔
4209
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
1,490✔
4210
      if (pColInfoData == NULL) {
1,490!
4211
        TAOS_CHECK_EXIT(terrno);
×
4212
      }
4213
      int64_t value = 0;
1,490✔
4214
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
1,490!
4215
      colDataSetInt64(pColInfoData, i, &value);
1,490✔
4216
    }
4217
  }
4218

4219
  pResBlock->info.dataLoad = 1;
607✔
4220
  pResBlock->info.rows = numOfRows;
607✔
4221

4222
  tEndDecode(&decoder);
607✔
4223

4224
_exit:
607✔
4225
  tDecoderClear(&decoder);
607✔
4226
  return code;
607✔
4227
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc