• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4996

19 Mar 2026 02:16AM UTC coverage: 72.069% (+0.07%) from 71.996%
#4996

push

travis-ci

web-flow
feat: SQL firewall black/white list (#34798)

461 of 618 new or added lines in 4 files covered. (74.6%)

380 existing lines in 128 files now uncovered.

245359 of 340448 relevant lines covered (72.07%)

135732617.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.27
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "taos.h"
18
#include "tarray.h"
19
#include "tdatablock.h"
20
#include "thash.h"
21
#include "tlist.h"
22
#include "tmsg.h"
23
#include "os.h"
24
#include "tcommon.h"
25
#include "tsimplehash.h"
26

27
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
134,154✔
28
  int32_t code = 0;
134,154✔
29
  int32_t lino = 0;
134,154✔
30
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
268,308✔
31
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
268,308✔
32
  switch (pReq->type) {
134,154✔
33
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
128,984✔
34
      if (pReq->cont.pReqs) {
128,984✔
35
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
128,984✔
36
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
128,984✔
37
        for (int32_t i = 0; i < num; ++i) {
508,398✔
38
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.pReqs, i);
379,414✔
39
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
758,828✔
40
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
758,828✔
41
        }
42
      } else {
43
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
44
      }
45
      break;
128,984✔
46
    }
47
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
5,170✔
48
      if (pReq->cont.pReqs) {
5,170✔
49
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
5,170✔
50
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
5,170✔
51
        for (int32_t i = 0; i < num; ++i) {
10,340✔
52
          SStreamOReaderDeployReq* pDeploy = taosArrayGet(pReq->cont.pReqs, i);
5,170✔
53
          int32_t vgIdNum = taosArrayGetSize(pDeploy->vgIds);
5,170✔
54
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeploy->execId));
10,340✔
55
          TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pDeploy->uid));
10,340✔
56
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgIdNum));
5,170✔
57
          for (int32_t n = 0; n < vgIdNum; ++n) {
10,340✔
58
            TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pDeploy->vgIds, n)));
10,340✔
59
          }
60
        }
61
      } else {
62
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
63
      }
64
      break;
5,170✔
65
    }
66
    default:
×
67
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
68
      break;
×
69
  }
70

71
_exit:
134,154✔
72

73
  return code;
134,154✔
74
}
75

76
void tFreeRunnerOReaderDeployReq(void* param) {
7,755✔
77
  SStreamOReaderDeployReq* pReq = (SStreamOReaderDeployReq*)param;
7,755✔
78
  if (pReq) {
7,755✔
79
    taosArrayDestroy(pReq->vgIds);
7,755✔
80
  }
81
}
7,755✔
82

83
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
267,530✔
84
  if (NULL == pReq) {
267,530✔
85
    return;
67,077✔
86
  }
87

88
  switch (pReq->type) {
200,453✔
89
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
192,698✔
90
      taosArrayDestroy(pReq->cont.pReqs);
192,698✔
91
      break;
192,698✔
92
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER:
7,755✔
93
      taosArrayDestroyEx(pReq->cont.pReqs, tFreeRunnerOReaderDeployReq);
7,755✔
94
      break;
7,755✔
95
    default:
×
96
      break;
×
97
  }
98
}
99

100

101
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
67,077✔
102
  *ppDst = NULL;
67,077✔
103
  
104
  if (NULL == pSrc) {
67,077✔
105
    return TSDB_CODE_SUCCESS;
×
106
  }
107

108
  int32_t code = 0, lino = 0;
67,077✔
109
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
67,077✔
110
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
67,077✔
111

112
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
67,077✔
113
  if (pSrc->cont.pReqs) {
67,077✔
114
    switch (pSrc->type) {
67,077✔
115
      case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
64,492✔
116
        (*ppDst)->cont.pReqs = taosArrayDup(pSrc->cont.pReqs, NULL);
64,492✔
117
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
64,492✔
118
        break;
64,492✔
119
      case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
2,585✔
120
        int32_t reqNum = taosArrayGetSize(pSrc->cont.pReqs);
2,585✔
121
        (*ppDst)->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), reqNum);
2,585✔
122
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
2,585✔
123
        for (int32_t i = 0; i < reqNum; ++i) {
5,170✔
124
          SStreamOReaderDeployReq* pNew = taosArrayGet((*ppDst)->cont.pReqs, i);
2,585✔
125
          SStreamOReaderDeployReq* pReq = taosArrayGet(pSrc->cont.pReqs, i);
2,585✔
126
          pNew->vgIds = taosArrayDup(pReq->vgIds, NULL);
2,585✔
127
          TSDB_CHECK_NULL(pNew->vgIds, code, lino, _exit, terrno);
2,585✔
128
          pNew->execId = pReq->execId;
2,585✔
129
          pNew->uid = pReq->uid;
2,585✔
130
        }
131
        break;
2,585✔
132
      }  
133
      default:
×
134
        break;
×
135
    }
136
  }
137
  
138
_exit:
×
139

140
  if (code) {
67,077✔
141
    tFreeSStreamMgmtReq(*ppDst);
×
142
    taosMemoryFreeClear(*ppDst);
×
143
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
144
  }
145
  
146
  return code;
67,077✔
147
}
148

149

150
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
67,077✔
151
  int32_t code = 0;
67,077✔
152
  int32_t lino = 0;
67,077✔
153

154
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
134,154✔
155
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
134,154✔
156
  switch (pReq->type) {
67,077✔
157
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
64,492✔
158
      int32_t num = 0;
64,492✔
159
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
64,492✔
160
      if (num > 0) {
64,492✔
161
        pReq->cont.pReqs = taosArrayInit(num, sizeof(SStreamDbTableName));
64,492✔
162
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
64,492✔
163
        for (int32_t i = 0; i < num; ++i) {
254,199✔
164
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.pReqs, 1);
189,707✔
165
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
189,707✔
166
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
189,707✔
167
        }
168
      }
169
      break;
64,492✔
170
    }
171
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
2,585✔
172
      int32_t num = 0, vgIdNum = 0;
2,585✔
173
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
2,585✔
174
      if (num > 0) {
2,585✔
175
        pReq->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), num);
2,585✔
176
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
2,585✔
177
        for (int32_t i = 0; i < num; ++i) {
5,170✔
178
          SStreamOReaderDeployReq* p = taosArrayGet(pReq->cont.pReqs, i);
2,585✔
179
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &p->execId));
5,170✔
180
          TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &p->uid));
5,170✔
181
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgIdNum));
2,585✔
182
          if (vgIdNum > 0) {
2,585✔
183
            p->vgIds = taosArrayInit_s(sizeof(int32_t), vgIdNum);
2,585✔
184
            TSDB_CHECK_NULL(p->vgIds, code, lino, _exit, terrno);
2,585✔
185
          }
186
          for (int32_t n = 0; n < vgIdNum; ++n) {
5,170✔
187
            int32_t* vgId = taosArrayGet(p->vgIds, n);
2,585✔
188
            TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));
2,585✔
189
          }
190
        }
191
      }
192
      break;
2,585✔
193
    }
194
    default:
×
195
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
196
      break;
×
197
  }
198

199
_exit:
67,077✔
200

201
  return code;  
67,077✔
202
}
203

204
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
72,594,646✔
205
  int32_t code = 0;
72,594,646✔
206
  int32_t lino;
207

208
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
145,189,292✔
209
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
145,189,292✔
210
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
145,189,292✔
211

212
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
145,189,292✔
213
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
145,189,292✔
214
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
145,189,292✔
215
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
145,189,292✔
216
  // SKIP SESSIONID
217
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
145,189,292✔
218
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
145,189,292✔
219
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
145,189,292✔
220
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
145,189,292✔
221
  if (pTask->pMgmtReq) {
72,594,646✔
222
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
134,154✔
223
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
134,154✔
224
  } else {
225
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
72,460,492✔
226
  }
227

228
_exit:
72,460,492✔
229

230
  return code;
72,594,646✔
231
}
232

233

234
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
35,016,380✔
235
  int32_t code = 0;
35,016,380✔
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
70,032,760✔
239
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
70,032,760✔
240
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
70,032,760✔
241
  
242
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
70,032,760✔
243
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
70,032,760✔
244
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
70,032,760✔
245
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
70,032,760✔
246
  // SKIP SESSIONID
247
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
70,032,760✔
248
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
70,032,760✔
249
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
70,032,760✔
250
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
70,032,760✔
251
  int32_t req = 0;
35,016,380✔
252
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
35,016,380✔
253
  if (req) {
35,016,380✔
254
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
67,077✔
255
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
67,077✔
256
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
67,077✔
257
  }
258

259
_exit:
35,016,380✔
260

261
  return code;
35,016,380✔
262
}
263

264
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
265
  int32_t code = 0;
×
266
  int32_t lino;
267

268
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
269
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
270
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
271
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
272

273
_exit:
×
274

275
  return code;
×
276
}
277

278
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
283
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
284
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
285
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
286

287
_exit:
×
288

289
  return code;
×
290
}
291

292

293
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
4,429,560✔
294
  int32_t code = 0;
4,429,560✔
295
  int32_t lino;
296

297
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
8,859,120✔
298
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
8,859,120✔
299
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
8,859,120✔
300
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
8,859,120✔
301
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
8,859,120✔
302

303
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
4,429,560✔
304
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
4,429,560✔
305
  for (int32_t i = 0; i < recalcNum; ++i) {
4,429,560✔
306
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
307
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
308
  }
309

310
_exit:
4,429,560✔
311

312
  return code;
4,429,560✔
313
}
314

315
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
2,092,940✔
316
  int32_t code = 0;
2,092,940✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
4,185,880✔
320
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
4,185,880✔
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
4,185,880✔
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
4,185,880✔
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
4,185,880✔
324

325
  int32_t recalcNum = 0;
2,092,940✔
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
2,092,940✔
327
  if (recalcNum > 0) {
2,092,940✔
328
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
329
    if (NULL == pStatus->userRecalcs) {
×
330
      code = terrno;
×
331
      goto _exit;
×
332
    }
333
  }
334

335
  for (int32_t i = 0; i < recalcNum; ++i) {
2,092,940✔
336
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
337
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
338
  }
339

340
_exit:
2,092,940✔
341

342
  return code;
2,092,940✔
343
}
344

345

346
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
34,574,850✔
347
  int32_t code = 0;
34,574,850✔
348
  int32_t lino;
349

350
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
34,574,850✔
351
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
69,149,700✔
352
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
69,149,700✔
353
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
69,149,700✔
354
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
69,149,700✔
355

356
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
34,574,850✔
357
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
34,574,850✔
358
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
136,381,056✔
359
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
101,806,206✔
360
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
203,612,412✔
361
  }
362
  
363
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
34,574,850✔
364
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
34,574,850✔
365
  for (int32_t i = 0; i < statusNum; ++i) {
103,348,908✔
366
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
68,774,058✔
367
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
68,774,058✔
368
  }
369

370
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
34,574,850✔
371
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
34,574,850✔
372
  for (int32_t i = 0; i < reqNum; ++i) {
34,709,004✔
373
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
134,154✔
374
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
268,308✔
375
  }
376

377
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
34,574,850✔
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
34,574,850✔
379
  for (int32_t i = 0; i < triggerNum; ++i) {
39,004,410✔
380
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
4,429,560✔
381
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
4,429,560✔
382
  }
383
  
384
  tEndEncode(pEncoder);
34,574,850✔
385

386
_exit:
34,574,850✔
387
  if (code) {
34,574,850✔
388
    return code;
×
389
  } else {
390
    return pEncoder->pos;
34,574,850✔
391
  }
392
}
393

394
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
16,594,919✔
395
  int32_t code = 0;
16,594,919✔
396
  int32_t lino;
397

398
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
16,594,919✔
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
33,189,838✔
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
33,189,838✔
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
33,189,838✔
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
33,189,838✔
403

404
  int32_t vgLearderNum = 0;
16,594,919✔
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
16,594,919✔
406
  if (vgLearderNum > 0) {
16,594,919✔
407
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
13,250,483✔
408
    if (NULL == pReq->pVgLeaders) {
13,250,483✔
409
      code = terrno;
×
410
      goto _exit;
×
411
    }
412
  }
413
  for (int32_t i = 0; i < vgLearderNum; ++i) {
66,304,770✔
414
    int32_t vgId = 0;
49,709,851✔
415
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
49,709,851✔
416
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
99,419,702✔
417
      code = terrno;
×
418
      goto _exit;
×
419
    }
420
  }
421

422

423
  int32_t statusNum = 0;
16,594,919✔
424
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
16,594,919✔
425
  if (statusNum > 0) {
16,594,919✔
426
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
1,085,271✔
427
    if (NULL == pReq->pStreamStatus) {
1,085,271✔
428
      code = terrno;
×
429
      goto _exit;
×
430
    }
431
  }
432
  for (int32_t i = 0; i < statusNum; ++i) {
49,700,550✔
433
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
33,105,631✔
434
    if (NULL == pTask) {
33,105,631✔
435
      code = terrno;
×
436
      goto _exit;
×
437
    }
438
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
33,105,631✔
439
  }
440

441

442
  int32_t reqNum = 0;
16,594,919✔
443
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
16,594,919✔
444
  if (reqNum > 0) {
16,594,919✔
445
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
27,681✔
446
    if (NULL == pReq->pStreamReq) {
27,681✔
447
      code = terrno;
×
448
      goto _exit;
×
449
    }
450
  }
451
  for (int32_t i = 0; i < reqNum; ++i) {
16,661,996✔
452
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
67,077✔
453
    if (NULL == pIdx) {
67,077✔
454
      code = terrno;
×
455
      goto _exit;
×
456
    }
457
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
67,077✔
458
  }
459

460

461
  int32_t triggerNum = 0;
16,594,919✔
462
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
16,594,919✔
463
  if (triggerNum > 0) {
16,594,919✔
464
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
504,937✔
465
    if (NULL == pReq->pTriggerStatus) {
504,937✔
466
      code = terrno;
×
467
      goto _exit;
×
468
    }
469
  }
470
  for (int32_t i = 0; i < triggerNum; ++i) {
18,687,859✔
471
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
2,092,940✔
472
    if (NULL == pStatus) {
2,092,940✔
473
      code = terrno;
×
474
      goto _exit;
×
475
    }
476
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
2,092,940✔
477
  }
478

479
  
480
  tEndDecode(pDecoder);
16,594,919✔
481

482
_exit:
16,594,919✔
483
  return code;
16,594,919✔
484
}
485

486
void tFreeSSTriggerRuntimeStatus(void* param) {
4,307,720✔
487
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
4,307,720✔
488
  if (NULL == pStatus) {
4,307,720✔
489
    return;
×
490
  }
491
  taosArrayDestroy(pStatus->userRecalcs);
4,307,720✔
492
}
493

494
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
100,358,404✔
495
  if (pMsg == NULL) {
100,358,404✔
496
    return;
×
497
  }
498

499
  taosArrayDestroy(pMsg->pVgLeaders);
100,358,404✔
500
  if (deepClean) {
100,358,404✔
501
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
100,358,404✔
502
    for (int32_t i = 0; i < reqNum; ++i) {
100,492,558✔
503
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
134,154✔
504
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
134,154✔
505
      if (NULL == pTask) {
134,154✔
506
        continue;
×
507
      }
508

509
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
134,154✔
510
      taosMemoryFree(pTask->pMgmtReq);
134,154✔
511
    }
512
  }
513
  taosArrayDestroy(pMsg->pStreamReq);
100,358,404✔
514
  taosArrayDestroy(pMsg->pStreamStatus);
100,358,404✔
515
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
100,358,404✔
516
}
517

518
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
589,726✔
519
  int32_t code = 0;
589,726✔
520
  int32_t lino;
521

522
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
1,179,452✔
523
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
1,179,452✔
524
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblSuid));
1,179,452✔
525
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
1,179,452✔
526
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
1,179,452✔
527
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
1,179,452✔
528
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
1,179,452✔
529
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
1,179,452✔
530
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
1,179,452✔
531
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
532
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
1,179,452✔
533
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
1,179,452✔
534

535
_exit:
589,726✔
536

537
  return code;
589,726✔
538
}
539

540
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
576,976✔
541
  int32_t code = 0;
576,976✔
542
  int32_t lino;
543

544
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
1,153,952✔
545
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
1,153,952✔
546

547
_exit:
576,976✔
548

549
  return code;
576,976✔
550
}
551

552

553
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
1,166,702✔
554
  int32_t code = 0;
1,166,702✔
555
  int32_t lino;
556

557
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
2,333,404✔
558
  if (pMsg->triggerReader) {
1,166,702✔
559
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
589,726✔
560
  } else {
561
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
576,976✔
562
  }
563
  
564
_exit:
576,976✔
565

566
  return code;
1,166,702✔
567
}
568

569
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
1,719,662✔
570
  int32_t code = 0;
1,719,662✔
571
  int32_t lino;
572

573
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
3,439,324✔
574
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
3,439,324✔
575
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
1,719,662✔
576

577
_exit:
1,719,662✔
578

579
  return code;
1,719,662✔
580
}
581

582
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
1,101,192✔
583
  int32_t code = 0;
1,101,192✔
584
  int32_t lino;
585

586
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
1,101,192✔
587
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,202,384✔
588

589
_exit:
1,101,192✔
590

591
  return code;
1,101,192✔
592
}
593

594

595
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
373,770✔
596
  int32_t code = 0;
373,770✔
597
  int32_t lino;
598

599
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
747,540✔
600
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
747,540✔
601
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
747,540✔
602
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
747,540✔
603
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
747,540✔
604
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
747,540✔
605
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
747,540✔
606
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
747,540✔
607
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblStb));
747,540✔
608
  int32_t partitionColsLen = pMsg->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->partitionCols) + 1;
373,770✔
609
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, partitionColsLen));
747,540✔
610

611
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
373,770✔
612
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
373,770✔
613
  for (int32_t i = 0; i < addrSize; ++i) {
486,606✔
614
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
112,836✔
615
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
225,672✔
616
  }
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
747,540✔
618
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
747,540✔
619
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
747,540✔
620

621
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
747,540✔
622
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
747,540✔
623
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
747,540✔
624
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
747,540✔
625

626
  switch (pMsg->triggerType) {
373,770✔
627
    case WINDOW_TYPE_SESSION: {
16,714✔
628
      // session trigger
629
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
33,428✔
630
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
33,428✔
631
      break;
16,714✔
632
    }
633
    case WINDOW_TYPE_STATE: {
129,642✔
634
      // state trigger
635
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
259,284✔
636
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.extend));
259,284✔
637
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForType));
259,284✔
638
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForCount));
259,284✔
639
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
259,284✔
640
      int32_t stateWindowZerothLen = 
129,642✔
641
          pMsg->trigger.stateWin.zeroth == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.zeroth) + 1;
129,642✔
642
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.zeroth, stateWindowZerothLen));
259,284✔
643
      int32_t stateWindowExprLen =
129,642✔
644
          pMsg->trigger.stateWin.expr == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.expr) + 1;
129,642✔
645
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.expr, stateWindowExprLen));
259,284✔
646
      break;
129,642✔
647
    }
648
    case WINDOW_TYPE_INTERVAL: {
133,456✔
649
      // slide trigger
650
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
266,912✔
651
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
266,912✔
652
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
266,912✔
653
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
266,912✔
654
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
266,912✔
655
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
266,912✔
656
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
266,912✔
657
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
266,912✔
658
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
266,912✔
659
      break;
133,456✔
660
    }
661
    case WINDOW_TYPE_EVENT: {
49,776✔
662
      // event trigger
663
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
49,776✔
664
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
49,776✔
665

666
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
99,552✔
667
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
99,552✔
668
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForType));
99,552✔
669
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForCount));
99,552✔
670
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
99,552✔
671
      break;
49,776✔
672
    }
673
    case WINDOW_TYPE_COUNT: {
29,588✔
674
      // count trigger
675
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
29,588✔
676
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
59,176✔
677

678
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
59,176✔
679
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
59,176✔
680
      break;
29,588✔
681
    }
682
    case WINDOW_TYPE_PERIOD: {
14,594✔
683
      // period trigger
684
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.periodUnit));
29,188✔
685
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.offsetUnit));
29,188✔
686
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.precision));
29,188✔
687
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
29,188✔
688
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
29,188✔
689
      break;
14,594✔
690
    }
691
    default:
×
692
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
693
      break;
×
694
  }
695

696
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
747,540✔
697
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
747,540✔
698
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
747,540✔
699
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
747,540✔
700
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcPkSlotId));
747,540✔
701
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triPkSlotId));
747,540✔
702
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
373,770✔
703
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
747,540✔
704
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
373,770✔
705
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
747,540✔
706
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
373,770✔
707
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
747,540✔
708

709
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
373,770✔
710
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
373,770✔
711
  for (int32_t i = 0; i < readerNum; ++i) {
839,206✔
712
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
465,436✔
713
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
465,436✔
714
  }
715

716
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
373,770✔
717
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
373,770✔
718
  for (int32_t i = 0; i < runnerNum; ++i) {
1,474,962✔
719
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1,101,192✔
720
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
1,101,192✔
721
  }
722

723
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
747,540✔
724
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
747,540✔
725
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->precision));
747,540✔
726

727
_exit:
373,770✔
728

729
  return code;
373,770✔
730
}
731

732

733
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
6,381,218✔
734
  int32_t code = 0;
6,381,218✔
735
  int32_t lino;
736

737
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
12,762,436✔
738
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
12,762,436✔
739
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
12,762,436✔
740
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
12,762,436✔
741
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
12,762,436✔
742
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
12,762,436✔
743

744
_exit:
6,381,218✔
745

746
  return code;
6,381,218✔
747
}
748

749

750
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
1,237,396✔
751
  int32_t code = 0;
1,237,396✔
752
  int32_t lino;
753

754
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,474,792✔
755
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
2,474,792✔
756
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
2,474,792✔
757
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
2,474,792✔
758
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
2,474,792✔
759
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
2,474,792✔
760
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
2,474,792✔
761
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
2,474,792✔
762

763
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
1,237,396✔
764
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
1,237,396✔
765
  for (int32_t i = 0; i < addrSize; ++i) {
1,555,786✔
766
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
318,390✔
767
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
636,780✔
768
  }
769
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
2,474,792✔
770

771
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
1,237,396✔
772
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
1,237,396✔
773
  for (int32_t i = 0; i < outColNum; ++i) {
6,515,898✔
774
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
5,278,502✔
775
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
5,278,502✔
776
  }
777

778
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
1,237,396✔
779
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
1,237,396✔
780
  for (int32_t i = 0; i < outTagNum; ++i) {
2,340,112✔
781
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1,102,716✔
782
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
1,102,716✔
783
  }
784

785
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
2,474,792✔
786
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
2,474,792✔
787

788
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
2,474,792✔
789
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
2,474,792✔
790

791
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
1,237,396✔
792
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
1,237,396✔
793
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,373,938✔
794
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
136,542✔
795
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
136,542✔
796

797
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
273,084✔
798
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
273,084✔
799
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
273,084✔
800
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
273,084✔
801
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
273,084✔
802
  }
803

804
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
2,474,792✔
805

806
  // colCids and tagCids - always encode size (0 if NULL) for compatibility
807
  int32_t colCidsSize = (int32_t)taosArrayGetSize(pMsg->colCids);
1,237,396✔
808
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, colCidsSize));
1,237,396✔
809
  if (colCidsSize > 0) {
1,237,396✔
810
    for (int32_t i = 0; i < colCidsSize; ++i) {
72,678✔
811
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->colCids, i);
54,966✔
812
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
109,932✔
813
    }
814
  }
815

816
  int32_t tagCidsSize = (int32_t)taosArrayGetSize(pMsg->tagCids);
1,237,396✔
817
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, tagCidsSize));
1,237,396✔
818
  if (tagCidsSize > 0) {
1,237,396✔
819
    for (int32_t i = 0; i < tagCidsSize; ++i) {
35,898✔
820
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->tagCids, i);
22,368✔
821
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
44,736✔
822
    }
823
  }
824

825
_exit:
1,237,396✔
826

827
  return code;
1,237,396✔
828
}
829

830
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
2,777,868✔
831
  int32_t code = 0;
2,777,868✔
832
  int32_t lino;
833

834
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
2,777,868✔
835
  switch (pTask->task.type) {
2,777,868✔
836
    case STREAM_READER_TASK:
1,166,702✔
837
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
1,166,702✔
838
      break;
1,166,702✔
839
    case STREAM_TRIGGER_TASK:
373,770✔
840
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
373,770✔
841
      break;
373,770✔
842
    case STREAM_RUNNER_TASK:
1,237,396✔
843
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
1,237,396✔
844
      break;
1,237,396✔
845
    default:
×
846
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
847
      break;
×
848
  }
849
  
850
_exit:
2,777,868✔
851

852
  return code;
2,777,868✔
853
}
854

855

856
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
514,650✔
857
  int32_t code = 0;
514,650✔
858
  int32_t lino;
859

860
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
1,029,300✔
861

862
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
514,650✔
863
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
514,650✔
864
  for (int32_t i = 0; i < readerNum; ++i) {
1,681,352✔
865
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
1,166,702✔
866
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,166,702✔
867
  }
868

869
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
1,029,300✔
870
  if (pStream->triggerTask) {
514,650✔
871
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
373,770✔
872
  }
873
  
874
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
514,650✔
875
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
514,650✔
876
  for (int32_t i = 0; i < runnerNum; ++i) {
1,752,046✔
877
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
1,237,396✔
878
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,237,396✔
879
  }
880

881
_exit:
514,650✔
882

883
  return code;
514,650✔
884
}
885

886
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
1,042,720✔
887
  int32_t code = 0;
1,042,720✔
888
  int32_t lino = 0;
1,042,720✔
889

890
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
2,085,440✔
891

892
_exit:
1,042,720✔
893
  return code;
1,042,720✔
894
}
895

896
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
521,513✔
897
  int32_t code = 0;
521,513✔
898
  int32_t lino;
899

900
  int32_t type = 0;
521,513✔
901
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
521,513✔
902
  pMsg->msgType = type;
521,513✔
903

904
_exit:
521,513✔
905
  return code;
521,513✔
906
}
907

908
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
500,376✔
909
  int32_t code = 0;
500,376✔
910
  int32_t lino;
911

912
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
500,376✔
913

914
_exit:
500,376✔
915

916
  return code;
500,376✔
917
}
918

919
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
500,376✔
920
  int32_t code = 0;
500,376✔
921
  int32_t lino;
922

923
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
500,376✔
924
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
500,376✔
925

926
_exit:
500,376✔
927

928
  return code;
500,376✔
929
}
930

931
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
400,846✔
932
  int32_t code = 0;
400,846✔
933
  int32_t lino;
934

935
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
400,846✔
936
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
801,692✔
937
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
801,692✔
938

939
_exit:
400,846✔
940

941
  return code;
400,846✔
942
}
943

944
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
400,846✔
945
  int32_t code = 0;
400,846✔
946
  int32_t lino;
947

948
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
400,846✔
949
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
400,846✔
950

951
_exit:
400,846✔
952

953
  return code;
400,846✔
954
}
955

956

957
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
7,344✔
958
  int32_t code = 0;
7,344✔
959
  int32_t lino;
960

961
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
14,688✔
962
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
14,688✔
963
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
14,688✔
964

965
_exit:
7,344✔
966

967
  return code;
7,344✔
968
}
969

970
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
141,498✔
971
  int32_t code = 0;
141,498✔
972
  int32_t lino;
973

974
  switch (msgType) {
141,498✔
975
    case STREAM_MSG_ORIGTBL_READER_INFO: {
128,984✔
976
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
128,984✔
977
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
128,984✔
978

979
      for (int32_t i = 0; i < vgNum; ++i) {
506,842✔
980
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
377,858✔
981
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
755,716✔
982
      }
983

984
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
128,984✔
985
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
128,984✔
986
      
987
      for (int32_t i = 0; i < readerNum; ++i) {
252,462✔
988
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
123,478✔
989
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
123,478✔
990
      }
991
      break;
128,984✔
992
    }
993
    case STREAM_MSG_UPDATE_RUNNER: {
×
994
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
995
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
996
      
997
      for (int32_t i = 0; i < runnerNum; ++i) {
×
998
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
999
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1000
      }
1001
      break;
×
1002
    }
1003
    case STREAM_MSG_USER_RECALC: {
7,344✔
1004
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
7,344✔
1005
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
7,344✔
1006
      
1007
      for (int32_t i = 0; i < recalcNum; ++i) {
14,688✔
1008
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
7,344✔
1009
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
7,344✔
1010
      }
1011
      break;
7,344✔
1012
    }
1013
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
5,170✔
1014
      int32_t rspNum = taosArrayGetSize(pRsp->execRspList);
5,170✔
1015
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
5,170✔
1016
      
1017
      for (int32_t i = 0; i < rspNum; ++i) {
10,340✔
1018
        SStreamOReaderDeployRsp* pDeployRsp = taosArrayGet(pRsp->execRspList, i);
5,170✔
1019
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeployRsp->execId));
10,340✔
1020
        int32_t vgNum = taosArrayGetSize(pDeployRsp->vgList);
5,170✔
1021
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
5,170✔
1022
        for (int32_t n = 0; n < vgNum; ++n) {
10,340✔
1023
          TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, taosArrayGet(pDeployRsp->vgList, n)));
5,170✔
1024
        }
1025
      }
1026
      break;
5,170✔
1027
    }
1028
    default:
×
1029
      break;
×
1030
  }
1031

1032
_exit:
141,498✔
1033

1034
  return code;
141,498✔
1035
}
1036

1037
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
141,498✔
1038
  int32_t code = 0;
141,498✔
1039
  int32_t lino;
1040

1041
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
141,498✔
1042
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
282,996✔
1043
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
282,996✔
1044
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
141,498✔
1045
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
141,498✔
1046

1047
_exit:
141,498✔
1048

1049
  return code;
141,498✔
1050
}
1051

1052

1053
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
32,911,012✔
1054
  int32_t code = 0;
32,911,012✔
1055
  int32_t lino;
1056

1057
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
32,911,012✔
1058
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
65,822,024✔
1059
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
32,911,012✔
1060
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
32,911,012✔
1061
  for (int32_t i = 0; i < deployNum; ++i) {
33,425,662✔
1062
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
514,650✔
1063
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
514,650✔
1064
  }
1065

1066
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
32,911,012✔
1067
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
32,911,012✔
1068
  for (int32_t i = 0; i < startNum; ++i) {
33,411,388✔
1069
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
500,376✔
1070
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
500,376✔
1071
  }
1072

1073
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
65,822,024✔
1074
  if (!pRsp->undeploy.undeployAll) {
32,911,012✔
1075
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
32,911,012✔
1076
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
32,911,012✔
1077
    for (int32_t i = 0; i < undeployNum; ++i) {
33,311,858✔
1078
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
400,846✔
1079
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
400,846✔
1080
    }
1081
  }
1082

1083
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
32,911,012✔
1084
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
32,911,012✔
1085
  for (int32_t i = 0; i < rspNum; ++i) {
33,052,510✔
1086
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
141,498✔
1087
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
141,498✔
1088
  }
1089
  
1090
_exit:
32,911,012✔
1091

1092
  tEndEncode(pEncoder);
32,911,012✔
1093

1094
  return code;
32,911,012✔
1095
}
1096

1097
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
294,873✔
1098
  int32_t code = 0;
294,873✔
1099
  int32_t lino;
1100

1101
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
589,746✔
1102
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
589,746✔
1103
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblSuid));
589,746✔
1104
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
589,746✔
1105
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
589,746✔
1106
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
589,746✔
1107
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
589,746✔
1108
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
589,746✔
1109
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
589,746✔
1110
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
589,746✔
1111
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
589,746✔
1112

1113
_exit:
294,873✔
1114

1115
  return code;
294,873✔
1116
}
1117

1118

1119
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
288,495✔
1120
  int32_t code = 0;
288,495✔
1121
  int32_t lino;
1122

1123
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
576,990✔
1124
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
576,990✔
1125

1126
_exit:
288,495✔
1127

1128
  return code;
288,495✔
1129
}
1130

1131

1132
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
583,368✔
1133
  int32_t code = 0;
583,368✔
1134
  int32_t lino;
1135

1136
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
1,166,736✔
1137
  if (pMsg->triggerReader) {
583,368✔
1138
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
294,873✔
1139
  } else {
1140
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
288,495✔
1141
  }
1142
  
1143
_exit:
288,495✔
1144

1145
  return code;
583,368✔
1146
}
1147

1148

1149
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
860,413✔
1150
  int32_t code = 0;
860,413✔
1151
  int32_t lino;
1152

1153
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
1,720,826✔
1154
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
1,720,826✔
1155
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
860,413✔
1156

1157
_exit:
860,413✔
1158

1159
  return code;
860,413✔
1160
}
1161

1162

1163
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
550,800✔
1164
  int32_t code = 0;
550,800✔
1165
  int32_t lino;
1166

1167
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
550,800✔
1168
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,101,600✔
1169

1170
_exit:
550,800✔
1171

1172
  return code;
550,800✔
1173
}
1174

1175

1176
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
186,953✔
1177
  int32_t code = 0;
186,953✔
1178
  int32_t lino;
1179

1180
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
373,906✔
1181
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
373,906✔
1182
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
373,906✔
1183
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
373,906✔
1184
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
373,906✔
1185
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
373,906✔
1186
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
373,906✔
1187
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
373,906✔
1188
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblStb));
373,906✔
1189
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
373,906✔
1190

1191
  int32_t addrSize = 0;
186,953✔
1192
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
186,953✔
1193
  if (addrSize > 0) {
186,953✔
1194
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
56,418✔
1195
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
56,418✔
1196
  }
1197
  for (int32_t i = 0; i < addrSize; ++i) {
243,371✔
1198
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
56,418✔
1199
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
56,418✔
1200
  }
1201
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
373,906✔
1202
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
373,906✔
1203
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
373,906✔
1204

1205
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
373,906✔
1206
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
373,906✔
1207
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
373,906✔
1208
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
373,906✔
1209

1210
  switch (pMsg->triggerType) {
186,953✔
1211
    case WINDOW_TYPE_SESSION:
8,357✔
1212
      // session trigger
1213
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
16,714✔
1214
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
16,714✔
1215
      break;
8,357✔
1216
    case WINDOW_TYPE_STATE:
64,821✔
1217
      // state trigger
1218
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
129,642✔
1219
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.extend));
129,642✔
1220
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForType));
129,642✔
1221
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForCount));
129,642✔
1222
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
129,642✔
1223
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.zeroth, NULL));
129,642✔
1224
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.expr, NULL));
129,642✔
1225
      break;
64,821✔
1226
    
1227
    case WINDOW_TYPE_INTERVAL:
66,796✔
1228
      // slide trigger
1229
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
133,592✔
1230
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
133,592✔
1231
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
133,592✔
1232
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
133,592✔
1233
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
133,592✔
1234
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
133,592✔
1235
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
133,592✔
1236
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
133,592✔
1237
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
133,592✔
1238
      break;
66,796✔
1239
    
1240
    case WINDOW_TYPE_EVENT:
24,888✔
1241
      // event trigger
1242
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
49,776✔
1243
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
49,776✔
1244
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForType));
49,776✔
1245
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForCount));
49,776✔
1246
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
49,776✔
1247
      break;
24,888✔
1248
    
1249
    case WINDOW_TYPE_COUNT:
14,794✔
1250
      // count trigger
1251
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
29,588✔
1252
      
1253
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
29,588✔
1254
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
29,588✔
1255
      break;
14,794✔
1256
    
1257
    case WINDOW_TYPE_PERIOD:
7,297✔
1258
      // period trigger
1259
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.periodUnit));
14,594✔
1260
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.offsetUnit));
14,594✔
1261
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.period.precision));
14,594✔
1262
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
14,594✔
1263
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
14,594✔
1264
      break;
7,297✔
1265
    default:
×
1266
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1267
      break;
×
1268
  }
1269

1270
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
373,906✔
1271
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
373,906✔
1272
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
373,906✔
1273
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
373,906✔
1274
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcPkSlotId));
373,906✔
1275
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triPkSlotId));
373,906✔
1276
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
373,906✔
1277
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
373,906✔
1278
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
373,906✔
1279

1280
  int32_t readerNum = 0;
186,953✔
1281
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
186,953✔
1282
  if (readerNum > 0) {
186,953✔
1283
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
185,931✔
1284
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
185,931✔
1285
  }
1286
  for (int32_t i = 0; i < readerNum; ++i) {
419,782✔
1287
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
232,829✔
1288
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
232,829✔
1289
  }
1290

1291
  int32_t runnerNum = 0;
186,953✔
1292
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
186,953✔
1293
  if (runnerNum > 0) {
186,953✔
1294
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
183,600✔
1295
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
183,600✔
1296
  }
1297
  for (int32_t i = 0; i < runnerNum; ++i) {
737,753✔
1298
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
550,800✔
1299
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
550,800✔
1300
  }
1301

1302
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
373,906✔
1303
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
373,906✔
1304
  if (!tDecodeIsEnd(pDecoder)) {
186,953✔
1305
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->precision));
373,906✔
1306
  }
1307

1308
_exit:
186,953✔
1309

1310
  return code;
186,953✔
1311
}
1312

1313

1314

1315
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
3,191,232✔
1316
  int32_t code = 0;
3,191,232✔
1317
  int32_t lino;
1318

1319
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
3,191,232✔
1320
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
6,382,464✔
1321
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
6,382,464✔
1322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
6,382,464✔
1323
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
6,382,464✔
1324
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
6,382,464✔
1325

1326
_exit:
3,191,232✔
1327

1328
  return code;
3,191,232✔
1329
}
1330

1331
void destroySStreamOutCols(void* p){
68,271✔
1332
  if (p == NULL) return;
68,271✔
1333
  SStreamOutCol* col = (SStreamOutCol*)p;
68,271✔
1334
  taosMemoryFreeClear(col->expr);
68,271✔
1335
}
1336

1337
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
618,915✔
1338
  int32_t code = 0;
618,915✔
1339
  int32_t lino;
1340

1341
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,237,830✔
1342
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1,237,830✔
1343
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
1,237,830✔
1344
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
1,237,830✔
1345
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
1,237,830✔
1346
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
1,237,830✔
1347
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
1,237,830✔
1348
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
1,237,830✔
1349

1350
  int32_t addrSize = 0;
618,915✔
1351
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
618,915✔
1352
  if (addrSize > 0) {
618,915✔
1353
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
159,195✔
1354
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
159,195✔
1355
  }
1356
  for (int32_t i = 0; i < addrSize; ++i) {
778,110✔
1357
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
159,195✔
1358
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
159,195✔
1359
  }
1360
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
1,237,830✔
1361

1362
  int32_t outColNum = 0;
618,915✔
1363
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
618,915✔
1364
  if (outColNum > 0) {
618,915✔
1365
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
618,915✔
1366
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
618,915✔
1367
  }
1368
  for (int32_t i = 0; i < outColNum; ++i) {
3,258,642✔
1369
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
2,639,727✔
1370
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
2,639,727✔
1371
  }
1372

1373
  int32_t outTagNum = 0;
618,915✔
1374
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
618,915✔
1375
  if (outTagNum > 0) {
618,915✔
1376
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
266,616✔
1377
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
266,616✔
1378
  }
1379
  for (int32_t i = 0; i < outTagNum; ++i) {
1,170,420✔
1380
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
551,505✔
1381
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
551,505✔
1382
  }
1383

1384
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
1,237,830✔
1385
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
1,237,830✔
1386

1387
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
1,237,830✔
1388
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
1,237,830✔
1389

1390
  int32_t forceOutColsSize = 0;
618,915✔
1391
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
618,915✔
1392
  if (forceOutColsSize > 0) {
618,915✔
1393
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
12,354✔
1394
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
12,354✔
1395
  }
1396
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
687,186✔
1397
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
68,271✔
1398

1399
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
136,542✔
1400
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
136,542✔
1401
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
136,542✔
1402
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
136,542✔
1403
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
136,542✔
1404
  }
1405

1406
  if (!tDecodeIsEnd(pDecoder)) {
618,915✔
1407
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
1,237,830✔
1408
  }
1409

1410
  // colCids and tagCids - always decode size, create array only if size > 0
1411
  // For backward compatibility, check if there's more data before decoding
1412
  if (!tDecodeIsEnd(pDecoder)) {
618,915✔
1413
    int32_t colCidsSize = 0;
618,915✔
1414
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &colCidsSize));
618,915✔
1415
    if (colCidsSize > 0 && colCidsSize <= TSDB_MAX_COLUMNS) {  // Sanity check
618,915✔
1416
      pMsg->colCids = taosArrayInit(colCidsSize, sizeof(int16_t));
8,856✔
1417
      TSDB_CHECK_NULL(pMsg->colCids, code, lino, _exit, terrno);
8,856✔
1418
      for (int32_t i = 0; i < colCidsSize; ++i) {
36,339✔
1419
        int16_t cid = 0;
27,483✔
1420
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
27,483✔
1421
        if (taosArrayPush(pMsg->colCids, &cid) == NULL) {
54,966✔
1422
          TAOS_CHECK_EXIT(terrno);
×
1423
        }
1424
      }
1425
    }
1426
  }
1427
  // Try to decode tagCids if there's more data
1428
  if (!tDecodeIsEnd(pDecoder)) {
618,915✔
1429
    int32_t tagCidsSize = 0;
618,915✔
1430
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &tagCidsSize));
618,915✔
1431
    if (tagCidsSize > 0 && tagCidsSize <= TSDB_MAX_TAGS) {  // Sanity check
618,915✔
1432
      pMsg->tagCids = taosArrayInit(tagCidsSize, sizeof(int16_t));
6,765✔
1433
      TSDB_CHECK_NULL(pMsg->tagCids, code, lino, _exit, terrno);
6,765✔
1434
      for (int32_t i = 0; i < tagCidsSize; ++i) {
17,949✔
1435
        int16_t cid = 0;
11,184✔
1436
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
11,184✔
1437
        if (taosArrayPush(pMsg->tagCids, &cid) == NULL) {
22,368✔
1438
          TAOS_CHECK_EXIT(terrno);
×
1439
        }
1440
      }
1441
    }
1442
  }
1443

1444
_exit:
617,193✔
1445

1446
  return code;
618,915✔
1447
}
1448

1449
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
1,389,236✔
1450
  int32_t code = 0;
1,389,236✔
1451
  int32_t lino;
1452

1453
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1,389,236✔
1454
  switch (pTask->task.type) {
1,389,236✔
1455
    case STREAM_READER_TASK:
583,368✔
1456
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
583,368✔
1457
      break;
583,368✔
1458
    case STREAM_TRIGGER_TASK:
186,953✔
1459
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
186,953✔
1460
      break;
186,953✔
1461
    case STREAM_RUNNER_TASK:
618,915✔
1462
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
618,915✔
1463
      break;
618,915✔
1464
    default:
×
1465
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1466
      break;
×
1467
  }
1468
  
1469
_exit:
1,389,236✔
1470

1471
  return code;
1,389,236✔
1472
}
1473

1474

1475
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
257,432✔
1476
  int32_t code = 0;
257,432✔
1477
  int32_t lino;
1478

1479
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
514,864✔
1480

1481
  int32_t readerNum = 0;
257,432✔
1482
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
257,432✔
1483
  if (readerNum > 0) {
257,432✔
1484
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
245,085✔
1485
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
245,085✔
1486
  }
1487
  for (int32_t i = 0; i < readerNum; ++i) {
840,800✔
1488
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
583,368✔
1489
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
583,368✔
1490
  }
1491

1492
  int32_t triggerTask = 0;
257,432✔
1493
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
257,432✔
1494
  if (triggerTask) {
257,432✔
1495
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
186,953✔
1496
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
186,953✔
1497
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
186,953✔
1498
  }
1499
  
1500
  int32_t runnerNum = 0;
257,432✔
1501
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
257,432✔
1502
  if (runnerNum > 0) {
257,432✔
1503
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
191,182✔
1504
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
191,182✔
1505
  }
1506
  for (int32_t i = 0; i < runnerNum; ++i) {
876,347✔
1507
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
618,915✔
1508
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
618,915✔
1509
  }
1510

1511
_exit:
257,432✔
1512

1513
  return code;
257,432✔
1514
}
1515

1516

1517
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
250,249✔
1518
  int32_t code = 0;
250,249✔
1519
  int32_t lino;
1520

1521
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
250,249✔
1522

1523
_exit:
250,249✔
1524

1525
  return code;
250,249✔
1526
}
1527

1528

1529
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
250,249✔
1530
  int32_t code = 0;
250,249✔
1531
  int32_t lino;
1532

1533
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
250,249✔
1534
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
250,249✔
1535

1536
_exit:
250,249✔
1537

1538
  return code;
250,249✔
1539
}
1540

1541

1542
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
200,515✔
1543
  int32_t code = 0;
200,515✔
1544
  int32_t lino;
1545

1546
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
200,515✔
1547
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
401,030✔
1548
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
401,030✔
1549

1550
_exit:
200,515✔
1551

1552
  return code;
200,515✔
1553
}
1554

1555

1556
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
200,515✔
1557
  int32_t code = 0;
200,515✔
1558
  int32_t lino;
1559

1560
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
200,515✔
1561
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
200,515✔
1562

1563
_exit:
200,515✔
1564

1565
  return code;
200,515✔
1566
}
1567

1568
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
3,672✔
1569
  int32_t code = 0;
3,672✔
1570
  int32_t lino;
1571

1572
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
7,344✔
1573
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
7,344✔
1574
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
7,344✔
1575

1576
_exit:
3,672✔
1577

1578
  return code;
3,672✔
1579
}
1580

1581
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
70,749✔
1582
  int32_t code = 0;
70,749✔
1583
  int32_t lino;
1584

1585
  switch (msgType) {
70,749✔
1586
    case STREAM_MSG_ORIGTBL_READER_INFO: {
64,492✔
1587
      int32_t vgNum = 0;
64,492✔
1588
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
64,492✔
1589
      if (vgNum > 0) {
64,492✔
1590
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
63,714✔
1591
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
63,714✔
1592
      }
1593
      for (int32_t i = 0; i < vgNum; ++i) {
253,421✔
1594
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
188,929✔
1595
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
188,929✔
1596
      }
1597

1598
      int32_t readerNum = 0;
64,492✔
1599
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
64,492✔
1600
      if (readerNum > 0) {
64,492✔
1601
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
53,491✔
1602
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
53,491✔
1603
      }
1604
      for (int32_t i = 0; i < readerNum; ++i) {
126,231✔
1605
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
61,739✔
1606
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
61,739✔
1607
      }
1608
      break;
64,492✔
1609
    }
1610
    case STREAM_MSG_UPDATE_RUNNER: {
×
1611
      int32_t runnerNum = 0;
×
1612
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1613
      if (runnerNum > 0) {
×
1614
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1615
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1616
      }
1617
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1618
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1619
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1620
      }
1621
      break;
×
1622
    }
1623
    case STREAM_MSG_USER_RECALC: {
3,672✔
1624
      int32_t recalcNum = 0;
3,672✔
1625
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
3,672✔
1626
      if (recalcNum > 0) {
3,672✔
1627
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
3,672✔
1628
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
3,672✔
1629
      }
1630
      for (int32_t i = 0; i < recalcNum; ++i) {
7,344✔
1631
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
3,672✔
1632
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
3,672✔
1633
      }
1634
      break;
3,672✔
1635
    }
1636
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
2,585✔
1637
      int32_t rspNum = 0, vgNum = 0;
2,585✔
1638
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));  
2,585✔
1639
      if (rspNum > 0) {
2,585✔
1640
        pCont->execRspList = taosArrayInit_s(sizeof(SStreamOReaderDeployRsp), rspNum);
2,585✔
1641
        TSDB_CHECK_NULL(pCont->execRspList, code, lino, _exit, terrno);
2,585✔
1642
      }
1643
      for (int32_t i = 0; i < rspNum; ++i) {
5,170✔
1644
        SStreamOReaderDeployRsp *pDeployRsp = taosArrayGet(pCont->execRspList, i);
2,585✔
1645
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pDeployRsp->execId));  
5,170✔
1646
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));
2,585✔
1647
        if (vgNum > 0) {
2,585✔
1648
          pDeployRsp->vgList = taosArrayInit_s(sizeof(SStreamTaskAddr), vgNum);
2,585✔
1649
          TSDB_CHECK_NULL(pDeployRsp->vgList, code, lino, _exit, terrno);
2,585✔
1650
        }
1651
        for (int32_t n = 0; n < vgNum; ++n) {
5,170✔
1652
          SStreamTaskAddr* pAddr = taosArrayGet(pDeployRsp->vgList, n);
2,585✔
1653
          TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));  
2,585✔
1654
        }
1655
      }
1656
      break;
2,585✔
1657
    }
1658
    default:
×
1659
      break;
×
1660
  }
1661

1662
_exit:
70,749✔
1663

1664
  return code;
70,749✔
1665
}
1666

1667

1668
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
70,749✔
1669
  int32_t code = 0;
70,749✔
1670
  int32_t lino;
1671

1672
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
70,749✔
1673
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
141,498✔
1674
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
141,498✔
1675
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
70,749✔
1676
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
70,749✔
1677

1678
_exit:
70,749✔
1679

1680
  return code;
70,749✔
1681
}
1682

1683
void tFreeSStreamOReaderDeployRsp(void* param) {
5,170✔
1684
  if (NULL == param) {
5,170✔
1685
    return;
×
1686
  }
1687

1688
  SStreamOReaderDeployRsp* pRsp = (SStreamOReaderDeployRsp*)param;
5,170✔
1689
  taosArrayDestroy(pRsp->vgList);
5,170✔
1690
}
1691

1692
void tFreeSStreamMgmtRsp(void* param) {
141,498✔
1693
  if (NULL == param) {
141,498✔
1694
    return;
×
1695
  }
1696
  
1697
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
141,498✔
1698

1699
  taosArrayDestroy(pRsp->cont.vgIds);
141,498✔
1700
  taosArrayDestroy(pRsp->cont.readerList);
141,498✔
1701
  taosArrayDestroy(pRsp->cont.runnerList);
141,498✔
1702
  taosArrayDestroy(pRsp->cont.recalcList);
141,498✔
1703
  taosArrayDestroyEx(pRsp->cont.execRspList, tFreeSStreamOReaderDeployRsp);
141,498✔
1704
}
1705

1706
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
583,368✔
1707
  if (NULL == pReader) {
583,368✔
1708
    return;
×
1709
  }
1710
  
1711
  if (pReader->triggerReader) {
583,368✔
1712
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
294,873✔
1713
    taosMemoryFree(pMsg->triggerTblName);
294,873✔
1714
    taosMemoryFree(pMsg->partitionCols);
294,873✔
1715
    taosMemoryFree(pMsg->triggerCols);
294,873✔
1716
    taosMemoryFree(pMsg->triggerScanPlan);
294,873✔
1717
    taosMemoryFree(pMsg->calcCacheScanPlan);
294,873✔
1718
  } else {
1719
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
288,495✔
1720
    taosMemoryFree(pMsg->calcScanPlan);
288,495✔
1721
  }
1722
}
1723

1724
void tFreeStreamNotifyUrl(void* param) {
×
1725
  if (NULL == param) {
×
1726
    return;
×
1727
  }
1728

1729
  taosMemoryFree(*(void**)param);
×
1730
}
1731

1732
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
186,953✔
1733
  if (NULL == pTrigger) {
186,953✔
1734
    return;
×
1735
  }
1736
  
1737
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
186,953✔
1738
  switch (pTrigger->triggerType) {
186,953✔
1739
    case WINDOW_TYPE_STATE:
64,821✔
1740
      taosMemoryFree(pTrigger->trigger.stateWin.zeroth);
64,821✔
1741
      taosMemoryFree(pTrigger->trigger.stateWin.expr);
64,821✔
1742
      break;
64,821✔
1743
    case WINDOW_TYPE_EVENT:
24,888✔
1744
      taosMemoryFree(pTrigger->trigger.event.startCond);
24,888✔
1745
      taosMemoryFree(pTrigger->trigger.event.endCond);
24,888✔
1746
      break;
24,888✔
1747
    case WINDOW_TYPE_COUNT:
14,794✔
1748
      taosMemoryFree(pTrigger->trigger.count.condCols);  
14,794✔
1749
      break;
14,794✔
1750
    default:
82,450✔
1751
      break;
82,450✔
1752
  }
1753

1754
  taosMemoryFree(pTrigger->partitionCols);
186,953✔
1755
  taosMemoryFree(pTrigger->triggerPrevFilter);
186,953✔
1756
  taosMemoryFree(pTrigger->triggerScanPlan);
186,953✔
1757
  taosMemoryFree(pTrigger->calcCacheScanPlan);
186,953✔
1758

1759
  taosArrayDestroy(pTrigger->readerList);
186,953✔
1760
  taosArrayDestroy(pTrigger->runnerList);
186,953✔
1761
  taosMemoryFree(pTrigger->streamName);
186,953✔
1762
}
1763

1764
void tFreeSStreamOutCol(void* param) {
×
1765
  if (NULL == param) {
×
1766
    return;
×
1767
  }
1768

1769
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1770
  taosMemoryFree(pOut->expr);
×
1771
}
1772

1773
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
618,915✔
1774
  if (NULL == pRunner) {
618,915✔
1775
    return;
×
1776
  }
1777

1778
  taosMemoryFree(pRunner->streamName);
618,915✔
1779
  taosMemoryFree(pRunner->pPlan);
618,915✔
1780
  taosMemoryFree(pRunner->outDBFName);
618,915✔
1781
  taosMemoryFree(pRunner->outTblName);
618,915✔
1782

1783
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
618,915✔
1784
  taosArrayDestroy(pRunner->outCols);
618,915✔
1785
  taosArrayDestroy(pRunner->outTags);
618,915✔
1786

1787
  taosMemoryFree(pRunner->subTblNameExpr);
618,915✔
1788
  taosMemoryFree(pRunner->tagValueExpr);
618,915✔
1789
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
618,915✔
1790
}
1791

1792
void tFreeSStmTaskDeploy(void* param) {
1,717,040✔
1793
  if (NULL == param) {
1,717,040✔
1794
    return;
327,804✔
1795
  }
1796

1797
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
1,389,236✔
1798
  switch (pTask->task.type)  {
1,389,236✔
1799
    case STREAM_READER_TASK:
583,368✔
1800
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
583,368✔
1801
      break;
583,368✔
1802
    case STREAM_TRIGGER_TASK:
186,953✔
1803
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
186,953✔
1804
      break;
186,953✔
1805
    case STREAM_RUNNER_TASK:
618,915✔
1806
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
618,915✔
1807
      break;
618,915✔
1808
    default:
×
1809
      break;
×
1810
  }
1811
}
1812

1813

1814
void tFreeSStmStreamDeploy(void* param) {
257,325✔
1815
  if (NULL == param) {
257,325✔
1816
    return;
×
1817
  }
1818
  
1819
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
257,325✔
1820
  int32_t readerNum = taosArrayGetSize(pDeploy->readerTasks);
257,325✔
1821
  for (int32_t i = 0; i < readerNum; ++i) {
840,676✔
1822
    SStmTaskDeploy* pReader = taosArrayGet(pDeploy->readerTasks, i);
583,351✔
1823
    if (!pReader->msg.reader.triggerReader && pReader->msg.reader.msg.calc.freeScanPlan) {
583,351✔
1824
      taosMemoryFreeClear(pReader->msg.reader.msg.calc.calcScanPlan);
284,564✔
1825
    }
1826
  }
1827
  taosArrayDestroy(pDeploy->readerTasks);
257,325✔
1828

1829
  if (pDeploy->triggerTask) {
257,325✔
1830
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
186,885✔
1831
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
186,885✔
1832
    taosMemoryFree(pDeploy->triggerTask);
186,885✔
1833
  }
1834

1835
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
257,325✔
1836
  for (int32_t i = 0; i < runnerNum; ++i) {
876,023✔
1837
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
618,698✔
1838
    taosMemoryFree(pRunner->msg.runner.pPlan);
618,698✔
1839
  }
1840
  taosArrayDestroy(pDeploy->runnerTasks);
257,325✔
1841
}
1842

1843
void tDeepFreeSStmStreamDeploy(void* param) {
514,757✔
1844
  if (NULL == param) {
514,757✔
1845
    return;
×
1846
  }
1847
  
1848
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
514,757✔
1849
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
514,757✔
1850
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
514,757✔
1851
  taosMemoryFree(pDeploy->triggerTask);
514,757✔
1852
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
514,757✔
1853
}
1854

1855

1856
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
33,189,838✔
1857
  if (NULL == pRsp) {
33,189,838✔
1858
    return;
×
1859
  }
1860
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
33,189,838✔
1861
  taosArrayDestroy(pRsp->start.taskList);
33,189,838✔
1862
  taosArrayDestroy(pRsp->undeploy.taskList);
33,189,838✔
1863
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
33,189,838✔
1864
}
1865

1866
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
16,389,757✔
1867
  if (NULL == pRsp) {
16,389,757✔
1868
    return;
×
1869
  }
1870
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
16,389,757✔
1871
  taosArrayDestroy(pRsp->start.taskList);
16,389,757✔
1872
  taosArrayDestroy(pRsp->undeploy.taskList);
16,389,757✔
1873
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
16,389,757✔
1874
}
1875

1876

1877

1878
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
16,389,757✔
1879
  int32_t code = 0;
16,389,757✔
1880
  int32_t lino;
1881

1882
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
16,389,757✔
1883
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
32,779,514✔
1884
  int32_t deployNum = 0;
16,389,757✔
1885
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
16,389,757✔
1886
  if (deployNum > 0) {
16,389,757✔
1887
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
73,426✔
1888
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
73,426✔
1889
  }
1890
  for (int32_t i = 0; i < deployNum; ++i) {
16,647,189✔
1891
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
257,432✔
1892
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
257,432✔
1893
  }
1894

1895
  int32_t startNum = 0;
16,389,757✔
1896
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
16,389,757✔
1897
  if (startNum > 0) {
16,389,757✔
1898
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
110,505✔
1899
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
110,505✔
1900
  }
1901
  for (int32_t i = 0; i < startNum; ++i) {
16,640,006✔
1902
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
250,249✔
1903
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
250,249✔
1904
  }
1905

1906
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
32,779,514✔
1907
  if (!pRsp->undeploy.undeployAll) {
16,389,757✔
1908
    int32_t undeployNum = 0;
16,389,757✔
1909
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
16,389,757✔
1910
    if (undeployNum > 0) {
16,389,757✔
1911
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
28,256✔
1912
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
28,256✔
1913
    }
1914
    for (int32_t i = 0; i < undeployNum; ++i) {
16,590,272✔
1915
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
200,515✔
1916
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
200,515✔
1917
    }
1918
  }  
1919

1920
  int32_t rspNum = 0;
16,389,757✔
1921
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
16,389,757✔
1922
  if (rspNum > 0) {
16,389,757✔
1923
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
31,138✔
1924
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
31,138✔
1925
    for (int32_t i = 0; i < rspNum; ++i) {
101,887✔
1926
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
70,749✔
1927
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
70,749✔
1928
    }
1929
  }
1930

1931
  tEndDecode(pDecoder);
16,389,757✔
1932

1933
_exit:
16,389,757✔
1934
  return code;
16,389,757✔
1935
}
1936

1937
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
1938
  int32_t code = 0;
×
1939
  int32_t lino;
1940

1941
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
1942
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
1943
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
1944
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
1945
  tEndEncode(pEncoder);
×
1946

1947
_exit:
×
1948
  return code;
×
1949
}
1950

1951
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
1952
  int32_t code = 0;
×
1953
  int32_t lino;
1954

1955
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
1956
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
1957
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
1958
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
1959
  tEndDecode(pDecoder);
×
1960

1961
_exit:
×
1962
  return code;
×
1963
}
1964

1965
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
1966
  int32_t code = 0;
×
1967
  int32_t lino;
1968

1969
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
1970
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
1971
  tEndEncode(pEncoder);
×
1972

1973
_exit:
×
1974
  return code;
×
1975
}
1976

1977
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
1978
  int32_t code = 0;
×
1979
  int32_t lino;
1980

1981
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
1982
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
1983
  tEndDecode(pDecoder);
×
1984

1985
_exit:
×
1986
  return code;
×
1987

1988
}
1989

1990

1991
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
1,405,024✔
1992
  int32_t code = TSDB_CODE_SUCCESS;
1,405,024✔
1993
  int32_t lino = 0;
1,405,024✔
1994

1995
  char*   json = NULL;
1,405,024✔
1996
  int32_t jsonLen = 0;
1,405,024✔
1997
  TAOS_CHECK_EXIT(scmCreateStreamReqToJson(pReq, false, &json, &jsonLen));
1,405,024✔
1998
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, json, jsonLen));
2,810,048✔
1999

2000
_exit:
1,405,024✔
2001
  taosMemoryFreeClear(json);
1,405,024✔
2002
  if (code) {
1,405,024✔
2003
    return code;
×
2004
  }
2005
  
2006
  return 0;
1,405,024✔
2007
}
2008

2009
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
633,718✔
2010
  SEncoder encoder = {0};
633,718✔
2011
  tEncoderInit(&encoder, buf, bufLen);
633,718✔
2012
  int32_t code = 0;
633,718✔
2013
  int32_t lino;
2014

2015
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
633,718✔
2016

2017
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
633,718✔
2018

2019
  tEndEncode(&encoder);
633,718✔
2020

2021
_exit:
633,718✔
2022
  if (code) {
633,718✔
2023
    tEncoderClear(&encoder);
×
2024
    return code;
×
2025
  } else {
2026
    int32_t tlen = encoder.pos;
633,718✔
2027
    tEncoderClear(&encoder);
633,718✔
2028
    return tlen;
633,718✔
2029
  }
2030
  return 0;
2031
}
2032

2033
// Old version deserialization for backward compatibility,
2034
// especially for stream version number 7
2035
int32_t tDeserializeSCMCreateStreamReqImplOld(SDecoder *pDecoder, SCMCreateStreamReq *pReq, int32_t leftBytes) {
×
2036
  int32_t code = 0;
×
2037
  int32_t lino;
2038
  pReq->calcPkSlotId = -1;
×
2039
  pReq->triPkSlotId = -1;
×
2040

2041
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2042

2043
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
×
2044
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
×
2045
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
×
2046
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
×
2047
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
×
2048
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
×
2049
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
×
2050

2051
  int32_t calcDbSize = 0;
×
2052
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
×
2053
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
×
2054
  if (pReq->calcDB == NULL) {
×
2055
    TAOS_CHECK_EXIT(terrno);
×
2056
  }
2057
  for (int32_t i = 0; i < calcDbSize; ++i) {
×
2058
    char *calcDb = NULL;
×
2059
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
×
2060
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
×
2061
    if (calcDb == NULL) {
×
2062
      TAOS_CHECK_EXIT(terrno);
×
2063
    }
2064
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
×
2065
      taosMemoryFree(calcDb);
×
2066
      TAOS_CHECK_EXIT(terrno);
×
2067
    }
2068
  }
2069

2070
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
×
2071
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
×
2072
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
×
2073
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
×
2074
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
×
2075
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
×
2076
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
×
2077
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
×
2078
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
×
2079
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
×
2080

2081
  int32_t addrSize = 0;
×
2082
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
×
2083
  if (addrSize > 0) {
×
2084
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
×
2085
    if (pReq->pNotifyAddrUrls == NULL) {
×
2086
      TAOS_CHECK_EXIT(terrno);
×
2087
    }
2088
  }
2089
  for (int32_t i = 0; i < addrSize; ++i) {
×
2090
    char *url = NULL;
×
2091
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
2092
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
2093
    if (url == NULL) {
×
2094
      TAOS_CHECK_EXIT(terrno);
×
2095
    }
2096
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
×
2097
      taosMemoryFree(url);
×
2098
      TAOS_CHECK_EXIT(terrno);
×
2099
    }
2100
  }
2101
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
×
2102
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->addOptions));
×
2103
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
×
2104

2105
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
×
2106
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
×
2107
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
×
2108

2109
  int32_t outColSize = 0;
×
2110
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
×
2111
  if (outColSize > 0) {
×
2112
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
×
2113
    if (pReq->outCols == NULL) {
×
2114
      TAOS_CHECK_EXIT(terrno);
×
2115
    }
2116

2117
    for (int32_t i = 0; i < outColSize; ++i) {
×
2118
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
×
2119
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
×
2120
    }
2121
  }
2122

2123
  int32_t outTagSize = 0;
×
2124
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
×
2125
  if (outTagSize > 0) {
×
2126
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
×
2127
    if (pReq->outTags == NULL) {
×
2128
      TAOS_CHECK_EXIT(terrno);
×
2129
    }
2130

2131
    for (int32_t i = 0; i < outTagSize; ++i) {
×
2132
      SFieldWithOptions field = {0};
×
2133
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
×
2134
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
×
2135
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
×
2136
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
×
2137
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
×
2138
        TAOS_CHECK_EXIT(terrno);
×
2139
      }
2140
    }
2141
  }
2142

2143
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
×
2144
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
×
2145
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
×
2146
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
×
2147

2148
  switch (pReq->triggerType) {
×
2149
    case WINDOW_TYPE_SESSION: {
×
2150
      // session trigger
2151
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
×
2152
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
×
2153
      break;
×
2154
    }
2155
      case WINDOW_TYPE_STATE: {
×
2156
        // state trigger
2157
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
×
2158
        pReq->trigger.stateWin.extend = 0;
×
2159
        pReq->trigger.stateWin.trueForType = 0;
×
2160
        pReq->trigger.stateWin.trueForCount = 0;
×
2161
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
×
2162
        break;
×
2163
      }
2164
      case WINDOW_TYPE_INTERVAL: {
×
2165
        // slide trigger
2166
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
×
2167
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
×
2168
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
×
2169
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
×
2170
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
×
2171
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
×
2172
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
×
2173
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
×
2174
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
×
2175
        break;
×
2176
      }
2177
      case WINDOW_TYPE_EVENT: {
×
2178
        // event trigger
2179
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
×
2180
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
×
2181
        pReq->trigger.event.trueForType = 0;
×
2182
        pReq->trigger.event.trueForCount = 0;
×
2183
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
×
2184
        break;
×
2185
      }
2186
      case WINDOW_TYPE_COUNT: {
×
2187
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
×
2188

2189
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
×
2190
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
×
2191
        break;
×
2192
      }
2193
      case WINDOW_TYPE_PERIOD: {
×
2194
        // period trigger
2195
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
×
2196
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
×
2197
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
×
2198
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
×
2199
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
×
2200
        break;
×
2201
      }
2202
      default:
×
2203
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2204
  }
2205

2206
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
×
2207
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
×
2208
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
×
2209
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
×
2210
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
×
2211
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
×
2212
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
×
2213
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
×
2214
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
×
2215
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
×
2216
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
×
2217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
×
2218
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
×
2219
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
×
2220

2221
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
×
2222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
×
2223

2224
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
×
2225

2226
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
×
2227
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
×
2228

2229
  int32_t calcScanPlanListSize = 0;
×
2230
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
×
2231
  if (calcScanPlanListSize > 0) {
×
2232
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
×
2233
    if (pReq->calcScanPlanList == NULL) {
×
2234
      TAOS_CHECK_EXIT(terrno);
×
2235
    }
2236
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
×
2237
      SStreamCalcScan calcScan = {0};
×
2238
      int32_t         vgListSize = 0;
×
2239
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
×
2240
      if (vgListSize > 0) {
×
2241
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
×
2242
        if (calcScan.vgList == NULL) {
×
2243
          TAOS_CHECK_EXIT(terrno);
×
2244
        }
2245
        for (int32_t j = 0; j < vgListSize; ++j) {
×
2246
          int32_t vgId = 0;
×
2247
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
×
2248
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
×
2249
            TAOS_CHECK_EXIT(terrno);
×
2250
          }
2251
        }
2252
      }
2253
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
×
2254
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
×
2255
      if (taosArrayPush(pReq->calcScanPlanList, &calcScan) == NULL) {
×
2256
        TAOS_CHECK_EXIT(terrno);
×
2257
      }
2258
    }
2259
  }
2260

2261
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
×
2262
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
×
2263
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
×
2264
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
×
2265

2266
  int32_t forceOutColsSize = 0;
×
2267
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
×
2268
  if (forceOutColsSize > 0) {
×
2269
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
×
2270
    if (pReq->forceOutCols == NULL) {
×
2271
      TAOS_CHECK_EXIT(terrno);
×
2272
    }
2273
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
×
2274
      SStreamOutCol outCol = {0};
×
2275
      int64_t       exprLen = 0;
×
2276
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
×
2277
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
×
2278
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
×
2279
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
×
2280
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
×
2281
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
×
2282
        TAOS_CHECK_EXIT(terrno);
×
2283
      }
2284
    }
2285
  }
2286

2287
  // LeftBytes is the size of all fields at the tail of SStreamObj.
2288
  // If there are more data in the buffer, then it means
2289
  // the new fields are added in SStreamObj, need to decode them.
2290
  if (pDecoder->size - pDecoder->pos > leftBytes) {
×
2291
    switch (pReq->triggerType) {
×
2292
      case WINDOW_TYPE_STATE: {
×
2293
        // state trigger
2294
        if (!tDecodeIsEnd(pDecoder)) {
×
2295
          TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.stateWin.expr, NULL));
×
2296
        }
2297
        if (!tDecodeIsEnd(pDecoder)) {
×
2298
          TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.extend));
×
2299
        }
2300
        break;
×
2301
      }
2302
      case WINDOW_TYPE_INTERVAL: {
×
2303
        if (!tDecodeIsEnd(pDecoder)) {
×
2304
          TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.overlap));
×
2305
        }
2306
        break;
×
2307
      }
2308
      default:
×
2309
        break;
×
2310
    }
2311
  }
2312

2313
  if (pDecoder->size - pDecoder->pos > leftBytes) {
×
2314
    if (!tDecodeIsEnd(pDecoder)) {
×
2315
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pReq->triggerPrec));
×
2316
    }
2317
  }
2318

2319
_exit:
×
2320

2321
  return code;
×
2322
}
2323

2324
// New deserialization using JSON
2325
// start from taosd ver-3.3.8.6, stream version number 8
2326
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
436,185✔
2327
  int32_t code = 0;
436,185✔
2328
  int32_t lino;
2329

2330
  char* json = NULL;
436,185✔
2331
  SJson* pJson = NULL;
436,185✔
2332
  TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &json));
436,185✔
2333
  pJson = tjsonParse(json);
436,185✔
2334
  if (pJson == NULL) {
436,185✔
2335
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INVALID_JSON);
×
2336
  }
2337
  TAOS_CHECK_EXIT(jsonToSCMCreateStreamReq(pJson, pReq));
436,185✔
2338

2339
_exit:
436,185✔
2340
  taosMemoryFreeClear(json);
436,185✔
2341
  if (NULL != pJson) {
436,185✔
2342
    tjsonDelete(pJson);
436,185✔
2343
  }
2344

2345
  return code;
436,185✔
2346
}
2347

2348

2349
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
209,729✔
2350
  SDecoder decoder = {0};
209,729✔
2351
  tDecoderInit(&decoder, buf, bufLen);
209,729✔
2352
  int32_t code = 0;
209,729✔
2353
  int32_t lino;
2354

2355
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
209,729✔
2356
  
2357
  code = tDeserializeSCMCreateStreamReqImpl(&decoder, pReq);
209,729✔
2358
  if (TSDB_CODE_MND_STREAM_INVALID_JSON == code) {
209,729✔
2359
    uError("invalid json for stream create request, try old deserialization");
×
2360
    // try old deserialization for backward compatibility
2361
    tDecoderClear(&decoder);
×
2362
    tDecoderInit(&decoder, buf, bufLen);
×
2363
    TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
2364
    TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImplOld(&decoder, pReq, 0));
×
2365
  }
2366

2367
  tEndDecode(&decoder);
209,729✔
2368

2369
_exit:
209,729✔
2370

2371
  tDecoderClear(&decoder);
209,729✔
2372
  return code;
209,729✔
2373
}
2374

2375

2376
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
45,190✔
2377
  int32_t  code = 0;
45,190✔
2378
  int32_t  lino;
2379
  int32_t  tlen;
2380
  SEncoder encoder = {0};
45,190✔
2381
  tEncoderInit(&encoder, buf, bufLen);
45,190✔
2382

2383
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
45,190✔
2384

2385
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->count));
90,380✔
2386
  for (int32_t i = 0; i < pReq->count; i++) {
96,932✔
2387
    int32_t nameLen = pReq->name[i] == NULL ? 0 : (int32_t)strlen(pReq->name[i]) + 1;
51,742✔
2388
    TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name[i], nameLen));
103,484✔
2389
  }
2390
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
90,380✔
2391

2392
  tEndEncode(&encoder);
45,190✔
2393

2394
_exit:
45,190✔
2395
  if (code) {
45,190✔
2396
    tlen = code;
×
2397
  } else {
2398
    tlen = encoder.pos;
45,190✔
2399
  }
2400
  tEncoderClear(&encoder);
45,190✔
2401
  return tlen;
45,190✔
2402
}
2403

2404
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
16,685✔
2405
  SDecoder decoder = {0};
16,685✔
2406
  int32_t  code = 0;
16,685✔
2407
  int32_t  lino;
2408
  tDecoderInit(&decoder, buf, bufLen);
16,685✔
2409

2410
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
16,685✔
2411
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->count));
33,370✔
2412
  if (pReq->count > 0) {
16,685✔
2413
    pReq->name = taosMemoryCalloc(pReq->count, sizeof(char*));
16,685✔
2414
    if (pReq->name == NULL) {
16,685✔
2415
      code = terrno;
×
2416
      goto _exit;
×
2417
    }
2418
    for (int32_t i = 0; i < pReq->count; i++) {
36,646✔
2419
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name[i], NULL));
39,922✔
2420
    }
2421
  }
2422
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
33,370✔
2423

2424
  tEndDecode(&decoder);
16,685✔
2425

2426
_exit:
16,685✔
2427
  tDecoderClear(&decoder);
16,685✔
2428
  return code;
16,685✔
2429
}
2430

2431
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
39,280✔
2432
  if (NULL == pReq) {
39,280✔
2433
    return;
×
2434
  }
2435
  if (pReq->name) {
39,280✔
2436
    for (int32_t i = 0; i < pReq->count; i++) {
85,112✔
2437
      taosMemoryFreeClear(pReq->name[i]);
45,832✔
2438
    }
2439
    taosMemoryFreeClear(pReq->name);
39,280✔
2440
  }
2441
}
2442

2443
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
1,840,240✔
2444
  if (pScan == NULL) {
1,840,240✔
2445
    return;
×
2446
  }
2447
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
1,840,240✔
2448
  taosArrayDestroy(pCalcScan->vgList);
1,840,240✔
2449
  taosMemoryFreeClear(pCalcScan->scanPlan);
1,840,240✔
2450
}
2451

2452
void tFreeStreamOutCol(void* pCol) {
78,038✔
2453
  if (pCol == NULL) {
78,038✔
2454
    return;
×
2455
  }
2456
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
78,038✔
2457
  taosMemoryFreeClear(pOutCol->expr);
78,038✔
2458
}
2459

2460

2461

2462
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
1,204,012✔
2463
  if (NULL == pReq) {
1,204,012✔
2464
    return;
197,183✔
2465
  }
2466
  taosMemoryFreeClear(pReq->name);
1,006,829✔
2467
  taosMemoryFreeClear(pReq->sql);
1,006,829✔
2468
  taosMemoryFreeClear(pReq->streamDB);
1,006,829✔
2469
  taosMemoryFreeClear(pReq->triggerDB);
1,006,829✔
2470
  taosMemoryFreeClear(pReq->outDB);
1,006,829✔
2471
  taosMemoryFreeClear(pReq->triggerTblName);
1,006,829✔
2472
  taosMemoryFreeClear(pReq->outTblName);
1,006,829✔
2473

2474
  taosArrayDestroyP(pReq->calcDB, NULL);
1,006,829✔
2475
  pReq->calcDB = NULL;
1,006,829✔
2476
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
1,006,829✔
2477
  pReq->pNotifyAddrUrls = NULL;
1,006,829✔
2478

2479
  taosMemoryFreeClear(pReq->triggerFilterCols);
1,006,829✔
2480
  taosMemoryFreeClear(pReq->triggerCols);
1,006,829✔
2481
  taosMemoryFreeClear(pReq->partitionCols);
1,006,829✔
2482

2483
  taosArrayDestroy(pReq->outTags);
1,006,829✔
2484
  pReq->outTags = NULL;
1,006,829✔
2485
  taosArrayDestroy(pReq->outCols);
1,006,829✔
2486
  pReq->outCols = NULL;
1,006,829✔
2487

2488
  switch (pReq->triggerType) {
1,006,829✔
2489
    case WINDOW_TYPE_STATE:
301,864✔
2490
      taosMemoryFreeClear(pReq->trigger.stateWin.zeroth);
301,864✔
2491
      taosMemoryFreeClear(pReq->trigger.stateWin.expr);
301,864✔
2492
      break;
301,864✔
2493
    case WINDOW_TYPE_EVENT:
113,412✔
2494
      taosMemoryFreeClear(pReq->trigger.event.startCond);
113,412✔
2495
      taosMemoryFreeClear(pReq->trigger.event.endCond);
113,412✔
2496
      break;
113,412✔
2497
    default:
591,553✔
2498
      break;
591,553✔
2499
  }
2500

2501
  taosMemoryFreeClear(pReq->triggerScanPlan);
1,006,829✔
2502
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
1,006,829✔
2503
  pReq->calcScanPlanList = NULL;
1,006,829✔
2504
  taosMemoryFreeClear(pReq->triggerPrevFilter);
1,006,829✔
2505

2506
  taosMemoryFreeClear(pReq->calcPlan);
1,006,829✔
2507
  taosMemoryFreeClear(pReq->subTblNameExpr);
1,006,829✔
2508
  taosMemoryFreeClear(pReq->tagValueExpr);
1,006,829✔
2509
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
1,006,829✔
2510
  pReq->forceOutCols = NULL;
1,006,829✔
2511
  taosArrayDestroy(pReq->colCids);
1,006,829✔
2512
  pReq->colCids = NULL;
1,006,829✔
2513
  taosArrayDestroy(pReq->tagCids);
1,006,829✔
2514
  pReq->tagCids = NULL;
1,006,829✔
2515
}
2516

2517
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
182,087✔
2518
  int32_t code = 0, lino = 0;
182,087✔
2519
  if (NULL == pSrc) {
182,087✔
2520
    return code;
×
2521
  } 
2522

2523
  void* p = NULL;
182,087✔
2524
  int32_t num = 0;
182,087✔
2525
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
182,087✔
2526
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
182,087✔
2527

2528
  SCMCreateStreamReq* pDst = *ppDst;
182,087✔
2529

2530
  if (pSrc->outDB) {
182,087✔
2531
    pDst->outDB = COPY_STR(pSrc->outDB);
178,734✔
2532
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
178,734✔
2533
  }
2534
  
2535
  if (pSrc->triggerTblName) {
182,087✔
2536
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
181,065✔
2537
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
181,065✔
2538
  }
2539
  
2540
  if (pSrc->outTblName) {
182,087✔
2541
    pDst->outTblName = COPY_STR(pSrc->outTblName);
178,734✔
2542
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
178,734✔
2543
  }
2544
  
2545
  if (pSrc->pNotifyAddrUrls) {
182,087✔
2546
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
56,418✔
2547
    if (num > 0) {
56,418✔
2548
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
56,418✔
2549
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
56,418✔
2550
    }
2551
    for (int32_t i = 0; i < num; ++i) {
112,836✔
2552
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
56,418✔
2553
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
56,418✔
2554
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
112,836✔
2555
    }
2556
  }
2557
  
2558
  if (pSrc->triggerFilterCols) {
182,087✔
2559
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
17,767✔
2560
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
17,767✔
2561
  }
2562
  
2563
  if (pSrc->triggerCols) {
182,087✔
2564
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
174,790✔
2565
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
174,790✔
2566
  }
2567
  
2568
  if (pSrc->partitionCols) {
182,087✔
2569
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
67,559✔
2570
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
67,559✔
2571
  }
2572
  
2573
  if (pSrc->outCols) {
182,087✔
2574
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
178,734✔
2575
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
178,734✔
2576
  }
2577
  
2578
  if (pSrc->outTags) {
182,087✔
2579
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
67,559✔
2580
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
67,559✔
2581
  }
2582

2583
  pDst->triggerType = pSrc->triggerType;
182,087✔
2584
  
2585
  switch (pSrc->triggerType) {
182,087✔
2586
    case WINDOW_TYPE_STATE:
64,339✔
2587
      pDst->trigger.stateWin.slotId = pSrc->trigger.stateWin.slotId;
64,339✔
2588
      pDst->trigger.stateWin.extend = pSrc->trigger.stateWin.extend;
64,339✔
2589
      pDst->trigger.stateWin.trueForType = pSrc->trigger.stateWin.trueForType;
64,339✔
2590
      pDst->trigger.stateWin.trueForCount = pSrc->trigger.stateWin.trueForCount;
64,339✔
2591
      pDst->trigger.stateWin.trueForDuration = pSrc->trigger.stateWin.trueForDuration;
64,339✔
2592
      if (pSrc->trigger.stateWin.zeroth) {
64,339✔
2593
        pDst->trigger.stateWin.zeroth = COPY_STR(pSrc->trigger.stateWin.zeroth);
×
2594
        TSDB_CHECK_NULL(pDst->trigger.stateWin.zeroth, code, lino, _exit, terrno);
×
2595
      }
2596
      if (pSrc->trigger.stateWin.expr) {
64,339✔
2597
        pDst->trigger.stateWin.expr = COPY_STR(pSrc->trigger.stateWin.expr);
64,339✔
2598
        TSDB_CHECK_NULL(pDst->trigger.stateWin.expr, code, lino, _exit, terrno);
64,339✔
2599
      }
2600
      break;
64,339✔
2601
    case WINDOW_TYPE_EVENT:
24,138✔
2602
      if (pSrc->trigger.event.startCond) {
24,138✔
2603
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
24,138✔
2604
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
24,138✔
2605
      }
2606
      
2607
      if (pSrc->trigger.event.endCond) {
24,138✔
2608
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
22,042✔
2609
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
22,042✔
2610
      }
2611
      pDst->trigger.event.trueForType = pSrc->trigger.event.trueForType;
24,138✔
2612
      pDst->trigger.event.trueForCount = pSrc->trigger.event.trueForCount;
24,138✔
2613
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
24,138✔
2614
      break;
24,138✔
2615
    case WINDOW_TYPE_COUNT:
14,794✔
2616
      pDst->trigger.count.countVal = pSrc->trigger.count.countVal;
14,794✔
2617
      pDst->trigger.count.sliding = pSrc->trigger.count.sliding;
14,794✔
2618
      if (pSrc->trigger.count.condCols) {
14,794✔
2619
        pDst->trigger.count.condCols = COPY_STR(pSrc->trigger.count.condCols);
×
2620
        TSDB_CHECK_NULL(pDst->trigger.count.condCols, code, lino, _exit, terrno);
×
2621
      }
2622
      break;
14,794✔
2623
    default:
78,816✔
2624
      pDst->trigger = pSrc->trigger;
78,816✔
2625
      break;
78,816✔
2626
  }
2627

2628

2629
  if (pSrc->triggerScanPlan) {
182,087✔
2630
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
181,065✔
2631
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
181,065✔
2632
  }
2633
  
2634
  if (pSrc->calcScanPlanList) {
182,087✔
2635
    num = taosArrayGetSize(pSrc->calcScanPlanList);
178,734✔
2636
    if (num > 0) {
178,734✔
2637
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
178,734✔
2638
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
178,734✔
2639
    }
2640
    for (int32_t i = 0; i < num; ++i) {
542,604✔
2641
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
363,870✔
2642
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
363,870✔
2643

2644
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
363,870✔
2645
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
363,870✔
2646

2647
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
363,870✔
2648
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
363,870✔
2649
      
2650
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
727,740✔
2651
    }
2652
  }
2653
  
2654
  if (pSrc->triggerPrevFilter) {
182,087✔
2655
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
7,780✔
2656
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
7,780✔
2657
  }
2658
  
2659
  if (pSrc->calcPlan) {
182,087✔
2660
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
178,734✔
2661
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
178,734✔
2662
  }
2663
  
2664
  if (pSrc->subTblNameExpr) {
182,087✔
2665
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
67,559✔
2666
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
67,559✔
2667
  }
2668
  
2669
  if (pSrc->tagValueExpr) {
182,087✔
2670
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
67,559✔
2671
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
67,559✔
2672
  }
2673
  
2674
  if (pSrc->forceOutCols) {
182,087✔
2675
    num = taosArrayGetSize(pSrc->forceOutCols);
3,401✔
2676
    if (num > 0) {
3,401✔
2677
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
3,401✔
2678
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
3,401✔
2679
    }
2680
    for (int32_t i = 0; i < num; ++i) {
22,573✔
2681
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
19,172✔
2682
      SStreamOutCol  dcol = {.type = scol->type};
19,172✔
2683

2684
      dcol.expr = COPY_STR(scol->expr);
19,172✔
2685
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
19,172✔
2686
      
2687
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
38,344✔
2688
    }
2689
  }
2690

2691
  if (pSrc->colCids) {
182,087✔
2692
    pDst->colCids = taosArrayDup(pSrc->colCids, NULL);
2,952✔
2693
    TSDB_CHECK_NULL(pDst->colCids, code, lino, _exit, terrno);
2,952✔
2694
  }
2695

2696
  if (pSrc->tagCids) {
182,087✔
2697
    pDst->tagCids = taosArrayDup(pSrc->tagCids, NULL);
2,255✔
2698
    TSDB_CHECK_NULL(pDst->tagCids, code, lino, _exit, terrno);
2,255✔
2699
  }
2700

2701
  pDst->triggerTblUid = pSrc->triggerTblUid;
182,087✔
2702
  pDst->triggerTblType = pSrc->triggerTblType;
182,087✔
2703
  pDst->triggerPrec = pSrc->triggerPrec;
182,087✔
2704
  pDst->deleteReCalc = pSrc->deleteReCalc;
182,087✔
2705
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
182,087✔
2706
  pDst->flags = pSrc->flags;
182,087✔
2707
  
2708
_exit:
182,087✔
2709

2710
  if (code) {
182,087✔
2711
    tFreeSCMCreateStreamReq(pDst);
×
2712
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2713
  }
2714

2715
  return code;
182,087✔
2716
}
2717

2718

2719
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
6,142✔
2720
  int32_t  code = 0;
6,142✔
2721
  int32_t  lino;
2722
  int32_t  tlen;
2723
  SEncoder encoder = {0};
6,142✔
2724
  tEncoderInit(&encoder, buf, bufLen);
6,142✔
2725
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
6,142✔
2726

2727
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
6,142✔
2728
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
12,284✔
2729
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
12,284✔
2730
  tEndEncode(&encoder);
6,142✔
2731

2732
_exit:
6,142✔
2733
  if (code) {
6,142✔
2734
    tlen = code;
×
2735
  } else {
2736
    tlen = encoder.pos;
6,142✔
2737
  }
2738
  tEncoderClear(&encoder);
6,142✔
2739
  return tlen;
6,142✔
2740
}
2741

2742
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
2,884✔
2743
  SDecoder decoder = {0};
2,884✔
2744
  int32_t  code = 0;
2,884✔
2745
  int32_t  lino;
2746

2747
  tDecoderInit(&decoder, buf, bufLen);
2,884✔
2748
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,884✔
2749
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
5,768✔
2750
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
5,768✔
2751
  tEndDecode(&decoder);
2,884✔
2752

2753
_exit:
2,884✔
2754
  tDecoderClear(&decoder);
2,884✔
2755
  return code;
2,884✔
2756
}
2757

2758
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
3,071✔
2759
  taosMemoryFreeClear(pReq->name);
3,071✔
2760
}
3,071✔
2761

2762
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
5,402✔
2763
  SEncoder encoder = {0};
5,402✔
2764
  int32_t  code = 0;
5,402✔
2765
  int32_t  lino;
2766
  int32_t  tlen;
2767
  tEncoderInit(&encoder, buf, bufLen);
5,402✔
2768
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
5,402✔
2769
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
5,402✔
2770
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
10,804✔
2771
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
10,804✔
2772
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
10,804✔
2773
  tEndEncode(&encoder);
5,402✔
2774

2775
_exit:
5,402✔
2776
  if (code) {
5,402✔
2777
    tlen = code;
×
2778
  } else {
2779
    tlen = encoder.pos;
5,402✔
2780
  }
2781
  tEncoderClear(&encoder);
5,402✔
2782
  return tlen;
5,402✔
2783
}
2784

2785
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
2,514✔
2786
  SDecoder decoder = {0};
2,514✔
2787
  int32_t  code = 0;
2,514✔
2788
  int32_t  lino;
2789

2790
  tDecoderInit(&decoder, buf, bufLen);
2,514✔
2791
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,514✔
2792
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
5,028✔
2793
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
5,028✔
2794
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
5,028✔
2795
  tEndDecode(&decoder);
2,514✔
2796

2797
_exit:
2,514✔
2798
  tDecoderClear(&decoder);
2,514✔
2799
  return code;
2,514✔
2800
}
2801

2802
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
2,701✔
2803
  taosMemoryFreeClear(pReq->name);
2,701✔
2804
}
2,701✔
2805

2806
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
24,524✔
2807
  SEncoder encoder = {0};
24,524✔
2808
  int32_t  code = 0;
24,524✔
2809
  int32_t  lino;
2810
  int32_t  tlen;
2811
  tEncoderInit(&encoder, buf, bufLen);
24,524✔
2812
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
24,524✔
2813
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
24,524✔
2814
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
49,048✔
2815
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
49,048✔
2816
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
49,048✔
2817
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
49,048✔
2818
  tEndEncode(&encoder);
24,524✔
2819

2820
_exit:
24,524✔
2821
  if (code) {
24,524✔
2822
    tlen = code;
×
2823
  } else {
2824
    tlen = encoder.pos;
24,524✔
2825
  }
2826
  tEncoderClear(&encoder);
24,524✔
2827
  return tlen;
24,524✔
2828
}
2829

2830
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
12,262✔
2831
  SDecoder decoder = {0};
12,262✔
2832
  int32_t  code = 0;
12,262✔
2833
  int32_t  lino;
2834

2835
  tDecoderInit(&decoder, buf, bufLen);
12,262✔
2836
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
12,262✔
2837

2838
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
24,524✔
2839
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
24,524✔
2840
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
24,524✔
2841
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
24,524✔
2842
  tEndDecode(&decoder);
12,262✔
2843

2844
_exit:
12,262✔
2845
  tDecoderClear(&decoder);
12,262✔
2846
  return code;
12,262✔
2847
}
2848

2849
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
24,524✔
2850
  taosMemoryFreeClear(pReq->name);
24,524✔
2851
}
24,524✔
2852

2853
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
13,160✔
2854
  int32_t code = 0;
13,160✔
2855
  int32_t lino;
2856

2857
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
26,320✔
2858
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
26,320✔
2859
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
26,320✔
2860

2861
_exit:
13,160✔
2862
  return code;
13,160✔
2863
}
2864

2865
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
13,160✔
2866
  SEncoder encoder = {0};
13,160✔
2867
  int32_t  code = 0;
13,160✔
2868
  int32_t  lino;
2869
  int32_t  tlen;
2870
  tEncoderInit(&encoder, buf, bufLen);
13,160✔
2871

2872
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
13,160✔
2873
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
13,160✔
2874

2875
  tEndEncode(&encoder);
13,160✔
2876

2877
_exit:
13,160✔
2878
  if (code) {
13,160✔
2879
    tlen = code;
×
2880
  } else {
2881
    tlen = encoder.pos;
13,160✔
2882
  }
2883
  tEncoderClear(&encoder);
13,160✔
2884
  return tlen;
13,160✔
2885
}
2886

2887
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
10,686✔
2888
  int32_t code = 0;
10,686✔
2889
  int32_t lino;
2890

2891
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
21,372✔
2892
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
21,372✔
2893
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
21,372✔
2894

2895
_exit:
10,686✔
2896
  return code;
10,686✔
2897
}
2898

2899
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
10,686✔
2900
  SDecoder decoder = {0};
10,686✔
2901
  int32_t  code = 0;
10,686✔
2902
  int32_t  lino;
2903

2904
  tDecoderInit(&decoder, (char *)buf, bufLen);
10,686✔
2905

2906
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
10,686✔
2907
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
10,686✔
2908

2909
  tEndDecode(&decoder);
10,686✔
2910

2911
_exit:
10,686✔
2912
  tDecoderClear(&decoder);
10,686✔
2913
  return code;
10,686✔
2914
}
2915

2916
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
12,878✔
2917
  int32_t code = 0;
12,878✔
2918
  int32_t lino;
2919

2920
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
25,756✔
2921
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
25,756✔
2922
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
25,756✔
2923
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
25,756✔
2924

2925
_exit:
12,878✔
2926
  return code;
12,878✔
2927
}
2928

2929
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
12,878✔
2930
  SEncoder encoder = {0};
12,878✔
2931
  int32_t  code = 0;
12,878✔
2932
  int32_t  lino;
2933
  int32_t  tlen;
2934
  tEncoderInit(&encoder, buf, bufLen);
12,878✔
2935

2936
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
12,878✔
2937
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
12,878✔
2938

2939
  tEndEncode(&encoder);
12,878✔
2940

2941
_exit:
12,878✔
2942
  if (code) {
12,878✔
2943
    tlen = code;
×
2944
  } else {
2945
    tlen = encoder.pos;
12,878✔
2946
  }
2947
  tEncoderClear(&encoder);
12,878✔
2948
  return tlen;
12,878✔
2949
}
2950

2951
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
6,580✔
2952
  int32_t code = 0;
6,580✔
2953
  int32_t lino;
2954

2955
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
13,160✔
2956
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
13,160✔
2957
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
13,160✔
2958
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
13,160✔
2959

2960
_exit:
6,580✔
2961
  return code;
6,580✔
2962
}
2963

2964
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
6,580✔
2965
  SDecoder decoder = {0};
6,580✔
2966
  int32_t  code = 0;
6,580✔
2967
  int32_t  lino;
2968

2969
  tDecoderInit(&decoder, buf, bufLen);
6,580✔
2970

2971
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
6,580✔
2972
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
6,580✔
2973

2974
  tEndDecode(&decoder);
6,580✔
2975

2976
_exit:
6,580✔
2977
  tDecoderClear(&decoder);
6,580✔
2978
  return code;
6,580✔
2979
}
2980

2981
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
268,692✔
2982
  SEncoder encoder = {0};
268,692✔
2983
  int32_t  code = TSDB_CODE_SUCCESS;
268,692✔
2984
  int32_t  lino = 0;
268,692✔
2985
  int32_t  tlen = 0;
268,692✔
2986

2987
  tEncoderInit(&encoder, buf, bufLen);
268,692✔
2988
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
268,692✔
2989

2990
  int32_t size = taosArrayGetSize(pRsp->cols);
268,692✔
2991
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
268,692✔
2992
  for (int32_t i = 0; i < size; ++i) {
996,324✔
2993
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
727,632✔
2994
    if (oInfo == NULL) {
727,632✔
2995
      uError("col id is NULL at index %d", i);
×
2996
      code = TSDB_CODE_INVALID_PARA;
×
2997
      goto _exit;
×
2998
    }
2999
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
1,455,264✔
3000
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
1,455,023✔
3001
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
1,455,023✔
3002
  }
3003

3004
  tEndEncode(&encoder);
268,692✔
3005

3006
_exit:
268,692✔
3007
  if (code != TSDB_CODE_SUCCESS) {
268,692✔
3008
    tlen = code;
×
3009
  } else {
3010
    tlen = encoder.pos;
268,692✔
3011
  }
3012
  tEncoderClear(&encoder);
268,692✔
3013
  return tlen;
268,692✔
3014
}
3015

3016
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
134,346✔
3017
  SDecoder decoder = {0};
134,346✔
3018
  int32_t  code = TSDB_CODE_SUCCESS;
134,346✔
3019
  int32_t  lino = 0;
134,346✔
3020

3021
  tDecoderInit(&decoder, buf, bufLen);
134,346✔
3022
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
134,346✔
3023

3024
  int32_t size = 0;
134,346✔
3025
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
134,346✔
3026
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
134,346✔
3027
  if (pRsp->cols == NULL) {
134,346✔
3028
    code = terrno;
×
3029
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3030
    goto _exit;
×
3031
  }
3032
  for (int32_t i = 0; i < size; ++i) {
498,162✔
3033
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
363,816✔
3034
    if (oInfo == NULL) {
363,816✔
3035
      code = terrno;
×
3036
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3037
      goto _exit;
×
3038
    }
3039
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
727,632✔
3040
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
727,632✔
3041
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
727,632✔
3042
  }
3043

3044
  tEndDecode(&decoder);
134,346✔
3045

3046
_exit:
134,346✔
3047
  tDecoderClear(&decoder);
134,346✔
3048
  return code;
134,346✔
3049
}
3050

3051
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
26,605,261✔
3052
  taosArrayDestroy(pRsp->cols);
26,605,261✔
3053
}
26,605,261✔
3054

3055
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
35,819,088✔
3056
  if (pReq == NULL) return;
35,819,088✔
3057
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA_NEW || pReq->base.type == STRIGGER_PULL_WAL_CALC_DATA_NEW) {
45,280,790✔
3058
    SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
9,462,467✔
3059
    taosArrayDestroy(pRequest->versions);
9,462,467✔
3060
    tSimpleHashCleanup(pRequest->ranges);
9,461,945✔
3061
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
26,358,056✔
3062
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
550,540✔
3063
    if (pRequest->cids != NULL) {
550,540✔
3064
      taosArrayDestroy(pRequest->cids);
550,540✔
3065
      pRequest->cids = NULL;
550,540✔
3066
    }
3067
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
25,806,809✔
3068
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
75,395✔
3069
    if (pRequest->cids != NULL) {
75,395✔
3070
      taosArrayDestroy(pRequest->cids);
75,395✔
3071
      pRequest->cids = NULL;
75,395✔
3072
    }
3073
    if (pRequest->uids != NULL) {
75,395✔
3074
      taosArrayDestroy(pRequest->uids);
×
3075
      pRequest->uids = NULL;
×
3076
    }
3077
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
25,730,244✔
3078
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
1,055,260✔
3079
    if (pRequest->cids != NULL) {
1,055,260✔
3080
      taosArrayDestroy(pRequest->cids);
1,055,260✔
3081
      pRequest->cids = NULL;
1,055,260✔
3082
    }
3083
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
24,674,037✔
3084
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
134,346✔
3085
    if (pRequest->cols != NULL) {
134,346✔
3086
      taosArrayDestroy(pRequest->cols);
134,346✔
3087
      pRequest->cols = NULL;
134,346✔
3088
    }
3089
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
24,539,002✔
3090
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
134,346✔
3091
    tSimpleHashCleanup(pRequest->uidInfoTrigger);
134,346✔
3092
    tSimpleHashCleanup(pRequest->uidInfoCalc);
134,346✔
3093
  }
3094
}
3095

3096
int32_t encodePlainArray(SEncoder *encoder, SArray *pArr) {
3,519,672✔
3097
  int32_t  code = TSDB_CODE_SUCCESS;
3,519,672✔
3098
  int32_t  lino = 0;
3,519,672✔
3099
  int32_t  nEle = taosArrayGetSize(pArr);
3,519,672✔
3100
  uint8_t* buf = (nEle > 0) ? TARRAY_DATA(pArr) : NULL;
3,519,672✔
3101
  int32_t  len = (nEle > 0) ? (nEle * pArr->elemSize) : 0;
3,519,672✔
3102
  TAOS_CHECK_EXIT(tEncodeBinary(encoder, buf, len));
7,039,344✔
3103

3104
_exit:
3,519,672✔
3105
  return code;
3,519,672✔
3106
}
3107

3108
int32_t decodePlainArray(SDecoder* decoder, SArray** ppArr, uint32_t elemSize) {
1,756,590✔
3109
  int32_t  code = TSDB_CODE_SUCCESS;
1,756,590✔
3110
  int32_t  lino = 0;
1,756,590✔
3111
  void*    buf = NULL;
1,756,590✔
3112
  uint64_t len = 0;
1,756,590✔
3113
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(decoder, &buf, &len));
1,756,375✔
3114

3115
  if (len > 0) {
1,756,375✔
3116
    *ppArr = taosArrayInit(0, elemSize);
1,680,980✔
3117
    TSDB_CHECK_NULL(*ppArr, code, lino, _exit, terrno);
1,681,195✔
3118
    TSWAP((*ppArr)->pData, buf);
1,681,195✔
3119
    (*ppArr)->size = (*ppArr)->capacity = len / elemSize;
1,681,195✔
3120
  }
3121

3122
_exit:
1,756,590✔
3123
  if (buf != NULL) {
1,756,590✔
3124
    taosMemoryFree(buf);
1,681,195✔
3125
  }
3126
  return code;
1,756,590✔
3127
}
3128

3129
static int32_t encodeSetTableMapInfo(SEncoder* encoder, SSHashObj* pInfo) {
537,384✔
3130
  int32_t  code = TSDB_CODE_SUCCESS;
537,384✔
3131
  int32_t  lino = 0;
537,384✔
3132
  int32_t size = tSimpleHashGetSize(pInfo);
537,384✔
3133
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
537,384✔
3134
  int32_t iter = 0;
537,384✔
3135
  void*   px = tSimpleHashIterate(pInfo, NULL, &iter);
537,384✔
3136
  while (px != NULL) {
1,211,074✔
3137
    int64_t* uid = tSimpleHashGetKey(px, NULL);
673,690✔
3138
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *uid));
1,347,380✔
3139
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *(uid + 1)));
1,347,380✔
3140
    SSHashObj* info = *(SSHashObj**)px;
673,690✔
3141
    int32_t len = tSimpleHashGetSize(info);
673,690✔
3142
    TAOS_CHECK_EXIT(tEncodeI32(encoder, len));
673,690✔
3143
    int32_t iter1 = 0;
673,690✔
3144
    void*   px1 = tSimpleHashIterate(info, NULL, &iter1);
673,690✔
3145
    while (px1 != NULL) {
2,542,078✔
3146
      int16_t* slot = tSimpleHashGetKey(px1, NULL);
1,868,631✔
3147
      int16_t* cid = (int16_t*)px1;
1,868,631✔
3148
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *slot));
3,737,505✔
3149
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *cid));
3,737,748✔
3150

3151
      px1 = tSimpleHashIterate(info, px1, &iter1);
1,868,874✔
3152
    }
3153

3154
    px = tSimpleHashIterate(pInfo, px, &iter);
673,204✔
3155
  }
3156
  
3157
_exit:
537,384✔
3158
  return code;
537,384✔
3159
}
3160

3161
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
72,002,074✔
3162
  SEncoder encoder = {0};
72,002,074✔
3163
  int32_t  code = TSDB_CODE_SUCCESS;
72,002,317✔
3164
  int32_t  lino = 0;
72,002,317✔
3165
  int32_t  tlen = 0;
72,002,317✔
3166

3167
  tEncoderInit(&encoder, buf, bufLen);
72,002,317✔
3168
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
71,998,008✔
3169

3170
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
144,000,270✔
3171
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
143,996,859✔
3172
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
143,995,391✔
3173
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
143,998,625✔
3174

3175
  switch (pReq->type) {
72,001,123✔
3176
    case STRIGGER_PULL_SET_TABLE: {
268,692✔
3177
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
268,692✔
3178
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoTrigger));
268,692✔
3179
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoCalc));
268,692✔
3180
      break;
268,692✔
3181
    }
3182
    case STRIGGER_PULL_LAST_TS: {
567,086✔
3183
      break;
567,086✔
3184
    }
3185
    case STRIGGER_PULL_FIRST_TS: {
515,112✔
3186
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
515,112✔
3187
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
1,030,225✔
3188
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
1,030,466✔
3189
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,030,706✔
3190
      break;
515,353✔
3191
    }
3192
    case STRIGGER_PULL_TSDB_META: {
1,387,952✔
3193
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
1,387,952✔
3194
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
2,775,904✔
3195
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
2,775,904✔
3196
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,775,904✔
3197
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
2,775,904✔
3198
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,775,904✔
3199
      break;
1,387,952✔
3200
    }
3201
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3202
      break;
×
3203
    }
3204
    case STRIGGER_PULL_TSDB_TS_DATA: {
207,264✔
3205
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
207,264✔
3206
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
414,528✔
3207
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
414,528✔
3208
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
414,528✔
3209
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
414,528✔
3210
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
414,528✔
3211
      break;
207,264✔
3212
    }
3213
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
117,610✔
3214
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
117,610✔
3215
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
235,220✔
3216
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
235,220✔
3217
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
235,220✔
3218
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
235,220✔
3219
      break;
117,610✔
3220
    }
3221
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
117,112✔
3222
      break;
117,112✔
3223
    }
3224
    case STRIGGER_PULL_TSDB_CALC_DATA: {
13,572,798✔
3225
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
13,572,798✔
3226
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
27,145,596✔
3227
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
27,145,596✔
3228
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
27,145,596✔
3229
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
27,145,596✔
3230
      break;
13,572,798✔
3231
    }
3232
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3233
      break;
×
3234
    }
3235
    case STRIGGER_PULL_TSDB_DATA: {
1,104,192✔
3236
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
1,104,192✔
3237
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
2,208,384✔
3238
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
2,208,384✔
3239
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
2,208,384✔
3240
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
2,208,384✔
3241
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
1,104,192✔
3242
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
2,208,384✔
3243
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,208,384✔
3244
      break;
1,104,192✔
3245
    }
3246
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3247
      break;
×
3248
    }
3249
    case STRIGGER_PULL_WAL_META_NEW: {
25,025,703✔
3250
      SSTriggerWalMetaNewRequest* pRequest = (SSTriggerWalMetaNewRequest*)pReq;
25,025,703✔
3251
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
50,050,385✔
3252
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
50,051,592✔
3253
      break;
25,026,910✔
3254
    }
3255
    case STRIGGER_PULL_WAL_DATA_NEW:
18,925,067✔
3256
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3257
      SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
18,925,067✔
3258
      int32_t                     nVersion = taosArrayGetSize(pRequest->versions);
18,925,067✔
3259
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nVersion));
18,924,129✔
3260
      for (int32_t i = 0; i < nVersion; i++) {
36,555,697✔
3261
        int64_t ver = *(int64_t*)TARRAY_GET_ELEM(pRequest->versions, i);
17,631,086✔
3262
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, ver));
17,631,568✔
3263
      }
3264
      int32_t nRanges = tSimpleHashGetSize(pRequest->ranges);
18,924,611✔
3265
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nRanges));
18,925,551✔
3266
      int32_t iter = 0;
18,925,551✔
3267
      void*   px = tSimpleHashIterate(pRequest->ranges, NULL, &iter);
18,925,551✔
3268
      while (px != NULL) {
26,090,101✔
3269
        uint64_t* gid = tSimpleHashGetKey(px, NULL);
7,165,277✔
3270
        TAOS_CHECK_EXIT(tEncodeU64(&encoder, *gid));
14,330,072✔
3271
        int64_t* key = (int64_t*)px;
7,164,795✔
3272
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[0]));
14,330,072✔
3273
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[1]));
14,330,072✔
3274

3275
        px = tSimpleHashIterate(pRequest->ranges, px, &iter);
7,164,795✔
3276
      }
3277
      break;
18,924,824✔
3278
    }
3279
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
7,107,739✔
3280
      SSTriggerWalMetaDataNewRequest* pRequest = (SSTriggerWalMetaDataNewRequest*)pReq;
7,107,739✔
3281
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
14,216,100✔
3282
      break;
7,108,361✔
3283
    }
3284
    case STRIGGER_PULL_GROUP_COL_VALUE: {
551,332✔
3285
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
551,332✔
3286
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
1,102,664✔
3287
      break;
551,332✔
3288
    }
3289
    case STRIGGER_PULL_VTABLE_INFO: {
151,702✔
3290
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
151,702✔
3291
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
151,702✔
3292
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->uids));
151,702✔
3293
      TAOS_CHECK_EXIT(tEncodeBool(&encoder, pRequest->fetchAllTable));
151,702✔
3294
      break;
151,702✔
3295
    }
3296
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
2,112,076✔
3297
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
2,112,076✔
3298
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
4,224,152✔
3299
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
2,112,076✔
3300
      break;
2,112,076✔
3301
    }
3302
    case STRIGGER_PULL_OTABLE_INFO: {
268,692✔
3303
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
268,692✔
3304
      int32_t size = taosArrayGetSize(pRequest->cols);
268,692✔
3305
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
268,692✔
3306
      for (int32_t i = 0; i < size; ++i) {
996,324✔
3307
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
727,632✔
3308
        if (oInfo == NULL) {
727,632✔
3309
          uError("col id is NULL at index %d", i);
×
3310
          code = TSDB_CODE_INVALID_PARA;
×
3311
          goto _exit;
×
3312
        }
3313
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
1,455,264✔
3314
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
1,455,264✔
3315
      }
3316
      break; 
268,692✔
3317
    }
UNCOV
3318
    default: {
×
UNCOV
3319
      uError("unknown pull type %d", pReq->type);
×
3320
      code = TSDB_CODE_INVALID_PARA;
×
3321
      break;
×
3322
    }
3323
  }
3324

3325
  tEndEncode(&encoder);
72,001,956✔
3326

3327
_exit:
71,994,135✔
3328
  if (code != TSDB_CODE_SUCCESS) {
71,995,180✔
3329
    tlen = code;
×
3330
  } else {
3331
    tlen = encoder.pos;
71,995,180✔
3332
  }
3333
  tEncoderClear(&encoder);
71,995,180✔
3334
  return tlen;
71,996,108✔
3335
}
3336

3337
static void destroyHash(void* data){
336,845✔
3338
  if (data){
336,845✔
3339
    SSHashObj* tmp = *(SSHashObj**)data;
336,845✔
3340
    tSimpleHashCleanup(tmp);
336,845✔
3341
  }
3342
}
336,845✔
3343

3344
static int32_t decodeSetTableMapInfo(SDecoder* decoder, SSHashObj** ppInfo) {
268,692✔
3345
  int32_t  code = TSDB_CODE_SUCCESS;
268,692✔
3346
  int32_t  lino = 0;
268,692✔
3347
  int32_t size = 0;
268,692✔
3348
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
268,692✔
3349
  *ppInfo = tSimpleHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
268,692✔
3350
  if (*ppInfo == NULL) {
268,692✔
3351
    TAOS_CHECK_EXIT(terrno);
×
3352
  }
3353
  tSimpleHashSetFreeFp(*ppInfo, destroyHash);
268,692✔
3354
  
3355
  for (int32_t i = 0; i < size; ++i) {
605,537✔
3356
    int64_t id[2] = {0};
336,845✔
3357
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id));
336,845✔
3358
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id+1));
673,690✔
3359
    int32_t len = 0;
336,845✔
3360
    TAOS_CHECK_EXIT(tDecodeI32(decoder, &len));
336,845✔
3361
    SSHashObj* tmp = tSimpleHashInit(len, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
336,845✔
3362
    if (tmp == NULL) {
336,845✔
3363
      TAOS_CHECK_EXIT(terrno);
×
3364
    }
3365
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppInfo, id, sizeof(id), &tmp, POINTER_BYTES));
336,845✔
3366

3367
    for (int32_t j = 0; j < len; ++j) {
1,271,282✔
3368
      int16_t slotId = 0;
934,437✔
3369
      int16_t cid = 0;
934,437✔
3370
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &slotId));
934,437✔
3371
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &cid));
934,437✔
3372
      TAOS_CHECK_EXIT(tSimpleHashPut(tmp, &slotId, sizeof(slotId), &cid, sizeof(cid)));
934,437✔
3373
    }
3374
  }
3375
_exit:
268,692✔
3376
  if (code != TSDB_CODE_SUCCESS) {
268,692✔
3377
    tSimpleHashCleanup(*ppInfo);
×
3378
    *ppInfo = NULL;
×
3379
  }
3380
  return code;
268,692✔
3381
}
3382

3383
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
35,818,291✔
3384
  SDecoder decoder = {0};
35,818,291✔
3385
  int32_t  code = TSDB_CODE_SUCCESS;
35,818,756✔
3386
  int32_t  lino = 0;
35,818,756✔
3387

3388
  tDecoderInit(&decoder, buf, bufLen);
35,818,756✔
3389
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
35,818,256✔
3390

3391
  int32_t type = 0;
35,822,595✔
3392
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
35,821,575✔
3393
  SSTriggerPullRequest* pBase = &(pReq->base);
35,821,575✔
3394
  pBase->type = type;
35,818,704✔
3395
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
71,641,715✔
3396
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
71,642,736✔
3397
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
71,642,829✔
3398

3399
  switch (type) {
35,819,749✔
3400
    case STRIGGER_PULL_SET_TABLE: {
134,346✔
3401
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
134,346✔
3402
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoTrigger));
134,346✔
3403
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoCalc));
134,346✔
3404
      break;
134,346✔
3405
    }
3406
    case STRIGGER_PULL_LAST_TS: {
283,455✔
3407
      break;
283,455✔
3408
    }
3409
    case STRIGGER_PULL_FIRST_TS: {
254,710✔
3410
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
254,710✔
3411
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
509,663✔
3412
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
509,663✔
3413
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
509,663✔
3414
      break;
254,953✔
3415
    }
3416
    case STRIGGER_PULL_TSDB_META: {
693,902✔
3417
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
693,902✔
3418
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
1,387,804✔
3419
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
1,387,804✔
3420
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,387,804✔
3421
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,387,804✔
3422
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,387,804✔
3423
      break;
693,902✔
3424
    }
3425
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3426
      break;
×
3427
    }
3428
    case STRIGGER_PULL_TSDB_TS_DATA: {
103,632✔
3429
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
103,632✔
3430
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
207,264✔
3431
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
207,264✔
3432
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
207,264✔
3433
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
207,264✔
3434
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
207,264✔
3435
      break;
103,632✔
3436
    }
3437
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
58,805✔
3438
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
58,805✔
3439
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
117,610✔
3440
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
117,610✔
3441
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
117,610✔
3442
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
117,610✔
3443
      break;
58,805✔
3444
    }
3445
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
58,556✔
3446
      break;
58,556✔
3447
    }
3448
    case STRIGGER_PULL_TSDB_CALC_DATA: {
6,784,841✔
3449
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
6,784,841✔
3450
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
13,570,130✔
3451
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
13,570,578✔
3452
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
13,570,578✔
3453
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
13,570,578✔
3454
      break;
6,785,289✔
3455
    }
3456
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3457
      break;
×
3458
    }
3459
    case STRIGGER_PULL_TSDB_DATA: {
550,540✔
3460
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
550,540✔
3461
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
1,101,080✔
3462
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
1,101,080✔
3463
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
1,101,080✔
3464
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
1,101,080✔
3465
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
550,540✔
3466
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,101,080✔
3467
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,101,080✔
3468
      break;
550,540✔
3469
    }
3470
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3471
      break;
×
3472
    }
3473
    case STRIGGER_PULL_WAL_META_NEW: {
12,412,841✔
3474
      SSTriggerWalMetaNewRequest* pRequest = &(pReq->walMetaNewReq);
12,412,841✔
3475
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
24,828,535✔
3476
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
24,830,159✔
3477
      break;
12,415,185✔
3478
    }
3479
    case STRIGGER_PULL_WAL_DATA_NEW:
9,462,192✔
3480
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3481
      SSTriggerWalDataNewRequest* pRequest = &(pReq->walDataNewReq);
9,462,192✔
3482
      int32_t                     nVersion = 0;
9,461,951✔
3483
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nVersion));
9,462,915✔
3484
      pRequest->versions = taosArrayInit_s(sizeof(int64_t), nVersion);
9,462,915✔
3485
      for (int32_t i = 0; i < nVersion; i++) {
18,279,701✔
3486
        int64_t* pVer = TARRAY_GET_ELEM(pRequest->versions, i);
8,817,268✔
3487
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, pVer));
8,817,268✔
3488
      }
3489
      int32_t nRanges = 0;
9,462,433✔
3490
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges));
9,462,137✔
3491
      pRequest->ranges = tSimpleHashInit(nRanges, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
9,462,137✔
3492
      if (pRequest->ranges == NULL) {
9,461,414✔
3493
        TAOS_CHECK_EXIT(terrno);
×
3494
      }
3495
      for (int32_t i = 0; i < nRanges; i++) {
13,044,488✔
3496
        uint64_t gid = 0;
3,582,537✔
3497
        int64_t pRange[2] = {0};
3,582,537✔
3498
        TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
3,582,537✔
3499
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[0]));
3,582,537✔
3500
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[1]));
3,582,537✔
3501
        TAOS_CHECK_EXIT(tSimpleHashPut(pRequest->ranges, &gid, sizeof(gid), pRange, sizeof(pRange)));
3,582,537✔
3502
      }
3503
      break;
9,461,951✔
3504
    }
3505
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
3,479,815✔
3506
      SSTriggerWalMetaDataNewRequest* pRequest = &(pReq->walMetaDataNewReq);
3,479,815✔
3507
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
6,961,836✔
3508
      break;
3,481,158✔
3509
    }
3510
    case STRIGGER_PULL_GROUP_COL_VALUE: {
275,447✔
3511
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
275,447✔
3512
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
550,894✔
3513
      break;
275,447✔
3514
    }
3515
    case STRIGGER_PULL_VTABLE_INFO: {
75,395✔
3516
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
75,395✔
3517
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
75,395✔
3518
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->uids, sizeof(int64_t)));
75,395✔
3519
      TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pRequest->fetchAllTable));
75,395✔
3520
      break;
75,395✔
3521
    }
3522
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
1,055,260✔
3523
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
1,055,260✔
3524
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
2,110,520✔
3525
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
1,055,260✔
3526
      break;
1,055,260✔
3527
    }
3528
    case STRIGGER_PULL_OTABLE_INFO: {
134,346✔
3529
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
134,346✔
3530
      int32_t size = 0;
134,346✔
3531
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
134,346✔
3532
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
134,346✔
3533
      if (pRequest->cols == NULL) {
134,346✔
3534
        code = terrno;
×
3535
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3536
        goto _exit;
×
3537
      }
3538
      for (int32_t i = 0; i < size; ++i) {
498,162✔
3539
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
363,816✔
3540
        if (oInfo == NULL) {
363,816✔
3541
          code = terrno;
×
3542
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3543
          goto _exit;
×
3544
        }
3545
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
363,816✔
3546
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
363,816✔
3547
      }
3548
      break;
134,346✔
3549
    }
3550
    default: {
1,666✔
3551
      uError("unknown pull type %d", type);
1,666✔
3552
      code = TSDB_CODE_INVALID_PARA;
×
3553
      break;
×
3554
    }
3555
  }
3556

3557
  tEndDecode(&decoder);
35,822,702✔
3558

3559
_exit:
35,818,651✔
3560
  tDecoderClear(&decoder);
35,820,080✔
3561
  return code;
35,819,685✔
3562
}
3563

3564
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo, bool full) {
27,119,741✔
3565
  int32_t size = full ? taosArrayGetSize(pParams) : 0;
27,119,741✔
3566
  int32_t code = 0;
27,119,976✔
3567
  int32_t lino = 0;
27,119,976✔
3568
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
27,118,513✔
3569
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3570
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
2,147,483,647✔
3571
    if (param == NULL) {
2,147,483,647✔
3572
      TAOS_CHECK_EXIT(terrno);
×
3573
    }
3574
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
2,147,483,647✔
3575
    if (pEncoder->data) {
2,147,483,647✔
3576
      TAOS_MEMCPY(pEncoder->data + pEncoder->pos, param, plainFieldSize);
2,147,483,647✔
3577
    }
3578
    pEncoder->pos += plainFieldSize;
2,147,483,647✔
3579

3580
    if (!ignoreNotificationInfo) {
2,147,483,647✔
3581
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
462,289,507✔
3582
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
231,144,633✔
3583
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
413,484,852✔
3584
    }
3585
  }
3586
_exit:
12,003,764✔
3587
  return code;
12,003,764✔
3588
}
3589

3590
void tDestroySSTriggerCalcParam(void* ptr) {
2,147,483,647✔
3591
  SSTriggerCalcParam* pParam = ptr;
2,147,483,647✔
3592
  if (pParam && pParam->extraNotifyContent != NULL) {
2,147,483,647✔
3593
    taosMemoryFreeClear(pParam->extraNotifyContent);
268,247✔
3594
  }
3595
  if (pParam && pParam->resultNotifyContent != NULL) {
2,147,483,647✔
3596
    taosMemoryFreeClear(pParam->resultNotifyContent);
×
3597
  }
3598
}
2,147,483,647✔
3599

3600
void tDestroySStreamGroupValue(void* ptr) {
17,448,168✔
3601
  SStreamGroupValue* pValue = ptr;
17,448,168✔
3602
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
17,448,168✔
3603
    taosMemoryFreeClear(pValue->data.pData);
13,666,540✔
3604
    pValue->data.nData = 0;
13,666,278✔
3605
  }
3606
}
17,447,790✔
3607

3608
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
13,553,458✔
3609
  int32_t size = 0, code = 0, lino = 0;
13,553,458✔
3610
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
13,554,188✔
3611
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
13,554,188✔
3612
  if (*ppParams == NULL) {
13,554,261✔
3613
    TAOS_CHECK_EXIT(terrno);
×
3614
  }
3615
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3616
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
2,147,483,647✔
3617
    if (param == NULL) {
2,147,483,647✔
3618
      TAOS_CHECK_EXIT(terrno);
×
3619
    }
3620
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
2,147,483,647✔
3621
    TAOS_MEMCPY(param, pDecoder->data + pDecoder->pos, plainFieldSize);
2,147,483,647✔
3622
    pDecoder->pos += plainFieldSize;
2,147,483,647✔
3623

3624
    if (!ignoreNotificationInfo) {
2,147,483,647✔
3625
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
231,143,318✔
3626
      uint64_t len = 0;
115,571,659✔
3627
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
231,143,318✔
3628
    }
3629
  }
3630

3631
_exit:
9,660,766✔
3632
  return code;
13,554,261✔
3633
}
3634

3635
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
27,670,981✔
3636
  int32_t code = TSDB_CODE_SUCCESS;
27,670,981✔
3637
  int32_t lino = 0;
27,670,981✔
3638

3639
  int32_t size = taosArrayGetSize(pGroupColVals);
27,670,981✔
3640
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
27,670,766✔
3641
  for (int32_t i = 0; i < size; ++i) {
60,140,535✔
3642
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
32,469,394✔
3643
    if (pValue == NULL) {
32,469,236✔
3644
      TAOS_CHECK_EXIT(terrno);
×
3645
    }
3646
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
32,469,236✔
3647
    if (pValue->isNull) {
32,469,091✔
3648
      continue;
10,890✔
3649
    }
3650
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
32,458,201✔
3651
    if (pValue->isTbname) {
32,459,093✔
3652
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
29,150,741✔
3653
      if (vgId != -1) { pValue->vgId = vgId; }
14,575,185✔
3654
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
29,150,062✔
3655
    }
3656
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
64,916,647✔
3657
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
32,458,233✔
3658
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
51,477,122✔
3659
    } else {
3660
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
13,440,255✔
3661
    }
3662
  }
3663

3664
_exit:
27,671,141✔
3665
  return code;
27,671,141✔
3666
}
3667

3668
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
13,829,658✔
3669
  int32_t code = TSDB_CODE_SUCCESS;
13,829,658✔
3670
  int32_t lino = 0;
13,829,658✔
3671
  int32_t size = 0;
13,829,658✔
3672

3673
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
13,830,170✔
3674
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
13,830,170✔
3675
  if (size > 0) {
13,829,480✔
3676
    if (*ppGroupColVals == NULL) {
8,449,145✔
3677
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
8,448,876✔
3678
      if (*ppGroupColVals == NULL) {
8,448,607✔
3679
        TAOS_CHECK_EXIT(terrno);
×
3680
      }
3681
    } else {
3682
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
269✔
3683
    }
3684
  }
3685
  for (int32_t i = 0; i < size; ++i) {
30,062,589✔
3686
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
16,233,059✔
3687
    if (pValue == NULL) {
16,233,382✔
3688
      TAOS_CHECK_EXIT(terrno);
×
3689
    }
3690
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
16,233,382✔
3691
    if (pValue->isNull) {
16,233,024✔
3692
      continue;
5,445✔
3693
    }
3694
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
16,227,551✔
3695
    if (pValue->isTbname) {
16,228,336✔
3696
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
14,572,933✔
3697
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
14,573,044✔
3698
    }
3699
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
32,455,839✔
3700
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
29,095,382✔
3701
      uint64_t len = 0;
12,867,367✔
3702
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
25,733,838✔
3703
      pValue->data.nData = len;
12,866,919✔
3704
    } else {
3705
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
6,720,885✔
3706
    }
3707
  }
3708
_exit:
13,829,804✔
3709
  return code;
13,829,509✔
3710
}
3711

3712
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
550,894✔
3713
  SEncoder encoder = {0};
550,894✔
3714
  int32_t  code = TSDB_CODE_SUCCESS;
550,894✔
3715
  int32_t  lino = 0;
550,894✔
3716
  int32_t  tlen = 0;
550,894✔
3717

3718
  tEncoderInit(&encoder, buf, bufLen);
550,894✔
3719
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
550,894✔
3720

3721
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
550,894✔
3722

3723
  tEndEncode(&encoder);
550,894✔
3724

3725
_exit:
550,894✔
3726
  if (code != TSDB_CODE_SUCCESS) {
550,894✔
3727
    tlen = code;
×
3728
  } else {
3729
    tlen = encoder.pos;
550,894✔
3730
  }
3731
  tEncoderClear(&encoder);
550,894✔
3732
  return tlen;
550,894✔
3733
}
3734

3735
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
275,666✔
3736
  SDecoder decoder = {0};
275,666✔
3737
  int32_t  code = TSDB_CODE_SUCCESS;
275,666✔
3738
  int32_t  lino = 0;
275,666✔
3739
  int32_t  size = 0;
275,666✔
3740

3741
  tDecoderInit(&decoder, buf, bufLen);
275,666✔
3742
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
275,666✔
3743

3744
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
275,666✔
3745

3746
  tEndDecode(&decoder);
275,666✔
3747

3748
_exit:
275,666✔
3749
  tDecoderClear(&decoder);
275,666✔
3750
  return code;
275,666✔
3751
}
3752

3753
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
5,402,931✔
3754
  SEncoder encoder = {0};
5,402,931✔
3755
  int32_t  code = TSDB_CODE_SUCCESS;
5,402,931✔
3756
  int32_t  lino = 0;
5,402,931✔
3757
  int32_t  tlen = 0;
5,402,931✔
3758

3759
  tEncoderInit(&encoder, buf, bufLen);
5,402,931✔
3760
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
5,403,166✔
3761

3762
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
10,806,332✔
3763
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
10,806,332✔
3764
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
10,806,332✔
3765
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
10,805,829✔
3766
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
10,805,829✔
3767

3768
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false, true));
5,403,166✔
3769
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
5,402,226✔
3770
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
10,805,627✔
3771
  TAOS_CHECK_EXIT(tEncodeBool(&encoder, pReq->isWindowTrigger));
5,402,931✔
3772
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->precision));
10,805,621✔
3773

3774
  tEndEncode(&encoder);
5,402,455✔
3775

3776
_exit:
5,402,931✔
3777
  if (code != TSDB_CODE_SUCCESS) {
5,402,931✔
3778
    tlen = code;
×
3779
  } else {
3780
    tlen = encoder.pos;
5,402,931✔
3781
  }
3782
  tEncoderClear(&encoder);
5,402,931✔
3783
  return tlen;
5,402,663✔
3784
}
3785

3786
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
2,700,805✔
3787
  SDecoder decoder = {0};
2,700,805✔
3788
  int32_t  code = TSDB_CODE_SUCCESS;
2,700,805✔
3789
  int32_t  lino = 0;
2,700,805✔
3790

3791
  tDecoderInit(&decoder, buf, bufLen);
2,700,805✔
3792
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,700,323✔
3793

3794
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
5,401,610✔
3795
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
5,401,610✔
3796
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
5,401,610✔
3797
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
5,401,610✔
3798
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
5,401,368✔
3799

3800
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
2,700,563✔
3801
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
2,700,805✔
3802
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
5,401,610✔
3803
  if (!tDecodeIsEnd(&decoder)) {
2,700,805✔
3804
    TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pReq->isWindowTrigger));
2,700,805✔
3805
    TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->precision));
5,401,610✔
3806
  }
3807

3808
  tEndDecode(&decoder);
2,700,805✔
3809

3810
_exit:
2,700,805✔
3811
  tDecoderClear(&decoder);
2,700,805✔
3812
  return code;
2,700,570✔
3813
}
3814

3815
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
5,823,532✔
3816
  if (pReq != NULL) {
5,823,532✔
3817
    if (pReq->params != NULL) {
5,823,532✔
3818
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
3,164,602✔
3819
      pReq->params = NULL;
3,164,602✔
3820
    }
3821
    if (pReq->groupColVals != NULL) {
5,823,669✔
3822
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
864,494✔
3823
      pReq->groupColVals = NULL;
864,494✔
3824
    }
3825
    blockDataDestroy(pReq->pOutBlock);
5,823,532✔
3826
  }
3827
}
5,823,669✔
3828

3829
int32_t tSerializeSTriggerDropTableRequest(void* buf, int32_t bufLen, const SSTriggerDropRequest* pReq) {
×
3830
  SEncoder encoder = {0};
×
3831
  int32_t  code = TSDB_CODE_SUCCESS;
×
3832
  int32_t  lino = 0;
×
3833
  int32_t  tlen = 0;
×
3834

3835
  tEncoderInit(&encoder, buf, bufLen);
×
3836
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3837

3838
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
×
3839
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
×
3840
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
×
3841
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
×
3842

3843
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
×
3844

3845
  tEndEncode(&encoder);
×
3846

3847
_exit:
×
3848
  if (code != TSDB_CODE_SUCCESS) {
×
3849
    tlen = code;
×
3850
  } else {
3851
    tlen = encoder.pos;
×
3852
  }
3853
  tEncoderClear(&encoder);
×
3854
  return tlen;
×
3855
}
3856

3857
int32_t tDeserializeSTriggerDropTableRequest(void* buf, int32_t bufLen, SSTriggerDropRequest* pReq) {
×
3858
  SDecoder decoder = {0};
×
3859
  int32_t  code = TSDB_CODE_SUCCESS;
×
3860
  int32_t  lino = 0;
×
3861

3862
  tDecoderInit(&decoder, buf, bufLen);
×
3863
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3864

3865
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
×
3866
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
×
3867
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
×
3868
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
×
3869

3870
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
×
3871

3872
  tEndDecode(&decoder);
×
3873

3874
_exit:
×
3875
  tDecoderClear(&decoder);
×
3876
  return code;
×
3877
}
3878

3879
void tDestroySSTriggerDropRequest(SSTriggerDropRequest* pReq) {
×
3880
  if (pReq != NULL) {
×
3881
    if (pReq->groupColVals != NULL) {
×
3882
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
×
3883
      pReq->groupColVals = NULL;
×
3884
    }
3885
  }
3886
}
×
3887

3888
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
38,404,622✔
3889
  SEncoder encoder = {0};
38,404,622✔
3890
  int32_t  code = TSDB_CODE_SUCCESS;
38,404,622✔
3891
  int32_t  lino = 0;
38,404,622✔
3892
  int32_t  tlen = 0;
38,404,622✔
3893

3894
  tEncoderInit(&encoder, buf, bufLen);
38,404,622✔
3895
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
38,404,622✔
3896

3897
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
76,809,244✔
3898
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
76,809,244✔
3899
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
76,809,244✔
3900
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
76,809,244✔
3901

3902
  tEndEncode(&encoder);
38,404,622✔
3903

3904
_exit:
38,404,622✔
3905
  if (code != TSDB_CODE_SUCCESS) {
38,404,622✔
3906
    tlen = code;
×
3907
  } else {
3908
    tlen = encoder.pos;
38,404,622✔
3909
  }
3910
  tEncoderClear(&encoder);
38,404,622✔
3911
  return tlen;
38,404,622✔
3912
}
3913

3914
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
57,580,680✔
3915
  SDecoder decoder = {0};
57,580,680✔
3916
  int32_t  code = TSDB_CODE_SUCCESS;
57,581,647✔
3917
  int32_t  lino = 0;
57,581,647✔
3918

3919
  tDecoderInit(&decoder, buf, bufLen);
57,581,647✔
3920
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
57,569,939✔
3921

3922
  int32_t type = 0;
57,587,242✔
3923
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
57,583,793✔
3924
  pReq->type = type;
57,583,793✔
3925
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
115,178,492✔
3926
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
115,189,206✔
3927
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
115,186,017✔
3928

3929
  tEndDecode(&decoder);
57,592,715✔
3930

3931
_exit:
57,586,080✔
3932
  tDecoderClear(&decoder);
57,587,288✔
3933
  return code;
57,580,958✔
3934
}
3935

3936
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo, bool full) {
21,716,893✔
3937
  int32_t code = 0, lino = 0;
21,716,893✔
3938
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true, full));
21,716,893✔
3939
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
21,716,410✔
3940
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.skey));
43,431,006✔
3941
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.ekey));
43,431,984✔
3942
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
43,430,637✔
3943
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
43,430,826✔
3944
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
43,427,970✔
3945
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
21,712,662✔
3946
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
43,429,987✔
3947
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->isWindowTrigger));
21,714,864✔
3948
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pInfo->precision));
43,430,605✔
3949
_exit:
21,715,251✔
3950
  return code;
21,715,251✔
3951
}
3952

3953
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
10,853,164✔
3954
  int32_t code = 0, lino = 0;
10,853,164✔
3955
  int32_t size = 0;
10,853,164✔
3956
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
10,853,164✔
3957
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
10,853,456✔
3958
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.skey));
21,705,882✔
3959
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.ekey));
21,706,999✔
3960
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
21,707,398✔
3961
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
21,707,211✔
3962
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
21,707,211✔
3963
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
10,853,699✔
3964
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
21,706,895✔
3965
  if (!tDecodeIsEnd(pDecoder)) {
10,853,267✔
3966
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->isWindowTrigger));
10,851,922✔
3967
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pInfo->precision));
21,707,398✔
3968
  }
3969
_exit:
10,855,044✔
3970
  return code;
10,855,044✔
3971
}
3972

3973
void tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
18,275,246✔
3974
  if (pInfo == NULL) return;
18,275,246✔
3975
  if (pInfo->pStreamPesudoFuncVals != NULL) {
18,275,246✔
3976
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
10,875,285✔
3977
    pInfo->pStreamPesudoFuncVals = NULL;
10,875,285✔
3978
  }
3979
  if (pInfo->pStreamPartColVals != NULL) {
18,275,699✔
3980
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
7,794,368✔
3981
    pInfo->pStreamPartColVals = NULL;
7,793,936✔
3982
  }
3983
}
3984

3985
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
150,790✔
3986
  SEncoder encoder = {0};
150,790✔
3987
  int32_t  code = TSDB_CODE_SUCCESS;
150,790✔
3988
  int32_t  lino = 0;
150,790✔
3989
  int32_t  tlen = 0;
150,790✔
3990

3991
  tEncoderInit(&encoder, buf, bufLen);
150,790✔
3992
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
150,790✔
3993

3994
  int32_t size = taosArrayGetSize(pRsp->infos);
150,790✔
3995
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
150,790✔
3996
  for (int32_t i = 0; i < size; ++i) {
528,648✔
3997
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
377,858✔
3998
    if (info == NULL) {
377,858✔
3999
      TAOS_CHECK_EXIT(terrno);
×
4000
    }
4001
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
755,716✔
4002
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
755,716✔
4003
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
377,858✔
4004
  }
4005

4006
  tEndEncode(&encoder);
150,790✔
4007

4008
_exit:
150,560✔
4009
  if (code != TSDB_CODE_SUCCESS) {
150,560✔
4010
    tlen = code;
×
4011
  } else {
4012
    tlen = encoder.pos;
150,560✔
4013
  }
4014
  tEncoderClear(&encoder);
150,560✔
4015
  return tlen;
150,790✔
4016
}
4017

4018
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
75,395✔
4019
  SDecoder decoder = {0};
75,395✔
4020
  int32_t  code = TSDB_CODE_SUCCESS;
75,395✔
4021
  int32_t  lino = 0;
75,395✔
4022
  int32_t  size = 0;
75,395✔
4023

4024
  tDecoderInit(&decoder, buf, bufLen);
75,395✔
4025
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
75,395✔
4026

4027
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
75,395✔
4028
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
75,395✔
4029
  if (vTableInfo->infos == NULL) {
75,395✔
4030
    TAOS_CHECK_EXIT(terrno);
×
4031
  }
4032
  for (int32_t i = 0; i < size; ++i) {
264,324✔
4033
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
188,929✔
4034
    if (info == NULL) {
188,929✔
4035
      TAOS_CHECK_EXIT(terrno);
×
4036
    }
4037
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
377,858✔
4038
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
377,858✔
4039
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
188,929✔
4040
  }
4041

4042
  tEndDecode(&decoder);
75,395✔
4043

4044
_exit:
75,395✔
4045
  tDecoderClear(&decoder);
75,395✔
4046
  return code;
75,395✔
4047
}
4048

4049

4050
void tDestroyVTableInfo(void *ptr) {
377,858✔
4051
  if (NULL == ptr) {
377,858✔
4052
    return;
×
4053
  }
4054
  VTableInfo* pTable = (VTableInfo*)ptr;
377,858✔
4055
  taosMemoryFree(pTable->cols.pColRef);
377,858✔
4056
}
4057

4058
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
26,545,828✔
4059
  if (ptr == NULL) return;
26,545,828✔
4060
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
26,545,828✔
4061
  ptr->infos = NULL;
26,546,069✔
4062
}
4063

4064
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
1,076,575✔
4065
  SEncoder encoder = {0};
1,076,575✔
4066
  int32_t  code = TSDB_CODE_SUCCESS;
1,076,575✔
4067
  int32_t  lino = 0;
1,076,575✔
4068
  int32_t  tlen = 0;
1,076,575✔
4069

4070
  tEncoderInit(&encoder, buf, bufLen);
1,076,575✔
4071
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,075,852✔
4072

4073
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
2,152,185✔
4074
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
1,075,851✔
4075
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,075,436✔
4076
  for (int32_t i = 0; i < size; ++i) {
2,348,328✔
4077
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
1,273,859✔
4078
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
2,547,233✔
4079
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
2,546,509✔
4080
  }
4081

4082
  tEndEncode(&encoder);
1,074,469✔
4083

4084
_exit:
1,076,346✔
4085
  if (code != TSDB_CODE_SUCCESS) {
1,076,346✔
4086
    tlen = code;
×
4087
  } else {
4088
    tlen = encoder.pos;
1,076,346✔
4089
  }
4090
  tEncoderClear(&encoder);
1,076,346✔
4091
  return tlen;
1,075,864✔
4092
}
4093

4094
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
538,584✔
4095
  SDecoder decoder = {0};
538,584✔
4096
  int32_t  code = TSDB_CODE_SUCCESS;
538,584✔
4097
  int32_t  lino = 0;
538,584✔
4098
  SSDataBlock *pResBlock = pBlock;
538,584✔
4099

4100
  tDecoderInit(&decoder, buf, bufLen);
538,584✔
4101
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
538,584✔
4102

4103
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
1,077,168✔
4104
  int32_t numOfCols = 2;
538,584✔
4105
  if (pResBlock->pDataBlock == NULL) {
538,584✔
4106
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
538,584✔
4107
    if (pResBlock->pDataBlock == NULL) {
538,584✔
4108
      TAOS_CHECK_EXIT(terrno);
×
4109
    }
4110
    for (int32_t i = 0; i< numOfCols; ++i) {
1,615,752✔
4111
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
1,077,168✔
4112
      if (pColInfoData == NULL) {
1,077,168✔
4113
        TAOS_CHECK_EXIT(terrno);
×
4114
      }
4115
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
1,077,168✔
4116
      pColInfoData->info.bytes = sizeof(int64_t);
1,077,168✔
4117
    }
4118
  }
4119
  int32_t numOfRows = 0;
538,584✔
4120
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
538,584✔
4121
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
538,584✔
4122
  for (int32_t i = 0; i < numOfRows; ++i) {
1,175,880✔
4123
    for (int32_t j = 0; j < numOfCols; ++j) {
1,911,888✔
4124
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
1,274,592✔
4125
      if (pColInfoData == NULL) {
1,274,592✔
4126
        TAOS_CHECK_EXIT(terrno);
×
4127
      }
4128
      int64_t value = 0;
1,274,592✔
4129
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
1,274,592✔
4130
      colDataSetInt64(pColInfoData, i, &value);
1,274,592✔
4131
    }
4132
  }
4133

4134
  pResBlock->info.dataLoad = 1;
538,584✔
4135
  pResBlock->info.rows = numOfRows;
538,584✔
4136

4137
  tEndDecode(&decoder);
538,584✔
4138

4139
_exit:
538,350✔
4140
  tDecoderClear(&decoder);
538,350✔
4141
  return code;
538,584✔
4142
}
4143

4144
static int32_t encodeData(SEncoder* encoder, void* pBlock, SSHashObj* indexHash) {
2,539,797✔
4145
  int32_t code = TSDB_CODE_SUCCESS;
2,539,797✔
4146
  int32_t lino = 0;
2,539,797✔
4147
  int32_t len = 0;
2,539,797✔
4148
  if (encoder->data == NULL){
2,539,797✔
4149
    len = blockGetEncodeSize(pBlock);
1,270,140✔
4150
  } else {
4151
    len = blockEncode(pBlock, (char*)(encoder->data + encoder->pos), encoder->size - encoder->pos, blockDataGetNumOfCols(pBlock));
1,269,898✔
4152
    if (len < 0) {
1,270,140✔
4153
      TAOS_CHECK_EXIT(terrno);
×
4154
    }
4155
  }
4156
  encoder->pos += len;
2,540,280✔
4157

4158
  if (indexHash == NULL) {
2,540,280✔
4159
    goto _exit;
1,189,910✔
4160
  } 
4161
  
4162
  uint32_t pos = encoder->pos;
1,350,370✔
4163
  encoder->pos += sizeof(uint32_t); // reserve space for tables
1,350,370✔
4164
  int32_t tables = 0;
1,350,370✔
4165
  
4166
  void*   pe = NULL;
1,350,370✔
4167
  int32_t iter = 0;
1,350,370✔
4168
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
2,943,882✔
4169
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
1,593,271✔
4170
    if (pInfo->gId == -1){
1,593,271✔
4171
      continue;
×
4172
    }
4173
    int64_t uid = *(int64_t*)(tSimpleHashGetKey(pe, NULL));
1,593,030✔
4174
    TAOS_CHECK_EXIT(tEncodeI64(encoder, uid));
1,593,512✔
4175
    TAOS_CHECK_EXIT(tEncodeU64(encoder, pInfo->gId));
3,187,024✔
4176
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->startRowIdx));
3,187,024✔
4177
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->numRows));
3,187,024✔
4178
    tables++;
1,593,512✔
4179
  }
4180
  uint32_t tmpPos = encoder->pos;
1,350,370✔
4181
  encoder->pos = pos;
1,350,370✔
4182
  TAOS_CHECK_EXIT(tEncodeI32(encoder, tables));
1,350,370✔
4183
  encoder->pos = tmpPos;
1,350,370✔
4184
_exit:
2,540,280✔
4185
  return code;
2,540,039✔
4186
}
4187
 
4188
static int32_t encodeBlock(SEncoder* encoder, void* block, SSHashObj* indexHash) {
9,039,557✔
4189
  int32_t  code = TSDB_CODE_SUCCESS;
9,039,557✔
4190
  int32_t  lino = 0;
9,039,557✔
4191
  if (block != NULL && ((SSDataBlock*)block)->info.rows > 0) {
9,039,557✔
4192
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 1));
2,540,038✔
4193
    TAOS_CHECK_EXIT(encodeData(encoder, block, indexHash));
2,540,038✔
4194
  } else {
4195
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 0));
6,500,000✔
4196
  }
4197

4198
_exit:
6,500,000✔
4199
  return code;
9,040,039✔
4200
}
4201

4202
int32_t tSerializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* rsp) {
2,259,829✔
4203
  SEncoder encoder = {0};
2,259,829✔
4204
  int32_t  code = TSDB_CODE_SUCCESS;
2,259,829✔
4205
  int32_t  lino = 0;
2,259,829✔
4206
  int32_t  tlen = 0;
2,259,829✔
4207

4208
  tEncoderInit(&encoder, buf, bufLen);
2,259,829✔
4209
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,259,829✔
4210

4211
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->dataBlock, rsp->indexHash));
2,259,588✔
4212
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->metaBlock, NULL));
2,260,070✔
4213
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->deleteBlock, NULL));
2,259,829✔
4214
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->tableBlock, NULL));
2,260,070✔
4215

4216
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->ver));
4,519,658✔
4217
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->verTime));
4,519,658✔
4218
  tEndEncode(&encoder);
2,260,070✔
4219

4220
_exit:
2,259,829✔
4221
  if (code != TSDB_CODE_SUCCESS) {
2,260,070✔
4222
    tlen = code;
×
4223
  } else {
4224
    tlen = encoder.pos;
2,260,070✔
4225
  }
4226
  tEncoderClear(&encoder);
2,260,070✔
4227
  return tlen;
2,260,070✔
4228
}
4229

4230
static int32_t decodeBlock(SDecoder* decoder, void* pBlock) {
3,390,882✔
4231
  int32_t  code = TSDB_CODE_SUCCESS;
3,390,882✔
4232
  int32_t  lino = 0;
3,390,882✔
4233
  
4234
  int8_t hasData = false;
3,390,882✔
4235
  TAOS_CHECK_EXIT(tDecodeI8(decoder, &hasData));
3,390,882✔
4236
  if (hasData) {
3,390,882✔
4237
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
595,103✔
4238
    const char* pEndPos = NULL;
595,103✔
4239
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder->data + decoder->pos, &pEndPos));
595,103✔
4240
    decoder->pos = (uint8_t*)pEndPos - decoder->data;
595,103✔
4241
  } else if (pBlock != NULL) {
2,795,779✔
4242
    blockDataEmpty(pBlock);
1,187,488✔
4243
  }
4244

4245
_exit:
3,388,071✔
4246
  return code;
3,390,882✔
4247
}
4248

4249
int32_t tDeserializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* pRsp, SArray* pSlices){
1,130,294✔
4250
  SDecoder     decoder = {0};
1,130,294✔
4251
  int32_t      code = TSDB_CODE_SUCCESS;
1,130,294✔
4252
  int32_t      lino = 0;
1,130,294✔
4253
  SSDataBlock* pBlock = NULL;
1,130,294✔
4254

4255
  tDecoderInit(&decoder, buf, bufLen);
1,130,294✔
4256
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,130,053✔
4257

4258
  // decode data block
4259
  int8_t hasData = false;
1,130,294✔
4260
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasData));
1,130,294✔
4261
  pBlock = pRsp->dataBlock;
1,130,294✔
4262
  if (hasData) {
1,130,294✔
4263
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
675,296✔
4264
    const char* pEndPos = NULL;
675,296✔
4265
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
675,296✔
4266
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
675,296✔
4267

4268
    int32_t nSlices = 0;
675,296✔
4269
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nSlices));
675,296✔
4270
    TAOS_CHECK_EXIT(taosArrayEnsureCap(pSlices, nSlices));
675,296✔
4271
    taosArrayClear(pSlices);
675,296✔
4272
    int64_t  uid = 0;
675,296✔
4273
    uint64_t gid = 0;
675,296✔
4274
    int32_t  startIdx = 0;
675,296✔
4275
    int32_t  numRows = 0;
675,296✔
4276
    for (int32_t i = 0; i < nSlices; i++) {
1,472,200✔
4277
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &uid));
796,904✔
4278
      TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
796,904✔
4279
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &startIdx));
796,904✔
4280
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numRows));
796,904✔
4281
      int32_t endIdx = startIdx + numRows;
796,904✔
4282
      int64_t value[3] = {gid, uid, (int64_t)startIdx << 32 | endIdx};
796,904✔
4283
      void*   px = taosArrayPush(pSlices, value);
796,904✔
4284
      if (px == NULL) {
796,904✔
4285
        code = terrno;
×
4286
        goto _exit;
×
4287
      }
4288
    }
4289
  } else if (pBlock != NULL) {
454,998✔
4290
    blockDataEmpty(pBlock);
3,434✔
4291
    taosArrayClear(pSlices);
3,434✔
4292
  }
4293

4294
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->metaBlock));
1,130,294✔
4295
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->deleteBlock));
1,130,294✔
4296
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->tableBlock));
1,130,294✔
4297
  
4298
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
2,260,588✔
4299
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->verTime));
2,260,588✔
4300

4301
  tEndDecode(&decoder);
1,130,294✔
4302

4303
_exit:
1,130,294✔
4304
  if (code != TSDB_CODE_SUCCESS) {
1,130,294✔
4305
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4306
  }
4307
  tDecoderClear(&decoder);
1,130,294✔
4308
  return code;
1,130,294✔
4309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc