• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5006

29 Mar 2026 04:32AM UTC coverage: 72.274% (+0.1%) from 72.152%
#5006

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253711 of 351039 relevant lines covered (72.27%)

131490495.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.31
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "taos.h"
18
#include "tarray.h"
19
#include "tdatablock.h"
20
#include "thash.h"
21
#include "tlist.h"
22
#include "tmsg.h"
23
#include "os.h"
24
#include "tcommon.h"
25
#include "tsimplehash.h"
26

27
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
137,202✔
28
  int32_t code = 0;
137,202✔
29
  int32_t lino = 0;
137,202✔
30
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
274,404✔
31
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
274,404✔
32
  switch (pReq->type) {
137,202✔
33
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
131,900✔
34
      if (pReq->cont.pReqs) {
131,900✔
35
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
131,900✔
36
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
131,900✔
37
        for (int32_t i = 0; i < num; ++i) {
518,034✔
38
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.pReqs, i);
386,134✔
39
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
772,268✔
40
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
772,268✔
41
        }
42
      } else {
43
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
44
      }
45
      break;
131,900✔
46
    }
47
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
5,302✔
48
      if (pReq->cont.pReqs) {
5,302✔
49
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
5,302✔
50
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
5,302✔
51
        for (int32_t i = 0; i < num; ++i) {
10,604✔
52
          SStreamOReaderDeployReq* pDeploy = taosArrayGet(pReq->cont.pReqs, i);
5,302✔
53
          int32_t vgIdNum = taosArrayGetSize(pDeploy->vgIds);
5,302✔
54
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeploy->execId));
10,604✔
55
          TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pDeploy->uid));
10,604✔
56
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgIdNum));
5,302✔
57
          for (int32_t n = 0; n < vgIdNum; ++n) {
10,604✔
58
            TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pDeploy->vgIds, n)));
10,604✔
59
          }
60
        }
61
      } else {
62
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
63
      }
64
      break;
5,302✔
65
    }
66
    default:
×
67
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
68
      break;
×
69
  }
70

71
_exit:
137,202✔
72

73
  return code;
137,202✔
74
}
75

76
void tFreeRunnerOReaderDeployReq(void* param) {
7,953✔
77
  SStreamOReaderDeployReq* pReq = (SStreamOReaderDeployReq*)param;
7,953✔
78
  if (pReq) {
7,953✔
79
    taosArrayDestroy(pReq->vgIds);
7,953✔
80
  }
81
}
7,953✔
82

83
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
274,404✔
84
  if (NULL == pReq) {
274,404✔
85
    return;
68,601✔
86
  }
87

88
  switch (pReq->type) {
205,803✔
89
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
197,850✔
90
      taosArrayDestroy(pReq->cont.pReqs);
197,850✔
91
      break;
197,850✔
92
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER:
7,953✔
93
      taosArrayDestroyEx(pReq->cont.pReqs, tFreeRunnerOReaderDeployReq);
7,953✔
94
      break;
7,953✔
95
    default:
×
96
      break;
×
97
  }
98
}
99

100

101
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
68,601✔
102
  *ppDst = NULL;
68,601✔
103
  
104
  if (NULL == pSrc) {
68,601✔
105
    return TSDB_CODE_SUCCESS;
×
106
  }
107

108
  int32_t code = 0, lino = 0;
68,601✔
109
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
68,601✔
110
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
68,601✔
111

112
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
68,601✔
113
  if (pSrc->cont.pReqs) {
68,601✔
114
    switch (pSrc->type) {
68,601✔
115
      case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
65,950✔
116
        (*ppDst)->cont.pReqs = taosArrayDup(pSrc->cont.pReqs, NULL);
65,950✔
117
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
65,950✔
118
        break;
65,950✔
119
      case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
2,651✔
120
        int32_t reqNum = taosArrayGetSize(pSrc->cont.pReqs);
2,651✔
121
        (*ppDst)->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), reqNum);
2,651✔
122
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
2,651✔
123
        for (int32_t i = 0; i < reqNum; ++i) {
5,302✔
124
          SStreamOReaderDeployReq* pNew = taosArrayGet((*ppDst)->cont.pReqs, i);
2,651✔
125
          SStreamOReaderDeployReq* pReq = taosArrayGet(pSrc->cont.pReqs, i);
2,651✔
126
          pNew->vgIds = taosArrayDup(pReq->vgIds, NULL);
2,651✔
127
          TSDB_CHECK_NULL(pNew->vgIds, code, lino, _exit, terrno);
2,651✔
128
          pNew->execId = pReq->execId;
2,651✔
129
          pNew->uid = pReq->uid;
2,651✔
130
        }
131
        break;
2,651✔
132
      }  
133
      default:
×
134
        break;
×
135
    }
136
  }
137
  
138
_exit:
×
139

140
  if (code) {
68,601✔
141
    tFreeSStreamMgmtReq(*ppDst);
×
142
    taosMemoryFreeClear(*ppDst);
×
143
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
144
  }
145
  
146
  return code;
68,601✔
147
}
148

149

150
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
68,601✔
151
  int32_t code = 0;
68,601✔
152
  int32_t lino = 0;
68,601✔
153

154
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
137,202✔
155
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
137,202✔
156
  switch (pReq->type) {
68,601✔
157
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
65,950✔
158
      int32_t num = 0;
65,950✔
159
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
65,950✔
160
      if (num > 0) {
65,950✔
161
        pReq->cont.pReqs = taosArrayInit(num, sizeof(SStreamDbTableName));
65,950✔
162
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
65,950✔
163
        for (int32_t i = 0; i < num; ++i) {
259,017✔
164
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.pReqs, 1);
193,067✔
165
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
193,067✔
166
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
193,067✔
167
        }
168
      }
169
      break;
65,950✔
170
    }
171
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
2,651✔
172
      int32_t num = 0, vgIdNum = 0;
2,651✔
173
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
2,651✔
174
      if (num > 0) {
2,651✔
175
        pReq->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), num);
2,651✔
176
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
2,651✔
177
        for (int32_t i = 0; i < num; ++i) {
5,302✔
178
          SStreamOReaderDeployReq* p = taosArrayGet(pReq->cont.pReqs, i);
2,651✔
179
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &p->execId));
5,302✔
180
          TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &p->uid));
5,302✔
181
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgIdNum));
2,651✔
182
          if (vgIdNum > 0) {
2,651✔
183
            p->vgIds = taosArrayInit_s(sizeof(int32_t), vgIdNum);
2,651✔
184
            TSDB_CHECK_NULL(p->vgIds, code, lino, _exit, terrno);
2,651✔
185
          }
186
          for (int32_t n = 0; n < vgIdNum; ++n) {
5,302✔
187
            int32_t* vgId = taosArrayGet(p->vgIds, n);
2,651✔
188
            TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));
2,651✔
189
          }
190
        }
191
      }
192
      break;
2,651✔
193
    }
194
    default:
×
195
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
196
      break;
×
197
  }
198

199
_exit:
68,601✔
200

201
  return code;  
68,601✔
202
}
203

204
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
79,245,436✔
205
  int32_t code = 0;
79,245,436✔
206
  int32_t lino;
207

208
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
158,490,872✔
209
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
158,490,872✔
210
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
158,490,872✔
211

212
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
158,490,872✔
213
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
158,490,872✔
214
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
158,490,872✔
215
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
158,490,872✔
216
  // SKIP SESSIONID
217
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
158,490,872✔
218
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
158,490,872✔
219
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
158,490,872✔
220
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
158,490,872✔
221
  if (pTask->pMgmtReq) {
79,245,436✔
222
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
137,202✔
223
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
137,202✔
224
  } else {
225
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
79,108,234✔
226
  }
227

228
_exit:
79,108,234✔
229

230
  return code;
79,245,436✔
231
}
232

233

234
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
38,108,222✔
235
  int32_t code = 0;
38,108,222✔
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
76,216,444✔
239
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
76,216,444✔
240
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
76,216,444✔
241
  
242
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
76,216,444✔
243
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
76,216,444✔
244
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
76,216,444✔
245
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
76,216,444✔
246
  // SKIP SESSIONID
247
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
76,216,444✔
248
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
76,216,444✔
249
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
76,216,444✔
250
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
76,216,444✔
251
  int32_t req = 0;
38,108,222✔
252
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
38,108,222✔
253
  if (req) {
38,108,222✔
254
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
68,601✔
255
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
68,601✔
256
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
68,601✔
257
  }
258

259
_exit:
38,108,222✔
260

261
  return code;
38,108,222✔
262
}
263

264
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
265
  int32_t code = 0;
×
266
  int32_t lino;
267

268
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
269
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
270
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
271
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
272

273
_exit:
×
274

275
  return code;
×
276
}
277

278
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
283
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
284
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
285
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
286

287
_exit:
×
288

289
  return code;
×
290
}
291

292

293
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
4,614,760✔
294
  int32_t code = 0;
4,614,760✔
295
  int32_t lino;
296

297
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
9,229,520✔
298
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
9,229,520✔
299
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
9,229,520✔
300
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
9,229,520✔
301
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
9,229,520✔
302

303
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
4,614,760✔
304
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
4,614,760✔
305
  for (int32_t i = 0; i < recalcNum; ++i) {
4,614,760✔
306
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
307
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
308
  }
309

310
_exit:
4,614,760✔
311

312
  return code;
4,614,760✔
313
}
314

315
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
2,200,542✔
316
  int32_t code = 0;
2,200,542✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
4,401,084✔
320
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
4,401,084✔
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
4,401,084✔
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
4,401,084✔
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
4,401,084✔
324

325
  int32_t recalcNum = 0;
2,200,542✔
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
2,200,542✔
327
  if (recalcNum > 0) {
2,200,542✔
328
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
329
    if (NULL == pStatus->userRecalcs) {
×
330
      code = terrno;
×
331
      goto _exit;
×
332
    }
333
  }
334

335
  for (int32_t i = 0; i < recalcNum; ++i) {
2,200,542✔
336
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
337
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
338
  }
339

340
_exit:
2,200,542✔
341

342
  return code;
2,200,542✔
343
}
344

345

346
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
36,810,164✔
347
  int32_t code = 0;
36,810,164✔
348
  int32_t lino;
349

350
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
36,810,164✔
351
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
73,620,328✔
352
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
73,620,328✔
353
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
73,620,328✔
354
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
73,620,328✔
355

356
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
36,810,164✔
357
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
36,810,164✔
358
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
143,351,672✔
359
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
106,541,508✔
360
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
213,083,016✔
361
  }
362
  
363
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
36,810,164✔
364
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
36,810,164✔
365
  for (int32_t i = 0; i < statusNum; ++i) {
111,626,972✔
366
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
74,816,808✔
367
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
74,816,808✔
368
  }
369

370
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
36,810,164✔
371
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
36,810,164✔
372
  for (int32_t i = 0; i < reqNum; ++i) {
36,947,366✔
373
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
137,202✔
374
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
274,404✔
375
  }
376

377
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
36,810,164✔
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
36,810,164✔
379
  for (int32_t i = 0; i < triggerNum; ++i) {
41,424,924✔
380
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
4,614,760✔
381
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
4,614,760✔
382
  }
383
  
384
  tEndEncode(pEncoder);
36,810,164✔
385

386
_exit:
36,810,164✔
387
  if (code) {
36,810,164✔
388
    return code;
×
389
  } else {
390
    return pEncoder->pos;
36,810,164✔
391
  }
392
}
393

394
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
17,678,393✔
395
  int32_t code = 0;
17,678,393✔
396
  int32_t lino;
397

398
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
17,678,393✔
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
35,356,786✔
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
35,356,786✔
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
35,356,786✔
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
35,356,786✔
403

404
  int32_t vgLearderNum = 0;
17,678,393✔
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
17,678,393✔
406
  if (vgLearderNum > 0) {
17,678,393✔
407
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
13,814,596✔
408
    if (NULL == pReq->pVgLeaders) {
13,814,596✔
409
      code = terrno;
×
410
      goto _exit;
×
411
    }
412
  }
413
  for (int32_t i = 0; i < vgLearderNum; ++i) {
69,705,651✔
414
    int32_t vgId = 0;
52,027,258✔
415
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
52,027,258✔
416
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
104,054,516✔
417
      code = terrno;
×
418
      goto _exit;
×
419
    }
420
  }
421

422

423
  int32_t statusNum = 0;
17,678,393✔
424
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
17,678,393✔
425
  if (statusNum > 0) {
17,678,393✔
426
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
1,297,855✔
427
    if (NULL == pReq->pStreamStatus) {
1,297,855✔
428
      code = terrno;
×
429
      goto _exit;
×
430
    }
431
  }
432
  for (int32_t i = 0; i < statusNum; ++i) {
53,585,294✔
433
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
35,906,901✔
434
    if (NULL == pTask) {
35,906,901✔
435
      code = terrno;
×
436
      goto _exit;
×
437
    }
438
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
35,906,901✔
439
  }
440

441

442
  int32_t reqNum = 0;
17,678,393✔
443
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
17,678,393✔
444
  if (reqNum > 0) {
17,678,393✔
445
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
18,047✔
446
    if (NULL == pReq->pStreamReq) {
18,047✔
447
      code = terrno;
×
448
      goto _exit;
×
449
    }
450
  }
451
  for (int32_t i = 0; i < reqNum; ++i) {
17,746,994✔
452
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
68,601✔
453
    if (NULL == pIdx) {
68,601✔
454
      code = terrno;
×
455
      goto _exit;
×
456
    }
457
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
68,601✔
458
  }
459

460

461
  int32_t triggerNum = 0;
17,678,393✔
462
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
17,678,393✔
463
  if (triggerNum > 0) {
17,678,393✔
464
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
573,748✔
465
    if (NULL == pReq->pTriggerStatus) {
573,748✔
466
      code = terrno;
×
467
      goto _exit;
×
468
    }
469
  }
470
  for (int32_t i = 0; i < triggerNum; ++i) {
19,878,935✔
471
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
2,200,542✔
472
    if (NULL == pStatus) {
2,200,542✔
473
      code = terrno;
×
474
      goto _exit;
×
475
    }
476
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
2,200,542✔
477
  }
478

479
  
480
  tEndDecode(pDecoder);
17,678,393✔
481

482
_exit:
17,678,393✔
483
  return code;
17,678,393✔
484
}
485

486
void tFreeSSTriggerRuntimeStatus(void* param) {
4,507,922✔
487
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
4,507,922✔
488
  if (NULL == pStatus) {
4,507,922✔
489
    return;
×
490
  }
491
  taosArrayDestroy(pStatus->userRecalcs);
4,507,922✔
492
}
493

494
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
106,647,628✔
495
  if (pMsg == NULL) {
106,647,628✔
496
    return;
×
497
  }
498

499
  taosArrayDestroy(pMsg->pVgLeaders);
106,647,628✔
500
  if (deepClean) {
106,647,628✔
501
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
106,647,628✔
502
    for (int32_t i = 0; i < reqNum; ++i) {
106,784,830✔
503
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
137,202✔
504
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
137,202✔
505
      if (NULL == pTask) {
137,202✔
506
        continue;
×
507
      }
508

509
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
137,202✔
510
      taosMemoryFree(pTask->pMgmtReq);
137,202✔
511
    }
512
  }
513
  taosArrayDestroy(pMsg->pStreamReq);
106,647,628✔
514
  taosArrayDestroy(pMsg->pStreamStatus);
106,647,628✔
515
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
106,647,628✔
516
}
517

518
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
648,664✔
519
  int32_t code = 0;
648,664✔
520
  int32_t lino;
521

522
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
1,297,328✔
523
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
1,297,328✔
524
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblSuid));
1,297,328✔
525
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
1,297,328✔
526
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
1,297,328✔
527
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
1,297,328✔
528
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
1,297,328✔
529
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
1,297,328✔
530
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
1,297,328✔
531
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
532
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
1,297,328✔
533
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
1,297,328✔
534

535
_exit:
648,664✔
536

537
  return code;
648,664✔
538
}
539

540
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
792,164✔
541
  int32_t code = 0;
792,164✔
542
  int32_t lino;
543

544
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
1,584,328✔
545
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
1,584,328✔
546

547
_exit:
792,164✔
548

549
  return code;
792,164✔
550
}
551

552

553
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
1,440,828✔
554
  int32_t code = 0;
1,440,828✔
555
  int32_t lino;
556

557
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
2,881,656✔
558
  if (pMsg->triggerReader) {
1,440,828✔
559
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
648,664✔
560
  } else {
561
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
792,164✔
562
  }
563
  
564
_exit:
792,164✔
565

566
  return code;
1,440,828✔
567
}
568

569
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
1,911,826✔
570
  int32_t code = 0;
1,911,826✔
571
  int32_t lino;
572

573
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
3,823,652✔
574
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
3,823,652✔
575
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
1,911,826✔
576

577
_exit:
1,911,826✔
578

579
  return code;
1,911,826✔
580
}
581

582
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
1,230,270✔
583
  int32_t code = 0;
1,230,270✔
584
  int32_t lino;
585

586
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
1,230,270✔
587
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,460,540✔
588

589
_exit:
1,230,270✔
590

591
  return code;
1,230,270✔
592
}
593

594

595
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
416,902✔
596
  int32_t code = 0;
416,902✔
597
  int32_t lino;
598

599
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
833,804✔
600
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
833,804✔
601
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
833,804✔
602
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
833,804✔
603
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
833,804✔
604
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
833,804✔
605
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
833,804✔
606
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
833,804✔
607
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblStb));
833,804✔
608
  int32_t partitionColsLen = pMsg->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->partitionCols) + 1;
416,902✔
609
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, partitionColsLen));
833,804✔
610

611
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
416,902✔
612
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
416,902✔
613
  for (int32_t i = 0; i < addrSize; ++i) {
531,526✔
614
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
114,624✔
615
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
229,248✔
616
  }
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
833,804✔
618
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
833,804✔
619
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
833,804✔
620

621
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
833,804✔
622
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
833,804✔
623
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
833,804✔
624
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
833,804✔
625
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->idleTimeoutMs));
833,804✔
626

627
  switch (pMsg->triggerType) {
416,902✔
628
    case WINDOW_TYPE_SESSION: {
17,508✔
629
      // session trigger
630
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
35,016✔
631
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
35,016✔
632
      break;
17,508✔
633
    }
634
    case WINDOW_TYPE_STATE: {
136,078✔
635
      // state trigger
636
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
272,156✔
637
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.extend));
272,156✔
638
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForType));
272,156✔
639
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForCount));
272,156✔
640
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
272,156✔
641
      int32_t stateWindowZerothLen = 
136,078✔
642
          pMsg->trigger.stateWin.zeroth == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.zeroth) + 1;
136,078✔
643
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.zeroth, stateWindowZerothLen));
272,156✔
644
      int32_t stateWindowExprLen =
136,078✔
645
          pMsg->trigger.stateWin.expr == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.expr) + 1;
136,078✔
646
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.expr, stateWindowExprLen));
272,156✔
647
      break;
136,078✔
648
    }
649
    case WINDOW_TYPE_INTERVAL: {
159,150✔
650
      // slide trigger
651
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
318,300✔
652
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
318,300✔
653
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
318,300✔
654
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
318,300✔
655
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
318,300✔
656
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
318,300✔
657
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
318,300✔
658
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
318,300✔
659
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
318,300✔
660
      break;
159,150✔
661
    }
662
    case WINDOW_TYPE_EVENT: {
50,820✔
663
      // event trigger
664
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
50,820✔
665
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
50,820✔
666

667
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
101,640✔
668
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
101,640✔
669
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForType));
101,640✔
670
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForCount));
101,640✔
671
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
101,640✔
672
      break;
50,820✔
673
    }
674
    case WINDOW_TYPE_COUNT: {
38,112✔
675
      // count trigger
676
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
38,112✔
677
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
76,224✔
678

679
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
76,224✔
680
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
76,224✔
681
      break;
38,112✔
682
    }
683
    case WINDOW_TYPE_PERIOD: {
15,234✔
684
      // period trigger
685
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.periodUnit));
30,468✔
686
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.offsetUnit));
30,468✔
687
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.precision));
30,468✔
688
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
30,468✔
689
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
30,468✔
690
      break;
15,234✔
691
    }
692
    default:
×
693
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
694
      break;
×
695
  }
696

697
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
833,804✔
698
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
833,804✔
699
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
833,804✔
700
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
833,804✔
701
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcPkSlotId));
833,804✔
702
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triPkSlotId));
833,804✔
703
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
416,902✔
704
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
833,804✔
705
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
416,902✔
706
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
833,804✔
707
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
416,902✔
708
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
833,804✔
709

710
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
416,902✔
711
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
416,902✔
712
  for (int32_t i = 0; i < readerNum; ++i) {
937,432✔
713
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
520,530✔
714
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
520,530✔
715
  }
716

717
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
416,902✔
718
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
416,902✔
719
  for (int32_t i = 0; i < runnerNum; ++i) {
1,647,172✔
720
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1,230,270✔
721
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
1,230,270✔
722
  }
723

724
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
833,804✔
725
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
833,804✔
726
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->precision));
833,804✔
727

728
_exit:
416,902✔
729

730
  return code;
416,902✔
731
}
732

733

734
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
7,285,964✔
735
  int32_t code = 0;
7,285,964✔
736
  int32_t lino;
737

738
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
14,571,928✔
739
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
14,571,928✔
740
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
14,571,928✔
741
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
14,571,928✔
742
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
14,571,928✔
743
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
14,571,928✔
744

745
_exit:
7,285,964✔
746

747
  return code;
7,285,964✔
748
}
749

750

751
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
1,470,838✔
752
  int32_t code = 0;
1,470,838✔
753
  int32_t lino;
754

755
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,941,676✔
756
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
2,941,676✔
757
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
2,941,676✔
758
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
2,941,676✔
759
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
2,941,676✔
760
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
2,941,676✔
761
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
2,941,676✔
762
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
2,941,676✔
763

764
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
1,470,838✔
765
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
1,470,838✔
766
  for (int32_t i = 0; i < addrSize; ++i) {
1,794,274✔
767
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
323,436✔
768
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
646,872✔
769
  }
770
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
2,941,676✔
771

772
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
1,470,838✔
773
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
1,470,838✔
774
  for (int32_t i = 0; i < outColNum; ++i) {
7,489,320✔
775
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
6,018,482✔
776
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
6,018,482✔
777
  }
778

779
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
1,470,838✔
780
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
1,470,838✔
781
  for (int32_t i = 0; i < outTagNum; ++i) {
2,738,320✔
782
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1,267,482✔
783
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
1,267,482✔
784
  }
785

786
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
2,941,676✔
787
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
2,941,676✔
788

789
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
2,941,676✔
790
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
2,941,676✔
791

792
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
1,470,838✔
793
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
1,470,838✔
794
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,610,962✔
795
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
140,124✔
796
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
140,124✔
797

798
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
280,248✔
799
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
280,248✔
800
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
280,248✔
801
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
280,248✔
802
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
280,248✔
803
  }
804

805
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
2,941,676✔
806

807
  // colCids and tagCids - always encode size (0 if NULL) for compatibility
808
  int32_t colCidsSize = (int32_t)taosArrayGetSize(pMsg->colCids);
1,470,838✔
809
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, colCidsSize));
1,470,838✔
810
  if (colCidsSize > 0) {
1,470,838✔
811
    for (int32_t i = 0; i < colCidsSize; ++i) {
100,422✔
812
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->colCids, i);
75,780✔
813
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
151,560✔
814
    }
815
  }
816

817
  int32_t tagCidsSize = (int32_t)taosArrayGetSize(pMsg->tagCids);
1,470,838✔
818
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, tagCidsSize));
1,470,838✔
819
  if (tagCidsSize > 0) {
1,470,838✔
820
    for (int32_t i = 0; i < tagCidsSize; ++i) {
49,674✔
821
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->tagCids, i);
29,292✔
822
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
58,584✔
823
    }
824
  }
825

826
_exit:
1,470,838✔
827

828
  return code;
1,470,838✔
829
}
830

831
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
3,328,568✔
832
  int32_t code = 0;
3,328,568✔
833
  int32_t lino;
834

835
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
3,328,568✔
836
  switch (pTask->task.type) {
3,328,568✔
837
    case STREAM_READER_TASK:
1,440,828✔
838
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
1,440,828✔
839
      break;
1,440,828✔
840
    case STREAM_TRIGGER_TASK:
416,902✔
841
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
416,902✔
842
      break;
416,902✔
843
    case STREAM_RUNNER_TASK:
1,470,838✔
844
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
1,470,838✔
845
      break;
1,470,838✔
846
    default:
×
847
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
848
      break;
×
849
  }
850
  
851
_exit:
3,328,568✔
852

853
  return code;
3,328,568✔
854
}
855

856

857
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
569,336✔
858
  int32_t code = 0;
569,336✔
859
  int32_t lino;
860

861
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
1,138,672✔
862

863
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
569,336✔
864
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
569,336✔
865
  for (int32_t i = 0; i < readerNum; ++i) {
2,010,164✔
866
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
1,440,828✔
867
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,440,828✔
868
  }
869

870
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
1,138,672✔
871
  if (pStream->triggerTask) {
569,336✔
872
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
416,902✔
873
  }
874
  
875
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
569,336✔
876
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
569,336✔
877
  for (int32_t i = 0; i < runnerNum; ++i) {
2,040,174✔
878
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
1,470,838✔
879
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,470,838✔
880
  }
881

882
_exit:
569,336✔
883

884
  return code;
569,336✔
885
}
886

887
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
1,100,060✔
888
  int32_t code = 0;
1,100,060✔
889
  int32_t lino = 0;
1,100,060✔
890

891
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
2,200,120✔
892

893
_exit:
1,100,060✔
894
  return code;
1,100,060✔
895
}
896

897
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
550,101✔
898
  int32_t code = 0;
550,101✔
899
  int32_t lino;
900

901
  int32_t type = 0;
550,101✔
902
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
550,101✔
903
  pMsg->msgType = type;
550,101✔
904

905
_exit:
550,101✔
906
  return code;
550,101✔
907
}
908

909
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
408,532✔
910
  int32_t code = 0;
408,532✔
911
  int32_t lino;
912

913
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
408,532✔
914

915
_exit:
408,532✔
916

917
  return code;
408,532✔
918
}
919

920
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
408,532✔
921
  int32_t code = 0;
408,532✔
922
  int32_t lino;
923

924
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
408,532✔
925
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
408,532✔
926

927
_exit:
408,532✔
928

929
  return code;
408,532✔
930
}
931

932
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
546,918✔
933
  int32_t code = 0;
546,918✔
934
  int32_t lino;
935

936
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
546,918✔
937
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
1,093,836✔
938
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
1,093,836✔
939

940
_exit:
546,918✔
941

942
  return code;
546,918✔
943
}
944

945
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
546,918✔
946
  int32_t code = 0;
546,918✔
947
  int32_t lino;
948

949
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
546,918✔
950
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
546,918✔
951

952
_exit:
546,918✔
953

954
  return code;
546,918✔
955
}
956

957

958
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
7,408✔
959
  int32_t code = 0;
7,408✔
960
  int32_t lino;
961

962
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
14,816✔
963
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
14,816✔
964
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
14,816✔
965

966
_exit:
7,408✔
967

968
  return code;
7,408✔
969
}
970

971
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
144,610✔
972
  int32_t code = 0;
144,610✔
973
  int32_t lino;
974

975
  switch (msgType) {
144,610✔
976
    case STREAM_MSG_ORIGTBL_READER_INFO: {
131,900✔
977
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
131,900✔
978
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
131,900✔
979

980
      for (int32_t i = 0; i < vgNum; ++i) {
518,034✔
981
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
386,134✔
982
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
772,268✔
983
      }
984

985
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
131,900✔
986
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
131,900✔
987
      
988
      for (int32_t i = 0; i < readerNum; ++i) {
259,322✔
989
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
127,422✔
990
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
127,422✔
991
      }
992
      break;
131,900✔
993
    }
994
    case STREAM_MSG_UPDATE_RUNNER: {
×
995
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
996
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
997
      
998
      for (int32_t i = 0; i < runnerNum; ++i) {
×
999
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1000
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1001
      }
1002
      break;
×
1003
    }
1004
    case STREAM_MSG_USER_RECALC: {
7,408✔
1005
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
7,408✔
1006
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
7,408✔
1007
      
1008
      for (int32_t i = 0; i < recalcNum; ++i) {
14,816✔
1009
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
7,408✔
1010
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
7,408✔
1011
      }
1012
      break;
7,408✔
1013
    }
1014
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
5,302✔
1015
      int32_t rspNum = taosArrayGetSize(pRsp->execRspList);
5,302✔
1016
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
5,302✔
1017
      
1018
      for (int32_t i = 0; i < rspNum; ++i) {
10,604✔
1019
        SStreamOReaderDeployRsp* pDeployRsp = taosArrayGet(pRsp->execRspList, i);
5,302✔
1020
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeployRsp->execId));
10,604✔
1021
        int32_t vgNum = taosArrayGetSize(pDeployRsp->vgList);
5,302✔
1022
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
5,302✔
1023
        for (int32_t n = 0; n < vgNum; ++n) {
10,604✔
1024
          TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, taosArrayGet(pDeployRsp->vgList, n)));
5,302✔
1025
        }
1026
      }
1027
      break;
5,302✔
1028
    }
1029
    default:
×
1030
      break;
×
1031
  }
1032

1033
_exit:
144,610✔
1034

1035
  return code;
144,610✔
1036
}
1037

1038
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
144,610✔
1039
  int32_t code = 0;
144,610✔
1040
  int32_t lino;
1041

1042
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
144,610✔
1043
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
289,220✔
1044
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
289,220✔
1045
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
144,610✔
1046
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
144,610✔
1047

1048
_exit:
144,610✔
1049

1050
  return code;
144,610✔
1051
}
1052

1053

1054
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
35,064,440✔
1055
  int32_t code = 0;
35,064,440✔
1056
  int32_t lino;
1057

1058
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
35,064,440✔
1059
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
70,128,880✔
1060
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
35,064,440✔
1061
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
35,064,440✔
1062
  for (int32_t i = 0; i < deployNum; ++i) {
35,633,776✔
1063
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
569,336✔
1064
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
569,336✔
1065
  }
1066

1067
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
35,064,440✔
1068
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
35,064,440✔
1069
  for (int32_t i = 0; i < startNum; ++i) {
35,472,972✔
1070
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
408,532✔
1071
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
408,532✔
1072
  }
1073

1074
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
70,128,880✔
1075
  if (!pRsp->undeploy.undeployAll) {
35,064,440✔
1076
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
35,064,440✔
1077
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
35,064,440✔
1078
    for (int32_t i = 0; i < undeployNum; ++i) {
35,611,358✔
1079
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
546,918✔
1080
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
546,918✔
1081
    }
1082
  }
1083

1084
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
35,064,440✔
1085
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
35,064,440✔
1086
  for (int32_t i = 0; i < rspNum; ++i) {
35,209,050✔
1087
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
144,610✔
1088
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
144,610✔
1089
  }
1090
  
1091
_exit:
35,064,440✔
1092

1093
  tEndEncode(pEncoder);
35,064,440✔
1094

1095
  return code;
35,064,440✔
1096
}
1097

1098
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
321,027✔
1099
  int32_t code = 0;
321,027✔
1100
  int32_t lino;
1101

1102
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
642,054✔
1103
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
642,054✔
1104
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblSuid));
642,054✔
1105
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
642,054✔
1106
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
642,054✔
1107
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
642,054✔
1108
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
642,054✔
1109
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
642,054✔
1110
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
642,054✔
1111
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
642,054✔
1112
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
642,054✔
1113

1114
_exit:
321,027✔
1115

1116
  return code;
321,027✔
1117
}
1118

1119

1120
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
392,777✔
1121
  int32_t code = 0;
392,777✔
1122
  int32_t lino;
1123

1124
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
785,554✔
1125
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
785,554✔
1126

1127
_exit:
392,777✔
1128

1129
  return code;
392,777✔
1130
}
1131

1132

1133
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
713,804✔
1134
  int32_t code = 0;
713,804✔
1135
  int32_t lino;
1136

1137
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
1,427,608✔
1138
  if (pMsg->triggerReader) {
713,804✔
1139
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
321,027✔
1140
  } else {
1141
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
392,777✔
1142
  }
1143
  
1144
_exit:
392,777✔
1145

1146
  return code;
713,804✔
1147
}
1148

1149

1150
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
947,875✔
1151
  int32_t code = 0;
947,875✔
1152
  int32_t lino;
1153

1154
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
1,895,750✔
1155
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
1,895,750✔
1156
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
947,875✔
1157

1158
_exit:
947,875✔
1159

1160
  return code;
947,875✔
1161
}
1162

1163

1164
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
610,263✔
1165
  int32_t code = 0;
610,263✔
1166
  int32_t lino;
1167

1168
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
610,263✔
1169
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,220,526✔
1170

1171
_exit:
610,263✔
1172

1173
  return code;
610,263✔
1174
}
1175

1176

1177
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
206,827✔
1178
  int32_t code = 0;
206,827✔
1179
  int32_t lino;
1180

1181
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
413,654✔
1182
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
413,654✔
1183
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
413,654✔
1184
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
413,654✔
1185
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
413,654✔
1186
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
413,654✔
1187
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
413,654✔
1188
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
413,654✔
1189
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblStb));
413,654✔
1190
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
413,654✔
1191

1192
  int32_t addrSize = 0;
206,827✔
1193
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
206,827✔
1194
  if (addrSize > 0) {
206,827✔
1195
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
57,312✔
1196
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
57,312✔
1197
  }
1198
  for (int32_t i = 0; i < addrSize; ++i) {
264,139✔
1199
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
57,312✔
1200
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
57,312✔
1201
  }
1202
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
413,654✔
1203
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
413,654✔
1204
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
413,654✔
1205

1206
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
413,654✔
1207
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
413,654✔
1208
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
413,654✔
1209
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
413,654✔
1210
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->idleTimeoutMs));
413,654✔
1211

1212
  switch (pMsg->triggerType) {
206,827✔
1213
    case WINDOW_TYPE_SESSION:
8,754✔
1214
      // session trigger
1215
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
17,508✔
1216
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
17,508✔
1217
      break;
8,754✔
1218
    case WINDOW_TYPE_STATE:
66,371✔
1219
      // state trigger
1220
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
132,742✔
1221
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.extend));
132,742✔
1222
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForType));
132,742✔
1223
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForCount));
132,742✔
1224
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
132,742✔
1225
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.zeroth, NULL));
132,742✔
1226
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.expr, NULL));
132,742✔
1227
      break;
66,371✔
1228
    
1229
    case WINDOW_TYPE_INTERVAL:
79,619✔
1230
      // slide trigger
1231
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
159,238✔
1232
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
159,238✔
1233
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
159,238✔
1234
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
159,238✔
1235
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
159,238✔
1236
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
159,238✔
1237
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
159,238✔
1238
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
159,238✔
1239
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
159,238✔
1240
      break;
79,619✔
1241
    
1242
    case WINDOW_TYPE_EVENT:
25,410✔
1243
      // event trigger
1244
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
50,820✔
1245
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
50,820✔
1246
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForType));
50,820✔
1247
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForCount));
50,820✔
1248
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
50,820✔
1249
      break;
25,410✔
1250
    
1251
    case WINDOW_TYPE_COUNT:
19,056✔
1252
      // count trigger
1253
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
38,112✔
1254
      
1255
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
38,112✔
1256
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
38,112✔
1257
      break;
19,056✔
1258
    
1259
    case WINDOW_TYPE_PERIOD:
7,617✔
1260
      // period trigger
1261
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.periodUnit));
15,234✔
1262
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.offsetUnit));
15,234✔
1263
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.period.precision));
15,234✔
1264
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
15,234✔
1265
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
15,234✔
1266
      break;
7,617✔
1267
    default:
×
1268
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1269
      break;
×
1270
  }
1271

1272
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
413,654✔
1273
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
413,654✔
1274
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
413,654✔
1275
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
413,654✔
1276
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcPkSlotId));
413,654✔
1277
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triPkSlotId));
413,654✔
1278
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
413,654✔
1279
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
413,654✔
1280
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
413,654✔
1281

1282
  int32_t readerNum = 0;
206,827✔
1283
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
206,827✔
1284
  if (readerNum > 0) {
206,827✔
1285
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
205,790✔
1286
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
205,790✔
1287
  }
1288
  for (int32_t i = 0; i < readerNum; ++i) {
463,837✔
1289
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
257,010✔
1290
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
257,010✔
1291
  }
1292

1293
  int32_t runnerNum = 0;
206,827✔
1294
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
206,827✔
1295
  if (runnerNum > 0) {
206,827✔
1296
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
203,421✔
1297
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
203,421✔
1298
  }
1299
  for (int32_t i = 0; i < runnerNum; ++i) {
817,090✔
1300
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
610,263✔
1301
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
610,263✔
1302
  }
1303

1304
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
413,654✔
1305
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
413,654✔
1306
  if (!tDecodeIsEnd(pDecoder)) {
206,827✔
1307
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->precision));
413,654✔
1308
  }
1309

1310
_exit:
206,827✔
1311

1312
  return code;
206,827✔
1313
}
1314

1315

1316

1317
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
3,623,425✔
1318
  int32_t code = 0;
3,623,425✔
1319
  int32_t lino;
1320

1321
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
3,623,425✔
1322
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
7,246,850✔
1323
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
7,246,850✔
1324
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
7,246,850✔
1325
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
7,246,850✔
1326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
7,246,850✔
1327

1328
_exit:
3,623,425✔
1329

1330
  return code;
3,623,425✔
1331
}
1332

1333
void destroySStreamOutCols(void* p){
70,062✔
1334
  if (p == NULL) return;
70,062✔
1335
  SStreamOutCol* col = (SStreamOutCol*)p;
70,062✔
1336
  taosMemoryFreeClear(col->expr);
70,062✔
1337
}
1338

1339
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
730,589✔
1340
  int32_t code = 0;
730,589✔
1341
  int32_t lino;
1342

1343
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,461,178✔
1344
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1,461,178✔
1345
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
1,461,178✔
1346
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
1,461,178✔
1347
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
1,461,178✔
1348
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
1,461,178✔
1349
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
1,461,178✔
1350
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
1,461,178✔
1351

1352
  int32_t addrSize = 0;
730,589✔
1353
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
730,589✔
1354
  if (addrSize > 0) {
730,589✔
1355
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
161,718✔
1356
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
161,718✔
1357
  }
1358
  for (int32_t i = 0; i < addrSize; ++i) {
892,307✔
1359
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
161,718✔
1360
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
161,718✔
1361
  }
1362
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
1,461,178✔
1363

1364
  int32_t outColNum = 0;
730,589✔
1365
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
730,589✔
1366
  if (outColNum > 0) {
730,589✔
1367
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
730,589✔
1368
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
730,589✔
1369
  }
1370
  for (int32_t i = 0; i < outColNum; ++i) {
3,725,166✔
1371
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
2,994,577✔
1372
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
2,994,577✔
1373
  }
1374

1375
  int32_t outTagNum = 0;
730,589✔
1376
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
730,589✔
1377
  if (outTagNum > 0) {
730,589✔
1378
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
313,959✔
1379
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
313,959✔
1380
  }
1381
  for (int32_t i = 0; i < outTagNum; ++i) {
1,359,437✔
1382
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
628,848✔
1383
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
628,848✔
1384
  }
1385

1386
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
1,461,178✔
1387
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
1,461,178✔
1388

1389
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
1,461,178✔
1390
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
1,461,178✔
1391

1392
  int32_t forceOutColsSize = 0;
730,589✔
1393
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
730,589✔
1394
  if (forceOutColsSize > 0) {
730,589✔
1395
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
12,690✔
1396
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
12,690✔
1397
  }
1398
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
800,651✔
1399
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
70,062✔
1400

1401
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
140,124✔
1402
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
140,124✔
1403
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
140,124✔
1404
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
140,124✔
1405
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
140,124✔
1406
  }
1407

1408
  if (!tDecodeIsEnd(pDecoder)) {
730,589✔
1409
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
1,461,178✔
1410
  }
1411

1412
  // colCids and tagCids - always decode size, create array only if size > 0
1413
  // For backward compatibility, check if there's more data before decoding
1414
  if (!tDecodeIsEnd(pDecoder)) {
730,589✔
1415
    int32_t colCidsSize = 0;
730,589✔
1416
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &colCidsSize));
730,589✔
1417
    if (colCidsSize > 0 && colCidsSize <= TSDB_MAX_COLUMNS) {  // Sanity check
730,589✔
1418
      pMsg->colCids = taosArrayInit(colCidsSize, sizeof(int16_t));
8,985✔
1419
      TSDB_CHECK_NULL(pMsg->colCids, code, lino, _exit, terrno);
8,985✔
1420
      for (int32_t i = 0; i < colCidsSize; ++i) {
36,867✔
1421
        int16_t cid = 0;
27,882✔
1422
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
27,882✔
1423
        if (taosArrayPush(pMsg->colCids, &cid) == NULL) {
55,764✔
1424
          TAOS_CHECK_EXIT(terrno);
×
1425
        }
1426
      }
1427
    }
1428
  }
1429
  // Try to decode tagCids if there's more data
1430
  if (!tDecodeIsEnd(pDecoder)) {
730,589✔
1431
    int32_t tagCidsSize = 0;
730,589✔
1432
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &tagCidsSize));
730,589✔
1433
    if (tagCidsSize > 0 && tagCidsSize <= TSDB_MAX_TAGS) {  // Sanity check
730,589✔
1434
      pMsg->tagCids = taosArrayInit(tagCidsSize, sizeof(int16_t));
6,855✔
1435
      TSDB_CHECK_NULL(pMsg->tagCids, code, lino, _exit, terrno);
6,855✔
1436
      for (int32_t i = 0; i < tagCidsSize; ++i) {
18,165✔
1437
        int16_t cid = 0;
11,310✔
1438
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
11,310✔
1439
        if (taosArrayPush(pMsg->tagCids, &cid) == NULL) {
22,620✔
1440
          TAOS_CHECK_EXIT(terrno);
×
1441
        }
1442
      }
1443
    }
1444
  }
1445

1446
_exit:
728,825✔
1447

1448
  return code;
730,589✔
1449
}
1450

1451
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
1,651,220✔
1452
  int32_t code = 0;
1,651,220✔
1453
  int32_t lino;
1454

1455
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1,651,220✔
1456
  switch (pTask->task.type) {
1,651,220✔
1457
    case STREAM_READER_TASK:
713,804✔
1458
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
713,804✔
1459
      break;
713,804✔
1460
    case STREAM_TRIGGER_TASK:
206,827✔
1461
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
206,827✔
1462
      break;
206,827✔
1463
    case STREAM_RUNNER_TASK:
730,589✔
1464
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
730,589✔
1465
      break;
730,589✔
1466
    default:
×
1467
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1468
      break;
×
1469
  }
1470
  
1471
_exit:
1,651,220✔
1472

1473
  return code;
1,651,220✔
1474
}
1475

1476

1477
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
283,120✔
1478
  int32_t code = 0;
283,120✔
1479
  int32_t lino;
1480

1481
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
566,240✔
1482

1483
  int32_t readerNum = 0;
283,120✔
1484
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
283,120✔
1485
  if (readerNum > 0) {
283,120✔
1486
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
268,433✔
1487
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
268,433✔
1488
  }
1489
  for (int32_t i = 0; i < readerNum; ++i) {
996,924✔
1490
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
713,804✔
1491
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
713,804✔
1492
  }
1493

1494
  int32_t triggerTask = 0;
283,120✔
1495
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
283,120✔
1496
  if (triggerTask) {
283,120✔
1497
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
206,827✔
1498
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
206,827✔
1499
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
206,827✔
1500
  }
1501
  
1502
  int32_t runnerNum = 0;
283,120✔
1503
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
283,120✔
1504
  if (runnerNum > 0) {
283,120✔
1505
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
211,550✔
1506
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
211,550✔
1507
  }
1508
  for (int32_t i = 0; i < runnerNum; ++i) {
1,013,709✔
1509
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
730,589✔
1510
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
730,589✔
1511
  }
1512

1513
_exit:
283,120✔
1514

1515
  return code;
283,120✔
1516
}
1517

1518

1519
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
204,310✔
1520
  int32_t code = 0;
204,310✔
1521
  int32_t lino;
1522

1523
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
204,310✔
1524

1525
_exit:
204,310✔
1526

1527
  return code;
204,310✔
1528
}
1529

1530

1531
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
204,310✔
1532
  int32_t code = 0;
204,310✔
1533
  int32_t lino;
1534

1535
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
204,310✔
1536
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
204,310✔
1537

1538
_exit:
204,310✔
1539

1540
  return code;
204,310✔
1541
}
1542

1543

1544
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
273,486✔
1545
  int32_t code = 0;
273,486✔
1546
  int32_t lino;
1547

1548
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
273,486✔
1549
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
546,972✔
1550
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
546,972✔
1551

1552
_exit:
273,486✔
1553

1554
  return code;
273,486✔
1555
}
1556

1557

1558
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
273,486✔
1559
  int32_t code = 0;
273,486✔
1560
  int32_t lino;
1561

1562
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
273,486✔
1563
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
273,486✔
1564

1565
_exit:
273,486✔
1566

1567
  return code;
273,486✔
1568
}
1569

1570
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
3,704✔
1571
  int32_t code = 0;
3,704✔
1572
  int32_t lino;
1573

1574
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
7,408✔
1575
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
7,408✔
1576
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
7,408✔
1577

1578
_exit:
3,704✔
1579

1580
  return code;
3,704✔
1581
}
1582

1583
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
72,305✔
1584
  int32_t code = 0;
72,305✔
1585
  int32_t lino;
1586

1587
  switch (msgType) {
72,305✔
1588
    case STREAM_MSG_ORIGTBL_READER_INFO: {
65,950✔
1589
      int32_t vgNum = 0;
65,950✔
1590
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
65,950✔
1591
      if (vgNum > 0) {
65,950✔
1592
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
65,950✔
1593
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
65,950✔
1594
      }
1595
      for (int32_t i = 0; i < vgNum; ++i) {
259,017✔
1596
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
193,067✔
1597
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
193,067✔
1598
      }
1599

1600
      int32_t readerNum = 0;
65,950✔
1601
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
65,950✔
1602
      if (readerNum > 0) {
65,950✔
1603
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
55,336✔
1604
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
55,336✔
1605
      }
1606
      for (int32_t i = 0; i < readerNum; ++i) {
129,661✔
1607
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
63,711✔
1608
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
63,711✔
1609
      }
1610
      break;
65,950✔
1611
    }
1612
    case STREAM_MSG_UPDATE_RUNNER: {
×
1613
      int32_t runnerNum = 0;
×
1614
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1615
      if (runnerNum > 0) {
×
1616
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1617
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1618
      }
1619
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1620
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1621
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1622
      }
1623
      break;
×
1624
    }
1625
    case STREAM_MSG_USER_RECALC: {
3,704✔
1626
      int32_t recalcNum = 0;
3,704✔
1627
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
3,704✔
1628
      if (recalcNum > 0) {
3,704✔
1629
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
3,704✔
1630
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
3,704✔
1631
      }
1632
      for (int32_t i = 0; i < recalcNum; ++i) {
7,408✔
1633
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
3,704✔
1634
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
3,704✔
1635
      }
1636
      break;
3,704✔
1637
    }
1638
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
2,651✔
1639
      int32_t rspNum = 0, vgNum = 0;
2,651✔
1640
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));  
2,651✔
1641
      if (rspNum > 0) {
2,651✔
1642
        pCont->execRspList = taosArrayInit_s(sizeof(SStreamOReaderDeployRsp), rspNum);
2,651✔
1643
        TSDB_CHECK_NULL(pCont->execRspList, code, lino, _exit, terrno);
2,651✔
1644
      }
1645
      for (int32_t i = 0; i < rspNum; ++i) {
5,302✔
1646
        SStreamOReaderDeployRsp *pDeployRsp = taosArrayGet(pCont->execRspList, i);
2,651✔
1647
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pDeployRsp->execId));  
5,302✔
1648
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));
2,651✔
1649
        if (vgNum > 0) {
2,651✔
1650
          pDeployRsp->vgList = taosArrayInit_s(sizeof(SStreamTaskAddr), vgNum);
2,651✔
1651
          TSDB_CHECK_NULL(pDeployRsp->vgList, code, lino, _exit, terrno);
2,651✔
1652
        }
1653
        for (int32_t n = 0; n < vgNum; ++n) {
5,302✔
1654
          SStreamTaskAddr* pAddr = taosArrayGet(pDeployRsp->vgList, n);
2,651✔
1655
          TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));  
2,651✔
1656
        }
1657
      }
1658
      break;
2,651✔
1659
    }
1660
    default:
×
1661
      break;
×
1662
  }
1663

1664
_exit:
72,305✔
1665

1666
  return code;
72,305✔
1667
}
1668

1669

1670
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
72,305✔
1671
  int32_t code = 0;
72,305✔
1672
  int32_t lino;
1673

1674
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
72,305✔
1675
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
144,610✔
1676
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
144,610✔
1677
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
72,305✔
1678
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
72,305✔
1679

1680
_exit:
72,305✔
1681

1682
  return code;
72,305✔
1683
}
1684

1685
void tFreeSStreamOReaderDeployRsp(void* param) {
5,302✔
1686
  if (NULL == param) {
5,302✔
1687
    return;
×
1688
  }
1689

1690
  SStreamOReaderDeployRsp* pRsp = (SStreamOReaderDeployRsp*)param;
5,302✔
1691
  taosArrayDestroy(pRsp->vgList);
5,302✔
1692
}
1693

1694
void tFreeSStreamMgmtRsp(void* param) {
144,610✔
1695
  if (NULL == param) {
144,610✔
1696
    return;
×
1697
  }
1698
  
1699
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
144,610✔
1700

1701
  taosArrayDestroy(pRsp->cont.vgIds);
144,610✔
1702
  taosArrayDestroy(pRsp->cont.readerList);
144,610✔
1703
  taosArrayDestroy(pRsp->cont.runnerList);
144,610✔
1704
  taosArrayDestroy(pRsp->cont.recalcList);
144,610✔
1705
  taosArrayDestroyEx(pRsp->cont.execRspList, tFreeSStreamOReaderDeployRsp);
144,610✔
1706
}
1707

1708
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
713,804✔
1709
  if (NULL == pReader) {
713,804✔
1710
    return;
×
1711
  }
1712
  
1713
  if (pReader->triggerReader) {
713,804✔
1714
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
321,027✔
1715
    taosMemoryFree(pMsg->triggerTblName);
321,027✔
1716
    taosMemoryFree(pMsg->partitionCols);
321,027✔
1717
    taosMemoryFree(pMsg->triggerCols);
321,027✔
1718
    taosMemoryFree(pMsg->triggerScanPlan);
321,027✔
1719
    taosMemoryFree(pMsg->calcCacheScanPlan);
321,027✔
1720
  } else {
1721
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
392,777✔
1722
    taosMemoryFree(pMsg->calcScanPlan);
392,777✔
1723
  }
1724
}
1725

1726
void tFreeStreamNotifyUrl(void* param) {
×
1727
  if (NULL == param) {
×
1728
    return;
×
1729
  }
1730

1731
  taosMemoryFree(*(void**)param);
×
1732
}
1733

1734
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
206,827✔
1735
  if (NULL == pTrigger) {
206,827✔
1736
    return;
×
1737
  }
1738
  
1739
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
206,827✔
1740
  switch (pTrigger->triggerType) {
206,827✔
1741
    case WINDOW_TYPE_STATE:
66,371✔
1742
      taosMemoryFree(pTrigger->trigger.stateWin.zeroth);
66,371✔
1743
      taosMemoryFree(pTrigger->trigger.stateWin.expr);
66,371✔
1744
      break;
66,371✔
1745
    case WINDOW_TYPE_EVENT:
25,410✔
1746
      taosMemoryFree(pTrigger->trigger.event.startCond);
25,410✔
1747
      taosMemoryFree(pTrigger->trigger.event.endCond);
25,410✔
1748
      break;
25,410✔
1749
    case WINDOW_TYPE_COUNT:
19,056✔
1750
      taosMemoryFree(pTrigger->trigger.count.condCols);  
19,056✔
1751
      break;
19,056✔
1752
    default:
95,990✔
1753
      break;
95,990✔
1754
  }
1755

1756
  taosMemoryFree(pTrigger->partitionCols);
206,827✔
1757
  taosMemoryFree(pTrigger->triggerPrevFilter);
206,827✔
1758
  taosMemoryFree(pTrigger->triggerScanPlan);
206,827✔
1759
  taosMemoryFree(pTrigger->calcCacheScanPlan);
206,827✔
1760

1761
  taosArrayDestroy(pTrigger->readerList);
206,827✔
1762
  taosArrayDestroy(pTrigger->runnerList);
206,827✔
1763
  taosMemoryFree(pTrigger->streamName);
206,827✔
1764
}
1765

1766
void tFreeSStreamOutCol(void* param) {
×
1767
  if (NULL == param) {
×
1768
    return;
×
1769
  }
1770

1771
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1772
  taosMemoryFree(pOut->expr);
×
1773
}
1774

1775
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
730,589✔
1776
  if (NULL == pRunner) {
730,589✔
1777
    return;
×
1778
  }
1779

1780
  taosMemoryFree(pRunner->streamName);
730,589✔
1781
  taosMemoryFree(pRunner->pPlan);
730,589✔
1782
  taosMemoryFree(pRunner->outDBFName);
730,589✔
1783
  taosMemoryFree(pRunner->outTblName);
730,589✔
1784

1785
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
730,589✔
1786
  taosArrayDestroy(pRunner->outCols);
730,589✔
1787
  taosArrayDestroy(pRunner->outTags);
730,589✔
1788

1789
  taosMemoryFree(pRunner->subTblNameExpr);
730,589✔
1790
  taosMemoryFree(pRunner->tagValueExpr);
730,589✔
1791
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
730,589✔
1792
}
1793

1794
void tFreeSStmTaskDeploy(void* param) {
2,012,181✔
1795
  if (NULL == param) {
2,012,181✔
1796
    return;
360,961✔
1797
  }
1798

1799
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
1,651,220✔
1800
  switch (pTask->task.type)  {
1,651,220✔
1801
    case STREAM_READER_TASK:
713,804✔
1802
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
713,804✔
1803
      break;
713,804✔
1804
    case STREAM_TRIGGER_TASK:
206,827✔
1805
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
206,827✔
1806
      break;
206,827✔
1807
    case STREAM_RUNNER_TASK:
730,589✔
1808
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
730,589✔
1809
      break;
730,589✔
1810
    default:
×
1811
      break;
×
1812
  }
1813
}
1814

1815

1816
void tFreeSStmStreamDeploy(void* param) {
284,668✔
1817
  if (NULL == param) {
284,668✔
1818
    return;
×
1819
  }
1820
  
1821
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
284,668✔
1822
  int32_t readerNum = taosArrayGetSize(pDeploy->readerTasks);
284,668✔
1823
  for (int32_t i = 0; i < readerNum; ++i) {
1,005,082✔
1824
    SStmTaskDeploy* pReader = taosArrayGet(pDeploy->readerTasks, i);
720,414✔
1825
    if (!pReader->msg.reader.triggerReader && pReader->msg.reader.msg.calc.freeScanPlan) {
720,414✔
1826
      taosMemoryFreeClear(pReader->msg.reader.msg.calc.calcScanPlan);
385,931✔
1827
    }
1828
  }
1829
  taosArrayDestroy(pDeploy->readerTasks);
284,668✔
1830

1831
  if (pDeploy->triggerTask) {
284,668✔
1832
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
208,451✔
1833
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
208,451✔
1834
    taosMemoryFree(pDeploy->triggerTask);
208,451✔
1835
  }
1836

1837
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
284,668✔
1838
  for (int32_t i = 0; i < runnerNum; ++i) {
1,020,087✔
1839
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
735,419✔
1840
    taosMemoryFree(pRunner->msg.runner.pPlan);
735,419✔
1841
  }
1842
  taosArrayDestroy(pDeploy->runnerTasks);
284,668✔
1843
}
1844

1845
void tDeepFreeSStmStreamDeploy(void* param) {
567,788✔
1846
  if (NULL == param) {
567,788✔
1847
    return;
×
1848
  }
1849
  
1850
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
567,788✔
1851
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
567,788✔
1852
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
567,788✔
1853
  taosMemoryFree(pDeploy->triggerTask);
567,788✔
1854
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
567,788✔
1855
}
1856

1857

1858
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
35,356,786✔
1859
  if (NULL == pRsp) {
35,356,786✔
1860
    return;
×
1861
  }
1862
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
35,356,786✔
1863
  taosArrayDestroy(pRsp->start.taskList);
35,356,786✔
1864
  taosArrayDestroy(pRsp->undeploy.taskList);
35,356,786✔
1865
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
35,356,786✔
1866
}
1867

1868
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
17,478,130✔
1869
  if (NULL == pRsp) {
17,478,130✔
1870
    return;
×
1871
  }
1872
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
17,478,130✔
1873
  taosArrayDestroy(pRsp->start.taskList);
17,478,130✔
1874
  taosArrayDestroy(pRsp->undeploy.taskList);
17,478,130✔
1875
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
17,478,130✔
1876
}
1877

1878

1879

1880
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
17,478,130✔
1881
  int32_t code = 0;
17,478,130✔
1882
  int32_t lino;
1883

1884
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
17,478,130✔
1885
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
34,956,260✔
1886
  int32_t deployNum = 0;
17,478,130✔
1887
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
17,478,130✔
1888
  if (deployNum > 0) {
17,478,130✔
1889
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
72,427✔
1890
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
72,427✔
1891
  }
1892
  for (int32_t i = 0; i < deployNum; ++i) {
17,761,250✔
1893
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
283,120✔
1894
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
283,120✔
1895
  }
1896

1897
  int32_t startNum = 0;
17,478,130✔
1898
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
17,478,130✔
1899
  if (startNum > 0) {
17,478,130✔
1900
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
98,428✔
1901
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
98,428✔
1902
  }
1903
  for (int32_t i = 0; i < startNum; ++i) {
17,682,440✔
1904
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
204,310✔
1905
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
204,310✔
1906
  }
1907

1908
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
34,956,260✔
1909
  if (!pRsp->undeploy.undeployAll) {
17,478,130✔
1910
    int32_t undeployNum = 0;
17,478,130✔
1911
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
17,478,130✔
1912
    if (undeployNum > 0) {
17,478,130✔
1913
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
37,640✔
1914
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
37,640✔
1915
    }
1916
    for (int32_t i = 0; i < undeployNum; ++i) {
17,751,616✔
1917
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
273,486✔
1918
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
273,486✔
1919
    }
1920
  }  
1921

1922
  int32_t rspNum = 0;
17,478,130✔
1923
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
17,478,130✔
1924
  if (rspNum > 0) {
17,478,130✔
1925
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
21,533✔
1926
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
21,533✔
1927
    for (int32_t i = 0; i < rspNum; ++i) {
93,838✔
1928
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
72,305✔
1929
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
72,305✔
1930
    }
1931
  }
1932

1933
  tEndDecode(pDecoder);
17,478,130✔
1934

1935
_exit:
17,478,130✔
1936
  return code;
17,478,130✔
1937
}
1938

1939
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
1940
  int32_t code = 0;
×
1941
  int32_t lino;
1942

1943
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
1944
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
1945
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
1946
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
1947
  tEndEncode(pEncoder);
×
1948

1949
_exit:
×
1950
  return code;
×
1951
}
1952

1953
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
1954
  int32_t code = 0;
×
1955
  int32_t lino;
1956

1957
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
1958
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
1959
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
1960
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
1961
  tEndDecode(pDecoder);
×
1962

1963
_exit:
×
1964
  return code;
×
1965
}
1966

1967
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
1968
  int32_t code = 0;
×
1969
  int32_t lino;
1970

1971
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
1972
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
1973
  tEndEncode(pEncoder);
×
1974

1975
_exit:
×
1976
  return code;
×
1977
}
1978

1979
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
1980
  int32_t code = 0;
×
1981
  int32_t lino;
1982

1983
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
1984
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
1985
  tEndDecode(pDecoder);
×
1986

1987
_exit:
×
1988
  return code;
×
1989

1990
}
1991

1992

1993
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
1,578,364✔
1994
  int32_t code = TSDB_CODE_SUCCESS;
1,578,364✔
1995
  int32_t lino = 0;
1,578,364✔
1996

1997
  char*   json = NULL;
1,578,364✔
1998
  int32_t jsonLen = 0;
1,578,364✔
1999
  TAOS_CHECK_EXIT(scmCreateStreamReqToJson(pReq, false, &json, &jsonLen));
1,578,364✔
2000
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, json, jsonLen));
3,156,728✔
2001

2002
_exit:
1,578,364✔
2003
  taosMemoryFreeClear(json);
1,578,364✔
2004
  if (code) {
1,578,364✔
2005
    return code;
×
2006
  }
2007
  
2008
  return 0;
1,578,364✔
2009
}
2010

2011
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
733,078✔
2012
  SEncoder encoder = {0};
733,078✔
2013
  tEncoderInit(&encoder, buf, bufLen);
733,078✔
2014
  int32_t code = 0;
733,078✔
2015
  int32_t lino;
2016

2017
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
733,078✔
2018

2019
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
733,078✔
2020

2021
  tEndEncode(&encoder);
733,078✔
2022

2023
_exit:
733,078✔
2024
  if (code) {
733,078✔
2025
    tEncoderClear(&encoder);
×
2026
    return code;
×
2027
  } else {
2028
    int32_t tlen = encoder.pos;
733,078✔
2029
    tEncoderClear(&encoder);
733,078✔
2030
    return tlen;
733,078✔
2031
  }
2032
  return 0;
2033
}
2034

2035
// Old version deserialization for backward compatibility,
2036
// especially for stream version number 7
2037
int32_t tDeserializeSCMCreateStreamReqImplOld(SDecoder *pDecoder, SCMCreateStreamReq *pReq, int32_t leftBytes) {
×
2038
  int32_t code = 0;
×
2039
  int32_t lino;
2040
  pReq->calcPkSlotId = -1;
×
2041
  pReq->triPkSlotId = -1;
×
2042

2043
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2044

2045
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
×
2046
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
×
2047
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
×
2048
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
×
2049
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
×
2050
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
×
2051
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
×
2052

2053
  int32_t calcDbSize = 0;
×
2054
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
×
2055
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
×
2056
  if (pReq->calcDB == NULL) {
×
2057
    TAOS_CHECK_EXIT(terrno);
×
2058
  }
2059
  for (int32_t i = 0; i < calcDbSize; ++i) {
×
2060
    char *calcDb = NULL;
×
2061
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
×
2062
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
×
2063
    if (calcDb == NULL) {
×
2064
      TAOS_CHECK_EXIT(terrno);
×
2065
    }
2066
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
×
2067
      taosMemoryFree(calcDb);
×
2068
      TAOS_CHECK_EXIT(terrno);
×
2069
    }
2070
  }
2071

2072
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
×
2073
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
×
2074
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
×
2075
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
×
2076
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
×
2077
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
×
2078
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
×
2079
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
×
2080
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
×
2081
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
×
2082

2083
  int32_t addrSize = 0;
×
2084
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
×
2085
  if (addrSize > 0) {
×
2086
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
×
2087
    if (pReq->pNotifyAddrUrls == NULL) {
×
2088
      TAOS_CHECK_EXIT(terrno);
×
2089
    }
2090
  }
2091
  for (int32_t i = 0; i < addrSize; ++i) {
×
2092
    char *url = NULL;
×
2093
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
2094
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
2095
    if (url == NULL) {
×
2096
      TAOS_CHECK_EXIT(terrno);
×
2097
    }
2098
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
×
2099
      taosMemoryFree(url);
×
2100
      TAOS_CHECK_EXIT(terrno);
×
2101
    }
2102
  }
2103
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
×
2104
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->addOptions));
×
2105
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
×
2106

2107
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
×
2108
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
×
2109
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
×
2110

2111
  int32_t outColSize = 0;
×
2112
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
×
2113
  if (outColSize > 0) {
×
2114
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
×
2115
    if (pReq->outCols == NULL) {
×
2116
      TAOS_CHECK_EXIT(terrno);
×
2117
    }
2118

2119
    for (int32_t i = 0; i < outColSize; ++i) {
×
2120
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
×
2121
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
×
2122
    }
2123
  }
2124

2125
  int32_t outTagSize = 0;
×
2126
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
×
2127
  if (outTagSize > 0) {
×
2128
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
×
2129
    if (pReq->outTags == NULL) {
×
2130
      TAOS_CHECK_EXIT(terrno);
×
2131
    }
2132

2133
    for (int32_t i = 0; i < outTagSize; ++i) {
×
2134
      SFieldWithOptions field = {0};
×
2135
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
×
2136
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
×
2137
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
×
2138
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
×
2139
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
×
2140
        TAOS_CHECK_EXIT(terrno);
×
2141
      }
2142
    }
2143
  }
2144

2145
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
×
2146
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
×
2147
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
×
2148
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
×
2149

2150
  switch (pReq->triggerType) {
×
2151
    case WINDOW_TYPE_SESSION: {
×
2152
      // session trigger
2153
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
×
2154
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
×
2155
      break;
×
2156
    }
2157
      case WINDOW_TYPE_STATE: {
×
2158
        // state trigger
2159
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
×
2160
        pReq->trigger.stateWin.extend = 0;
×
2161
        pReq->trigger.stateWin.trueForType = 0;
×
2162
        pReq->trigger.stateWin.trueForCount = 0;
×
2163
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
×
2164
        break;
×
2165
      }
2166
      case WINDOW_TYPE_INTERVAL: {
×
2167
        // slide trigger
2168
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
×
2169
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
×
2170
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
×
2171
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
×
2172
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
×
2173
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
×
2174
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
×
2175
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
×
2176
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
×
2177
        break;
×
2178
      }
2179
      case WINDOW_TYPE_EVENT: {
×
2180
        // event trigger
2181
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
×
2182
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
×
2183
        pReq->trigger.event.trueForType = 0;
×
2184
        pReq->trigger.event.trueForCount = 0;
×
2185
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
×
2186
        break;
×
2187
      }
2188
      case WINDOW_TYPE_COUNT: {
×
2189
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
×
2190

2191
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
×
2192
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
×
2193
        break;
×
2194
      }
2195
      case WINDOW_TYPE_PERIOD: {
×
2196
        // period trigger
2197
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
×
2198
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
×
2199
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
×
2200
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
×
2201
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
×
2202
        break;
×
2203
      }
2204
      default:
×
2205
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2206
  }
2207

2208
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
×
2209
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
×
2210
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
×
2211
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
×
2212
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
×
2213
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
×
2214
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
×
2215
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
×
2216
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
×
2217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
×
2218
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
×
2219
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
×
2220
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
×
2221
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
×
2222

2223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
×
2224
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
×
2225

2226
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
×
2227

2228
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
×
2229
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
×
2230

2231
  int32_t calcScanPlanListSize = 0;
×
2232
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
×
2233
  if (calcScanPlanListSize > 0) {
×
2234
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
×
2235
    if (pReq->calcScanPlanList == NULL) {
×
2236
      TAOS_CHECK_EXIT(terrno);
×
2237
    }
2238
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
×
2239
      SStreamCalcScan calcScan = {0};
×
2240
      int32_t         vgListSize = 0;
×
2241
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
×
2242
      if (vgListSize > 0) {
×
2243
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
×
2244
        if (calcScan.vgList == NULL) {
×
2245
          TAOS_CHECK_EXIT(terrno);
×
2246
        }
2247
        for (int32_t j = 0; j < vgListSize; ++j) {
×
2248
          int32_t vgId = 0;
×
2249
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
×
2250
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
×
2251
            TAOS_CHECK_EXIT(terrno);
×
2252
          }
2253
        }
2254
      }
2255
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
×
2256
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
×
2257
      if (taosArrayPush(pReq->calcScanPlanList, &calcScan) == NULL) {
×
2258
        TAOS_CHECK_EXIT(terrno);
×
2259
      }
2260
    }
2261
  }
2262

2263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
×
2264
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
×
2265
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
×
2266
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
×
2267

2268
  int32_t forceOutColsSize = 0;
×
2269
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
×
2270
  if (forceOutColsSize > 0) {
×
2271
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
×
2272
    if (pReq->forceOutCols == NULL) {
×
2273
      TAOS_CHECK_EXIT(terrno);
×
2274
    }
2275
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
×
2276
      SStreamOutCol outCol = {0};
×
2277
      int64_t       exprLen = 0;
×
2278
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
×
2279
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
×
2280
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
×
2281
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
×
2282
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
×
2283
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
×
2284
        TAOS_CHECK_EXIT(terrno);
×
2285
      }
2286
    }
2287
  }
2288

2289
  // LeftBytes is the size of all fields at the tail of SStreamObj.
2290
  // If there are more data in the buffer, then it means
2291
  // the new fields are added in SStreamObj, need to decode them.
2292
  if (pDecoder->size - pDecoder->pos > leftBytes) {
×
2293
    switch (pReq->triggerType) {
×
2294
      case WINDOW_TYPE_STATE: {
×
2295
        // state trigger
2296
        if (!tDecodeIsEnd(pDecoder)) {
×
2297
          TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.stateWin.expr, NULL));
×
2298
        }
2299
        if (!tDecodeIsEnd(pDecoder)) {
×
2300
          TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.extend));
×
2301
        }
2302
        break;
×
2303
      }
2304
      case WINDOW_TYPE_INTERVAL: {
×
2305
        if (!tDecodeIsEnd(pDecoder)) {
×
2306
          TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.overlap));
×
2307
        }
2308
        break;
×
2309
      }
2310
      default:
×
2311
        break;
×
2312
    }
2313
  }
2314

2315
  if (pDecoder->size - pDecoder->pos > leftBytes) {
×
2316
    if (!tDecodeIsEnd(pDecoder)) {
×
2317
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pReq->triggerPrec));
×
2318
    }
2319
  }
2320

2321
_exit:
×
2322

2323
  return code;
×
2324
}
2325

2326
// New deserialization using JSON
2327
// start from taosd ver-3.3.8.6, stream version number 8
2328
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
493,051✔
2329
  int32_t code = 0;
493,051✔
2330
  int32_t lino;
2331

2332
  char* json = NULL;
493,051✔
2333
  SJson* pJson = NULL;
493,051✔
2334
  TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &json));
493,051✔
2335
  pJson = tjsonParse(json);
493,051✔
2336
  if (pJson == NULL) {
493,051✔
2337
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INVALID_JSON);
×
2338
  }
2339
  TAOS_CHECK_EXIT(jsonToSCMCreateStreamReq(pJson, pReq));
493,051✔
2340

2341
_exit:
493,051✔
2342
  taosMemoryFreeClear(json);
493,051✔
2343
  if (NULL != pJson) {
493,051✔
2344
    tjsonDelete(pJson);
493,051✔
2345
  }
2346

2347
  return code;
493,051✔
2348
}
2349

2350

2351
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
242,061✔
2352
  SDecoder decoder = {0};
242,061✔
2353
  tDecoderInit(&decoder, buf, bufLen);
242,061✔
2354
  int32_t code = 0;
242,061✔
2355
  int32_t lino;
2356

2357
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
242,061✔
2358
  
2359
  code = tDeserializeSCMCreateStreamReqImpl(&decoder, pReq);
242,061✔
2360
  if (TSDB_CODE_MND_STREAM_INVALID_JSON == code) {
242,061✔
2361
    uError("invalid json for stream create request, try old deserialization");
×
2362
    // try old deserialization for backward compatibility
2363
    tDecoderClear(&decoder);
×
2364
    tDecoderInit(&decoder, buf, bufLen);
×
2365
    TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
2366
    TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImplOld(&decoder, pReq, 0));
×
2367
  }
2368

2369
  tEndDecode(&decoder);
242,061✔
2370

2371
_exit:
242,061✔
2372

2373
  tDecoderClear(&decoder);
242,061✔
2374
  return code;
242,061✔
2375
}
2376

2377

2378
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
56,470✔
2379
  int32_t  code = 0;
56,470✔
2380
  int32_t  lino;
2381
  int32_t  tlen;
2382
  SEncoder encoder = {0};
56,470✔
2383
  tEncoderInit(&encoder, buf, bufLen);
56,470✔
2384

2385
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
56,470✔
2386

2387
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->count));
112,940✔
2388
  for (int32_t i = 0; i < pReq->count; i++) {
119,612✔
2389
    int32_t nameLen = pReq->name[i] == NULL ? 0 : (int32_t)strlen(pReq->name[i]) + 1;
63,142✔
2390
    TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name[i], nameLen));
126,284✔
2391
  }
2392
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
112,940✔
2393

2394
  tEndEncode(&encoder);
56,470✔
2395

2396
_exit:
56,470✔
2397
  if (code) {
56,470✔
2398
    tlen = code;
×
2399
  } else {
2400
    tlen = encoder.pos;
56,470✔
2401
  }
2402
  tEncoderClear(&encoder);
56,470✔
2403
  return tlen;
56,470✔
2404
}
2405

2406
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
21,855✔
2407
  SDecoder decoder = {0};
21,855✔
2408
  int32_t  code = 0;
21,855✔
2409
  int32_t  lino;
2410
  tDecoderInit(&decoder, buf, bufLen);
21,855✔
2411

2412
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21,855✔
2413
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->count));
43,710✔
2414
  if (pReq->count > 0) {
21,855✔
2415
    pReq->name = taosMemoryCalloc(pReq->count, sizeof(char*));
21,855✔
2416
    if (pReq->name == NULL) {
21,855✔
2417
      code = terrno;
×
2418
      goto _exit;
×
2419
    }
2420
    for (int32_t i = 0; i < pReq->count; i++) {
47,046✔
2421
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name[i], NULL));
50,382✔
2422
    }
2423
  }
2424
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
43,710✔
2425

2426
  tEndDecode(&decoder);
21,855✔
2427

2428
_exit:
21,855✔
2429
  tDecoderClear(&decoder);
21,855✔
2430
  return code;
21,855✔
2431
}
2432

2433
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
51,050✔
2434
  if (NULL == pReq) {
51,050✔
2435
    return;
×
2436
  }
2437
  if (pReq->name) {
51,050✔
2438
    for (int32_t i = 0; i < pReq->count; i++) {
108,772✔
2439
      taosMemoryFreeClear(pReq->name[i]);
57,722✔
2440
    }
2441
    taosMemoryFreeClear(pReq->name);
51,050✔
2442
  }
2443
}
2444

2445
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
2,335,216✔
2446
  if (pScan == NULL) {
2,335,216✔
2447
    return;
×
2448
  }
2449
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
2,335,216✔
2450
  taosArrayDestroy(pCalcScan->vgList);
2,335,216✔
2451
  taosMemoryFreeClear(pCalcScan->scanPlan);
2,335,216✔
2452
}
2453

2454
void tFreeStreamOutCol(void* pCol) {
79,700✔
2455
  if (pCol == NULL) {
79,700✔
2456
    return;
×
2457
  }
2458
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
79,700✔
2459
  taosMemoryFreeClear(pOutCol->expr);
79,700✔
2460
}
2461

2462

2463

2464
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
1,381,706✔
2465
  if (NULL == pReq) {
1,381,706✔
2466
    return;
215,529✔
2467
  }
2468
  taosMemoryFreeClear(pReq->name);
1,166,177✔
2469
  taosMemoryFreeClear(pReq->sql);
1,166,177✔
2470
  taosMemoryFreeClear(pReq->streamDB);
1,166,177✔
2471
  taosMemoryFreeClear(pReq->triggerDB);
1,166,177✔
2472
  taosMemoryFreeClear(pReq->outDB);
1,166,177✔
2473
  taosMemoryFreeClear(pReq->triggerTblName);
1,166,177✔
2474
  taosMemoryFreeClear(pReq->outTblName);
1,166,177✔
2475

2476
  taosArrayDestroyP(pReq->calcDB, NULL);
1,166,177✔
2477
  pReq->calcDB = NULL;
1,166,177✔
2478
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
1,166,177✔
2479
  pReq->pNotifyAddrUrls = NULL;
1,166,177✔
2480

2481
  taosMemoryFreeClear(pReq->triggerFilterCols);
1,166,177✔
2482
  taosMemoryFreeClear(pReq->triggerCols);
1,166,177✔
2483
  taosMemoryFreeClear(pReq->partitionCols);
1,166,177✔
2484

2485
  taosArrayDestroy(pReq->outTags);
1,166,177✔
2486
  pReq->outTags = NULL;
1,166,177✔
2487
  taosArrayDestroy(pReq->outCols);
1,166,177✔
2488
  pReq->outCols = NULL;
1,166,177✔
2489

2490
  switch (pReq->triggerType) {
1,166,177✔
2491
    case WINDOW_TYPE_STATE:
314,810✔
2492
      taosMemoryFreeClear(pReq->trigger.stateWin.zeroth);
314,810✔
2493
      taosMemoryFreeClear(pReq->trigger.stateWin.expr);
314,810✔
2494
      break;
314,810✔
2495
    case WINDOW_TYPE_EVENT:
117,492✔
2496
      taosMemoryFreeClear(pReq->trigger.event.startCond);
117,492✔
2497
      taosMemoryFreeClear(pReq->trigger.event.endCond);
117,492✔
2498
      break;
117,492✔
2499
    default:
733,875✔
2500
      break;
733,875✔
2501
  }
2502

2503
  taosMemoryFreeClear(pReq->triggerScanPlan);
1,166,177✔
2504
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
1,166,177✔
2505
  pReq->calcScanPlanList = NULL;
1,166,177✔
2506
  taosMemoryFreeClear(pReq->triggerPrevFilter);
1,166,177✔
2507

2508
  taosMemoryFreeClear(pReq->calcPlan);
1,166,177✔
2509
  taosMemoryFreeClear(pReq->subTblNameExpr);
1,166,177✔
2510
  taosMemoryFreeClear(pReq->tagValueExpr);
1,166,177✔
2511
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
1,166,177✔
2512
  pReq->forceOutCols = NULL;
1,166,177✔
2513
  taosArrayDestroy(pReq->colCids);
1,166,177✔
2514
  pReq->colCids = NULL;
1,166,177✔
2515
  taosArrayDestroy(pReq->tagCids);
1,166,177✔
2516
  pReq->tagCids = NULL;
1,166,177✔
2517
}
2518

2519
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
201,749✔
2520
  int32_t code = 0, lino = 0;
201,749✔
2521
  if (NULL == pSrc) {
201,749✔
2522
    return code;
×
2523
  } 
2524

2525
  void* p = NULL;
201,749✔
2526
  int32_t num = 0;
201,749✔
2527
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
201,749✔
2528
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
201,749✔
2529

2530
  SCMCreateStreamReq* pDst = *ppDst;
201,749✔
2531

2532
  if (pSrc->outDB) {
201,749✔
2533
    pDst->outDB = COPY_STR(pSrc->outDB);
198,343✔
2534
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
198,343✔
2535
  }
2536
  
2537
  if (pSrc->triggerTblName) {
201,749✔
2538
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
200,712✔
2539
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
200,712✔
2540
  }
2541
  
2542
  if (pSrc->outTblName) {
201,749✔
2543
    pDst->outTblName = COPY_STR(pSrc->outTblName);
198,343✔
2544
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
198,343✔
2545
  }
2546
  
2547
  if (pSrc->pNotifyAddrUrls) {
201,749✔
2548
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
57,312✔
2549
    if (num > 0) {
57,312✔
2550
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
57,312✔
2551
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
57,312✔
2552
    }
2553
    for (int32_t i = 0; i < num; ++i) {
114,624✔
2554
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
57,312✔
2555
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
57,312✔
2556
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
114,624✔
2557
    }
2558
  }
2559
  
2560
  if (pSrc->triggerFilterCols) {
201,749✔
2561
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
18,081✔
2562
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
18,081✔
2563
  }
2564
  
2565
  if (pSrc->triggerCols) {
201,749✔
2566
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
194,350✔
2567
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
194,350✔
2568
  }
2569
  
2570
  if (pSrc->partitionCols) {
201,749✔
2571
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
76,474✔
2572
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
76,474✔
2573
  }
2574
  
2575
  if (pSrc->outCols) {
201,749✔
2576
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
198,343✔
2577
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
198,343✔
2578
  }
2579
  
2580
  if (pSrc->outTags) {
201,749✔
2581
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
76,474✔
2582
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
76,474✔
2583
  }
2584

2585
  pDst->triggerType = pSrc->triggerType;
201,749✔
2586
  
2587
  switch (pSrc->triggerType) {
201,749✔
2588
    case WINDOW_TYPE_STATE:
67,547✔
2589
      pDst->trigger.stateWin.slotId = pSrc->trigger.stateWin.slotId;
67,547✔
2590
      pDst->trigger.stateWin.extend = pSrc->trigger.stateWin.extend;
67,547✔
2591
      pDst->trigger.stateWin.trueForType = pSrc->trigger.stateWin.trueForType;
67,547✔
2592
      pDst->trigger.stateWin.trueForCount = pSrc->trigger.stateWin.trueForCount;
67,547✔
2593
      pDst->trigger.stateWin.trueForDuration = pSrc->trigger.stateWin.trueForDuration;
67,547✔
2594
      if (pSrc->trigger.stateWin.zeroth) {
67,547✔
2595
        pDst->trigger.stateWin.zeroth = COPY_STR(pSrc->trigger.stateWin.zeroth);
×
2596
        TSDB_CHECK_NULL(pDst->trigger.stateWin.zeroth, code, lino, _exit, terrno);
×
2597
      }
2598
      if (pSrc->trigger.stateWin.expr) {
67,547✔
2599
        pDst->trigger.stateWin.expr = COPY_STR(pSrc->trigger.stateWin.expr);
67,547✔
2600
        TSDB_CHECK_NULL(pDst->trigger.stateWin.expr, code, lino, _exit, terrno);
67,547✔
2601
      }
2602
      break;
67,547✔
2603
    case WINDOW_TYPE_EVENT:
24,640✔
2604
      if (pSrc->trigger.event.startCond) {
24,640✔
2605
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
24,640✔
2606
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
24,640✔
2607
      }
2608
      
2609
      if (pSrc->trigger.event.endCond) {
24,640✔
2610
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
22,464✔
2611
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
22,464✔
2612
      }
2613
      pDst->trigger.event.trueForType = pSrc->trigger.event.trueForType;
24,640✔
2614
      pDst->trigger.event.trueForCount = pSrc->trigger.event.trueForCount;
24,640✔
2615
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
24,640✔
2616
      break;
24,640✔
2617
    case WINDOW_TYPE_COUNT:
19,056✔
2618
      pDst->trigger.count.countVal = pSrc->trigger.count.countVal;
19,056✔
2619
      pDst->trigger.count.sliding = pSrc->trigger.count.sliding;
19,056✔
2620
      if (pSrc->trigger.count.condCols) {
19,056✔
2621
        pDst->trigger.count.condCols = COPY_STR(pSrc->trigger.count.condCols);
×
2622
        TSDB_CHECK_NULL(pDst->trigger.count.condCols, code, lino, _exit, terrno);
×
2623
      }
2624
      break;
19,056✔
2625
    default:
90,506✔
2626
      pDst->trigger = pSrc->trigger;
90,506✔
2627
      break;
90,506✔
2628
  }
2629

2630

2631
  if (pSrc->triggerScanPlan) {
201,749✔
2632
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
200,712✔
2633
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
200,712✔
2634
  }
2635
  
2636
  if (pSrc->calcScanPlanList) {
201,749✔
2637
    num = taosArrayGetSize(pSrc->calcScanPlanList);
198,343✔
2638
    if (num > 0) {
198,343✔
2639
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
198,343✔
2640
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
198,343✔
2641
    }
2642
    for (int32_t i = 0; i < num; ++i) {
668,594✔
2643
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
470,251✔
2644
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
470,251✔
2645

2646
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
470,251✔
2647
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
470,251✔
2648

2649
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
470,251✔
2650
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
470,251✔
2651
      
2652
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
940,502✔
2653
    }
2654
  }
2655
  
2656
  if (pSrc->triggerPrevFilter) {
201,749✔
2657
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
7,900✔
2658
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
7,900✔
2659
  }
2660
  
2661
  if (pSrc->calcPlan) {
201,749✔
2662
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
198,343✔
2663
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
198,343✔
2664
  }
2665
  
2666
  if (pSrc->subTblNameExpr) {
201,749✔
2667
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
76,474✔
2668
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
76,474✔
2669
  }
2670
  
2671
  if (pSrc->tagValueExpr) {
201,749✔
2672
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
76,474✔
2673
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
76,474✔
2674
  }
2675
  
2676
  if (pSrc->forceOutCols) {
201,749✔
2677
    num = taosArrayGetSize(pSrc->forceOutCols);
3,474✔
2678
    if (num > 0) {
3,474✔
2679
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
3,474✔
2680
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
3,474✔
2681
    }
2682
    for (int32_t i = 0; i < num; ++i) {
23,048✔
2683
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
19,574✔
2684
      SStreamOutCol  dcol = {.type = scol->type};
19,574✔
2685

2686
      dcol.expr = COPY_STR(scol->expr);
19,574✔
2687
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
19,574✔
2688
      
2689
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
39,148✔
2690
    }
2691
  }
2692

2693
  if (pSrc->colCids) {
201,749✔
2694
    pDst->colCids = taosArrayDup(pSrc->colCids, NULL);
4,107✔
2695
    TSDB_CHECK_NULL(pDst->colCids, code, lino, _exit, terrno);
4,107✔
2696
  }
2697

2698
  if (pSrc->tagCids) {
201,749✔
2699
    pDst->tagCids = taosArrayDup(pSrc->tagCids, NULL);
3,397✔
2700
    TSDB_CHECK_NULL(pDst->tagCids, code, lino, _exit, terrno);
3,397✔
2701
  }
2702

2703
  pDst->triggerTblUid = pSrc->triggerTblUid;
201,749✔
2704
  pDst->triggerTblType = pSrc->triggerTblType;
201,749✔
2705
  pDst->triggerPrec = pSrc->triggerPrec;
201,749✔
2706
  pDst->deleteReCalc = pSrc->deleteReCalc;
201,749✔
2707
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
201,749✔
2708
  pDst->flags = pSrc->flags;
201,749✔
2709
  
2710
_exit:
201,749✔
2711

2712
  if (code) {
201,749✔
2713
    tFreeSCMCreateStreamReq(pDst);
×
2714
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2715
  }
2716

2717
  return code;
201,749✔
2718
}
2719

2720

2721
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
6,436✔
2722
  int32_t  code = 0;
6,436✔
2723
  int32_t  lino;
2724
  int32_t  tlen;
2725
  SEncoder encoder = {0};
6,436✔
2726
  tEncoderInit(&encoder, buf, bufLen);
6,436✔
2727
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
6,436✔
2728

2729
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
6,436✔
2730
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
12,872✔
2731
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
12,872✔
2732
  tEndEncode(&encoder);
6,436✔
2733

2734
_exit:
6,436✔
2735
  if (code) {
6,436✔
2736
    tlen = code;
×
2737
  } else {
2738
    tlen = encoder.pos;
6,436✔
2739
  }
2740
  tEncoderClear(&encoder);
6,436✔
2741
  return tlen;
6,436✔
2742
}
2743

2744
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
3,031✔
2745
  SDecoder decoder = {0};
3,031✔
2746
  int32_t  code = 0;
3,031✔
2747
  int32_t  lino;
2748

2749
  tDecoderInit(&decoder, buf, bufLen);
3,031✔
2750
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
3,031✔
2751
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
6,062✔
2752
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
6,062✔
2753
  tEndDecode(&decoder);
3,031✔
2754

2755
_exit:
3,031✔
2756
  tDecoderClear(&decoder);
3,031✔
2757
  return code;
3,031✔
2758
}
2759

2760
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
3,218✔
2761
  taosMemoryFreeClear(pReq->name);
3,218✔
2762
}
3,218✔
2763

2764
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
5,692✔
2765
  SEncoder encoder = {0};
5,692✔
2766
  int32_t  code = 0;
5,692✔
2767
  int32_t  lino;
2768
  int32_t  tlen;
2769
  tEncoderInit(&encoder, buf, bufLen);
5,692✔
2770
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
5,692✔
2771
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
5,692✔
2772
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
11,384✔
2773
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
11,384✔
2774
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
11,384✔
2775
  tEndEncode(&encoder);
5,692✔
2776

2777
_exit:
5,692✔
2778
  if (code) {
5,692✔
2779
    tlen = code;
×
2780
  } else {
2781
    tlen = encoder.pos;
5,692✔
2782
  }
2783
  tEncoderClear(&encoder);
5,692✔
2784
  return tlen;
5,692✔
2785
}
2786

2787
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
2,659✔
2788
  SDecoder decoder = {0};
2,659✔
2789
  int32_t  code = 0;
2,659✔
2790
  int32_t  lino;
2791

2792
  tDecoderInit(&decoder, buf, bufLen);
2,659✔
2793
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,659✔
2794
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
5,318✔
2795
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
5,318✔
2796
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
5,318✔
2797
  tEndDecode(&decoder);
2,659✔
2798

2799
_exit:
2,659✔
2800
  tDecoderClear(&decoder);
2,659✔
2801
  return code;
2,659✔
2802
}
2803

2804
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
2,846✔
2805
  taosMemoryFreeClear(pReq->name);
2,846✔
2806
}
2,846✔
2807

2808
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
24,914✔
2809
  SEncoder encoder = {0};
24,914✔
2810
  int32_t  code = 0;
24,914✔
2811
  int32_t  lino;
2812
  int32_t  tlen;
2813
  tEncoderInit(&encoder, buf, bufLen);
24,914✔
2814
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
24,914✔
2815
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
24,914✔
2816
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
49,828✔
2817
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
49,828✔
2818
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
49,828✔
2819
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
49,828✔
2820
  tEndEncode(&encoder);
24,914✔
2821

2822
_exit:
24,914✔
2823
  if (code) {
24,914✔
2824
    tlen = code;
×
2825
  } else {
2826
    tlen = encoder.pos;
24,914✔
2827
  }
2828
  tEncoderClear(&encoder);
24,914✔
2829
  return tlen;
24,914✔
2830
}
2831

2832
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
12,453✔
2833
  SDecoder decoder = {0};
12,453✔
2834
  int32_t  code = 0;
12,453✔
2835
  int32_t  lino;
2836

2837
  tDecoderInit(&decoder, buf, bufLen);
12,453✔
2838
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
12,453✔
2839

2840
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
24,906✔
2841
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
24,906✔
2842
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
24,906✔
2843
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
24,906✔
2844
  tEndDecode(&decoder);
12,453✔
2845

2846
_exit:
12,453✔
2847
  tDecoderClear(&decoder);
12,453✔
2848
  return code;
12,453✔
2849
}
2850

2851
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
24,910✔
2852
  taosMemoryFreeClear(pReq->name);
24,910✔
2853
}
24,910✔
2854

2855
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
15,040✔
2856
  int32_t code = 0;
15,040✔
2857
  int32_t lino;
2858

2859
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
30,080✔
2860
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
30,080✔
2861
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
30,080✔
2862

2863
_exit:
15,040✔
2864
  return code;
15,040✔
2865
}
2866

2867
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
15,040✔
2868
  SEncoder encoder = {0};
15,040✔
2869
  int32_t  code = 0;
15,040✔
2870
  int32_t  lino;
2871
  int32_t  tlen;
2872
  tEncoderInit(&encoder, buf, bufLen);
15,040✔
2873

2874
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
15,040✔
2875
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
15,040✔
2876

2877
  tEndEncode(&encoder);
15,040✔
2878

2879
_exit:
15,040✔
2880
  if (code) {
15,040✔
2881
    tlen = code;
×
2882
  } else {
2883
    tlen = encoder.pos;
15,040✔
2884
  }
2885
  tEncoderClear(&encoder);
15,040✔
2886
  return tlen;
15,040✔
2887
}
2888

2889
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
12,402✔
2890
  int32_t code = 0;
12,402✔
2891
  int32_t lino;
2892

2893
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
24,804✔
2894
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
24,804✔
2895
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
24,804✔
2896

2897
_exit:
12,402✔
2898
  return code;
12,402✔
2899
}
2900

2901
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
12,402✔
2902
  SDecoder decoder = {0};
12,402✔
2903
  int32_t  code = 0;
12,402✔
2904
  int32_t  lino;
2905

2906
  tDecoderInit(&decoder, (char *)buf, bufLen);
12,402✔
2907

2908
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
12,402✔
2909
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
12,402✔
2910

2911
  tEndDecode(&decoder);
12,402✔
2912

2913
_exit:
12,402✔
2914
  tDecoderClear(&decoder);
12,402✔
2915
  return code;
12,402✔
2916
}
2917

2918
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
14,946✔
2919
  int32_t code = 0;
14,946✔
2920
  int32_t lino;
2921

2922
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
29,892✔
2923
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
29,892✔
2924
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
29,892✔
2925
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
29,892✔
2926

2927
_exit:
14,946✔
2928
  return code;
14,946✔
2929
}
2930

2931
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
14,946✔
2932
  SEncoder encoder = {0};
14,946✔
2933
  int32_t  code = 0;
14,946✔
2934
  int32_t  lino;
2935
  int32_t  tlen;
2936
  tEncoderInit(&encoder, buf, bufLen);
14,946✔
2937

2938
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
14,946✔
2939
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
14,946✔
2940

2941
  tEndEncode(&encoder);
14,946✔
2942

2943
_exit:
14,946✔
2944
  if (code) {
14,946✔
2945
    tlen = code;
×
2946
  } else {
2947
    tlen = encoder.pos;
14,946✔
2948
  }
2949
  tEncoderClear(&encoder);
14,946✔
2950
  return tlen;
14,946✔
2951
}
2952

2953
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
7,520✔
2954
  int32_t code = 0;
7,520✔
2955
  int32_t lino;
2956

2957
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
15,040✔
2958
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
15,040✔
2959
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
15,040✔
2960
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
15,040✔
2961

2962
_exit:
7,520✔
2963
  return code;
7,520✔
2964
}
2965

2966
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
7,520✔
2967
  SDecoder decoder = {0};
7,520✔
2968
  int32_t  code = 0;
7,520✔
2969
  int32_t  lino;
2970

2971
  tDecoderInit(&decoder, buf, bufLen);
7,520✔
2972

2973
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
7,520✔
2974
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
7,520✔
2975

2976
  tEndDecode(&decoder);
7,520✔
2977

2978
_exit:
7,520✔
2979
  tDecoderClear(&decoder);
7,520✔
2980
  return code;
7,520✔
2981
}
2982

2983
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
277,380✔
2984
  SEncoder encoder = {0};
277,380✔
2985
  int32_t  code = TSDB_CODE_SUCCESS;
277,380✔
2986
  int32_t  lino = 0;
277,380✔
2987
  int32_t  tlen = 0;
277,380✔
2988

2989
  tEncoderInit(&encoder, buf, bufLen);
277,380✔
2990
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
277,380✔
2991

2992
  int32_t size = taosArrayGetSize(pRsp->cols);
277,380✔
2993
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
277,380✔
2994
  for (int32_t i = 0; i < size; ++i) {
1,021,152✔
2995
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
743,772✔
2996
    if (oInfo == NULL) {
743,772✔
2997
      uError("col id is NULL at index %d", i);
×
2998
      code = TSDB_CODE_INVALID_PARA;
×
2999
      goto _exit;
×
3000
    }
3001
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
1,487,544✔
3002
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
1,487,544✔
3003
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
1,487,544✔
3004
  }
3005

3006
  tEndEncode(&encoder);
277,380✔
3007

3008
_exit:
277,380✔
3009
  if (code != TSDB_CODE_SUCCESS) {
277,380✔
3010
    tlen = code;
×
3011
  } else {
3012
    tlen = encoder.pos;
277,380✔
3013
  }
3014
  tEncoderClear(&encoder);
277,380✔
3015
  return tlen;
277,380✔
3016
}
3017

3018
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
138,690✔
3019
  SDecoder decoder = {0};
138,690✔
3020
  int32_t  code = TSDB_CODE_SUCCESS;
138,690✔
3021
  int32_t  lino = 0;
138,690✔
3022

3023
  tDecoderInit(&decoder, buf, bufLen);
138,690✔
3024
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
138,690✔
3025

3026
  int32_t size = 0;
138,690✔
3027
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
138,690✔
3028
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
138,690✔
3029
  if (pRsp->cols == NULL) {
138,690✔
3030
    code = terrno;
×
3031
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3032
    goto _exit;
×
3033
  }
3034
  for (int32_t i = 0; i < size; ++i) {
510,576✔
3035
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
371,886✔
3036
    if (oInfo == NULL) {
371,886✔
3037
      code = terrno;
×
3038
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3039
      goto _exit;
×
3040
    }
3041
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
743,772✔
3042
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
743,772✔
3043
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
743,772✔
3044
  }
3045

3046
  tEndDecode(&decoder);
138,690✔
3047

3048
_exit:
138,690✔
3049
  tDecoderClear(&decoder);
138,690✔
3050
  return code;
138,690✔
3051
}
3052

3053
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
26,170,597✔
3054
  taosArrayDestroy(pRsp->cols);
26,170,597✔
3055
}
26,171,632✔
3056

3057
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
34,764,479✔
3058
  if (pReq == NULL) return;
34,764,479✔
3059
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA_NEW || pReq->base.type == STRIGGER_PULL_WAL_CALC_DATA_NEW) {
43,779,627✔
3060
    SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
9,015,204✔
3061
    taosArrayDestroy(pRequest->versions);
9,015,204✔
3062
    tSimpleHashCleanup(pRequest->ranges);
9,014,901✔
3063
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
25,752,221✔
3064
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
407,368✔
3065
    if (pRequest->cids != NULL) {
407,368✔
3066
      taosArrayDestroy(pRequest->cids);
407,368✔
3067
      pRequest->cids = NULL;
407,368✔
3068
    }
3069
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
25,343,704✔
3070
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
78,501✔
3071
    if (pRequest->cids != NULL) {
78,501✔
3072
      taosArrayDestroy(pRequest->cids);
78,501✔
3073
      pRequest->cids = NULL;
78,501✔
3074
    }
3075
    if (pRequest->uids != NULL) {
78,501✔
3076
      taosArrayDestroy(pRequest->uids);
×
3077
      pRequest->uids = NULL;
×
3078
    }
3079
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
25,265,230✔
3080
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
686,001✔
3081
    if (pRequest->cids != NULL) {
686,001✔
3082
      taosArrayDestroy(pRequest->cids);
686,001✔
3083
      pRequest->cids = NULL;
686,001✔
3084
    }
3085
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
24,577,949✔
3086
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
138,690✔
3087
    if (pRequest->cols != NULL) {
138,690✔
3088
      taosArrayDestroy(pRequest->cols);
138,690✔
3089
      pRequest->cols = NULL;
138,690✔
3090
    }
3091
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
24,439,094✔
3092
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
138,690✔
3093
    tSimpleHashCleanup(pRequest->uidInfoTrigger);
138,690✔
3094
    tSimpleHashCleanup(pRequest->uidInfoCalc);
138,690✔
3095
  }
3096
}
3097

3098
int32_t encodePlainArray(SEncoder *encoder, SArray *pArr) {
2,508,918✔
3099
  int32_t  code = TSDB_CODE_SUCCESS;
2,508,918✔
3100
  int32_t  lino = 0;
2,508,918✔
3101
  int32_t  nEle = taosArrayGetSize(pArr);
2,508,918✔
3102
  uint8_t* buf = (nEle > 0) ? TARRAY_DATA(pArr) : NULL;
2,508,918✔
3103
  int32_t  len = (nEle > 0) ? (nEle * pArr->elemSize) : 0;
2,508,918✔
3104
  TAOS_CHECK_EXIT(tEncodeBinary(encoder, buf, len));
5,017,836✔
3105

3106
_exit:
2,508,918✔
3107
  return code;
2,508,918✔
3108
}
3109

3110
int32_t decodePlainArray(SDecoder* decoder, SArray** ppArr, uint32_t elemSize) {
1,250,371✔
3111
  int32_t  code = TSDB_CODE_SUCCESS;
1,250,371✔
3112
  int32_t  lino = 0;
1,250,371✔
3113
  void*    buf = NULL;
1,250,371✔
3114
  uint64_t len = 0;
1,250,371✔
3115
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(decoder, &buf, &len));
1,250,371✔
3116

3117
  if (len > 0) {
1,250,371✔
3118
    *ppArr = taosArrayInit(0, elemSize);
1,171,870✔
3119
    TSDB_CHECK_NULL(*ppArr, code, lino, _exit, terrno);
1,171,870✔
3120
    TSWAP((*ppArr)->pData, buf);
1,171,870✔
3121
    (*ppArr)->size = (*ppArr)->capacity = len / elemSize;
1,171,870✔
3122
  }
3123

3124
_exit:
1,250,371✔
3125
  if (buf != NULL) {
1,250,371✔
3126
    taosMemoryFree(buf);
1,171,870✔
3127
  }
3128
  return code;
1,250,371✔
3129
}
3130

3131
static int32_t encodeSetTableMapInfo(SEncoder* encoder, SSHashObj* pInfo) {
554,760✔
3132
  int32_t  code = TSDB_CODE_SUCCESS;
554,760✔
3133
  int32_t  lino = 0;
554,760✔
3134
  int32_t size = tSimpleHashGetSize(pInfo);
554,760✔
3135
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
554,760✔
3136
  int32_t iter = 0;
554,760✔
3137
  void*   px = tSimpleHashIterate(pInfo, NULL, &iter);
554,760✔
3138
  while (px != NULL) {
1,242,852✔
3139
    int64_t* uid = tSimpleHashGetKey(px, NULL);
688,092✔
3140
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *uid));
1,376,184✔
3141
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *(uid + 1)));
1,376,184✔
3142
    SSHashObj* info = *(SSHashObj**)px;
688,092✔
3143
    int32_t len = tSimpleHashGetSize(info);
688,092✔
3144
    TAOS_CHECK_EXIT(tEncodeI32(encoder, len));
688,092✔
3145
    int32_t iter1 = 0;
688,092✔
3146
    void*   px1 = tSimpleHashIterate(info, NULL, &iter1);
688,092✔
3147
    while (px1 != NULL) {
2,592,654✔
3148
      int16_t* slot = tSimpleHashGetKey(px1, NULL);
1,904,562✔
3149
      int16_t* cid = (int16_t*)px1;
1,904,562✔
3150
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *slot));
3,809,124✔
3151
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *cid));
3,809,124✔
3152

3153
      px1 = tSimpleHashIterate(info, px1, &iter1);
1,904,562✔
3154
    }
3155

3156
    px = tSimpleHashIterate(pInfo, px, &iter);
688,092✔
3157
  }
3158
  
3159
_exit:
554,760✔
3160
  return code;
554,760✔
3161
}
3162

3163
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
69,931,220✔
3164
  SEncoder encoder = {0};
69,931,220✔
3165
  int32_t  code = TSDB_CODE_SUCCESS;
69,931,220✔
3166
  int32_t  lino = 0;
69,931,220✔
3167
  int32_t  tlen = 0;
69,931,220✔
3168

3169
  tEncoderInit(&encoder, buf, bufLen);
69,931,220✔
3170
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
69,932,212✔
3171

3172
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
139,862,414✔
3173
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
139,842,686✔
3174
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
139,843,889✔
3175
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
139,863,637✔
3176

3177
  switch (pReq->type) {
69,932,014✔
3178
    case STRIGGER_PULL_SET_TABLE: {
277,380✔
3179
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
277,380✔
3180
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoTrigger));
277,380✔
3181
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoCalc));
277,380✔
3182
      break;
277,380✔
3183
    }
3184
    case STRIGGER_PULL_LAST_TS: {
613,514✔
3185
      break;
613,514✔
3186
    }
3187
    case STRIGGER_PULL_FIRST_TS: {
469,345✔
3188
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
469,345✔
3189
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
938,937✔
3190
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
939,184✔
3191
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
939,184✔
3192
      break;
469,592✔
3193
    }
3194
    case STRIGGER_PULL_TSDB_META: {
901,678✔
3195
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
901,678✔
3196
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
1,803,356✔
3197
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
1,803,356✔
3198
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
1,803,356✔
3199
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
1,803,356✔
3200
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,803,356✔
3201
      break;
901,678✔
3202
    }
3203
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3204
      break;
×
3205
    }
3206
    case STRIGGER_PULL_TSDB_TS_DATA: {
380,618✔
3207
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
380,618✔
3208
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
761,236✔
3209
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
761,236✔
3210
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
761,236✔
3211
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
761,236✔
3212
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
761,236✔
3213
      break;
380,618✔
3214
    }
3215
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
119,802✔
3216
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
119,802✔
3217
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
239,604✔
3218
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
239,604✔
3219
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
239,604✔
3220
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
239,604✔
3221
      break;
119,802✔
3222
    }
3223
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
119,802✔
3224
      break;
119,802✔
3225
    }
3226
    case STRIGGER_PULL_TSDB_CALC_DATA: {
13,345,758✔
3227
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
13,345,758✔
3228
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
26,691,516✔
3229
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
26,691,516✔
3230
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
26,691,516✔
3231
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
26,691,516✔
3232
      break;
13,345,758✔
3233
    }
3234
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3235
      break;
×
3236
    }
3237
    case STRIGGER_PULL_TSDB_DATA: {
817,896✔
3238
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
817,896✔
3239
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
1,635,545✔
3240
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
1,635,545✔
3241
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
1,635,792✔
3242
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
1,635,792✔
3243
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
817,896✔
3244
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
1,635,792✔
3245
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,635,792✔
3246
      break;
817,896✔
3247
    }
3248
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3249
      break;
×
3250
    }
3251
    case STRIGGER_PULL_WAL_META_NEW: {
25,010,203✔
3252
      SSTriggerWalMetaNewRequest* pRequest = (SSTriggerWalMetaNewRequest*)pReq;
25,010,203✔
3253
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
50,020,820✔
3254
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
50,022,272✔
3255
      break;
25,011,655✔
3256
    }
3257
    case STRIGGER_PULL_WAL_DATA_NEW:
18,030,098✔
3258
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3259
      SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
18,030,098✔
3260
      int32_t                     nVersion = taosArrayGetSize(pRequest->versions);
18,030,098✔
3261
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nVersion));
18,030,888✔
3262
      for (int32_t i = 0; i < nVersion; i++) {
35,950,476✔
3263
        int64_t ver = *(int64_t*)TARRAY_GET_ELEM(pRequest->versions, i);
17,919,835✔
3264
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, ver));
17,919,588✔
3265
      }
3266
      int32_t nRanges = tSimpleHashGetSize(pRequest->ranges);
18,030,641✔
3267
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nRanges));
18,030,888✔
3268
      int32_t iter = 0;
18,030,888✔
3269
      void*   px = tSimpleHashIterate(pRequest->ranges, NULL, &iter);
18,030,888✔
3270
      while (px != NULL) {
22,249,797✔
3271
        uint64_t* gid = tSimpleHashGetKey(px, NULL);
4,219,946✔
3272
        TAOS_CHECK_EXIT(tEncodeU64(&encoder, *gid));
8,439,892✔
3273
        int64_t* key = (int64_t*)px;
4,219,946✔
3274
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[0]));
8,439,892✔
3275
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[1]));
8,439,892✔
3276

3277
        px = tSimpleHashIterate(pRequest->ranges, px, &iter);
4,219,946✔
3278
      }
3279
      break;
18,029,851✔
3280
    }
3281
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
7,381,968✔
3282
      SSTriggerWalMetaDataNewRequest* pRequest = (SSTriggerWalMetaDataNewRequest*)pReq;
7,381,968✔
3283
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
14,762,378✔
3284
      break;
7,380,410✔
3285
    }
3286
    case STRIGGER_PULL_GROUP_COL_VALUE: {
651,523✔
3287
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
651,523✔
3288
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
1,303,291✔
3289
      break;
651,768✔
3290
    }
3291
    case STRIGGER_PULL_VTABLE_INFO: {
157,930✔
3292
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
157,930✔
3293
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
157,930✔
3294
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->uids));
157,930✔
3295
      TAOS_CHECK_EXIT(tEncodeBool(&encoder, pRequest->fetchAllTable));
157,930✔
3296
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
315,860✔
3297
      break;
157,930✔
3298
    }
3299
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
1,375,162✔
3300
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
1,375,162✔
3301
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
2,750,324✔
3302
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
1,375,162✔
3303
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,750,324✔
3304
      break;
1,375,162✔
3305
    }
3306
    case STRIGGER_PULL_OTABLE_INFO: {
277,380✔
3307
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
277,380✔
3308
      int32_t size = taosArrayGetSize(pRequest->cols);
277,380✔
3309
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
277,380✔
3310
      for (int32_t i = 0; i < size; ++i) {
1,021,152✔
3311
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
743,772✔
3312
        if (oInfo == NULL) {
743,772✔
3313
          uError("col id is NULL at index %d", i);
×
3314
          code = TSDB_CODE_INVALID_PARA;
×
3315
          goto _exit;
×
3316
        }
3317
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
1,487,544✔
3318
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
1,487,544✔
3319
      }
3320
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
554,760✔
3321
      break; 
277,380✔
3322
    }
3323
    default: {
×
3324
      uError("unknown pull type %d", pReq->type);
×
3325
      code = TSDB_CODE_INVALID_PARA;
×
3326
      break;
×
3327
    }
3328
  }
3329

3330
  tEndEncode(&encoder);
69,930,196✔
3331

3332
_exit:
69,927,955✔
3333
  if (code != TSDB_CODE_SUCCESS) {
69,928,114✔
3334
    tlen = code;
×
3335
  } else {
3336
    tlen = encoder.pos;
69,928,114✔
3337
  }
3338
  tEncoderClear(&encoder);
69,928,114✔
3339
  return tlen;
69,929,543✔
3340
}
3341

3342
static void destroyHash(void* data){
344,046✔
3343
  if (data){
344,046✔
3344
    SSHashObj* tmp = *(SSHashObj**)data;
344,046✔
3345
    tSimpleHashCleanup(tmp);
344,046✔
3346
  }
3347
}
344,046✔
3348

3349
static int32_t decodeSetTableMapInfo(SDecoder* decoder, SSHashObj** ppInfo) {
277,380✔
3350
  int32_t  code = TSDB_CODE_SUCCESS;
277,380✔
3351
  int32_t  lino = 0;
277,380✔
3352
  int32_t size = 0;
277,380✔
3353
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
277,380✔
3354
  *ppInfo = tSimpleHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
277,380✔
3355
  if (*ppInfo == NULL) {
277,380✔
3356
    TAOS_CHECK_EXIT(terrno);
×
3357
  }
3358
  tSimpleHashSetFreeFp(*ppInfo, destroyHash);
277,380✔
3359
  
3360
  for (int32_t i = 0; i < size; ++i) {
621,426✔
3361
    int64_t id[2] = {0};
344,046✔
3362
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id));
344,046✔
3363
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id+1));
688,092✔
3364
    int32_t len = 0;
344,046✔
3365
    TAOS_CHECK_EXIT(tDecodeI32(decoder, &len));
344,046✔
3366
    SSHashObj* tmp = tSimpleHashInit(len, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
344,046✔
3367
    if (tmp == NULL) {
344,046✔
3368
      TAOS_CHECK_EXIT(terrno);
×
3369
    }
3370
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppInfo, id, sizeof(id), &tmp, POINTER_BYTES));
344,046✔
3371

3372
    for (int32_t j = 0; j < len; ++j) {
1,296,327✔
3373
      int16_t slotId = 0;
952,281✔
3374
      int16_t cid = 0;
952,281✔
3375
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &slotId));
952,281✔
3376
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &cid));
952,281✔
3377
      TAOS_CHECK_EXIT(tSimpleHashPut(tmp, &slotId, sizeof(slotId), &cid, sizeof(cid)));
952,281✔
3378
    }
3379
  }
3380
_exit:
277,380✔
3381
  if (code != TSDB_CODE_SUCCESS) {
277,380✔
3382
    tSimpleHashCleanup(*ppInfo);
×
3383
    *ppInfo = NULL;
×
3384
  }
3385
  return code;
277,380✔
3386
}
3387

3388
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
34,765,955✔
3389
  SDecoder decoder = {0};
34,765,955✔
3390
  int32_t  code = TSDB_CODE_SUCCESS;
34,765,955✔
3391
  int32_t  lino = 0;
34,765,955✔
3392

3393
  tDecoderInit(&decoder, buf, bufLen);
34,765,955✔
3394
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
34,765,614✔
3395

3396
  int32_t type = 0;
34,766,349✔
3397
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
34,767,349✔
3398
  SSTriggerPullRequest* pBase = &(pReq->base);
34,767,349✔
3399
  pBase->type = type;
34,765,766✔
3400
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
69,536,329✔
3401
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
69,535,876✔
3402
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
69,535,295✔
3403

3404
  switch (type) {
34,767,830✔
3405
    case STRIGGER_PULL_SET_TABLE: {
138,690✔
3406
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
138,690✔
3407
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoTrigger));
138,690✔
3408
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoCalc));
138,690✔
3409
      break;
138,690✔
3410
    }
3411
    case STRIGGER_PULL_LAST_TS: {
306,707✔
3412
      break;
306,707✔
3413
    }
3414
    case STRIGGER_PULL_FIRST_TS: {
232,975✔
3415
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
232,975✔
3416
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
465,950✔
3417
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
465,950✔
3418
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
465,950✔
3419
      break;
232,975✔
3420
    }
3421
    case STRIGGER_PULL_TSDB_META: {
450,765✔
3422
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
450,765✔
3423
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
901,530✔
3424
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
901,530✔
3425
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
901,530✔
3426
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
901,530✔
3427
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
901,530✔
3428
      break;
450,765✔
3429
    }
3430
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3431
      break;
×
3432
    }
3433
    case STRIGGER_PULL_TSDB_TS_DATA: {
190,309✔
3434
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
190,309✔
3435
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
380,618✔
3436
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
380,618✔
3437
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
380,618✔
3438
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
380,618✔
3439
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
380,618✔
3440
      break;
190,309✔
3441
    }
3442
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
59,901✔
3443
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
59,901✔
3444
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
119,802✔
3445
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
119,802✔
3446
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
119,802✔
3447
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
119,802✔
3448
      break;
59,901✔
3449
    }
3450
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
59,901✔
3451
      break;
59,901✔
3452
    }
3453
    case STRIGGER_PULL_TSDB_CALC_DATA: {
6,671,181✔
3454
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
6,671,181✔
3455
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
13,342,580✔
3456
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
13,342,798✔
3457
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
13,342,798✔
3458
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
13,342,798✔
3459
      break;
6,671,399✔
3460
    }
3461
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3462
      break;
×
3463
    }
3464
    case STRIGGER_PULL_TSDB_DATA: {
407,368✔
3465
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
407,368✔
3466
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
814,736✔
3467
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
814,736✔
3468
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
814,736✔
3469
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
814,736✔
3470
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
407,368✔
3471
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
814,736✔
3472
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
814,736✔
3473
      break;
407,368✔
3474
    }
3475
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3476
      break;
×
3477
    }
3478
    case STRIGGER_PULL_WAL_META_NEW: {
12,393,060✔
3479
      SSTriggerWalMetaNewRequest* pRequest = &(pReq->walMetaNewReq);
12,393,060✔
3480
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
24,787,866✔
3481
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
24,788,348✔
3482
      break;
12,393,789✔
3483
    }
3484
    case STRIGGER_PULL_WAL_DATA_NEW:
9,015,148✔
3485
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3486
      SSTriggerWalDataNewRequest* pRequest = &(pReq->walDataNewReq);
9,015,148✔
3487
      int32_t                     nVersion = 0;
9,015,148✔
3488
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nVersion));
9,014,930✔
3489
      pRequest->versions = taosArrayInit_s(sizeof(int64_t), nVersion);
9,014,930✔
3490
      for (int32_t i = 0; i < nVersion; i++) {
17,973,708✔
3491
        int64_t* pVer = TARRAY_GET_ELEM(pRequest->versions, i);
8,959,597✔
3492
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, pVer));
8,959,597✔
3493
      }
3494
      int32_t nRanges = 0;
9,014,111✔
3495
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges));
9,015,148✔
3496
      pRequest->ranges = tSimpleHashInit(nRanges, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
9,015,148✔
3497
      if (pRequest->ranges == NULL) {
9,014,358✔
3498
        TAOS_CHECK_EXIT(terrno);
×
3499
      }
3500
      for (int32_t i = 0; i < nRanges; i++) {
11,124,035✔
3501
        uint64_t gid = 0;
2,109,677✔
3502
        int64_t pRange[2] = {0};
2,109,677✔
3503
        TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
2,109,677✔
3504
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[0]));
2,109,677✔
3505
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[1]));
2,109,677✔
3506
        TAOS_CHECK_EXIT(tSimpleHashPut(pRequest->ranges, &gid, sizeof(gid), pRange, sizeof(pRange)));
2,109,677✔
3507
      }
3508
      break;
9,014,358✔
3509
    }
3510
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
3,609,241✔
3511
      SSTriggerWalMetaDataNewRequest* pRequest = &(pReq->walMetaDataNewReq);
3,609,241✔
3512
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
7,222,010✔
3513
      break;
3,611,571✔
3514
    }
3515
    case STRIGGER_PULL_GROUP_COL_VALUE: {
325,837✔
3516
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
325,837✔
3517
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
651,674✔
3518
      break;
325,837✔
3519
    }
3520
    case STRIGGER_PULL_VTABLE_INFO: {
78,501✔
3521
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
78,501✔
3522
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
78,501✔
3523
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->uids, sizeof(int64_t)));
78,501✔
3524
      TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pRequest->fetchAllTable));
78,501✔
3525
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
157,002✔
3526
      break;
78,501✔
3527
    }
3528
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
686,001✔
3529
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
686,001✔
3530
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
1,372,002✔
3531
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
686,001✔
3532
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,372,002✔
3533
      break;
686,001✔
3534
    }
3535
    case STRIGGER_PULL_OTABLE_INFO: {
138,690✔
3536
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
138,690✔
3537
      int32_t size = 0;
138,690✔
3538
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
138,690✔
3539
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
138,690✔
3540
      if (pRequest->cols == NULL) {
138,690✔
3541
        code = terrno;
×
3542
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3543
        goto _exit;
×
3544
      }
3545
      for (int32_t i = 0; i < size; ++i) {
510,576✔
3546
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
371,886✔
3547
        if (oInfo == NULL) {
371,886✔
3548
          code = terrno;
×
3549
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3550
          goto _exit;
×
3551
        }
3552
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
371,886✔
3553
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
371,886✔
3554
      }
3555
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
277,380✔
3556

3557
      break;
138,690✔
3558
    }
3559
    default: {
3,555✔
3560
      uError("unknown pull type %d", type);
3,555✔
3561
      code = TSDB_CODE_INVALID_PARA;
×
3562
      break;
×
3563
    }
3564
  }
3565

3566
  tEndDecode(&decoder);
34,767,316✔
3567

3568
_exit:
34,765,174✔
3569
  tDecoderClear(&decoder);
34,766,897✔
3570
  return code;
34,763,761✔
3571
}
3572

3573
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo, bool full) {
37,517,582✔
3574
  int32_t size = full ? taosArrayGetSize(pParams) : 0;
37,517,582✔
3575
  int32_t code = 0;
37,517,657✔
3576
  int32_t lino = 0;
37,517,657✔
3577
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
37,518,259✔
3578
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3579
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
2,147,483,647✔
3580
    if (param == NULL) {
2,147,483,647✔
3581
      TAOS_CHECK_EXIT(terrno);
×
3582
    }
3583
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
2,147,483,647✔
3584
    if (pEncoder->data) {
2,147,483,647✔
3585
      TAOS_MEMCPY(pEncoder->data + pEncoder->pos, param, plainFieldSize);
2,147,483,647✔
3586
    }
3587
    pEncoder->pos += plainFieldSize;
2,147,483,647✔
3588

3589
    if (!ignoreNotificationInfo) {
2,147,483,647✔
3590
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
1,225,318,441✔
3591
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
612,659,098✔
3592
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
1,230,436,885✔
3593
    }
3594
  }
3595
_exit:
6,230,993✔
3596
  return code;
6,230,993✔
3597
}
3598

3599
void tDestroySSTriggerCalcParam(void* ptr) {
2,147,483,647✔
3600
  SSTriggerCalcParam* pParam = ptr;
2,147,483,647✔
3601
  if (pParam && pParam->extraNotifyContent != NULL) {
2,147,483,647✔
3602
    taosMemoryFreeClear(pParam->extraNotifyContent);
286,097✔
3603
  }
3604
  if (pParam && pParam->resultNotifyContent != NULL) {
2,147,483,647✔
3605
    taosMemoryFreeClear(pParam->resultNotifyContent);
×
3606
  }
3607
}
2,147,483,647✔
3608

3609
void tDestroySStreamGroupValue(void* ptr) {
28,249,427✔
3610
  SStreamGroupValue* pValue = ptr;
28,249,427✔
3611
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
28,249,427✔
3612
    taosMemoryFreeClear(pValue->data.pData);
18,642,212✔
3613
    pValue->data.nData = 0;
18,642,459✔
3614
  }
3615
}
28,249,427✔
3616

3617
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
18,764,332✔
3618
  int32_t size = 0, code = 0, lino = 0;
18,764,332✔
3619
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
18,764,573✔
3620
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
18,764,573✔
3621
  if (*ppParams == NULL) {
18,763,825✔
3622
    TAOS_CHECK_EXIT(terrno);
×
3623
  }
3624
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3625
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
2,147,483,647✔
3626
    if (param == NULL) {
2,147,483,647✔
3627
      TAOS_CHECK_EXIT(terrno);
×
3628
    }
3629
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
2,147,483,647✔
3630
    TAOS_MEMCPY(param, pDecoder->data + pDecoder->pos, plainFieldSize);
2,147,483,647✔
3631
    pDecoder->pos += plainFieldSize;
2,147,483,647✔
3632

3633
    if (!ignoreNotificationInfo) {
2,147,483,647✔
3634
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
598,348,020✔
3635
      uint64_t len = 0;
299,173,422✔
3636
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
598,347,742✔
3637
    }
3638
  }
3639

3640
_exit:
21,331,302✔
3641
  return code;
18,764,332✔
3642
}
3643

3644
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
38,171,775✔
3645
  int32_t code = TSDB_CODE_SUCCESS;
38,171,775✔
3646
  int32_t lino = 0;
38,171,775✔
3647

3648
  int32_t size = taosArrayGetSize(pGroupColVals);
38,171,775✔
3649
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
38,171,557✔
3650
  for (int32_t i = 0; i < size; ++i) {
91,741,318✔
3651
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
53,569,359✔
3652
    if (pValue == NULL) {
53,569,361✔
3653
      TAOS_CHECK_EXIT(terrno);
×
3654
    }
3655
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
53,569,361✔
3656
    if (pValue->isNull) {
53,569,484✔
3657
      continue;
16,740✔
3658
    }
3659
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
53,552,744✔
3660
    if (pValue->isTbname) {
53,552,100✔
3661
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
49,833,479✔
3662
      if (vgId != -1) { pValue->vgId = vgId; }
24,917,010✔
3663
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
49,834,267✔
3664
    }
3665
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
107,104,518✔
3666
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
53,551,632✔
3667
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
70,941,636✔
3668
    } else {
3669
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
36,161,494✔
3670
    }
3671
  }
3672

3673
_exit:
38,171,959✔
3674
  return code;
38,171,959✔
3675
}
3676

3677
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
19,090,703✔
3678
  int32_t code = TSDB_CODE_SUCCESS;
19,090,703✔
3679
  int32_t lino = 0;
19,090,703✔
3680
  int32_t size = 0;
19,090,703✔
3681

3682
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
19,090,458✔
3683
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
19,090,458✔
3684
  if (size > 0) {
19,090,184✔
3685
    if (*ppGroupColVals == NULL) {
13,613,624✔
3686
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
13,613,346✔
3687
      if (*ppGroupColVals == NULL) {
13,612,096✔
3688
        TAOS_CHECK_EXIT(terrno);
×
3689
      }
3690
    } else {
3691
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
278✔
3692
    }
3693
  }
3694
  for (int32_t i = 0; i < size; ++i) {
45,872,569✔
3695
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
26,781,655✔
3696
    if (pValue == NULL) {
26,782,392✔
3697
      TAOS_CHECK_EXIT(terrno);
×
3698
    }
3699
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
26,782,392✔
3700
    if (pValue->isNull) {
26,783,177✔
3701
      continue;
8,370✔
3702
    }
3703
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
26,774,807✔
3704
    if (pValue->isTbname) {
26,774,993✔
3705
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
24,914,150✔
3706
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
24,914,150✔
3707
    }
3708
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
53,549,986✔
3709
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
44,508,929✔
3710
      uint64_t len = 0;
17,733,404✔
3711
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
35,466,774✔
3712
      pValue->data.nData = len;
17,733,659✔
3713
    } else {
3714
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
18,082,114✔
3715
    }
3716
  }
3717
_exit:
19,090,914✔
3718
  return code;
19,090,185✔
3719
}
3720

3721
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
651,427✔
3722
  SEncoder encoder = {0};
651,427✔
3723
  int32_t  code = TSDB_CODE_SUCCESS;
651,427✔
3724
  int32_t  lino = 0;
651,427✔
3725
  int32_t  tlen = 0;
651,427✔
3726

3727
  tEncoderInit(&encoder, buf, bufLen);
651,427✔
3728
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
651,674✔
3729

3730
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
651,674✔
3731

3732
  tEndEncode(&encoder);
650,935✔
3733

3734
_exit:
650,946✔
3735
  if (code != TSDB_CODE_SUCCESS) {
650,946✔
3736
    tlen = code;
×
3737
  } else {
3738
    tlen = encoder.pos;
650,946✔
3739
  }
3740
  tEncoderClear(&encoder);
650,946✔
3741
  return tlen;
651,182✔
3742
}
3743

3744
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
326,130✔
3745
  SDecoder decoder = {0};
326,130✔
3746
  int32_t  code = TSDB_CODE_SUCCESS;
326,130✔
3747
  int32_t  lino = 0;
326,130✔
3748
  int32_t  size = 0;
326,130✔
3749

3750
  tDecoderInit(&decoder, buf, bufLen);
326,130✔
3751
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
326,130✔
3752

3753
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
326,130✔
3754

3755
  tEndDecode(&decoder);
326,130✔
3756

3757
_exit:
326,130✔
3758
  tDecoderClear(&decoder);
326,130✔
3759
  return code;
326,130✔
3760
}
3761

3762
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
5,465,653✔
3763
  SEncoder encoder = {0};
5,465,653✔
3764
  int32_t  code = TSDB_CODE_SUCCESS;
5,465,653✔
3765
  int32_t  lino = 0;
5,465,653✔
3766
  int32_t  tlen = 0;
5,465,653✔
3767

3768
  tEncoderInit(&encoder, buf, bufLen);
5,465,653✔
3769
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
5,465,894✔
3770

3771
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
10,931,061✔
3772
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
10,930,820✔
3773
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
10,930,579✔
3774
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
10,930,575✔
3775
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
10,931,057✔
3776

3777
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false, true));
5,465,649✔
3778
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
5,465,894✔
3779
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
10,930,825✔
3780
  TAOS_CHECK_EXIT(tEncodeBool(&encoder, pReq->isWindowTrigger));
5,465,894✔
3781
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->precision));
10,930,579✔
3782

3783
  tEndEncode(&encoder);
5,465,167✔
3784

3785
_exit:
5,464,689✔
3786
  if (code != TSDB_CODE_SUCCESS) {
5,464,689✔
3787
    tlen = code;
×
3788
  } else {
3789
    tlen = encoder.pos;
5,464,689✔
3790
  }
3791
  tEncoderClear(&encoder);
5,464,689✔
3792
  return tlen;
5,465,408✔
3793
}
3794

3795
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
2,731,502✔
3796
  SDecoder decoder = {0};
2,731,502✔
3797
  int32_t  code = TSDB_CODE_SUCCESS;
2,731,743✔
3798
  int32_t  lino = 0;
2,731,743✔
3799

3800
  tDecoderInit(&decoder, buf, bufLen);
2,731,743✔
3801
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,731,743✔
3802

3803
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
5,463,486✔
3804
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
5,463,486✔
3805
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
5,463,486✔
3806
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
5,463,486✔
3807
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
5,463,486✔
3808

3809
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
2,731,743✔
3810
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
2,731,743✔
3811
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
5,463,486✔
3812
  if (!tDecodeIsEnd(&decoder)) {
2,731,743✔
3813
    TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pReq->isWindowTrigger));
2,731,743✔
3814
    TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->precision));
5,463,486✔
3815
  }
3816

3817
  tEndDecode(&decoder);
2,731,743✔
3818

3819
_exit:
2,731,743✔
3820
  tDecoderClear(&decoder);
2,731,743✔
3821
  return code;
2,731,743✔
3822
}
3823

3824
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
8,906,274✔
3825
  if (pReq != NULL) {
8,906,274✔
3826
    if (pReq->params != NULL) {
8,906,274✔
3827
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
5,847,027✔
3828
      pReq->params = NULL;
5,846,786✔
3829
    }
3830
    if (pReq->groupColVals != NULL) {
8,906,274✔
3831
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
3,560,333✔
3832
      pReq->groupColVals = NULL;
3,560,333✔
3833
    }
3834
    blockDataDestroy(pReq->pOutBlock);
8,906,274✔
3835
  }
3836
}
8,906,274✔
3837

3838
int32_t tSerializeSTriggerDropTableRequest(void* buf, int32_t bufLen, const SSTriggerDropRequest* pReq) {
×
3839
  SEncoder encoder = {0};
×
3840
  int32_t  code = TSDB_CODE_SUCCESS;
×
3841
  int32_t  lino = 0;
×
3842
  int32_t  tlen = 0;
×
3843

3844
  tEncoderInit(&encoder, buf, bufLen);
×
3845
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3846

3847
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
×
3848
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
×
3849
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
×
3850
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
×
3851

3852
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
×
3853

3854
  tEndEncode(&encoder);
×
3855

3856
_exit:
×
3857
  if (code != TSDB_CODE_SUCCESS) {
×
3858
    tlen = code;
×
3859
  } else {
3860
    tlen = encoder.pos;
×
3861
  }
3862
  tEncoderClear(&encoder);
×
3863
  return tlen;
×
3864
}
3865

3866
int32_t tDeserializeSTriggerDropTableRequest(void* buf, int32_t bufLen, SSTriggerDropRequest* pReq) {
×
3867
  SDecoder decoder = {0};
×
3868
  int32_t  code = TSDB_CODE_SUCCESS;
×
3869
  int32_t  lino = 0;
×
3870

3871
  tDecoderInit(&decoder, buf, bufLen);
×
3872
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3873

3874
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
×
3875
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
×
3876
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
×
3877
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
×
3878

3879
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
×
3880

3881
  tEndDecode(&decoder);
×
3882

3883
_exit:
×
3884
  tDecoderClear(&decoder);
×
3885
  return code;
×
3886
}
3887

3888
void tDestroySSTriggerDropRequest(SSTriggerDropRequest* pReq) {
×
3889
  if (pReq != NULL) {
×
3890
    if (pReq->groupColVals != NULL) {
×
3891
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
×
3892
      pReq->groupColVals = NULL;
×
3893
    }
3894
  }
3895
}
×
3896

3897
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
41,363,050✔
3898
  SEncoder encoder = {0};
41,363,050✔
3899
  int32_t  code = TSDB_CODE_SUCCESS;
41,363,050✔
3900
  int32_t  lino = 0;
41,363,050✔
3901
  int32_t  tlen = 0;
41,363,050✔
3902

3903
  tEncoderInit(&encoder, buf, bufLen);
41,363,050✔
3904
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
41,363,050✔
3905

3906
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
82,726,100✔
3907
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
82,726,100✔
3908
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
82,726,100✔
3909
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
82,726,100✔
3910

3911
  tEndEncode(&encoder);
41,363,050✔
3912

3913
_exit:
41,363,050✔
3914
  if (code != TSDB_CODE_SUCCESS) {
41,363,050✔
3915
    tlen = code;
×
3916
  } else {
3917
    tlen = encoder.pos;
41,363,050✔
3918
  }
3919
  tEncoderClear(&encoder);
41,363,050✔
3920
  return tlen;
41,363,050✔
3921
}
3922

3923
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
62,026,775✔
3924
  SDecoder decoder = {0};
62,026,775✔
3925
  int32_t  code = TSDB_CODE_SUCCESS;
62,027,915✔
3926
  int32_t  lino = 0;
62,027,915✔
3927

3928
  tDecoderInit(&decoder, buf, bufLen);
62,027,915✔
3929
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
62,023,569✔
3930

3931
  int32_t type = 0;
62,027,223✔
3932
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
62,028,957✔
3933
  pReq->type = type;
62,028,957✔
3934
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
124,064,479✔
3935
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
124,069,515✔
3936
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
124,071,527✔
3937

3938
  tEndDecode(&decoder);
62,036,300✔
3939

3940
_exit:
62,031,163✔
3941
  tDecoderClear(&decoder);
62,034,137✔
3942
  return code;
62,026,499✔
3943
}
3944

3945
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo, bool full) {
32,052,280✔
3946
  int32_t code = 0, lino = 0;
32,052,280✔
3947
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true, full));
32,052,280✔
3948
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
32,052,047✔
3949
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.skey));
64,109,397✔
3950
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.ekey));
64,109,903✔
3951
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
64,109,133✔
3952
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
64,108,882✔
3953
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
64,107,815✔
3954
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
32,053,150✔
3955
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
64,106,383✔
3956
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->isWindowTrigger));
32,053,272✔
3957
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pInfo->precision));
64,106,829✔
3958
_exit:
32,052,412✔
3959
  return code;
32,052,412✔
3960
}
3961

3962
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
16,032,830✔
3963
  int32_t code = 0, lino = 0;
16,032,830✔
3964
  int32_t size = 0;
16,032,830✔
3965
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
16,032,830✔
3966
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
16,032,830✔
3967
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.skey));
32,065,660✔
3968
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.ekey));
32,065,660✔
3969
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
32,065,660✔
3970
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
32,065,660✔
3971
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
32,065,660✔
3972
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
16,032,830✔
3973
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
32,064,064✔
3974
  if (!tDecodeIsEnd(pDecoder)) {
16,032,830✔
3975
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->isWindowTrigger));
16,032,304✔
3976
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pInfo->precision));
32,065,134✔
3977
  }
3978
_exit:
16,033,356✔
3979
  return code;
16,033,356✔
3980
}
3981

3982
void tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
25,575,806✔
3983
  if (pInfo == NULL) return;
25,575,806✔
3984
  if (pInfo->pStreamPesudoFuncVals != NULL) {
25,575,806✔
3985
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
13,435,586✔
3986
    pInfo->pStreamPesudoFuncVals = NULL;
13,435,858✔
3987
  }
3988
  if (pInfo->pStreamPartColVals != NULL) {
25,576,078✔
3989
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
10,245,473✔
3990
    pInfo->pStreamPartColVals = NULL;
10,245,473✔
3991
  }
3992
}
3993

3994
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
157,002✔
3995
  SEncoder encoder = {0};
157,002✔
3996
  int32_t  code = TSDB_CODE_SUCCESS;
157,002✔
3997
  int32_t  lino = 0;
157,002✔
3998
  int32_t  tlen = 0;
157,002✔
3999

4000
  tEncoderInit(&encoder, buf, bufLen);
157,002✔
4001
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
157,002✔
4002

4003
  int32_t size = taosArrayGetSize(pRsp->infos);
157,002✔
4004
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
157,002✔
4005
  for (int32_t i = 0; i < size; ++i) {
543,136✔
4006
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
386,134✔
4007
    if (info == NULL) {
386,134✔
4008
      TAOS_CHECK_EXIT(terrno);
×
4009
    }
4010
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
772,268✔
4011
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
772,268✔
4012
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
386,134✔
4013
  }
4014

4015
  tEndEncode(&encoder);
157,002✔
4016

4017
_exit:
157,002✔
4018
  if (code != TSDB_CODE_SUCCESS) {
157,002✔
4019
    tlen = code;
×
4020
  } else {
4021
    tlen = encoder.pos;
157,002✔
4022
  }
4023
  tEncoderClear(&encoder);
157,002✔
4024
  return tlen;
157,002✔
4025
}
4026

4027
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
78,501✔
4028
  SDecoder decoder = {0};
78,501✔
4029
  int32_t  code = TSDB_CODE_SUCCESS;
78,501✔
4030
  int32_t  lino = 0;
78,501✔
4031
  int32_t  size = 0;
78,501✔
4032

4033
  tDecoderInit(&decoder, buf, bufLen);
78,501✔
4034
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
78,501✔
4035

4036
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
78,501✔
4037
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
78,501✔
4038
  if (vTableInfo->infos == NULL) {
78,501✔
4039
    TAOS_CHECK_EXIT(terrno);
×
4040
  }
4041
  for (int32_t i = 0; i < size; ++i) {
271,568✔
4042
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
193,067✔
4043
    if (info == NULL) {
193,067✔
4044
      TAOS_CHECK_EXIT(terrno);
×
4045
    }
4046
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
386,134✔
4047
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
386,134✔
4048
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
193,067✔
4049
  }
4050

4051
  tEndDecode(&decoder);
78,501✔
4052

4053
_exit:
78,501✔
4054
  tDecoderClear(&decoder);
78,501✔
4055
  return code;
78,501✔
4056
}
4057

4058

4059
void tDestroyVTableInfo(void *ptr) {
386,134✔
4060
  if (NULL == ptr) {
386,134✔
4061
    return;
×
4062
  }
4063
  VTableInfo* pTable = (VTableInfo*)ptr;
386,134✔
4064
  taosMemoryFree(pTable->cols.pColRef);
386,134✔
4065
}
4066

4067
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
26,110,653✔
4068
  if (ptr == NULL) return;
26,110,653✔
4069
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
26,110,653✔
4070
  ptr->infos = NULL;
26,110,653✔
4071
}
4072

4073
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
1,079,128✔
4074
  SEncoder encoder = {0};
1,079,128✔
4075
  int32_t  code = TSDB_CODE_SUCCESS;
1,079,364✔
4076
  int32_t  lino = 0;
1,079,364✔
4077
  int32_t  tlen = 0;
1,079,364✔
4078

4079
  tEncoderInit(&encoder, buf, bufLen);
1,079,364✔
4080
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,078,625✔
4081

4082
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
2,156,522✔
4083
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
1,078,618✔
4084
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,078,882✔
4085
  for (int32_t i = 0; i < size; ++i) {
2,478,533✔
4086
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
1,400,388✔
4087
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
2,799,792✔
4088
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
2,799,792✔
4089
  }
4090

4091
  tEndEncode(&encoder);
1,078,145✔
4092

4093
_exit:
1,077,397✔
4094
  if (code != TSDB_CODE_SUCCESS) {
1,077,397✔
4095
    tlen = code;
×
4096
  } else {
4097
    tlen = encoder.pos;
1,077,397✔
4098
  }
4099
  tEncoderClear(&encoder);
1,077,397✔
4100
  return tlen;
1,078,379✔
4101
}
4102

4103
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
539,782✔
4104
  SDecoder decoder = {0};
539,782✔
4105
  int32_t  code = TSDB_CODE_SUCCESS;
539,782✔
4106
  int32_t  lino = 0;
539,782✔
4107
  SSDataBlock *pResBlock = pBlock;
539,782✔
4108

4109
  tDecoderInit(&decoder, buf, bufLen);
539,782✔
4110
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
539,782✔
4111

4112
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
1,079,564✔
4113
  int32_t numOfCols = 2;
539,782✔
4114
  if (pResBlock->pDataBlock == NULL) {
539,782✔
4115
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
539,782✔
4116
    if (pResBlock->pDataBlock == NULL) {
539,782✔
4117
      TAOS_CHECK_EXIT(terrno);
×
4118
    }
4119
    for (int32_t i = 0; i< numOfCols; ++i) {
1,619,346✔
4120
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
1,079,564✔
4121
      if (pColInfoData == NULL) {
1,079,564✔
4122
        TAOS_CHECK_EXIT(terrno);
×
4123
      }
4124
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
1,079,564✔
4125
      pColInfoData->info.bytes = sizeof(int64_t);
1,079,564✔
4126
    }
4127
  }
4128
  int32_t numOfRows = 0;
539,782✔
4129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
539,782✔
4130
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
539,782✔
4131
  for (int32_t i = 0; i < numOfRows; ++i) {
1,240,589✔
4132
    for (int32_t j = 0; j < numOfCols; ++j) {
2,102,421✔
4133
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
1,401,614✔
4134
      if (pColInfoData == NULL) {
1,401,614✔
4135
        TAOS_CHECK_EXIT(terrno);
×
4136
      }
4137
      int64_t value = 0;
1,401,614✔
4138
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
1,401,614✔
4139
      colDataSetInt64(pColInfoData, i, &value);
1,401,614✔
4140
    }
4141
  }
4142

4143
  pResBlock->info.dataLoad = 1;
539,782✔
4144
  pResBlock->info.rows = numOfRows;
539,782✔
4145

4146
  tEndDecode(&decoder);
539,782✔
4147

4148
_exit:
539,782✔
4149
  tDecoderClear(&decoder);
539,782✔
4150
  return code;
539,782✔
4151
}
4152

4153
static int32_t encodeData(SEncoder* encoder, void* pBlock, SSHashObj* indexHash) {
1,769,306✔
4154
  int32_t code = TSDB_CODE_SUCCESS;
1,769,306✔
4155
  int32_t lino = 0;
1,769,306✔
4156
  int32_t len = 0;
1,769,306✔
4157
  if (encoder->data == NULL){
1,769,306✔
4158
    len = blockGetEncodeSize(pBlock);
884,407✔
4159
  } else {
4160
    len = blockEncode(pBlock, (char*)(encoder->data + encoder->pos), encoder->size - encoder->pos, blockDataGetNumOfCols(pBlock));
884,652✔
4161
    if (len < 0) {
884,899✔
4162
      TAOS_CHECK_EXIT(terrno);
×
4163
    }
4164
  }
4165
  encoder->pos += len;
1,769,552✔
4166

4167
  if (indexHash == NULL) {
1,769,798✔
4168
    goto _exit;
761,806✔
4169
  } 
4170
  
4171
  uint32_t pos = encoder->pos;
1,007,992✔
4172
  encoder->pos += sizeof(uint32_t); // reserve space for tables
1,007,466✔
4173
  int32_t tables = 0;
1,007,745✔
4174
  
4175
  void*   pe = NULL;
1,007,745✔
4176
  int32_t iter = 0;
1,007,745✔
4177
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
2,287,443✔
4178
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
1,278,959✔
4179
    if (pInfo->gId == -1){
1,278,959✔
4180
      continue;
×
4181
    }
4182
    int64_t uid = *(int64_t*)(tSimpleHashGetKey(pe, NULL));
1,278,187✔
4183
    TAOS_CHECK_EXIT(tEncodeI64(encoder, uid));
1,278,960✔
4184
    TAOS_CHECK_EXIT(tEncodeU64(encoder, pInfo->gId));
2,558,167✔
4185
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->startRowIdx));
2,558,660✔
4186
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->numRows));
2,559,151✔
4187
    tables++;
1,279,698✔
4188
  }
4189
  uint32_t tmpPos = encoder->pos;
1,007,221✔
4190
  encoder->pos = pos;
1,007,221✔
4191
  TAOS_CHECK_EXIT(tEncodeI32(encoder, tables));
1,007,747✔
4192
  encoder->pos = tmpPos;
1,007,747✔
4193
_exit:
1,769,553✔
4194
  return code;
1,769,553✔
4195
}
4196
 
4197
static int32_t encodeBlock(SEncoder* encoder, void* block, SSHashObj* indexHash) {
6,078,590✔
4198
  int32_t  code = TSDB_CODE_SUCCESS;
6,078,590✔
4199
  int32_t  lino = 0;
6,078,590✔
4200
  if (block != NULL && ((SSDataBlock*)block)->info.rows > 0) {
6,078,590✔
4201
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 1));
1,769,060✔
4202
    TAOS_CHECK_EXIT(encodeData(encoder, block, indexHash));
1,769,060✔
4203
  } else {
4204
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 0));
4,308,796✔
4205
  }
4206

4207
_exit:
4,308,796✔
4208
  return code;
6,078,349✔
4209
}
4210

4211
int32_t tSerializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* rsp) {
1,520,087✔
4212
  SEncoder encoder = {0};
1,520,087✔
4213
  int32_t  code = TSDB_CODE_SUCCESS;
1,520,087✔
4214
  int32_t  lino = 0;
1,520,087✔
4215
  int32_t  tlen = 0;
1,520,087✔
4216

4217
  tEncoderInit(&encoder, buf, bufLen);
1,520,087✔
4218
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,520,088✔
4219

4220
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->dataBlock, rsp->indexHash));
1,520,334✔
4221
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->metaBlock, NULL));
1,519,563✔
4222
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->deleteBlock, NULL));
1,520,089✔
4223
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->tableBlock, NULL));
1,519,596✔
4224

4225
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->ver));
3,039,435✔
4226
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->verTime));
3,039,435✔
4227
  tEndEncode(&encoder);
1,520,334✔
4228

4229
_exit:
1,519,070✔
4230
  if (code != TSDB_CODE_SUCCESS) {
1,519,070✔
4231
    tlen = code;
×
4232
  } else {
4233
    tlen = encoder.pos;
1,519,070✔
4234
  }
4235
  tEncoderClear(&encoder);
1,519,070✔
4236
  return tlen;
1,520,089✔
4237
}
4238

4239
static int32_t decodeBlock(SDecoder* decoder, void* pBlock) {
2,281,144✔
4240
  int32_t  code = TSDB_CODE_SUCCESS;
2,281,144✔
4241
  int32_t  lino = 0;
2,281,144✔
4242
  
4243
  int8_t hasData = false;
2,281,144✔
4244
  TAOS_CHECK_EXIT(tDecodeI8(decoder, &hasData));
2,281,389✔
4245
  if (hasData) {
2,281,389✔
4246
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
381,051✔
4247
    const char* pEndPos = NULL;
381,051✔
4248
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder->data + decoder->pos, &pEndPos));
381,051✔
4249
    decoder->pos = (uint8_t*)pEndPos - decoder->data;
381,051✔
4250
  } else if (pBlock != NULL) {
1,900,338✔
4251
    blockDataEmpty(pBlock);
760,071✔
4252
  }
4253

4254
_exit:
2,278,509✔
4255
  return code;
2,281,389✔
4256
}
4257

4258
int32_t tDeserializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* pRsp, SArray* pSlices){
760,463✔
4259
  SDecoder     decoder = {0};
760,463✔
4260
  int32_t      code = TSDB_CODE_SUCCESS;
760,463✔
4261
  int32_t      lino = 0;
760,463✔
4262
  SSDataBlock* pBlock = NULL;
760,463✔
4263

4264
  tDecoderInit(&decoder, buf, bufLen);
760,463✔
4265
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
760,463✔
4266

4267
  // decode data block
4268
  int8_t hasData = false;
760,463✔
4269
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasData));
760,463✔
4270
  pBlock = pRsp->dataBlock;
760,463✔
4271
  if (hasData) {
760,463✔
4272
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
504,144✔
4273
    const char* pEndPos = NULL;
504,144✔
4274
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
504,144✔
4275
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
504,144✔
4276

4277
    int32_t nSlices = 0;
504,144✔
4278
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nSlices));
504,144✔
4279
    TAOS_CHECK_EXIT(taosArrayEnsureCap(pSlices, nSlices));
504,144✔
4280
    taosArrayClear(pSlices);
504,144✔
4281
    int64_t  uid = 0;
504,144✔
4282
    uint64_t gid = 0;
504,144✔
4283
    int32_t  startIdx = 0;
504,144✔
4284
    int32_t  numRows = 0;
504,144✔
4285
    for (int32_t i = 0; i < nSlices; i++) {
1,143,896✔
4286
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &uid));
639,997✔
4287
      TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
639,752✔
4288
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &startIdx));
639,752✔
4289
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numRows));
639,997✔
4290
      int32_t endIdx = startIdx + numRows;
639,997✔
4291
      int64_t value[3] = {gid, uid, (int64_t)startIdx << 32 | endIdx};
639,997✔
4292
      void*   px = taosArrayPush(pSlices, value);
639,752✔
4293
      if (px == NULL) {
639,752✔
4294
        code = terrno;
×
4295
        goto _exit;
×
4296
      }
4297
    }
4298
  } else if (pBlock != NULL) {
256,319✔
4299
    blockDataEmpty(pBlock);
3,735✔
4300
    taosArrayClear(pSlices);
3,735✔
4301
  }
4302

4303
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->metaBlock));
760,463✔
4304
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->deleteBlock));
760,463✔
4305
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->tableBlock));
760,463✔
4306
  
4307
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
1,520,926✔
4308
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->verTime));
1,520,926✔
4309

4310
  tEndDecode(&decoder);
760,463✔
4311

4312
_exit:
760,463✔
4313
  if (code != TSDB_CODE_SUCCESS) {
760,463✔
4314
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4315
  }
4316
  tDecoderClear(&decoder);
760,463✔
4317
  return code;
760,463✔
4318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc