• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5008

29 Mar 2026 04:32AM UTC coverage: 72.241% (-0.009%) from 72.25%
#5008

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253593 of 351039 relevant lines covered (72.24%)

130989730.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.21
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "taos.h"
18
#include "tarray.h"
19
#include "tdatablock.h"
20
#include "thash.h"
21
#include "tlist.h"
22
#include "tmsg.h"
23
#include "os.h"
24
#include "tcommon.h"
25
#include "tsimplehash.h"
26

27
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
51,532✔
28
  int32_t code = 0;
51,532✔
29
  int32_t lino = 0;
51,532✔
30
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
103,064✔
31
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
103,064✔
32
  switch (pReq->type) {
51,532✔
33
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
46,230✔
34
      if (pReq->cont.pReqs) {
46,230✔
35
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
46,230✔
36
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
46,230✔
37
        for (int32_t i = 0; i < num; ++i) {
135,916✔
38
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.pReqs, i);
89,686✔
39
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
179,372✔
40
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
179,372✔
41
        }
42
      } else {
43
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
44
      }
45
      break;
46,230✔
46
    }
47
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
5,302✔
48
      if (pReq->cont.pReqs) {
5,302✔
49
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
5,302✔
50
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
5,302✔
51
        for (int32_t i = 0; i < num; ++i) {
10,604✔
52
          SStreamOReaderDeployReq* pDeploy = taosArrayGet(pReq->cont.pReqs, i);
5,302✔
53
          int32_t vgIdNum = taosArrayGetSize(pDeploy->vgIds);
5,302✔
54
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeploy->execId));
10,604✔
55
          TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pDeploy->uid));
10,604✔
56
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgIdNum));
5,302✔
57
          for (int32_t n = 0; n < vgIdNum; ++n) {
10,604✔
58
            TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pDeploy->vgIds, n)));
10,604✔
59
          }
60
        }
61
      } else {
62
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
63
      }
64
      break;
5,302✔
65
    }
66
    default:
×
67
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
68
      break;
×
69
  }
70

71
_exit:
51,532✔
72

73
  return code;
51,532✔
74
}
75

76
void tFreeRunnerOReaderDeployReq(void* param) {
7,953✔
77
  SStreamOReaderDeployReq* pReq = (SStreamOReaderDeployReq*)param;
7,953✔
78
  if (pReq) {
7,953✔
79
    taosArrayDestroy(pReq->vgIds);
7,953✔
80
  }
81
}
7,953✔
82

83
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
103,064✔
84
  if (NULL == pReq) {
103,064✔
85
    return;
25,766✔
86
  }
87

88
  switch (pReq->type) {
77,298✔
89
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
69,345✔
90
      taosArrayDestroy(pReq->cont.pReqs);
69,345✔
91
      break;
69,345✔
92
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER:
7,953✔
93
      taosArrayDestroyEx(pReq->cont.pReqs, tFreeRunnerOReaderDeployReq);
7,953✔
94
      break;
7,953✔
95
    default:
×
96
      break;
×
97
  }
98
}
99

100

101
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
25,766✔
102
  *ppDst = NULL;
25,766✔
103
  
104
  if (NULL == pSrc) {
25,766✔
105
    return TSDB_CODE_SUCCESS;
×
106
  }
107

108
  int32_t code = 0, lino = 0;
25,766✔
109
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
25,766✔
110
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
25,766✔
111

112
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
25,766✔
113
  if (pSrc->cont.pReqs) {
25,766✔
114
    switch (pSrc->type) {
25,766✔
115
      case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
23,115✔
116
        (*ppDst)->cont.pReqs = taosArrayDup(pSrc->cont.pReqs, NULL);
23,115✔
117
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
23,115✔
118
        break;
23,115✔
119
      case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
2,651✔
120
        int32_t reqNum = taosArrayGetSize(pSrc->cont.pReqs);
2,651✔
121
        (*ppDst)->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), reqNum);
2,651✔
122
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
2,651✔
123
        for (int32_t i = 0; i < reqNum; ++i) {
5,302✔
124
          SStreamOReaderDeployReq* pNew = taosArrayGet((*ppDst)->cont.pReqs, i);
2,651✔
125
          SStreamOReaderDeployReq* pReq = taosArrayGet(pSrc->cont.pReqs, i);
2,651✔
126
          pNew->vgIds = taosArrayDup(pReq->vgIds, NULL);
2,651✔
127
          TSDB_CHECK_NULL(pNew->vgIds, code, lino, _exit, terrno);
2,651✔
128
          pNew->execId = pReq->execId;
2,651✔
129
          pNew->uid = pReq->uid;
2,651✔
130
        }
131
        break;
2,651✔
132
      }  
133
      default:
×
134
        break;
×
135
    }
136
  }
137
  
138
_exit:
×
139

140
  if (code) {
25,766✔
141
    tFreeSStreamMgmtReq(*ppDst);
×
142
    taosMemoryFreeClear(*ppDst);
×
143
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
144
  }
145
  
146
  return code;
25,766✔
147
}
148

149

150
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
25,766✔
151
  int32_t code = 0;
25,766✔
152
  int32_t lino = 0;
25,766✔
153

154
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
51,532✔
155
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
51,532✔
156
  switch (pReq->type) {
25,766✔
157
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
23,115✔
158
      int32_t num = 0;
23,115✔
159
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
23,115✔
160
      if (num > 0) {
23,115✔
161
        pReq->cont.pReqs = taosArrayInit(num, sizeof(SStreamDbTableName));
23,115✔
162
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
23,115✔
163
        for (int32_t i = 0; i < num; ++i) {
67,958✔
164
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.pReqs, 1);
44,843✔
165
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
44,843✔
166
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
44,843✔
167
        }
168
      }
169
      break;
23,115✔
170
    }
171
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
2,651✔
172
      int32_t num = 0, vgIdNum = 0;
2,651✔
173
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
2,651✔
174
      if (num > 0) {
2,651✔
175
        pReq->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), num);
2,651✔
176
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
2,651✔
177
        for (int32_t i = 0; i < num; ++i) {
5,302✔
178
          SStreamOReaderDeployReq* p = taosArrayGet(pReq->cont.pReqs, i);
2,651✔
179
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &p->execId));
5,302✔
180
          TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &p->uid));
5,302✔
181
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgIdNum));
2,651✔
182
          if (vgIdNum > 0) {
2,651✔
183
            p->vgIds = taosArrayInit_s(sizeof(int32_t), vgIdNum);
2,651✔
184
            TSDB_CHECK_NULL(p->vgIds, code, lino, _exit, terrno);
2,651✔
185
          }
186
          for (int32_t n = 0; n < vgIdNum; ++n) {
5,302✔
187
            int32_t* vgId = taosArrayGet(p->vgIds, n);
2,651✔
188
            TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));
2,651✔
189
          }
190
        }
191
      }
192
      break;
2,651✔
193
    }
194
    default:
×
195
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
196
      break;
×
197
  }
198

199
_exit:
25,766✔
200

201
  return code;  
25,766✔
202
}
203

204
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
39,860,106✔
205
  int32_t code = 0;
39,860,106✔
206
  int32_t lino;
207

208
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
79,720,212✔
209
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
79,720,212✔
210
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
79,720,212✔
211

212
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
79,720,212✔
213
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
79,720,212✔
214
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
79,720,212✔
215
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
79,720,212✔
216
  // SKIP SESSIONID
217
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
79,720,212✔
218
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
79,720,212✔
219
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
79,720,212✔
220
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
79,720,212✔
221
  if (pTask->pMgmtReq) {
39,860,106✔
222
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
51,532✔
223
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
51,532✔
224
  } else {
225
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
39,808,574✔
226
  }
227

228
_exit:
39,808,574✔
229

230
  return code;
39,860,106✔
231
}
232

233

234
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
19,023,290✔
235
  int32_t code = 0;
19,023,290✔
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
38,046,580✔
239
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
38,046,580✔
240
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
38,046,580✔
241
  
242
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
38,046,580✔
243
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
38,046,580✔
244
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
38,046,580✔
245
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
38,046,580✔
246
  // SKIP SESSIONID
247
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
38,046,580✔
248
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
38,046,580✔
249
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
38,046,580✔
250
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
38,046,580✔
251
  int32_t req = 0;
19,023,290✔
252
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
19,023,290✔
253
  if (req) {
19,023,290✔
254
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
25,766✔
255
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
25,766✔
256
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
25,766✔
257
  }
258

259
_exit:
19,023,290✔
260

261
  return code;
19,023,290✔
262
}
263

264
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
265
  int32_t code = 0;
×
266
  int32_t lino;
267

268
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
269
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
270
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
271
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
272

273
_exit:
×
274

275
  return code;
×
276
}
277

278
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
283
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
284
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
285
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
286

287
_exit:
×
288

289
  return code;
×
290
}
291

292

293
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
2,409,850✔
294
  int32_t code = 0;
2,409,850✔
295
  int32_t lino;
296

297
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
4,819,700✔
298
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
4,819,700✔
299
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
4,819,700✔
300
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
4,819,700✔
301
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
4,819,700✔
302

303
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
2,409,850✔
304
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
2,409,850✔
305
  for (int32_t i = 0; i < recalcNum; ++i) {
2,409,850✔
306
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
307
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
308
  }
309

310
_exit:
2,409,850✔
311

312
  return code;
2,409,850✔
313
}
314

315
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
1,139,693✔
316
  int32_t code = 0;
1,139,693✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
2,279,386✔
320
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
2,279,386✔
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
2,279,386✔
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
2,279,386✔
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
2,279,386✔
324

325
  int32_t recalcNum = 0;
1,139,693✔
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
1,139,693✔
327
  if (recalcNum > 0) {
1,139,693✔
328
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
329
    if (NULL == pStatus->userRecalcs) {
×
330
      code = terrno;
×
331
      goto _exit;
×
332
    }
333
  }
334

335
  for (int32_t i = 0; i < recalcNum; ++i) {
1,139,693✔
336
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
337
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
338
  }
339

340
_exit:
1,139,693✔
341

342
  return code;
1,139,693✔
343
}
344

345

346
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
36,380,814✔
347
  int32_t code = 0;
36,380,814✔
348
  int32_t lino;
349

350
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
36,380,814✔
351
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
72,761,628✔
352
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
72,761,628✔
353
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
72,761,628✔
354
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
72,761,628✔
355

356
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
36,380,814✔
357
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
36,380,814✔
358
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
140,400,898✔
359
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
104,020,084✔
360
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
208,040,168✔
361
  }
362
  
363
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
36,380,814✔
364
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
36,380,814✔
365
  for (int32_t i = 0; i < statusNum; ++i) {
72,775,420✔
366
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
36,394,606✔
367
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
36,394,606✔
368
  }
369

370
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
36,380,814✔
371
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
36,380,814✔
372
  for (int32_t i = 0; i < reqNum; ++i) {
36,432,346✔
373
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
51,532✔
374
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
103,064✔
375
  }
376

377
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
36,380,814✔
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
36,380,814✔
379
  for (int32_t i = 0; i < triggerNum; ++i) {
38,790,664✔
380
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
2,409,850✔
381
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
2,409,850✔
382
  }
383
  
384
  tEndEncode(pEncoder);
36,380,814✔
385

386
_exit:
36,380,814✔
387
  if (code) {
36,380,814✔
388
    return code;
×
389
  } else {
390
    return pEncoder->pos;
36,380,814✔
391
  }
392
}
393

394
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
17,458,220✔
395
  int32_t code = 0;
17,458,220✔
396
  int32_t lino;
397

398
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
17,458,220✔
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
34,916,440✔
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
34,916,440✔
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
34,916,440✔
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
34,916,440✔
403

404
  int32_t vgLearderNum = 0;
17,458,220✔
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
17,458,220✔
406
  if (vgLearderNum > 0) {
17,458,220✔
407
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
13,453,760✔
408
    if (NULL == pReq->pVgLeaders) {
13,453,760✔
409
      code = terrno;
×
410
      goto _exit;
×
411
    }
412
  }
413
  for (int32_t i = 0; i < vgLearderNum; ++i) {
68,172,073✔
414
    int32_t vgId = 0;
50,713,853✔
415
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
50,713,853✔
416
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
101,427,706✔
417
      code = terrno;
×
418
      goto _exit;
×
419
    }
420
  }
421

422

423
  int32_t statusNum = 0;
17,458,220✔
424
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
17,458,220✔
425
  if (statusNum > 0) {
17,458,220✔
426
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
1,184,271✔
427
    if (NULL == pReq->pStreamStatus) {
1,184,271✔
428
      code = terrno;
×
429
      goto _exit;
×
430
    }
431
  }
432
  for (int32_t i = 0; i < statusNum; ++i) {
34,761,653✔
433
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
17,303,433✔
434
    if (NULL == pTask) {
17,303,433✔
435
      code = terrno;
×
436
      goto _exit;
×
437
    }
438
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
17,303,433✔
439
  }
440

441

442
  int32_t reqNum = 0;
17,458,220✔
443
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
17,458,220✔
444
  if (reqNum > 0) {
17,458,220✔
445
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
10,592✔
446
    if (NULL == pReq->pStreamReq) {
10,592✔
447
      code = terrno;
×
448
      goto _exit;
×
449
    }
450
  }
451
  for (int32_t i = 0; i < reqNum; ++i) {
17,483,986✔
452
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
25,766✔
453
    if (NULL == pIdx) {
25,766✔
454
      code = terrno;
×
455
      goto _exit;
×
456
    }
457
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
25,766✔
458
  }
459

460

461
  int32_t triggerNum = 0;
17,458,220✔
462
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
17,458,220✔
463
  if (triggerNum > 0) {
17,458,220✔
464
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
513,041✔
465
    if (NULL == pReq->pTriggerStatus) {
513,041✔
466
      code = terrno;
×
467
      goto _exit;
×
468
    }
469
  }
470
  for (int32_t i = 0; i < triggerNum; ++i) {
18,597,913✔
471
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
1,139,693✔
472
    if (NULL == pStatus) {
1,139,693✔
473
      code = terrno;
×
474
      goto _exit;
×
475
    }
476
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
1,139,693✔
477
  }
478

479
  
480
  tEndDecode(pDecoder);
17,458,220✔
481

482
_exit:
17,458,220✔
483
  return code;
17,458,220✔
484
}
485

486
void tFreeSSTriggerRuntimeStatus(void* param) {
2,344,618✔
487
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
2,344,618✔
488
  if (NULL == pStatus) {
2,344,618✔
489
    return;
×
490
  }
491
  taosArrayDestroy(pStatus->userRecalcs);
2,344,618✔
492
}
493

494
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
106,056,108✔
495
  if (pMsg == NULL) {
106,056,108✔
496
    return;
×
497
  }
498

499
  taosArrayDestroy(pMsg->pVgLeaders);
106,056,108✔
500
  if (deepClean) {
106,056,108✔
501
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
106,056,108✔
502
    for (int32_t i = 0; i < reqNum; ++i) {
106,107,640✔
503
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
51,532✔
504
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
51,532✔
505
      if (NULL == pTask) {
51,532✔
506
        continue;
×
507
      }
508

509
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
51,532✔
510
      taosMemoryFree(pTask->pMgmtReq);
51,532✔
511
    }
512
  }
513
  taosArrayDestroy(pMsg->pStreamReq);
106,056,108✔
514
  taosArrayDestroy(pMsg->pStreamStatus);
106,056,108✔
515
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
106,056,108✔
516
}
517

518
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
429,022✔
519
  int32_t code = 0;
429,022✔
520
  int32_t lino;
521

522
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
858,044✔
523
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
858,044✔
524
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblSuid));
858,044✔
525
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
858,044✔
526
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
858,044✔
527
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
858,044✔
528
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
858,044✔
529
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
858,044✔
530
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
858,044✔
531
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
532
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
858,044✔
533
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
858,044✔
534

535
_exit:
429,022✔
536

537
  return code;
429,022✔
538
}
539

540
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
516,622✔
541
  int32_t code = 0;
516,622✔
542
  int32_t lino;
543

544
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
1,033,244✔
545
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
1,033,244✔
546

547
_exit:
516,622✔
548

549
  return code;
516,622✔
550
}
551

552

553
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
945,644✔
554
  int32_t code = 0;
945,644✔
555
  int32_t lino;
556

557
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
1,891,288✔
558
  if (pMsg->triggerReader) {
945,644✔
559
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
429,022✔
560
  } else {
561
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
516,622✔
562
  }
563
  
564
_exit:
516,622✔
565

566
  return code;
945,644✔
567
}
568

569
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
1,451,498✔
570
  int32_t code = 0;
1,451,498✔
571
  int32_t lino;
572

573
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
2,902,996✔
574
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
2,902,996✔
575
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
1,451,498✔
576

577
_exit:
1,451,498✔
578

579
  return code;
1,451,498✔
580
}
581

582
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
991,146✔
583
  int32_t code = 0;
991,146✔
584
  int32_t lino;
585

586
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
991,146✔
587
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
1,982,292✔
588

589
_exit:
991,146✔
590

591
  return code;
991,146✔
592
}
593

594

595
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
330,876✔
596
  int32_t code = 0;
330,876✔
597
  int32_t lino;
598

599
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
661,752✔
600
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
661,752✔
601
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
661,752✔
602
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
661,752✔
603
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
661,752✔
604
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
661,752✔
605
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
661,752✔
606
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
661,752✔
607
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblStb));
661,752✔
608
  int32_t partitionColsLen = pMsg->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->partitionCols) + 1;
330,876✔
609
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, partitionColsLen));
661,752✔
610

611
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
330,876✔
612
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
330,876✔
613
  for (int32_t i = 0; i < addrSize; ++i) {
356,256✔
614
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
25,380✔
615
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
50,760✔
616
  }
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
661,752✔
618
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
661,752✔
619
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
661,752✔
620

621
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
661,752✔
622
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
661,752✔
623
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
661,752✔
624
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
661,752✔
625
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->idleTimeoutMs));
661,752✔
626

627
  switch (pMsg->triggerType) {
330,876✔
628
    case WINDOW_TYPE_SESSION: {
4,840✔
629
      // session trigger
630
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
9,680✔
631
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
9,680✔
632
      break;
4,840✔
633
    }
634
    case WINDOW_TYPE_STATE: {
134,214✔
635
      // state trigger
636
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
268,428✔
637
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.extend));
268,428✔
638
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForType));
268,428✔
639
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForCount));
268,428✔
640
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
268,428✔
641
      int32_t stateWindowZerothLen = 
134,214✔
642
          pMsg->trigger.stateWin.zeroth == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.zeroth) + 1;
134,214✔
643
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.zeroth, stateWindowZerothLen));
268,428✔
644
      int32_t stateWindowExprLen =
134,214✔
645
          pMsg->trigger.stateWin.expr == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.expr) + 1;
134,214✔
646
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.expr, stateWindowExprLen));
268,428✔
647
      break;
134,214✔
648
    }
649
    case WINDOW_TYPE_INTERVAL: {
119,660✔
650
      // slide trigger
651
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
239,320✔
652
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
239,320✔
653
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
239,320✔
654
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
239,320✔
655
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
239,320✔
656
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
239,320✔
657
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
239,320✔
658
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
239,320✔
659
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
239,320✔
660
      break;
119,660✔
661
    }
662
    case WINDOW_TYPE_EVENT: {
32,608✔
663
      // event trigger
664
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
32,608✔
665
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
32,608✔
666

667
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
65,216✔
668
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
65,216✔
669
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForType));
65,216✔
670
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForCount));
65,216✔
671
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
65,216✔
672
      break;
32,608✔
673
    }
674
    case WINDOW_TYPE_COUNT: {
28,518✔
675
      // count trigger
676
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
28,518✔
677
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
57,036✔
678

679
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
57,036✔
680
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
57,036✔
681
      break;
28,518✔
682
    }
683
    case WINDOW_TYPE_PERIOD: {
11,036✔
684
      // period trigger
685
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.periodUnit));
22,072✔
686
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.offsetUnit));
22,072✔
687
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.precision));
22,072✔
688
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
22,072✔
689
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
22,072✔
690
      break;
11,036✔
691
    }
692
    default:
×
693
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
694
      break;
×
695
  }
696

697
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
661,752✔
698
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
661,752✔
699
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
661,752✔
700
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
661,752✔
701
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcPkSlotId));
661,752✔
702
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triPkSlotId));
661,752✔
703
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
330,876✔
704
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
661,752✔
705
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
330,876✔
706
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
661,752✔
707
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
330,876✔
708
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
661,752✔
709

710
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
330,876✔
711
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
330,876✔
712
  for (int32_t i = 0; i < readerNum; ++i) {
741,204✔
713
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
410,328✔
714
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
410,328✔
715
  }
716

717
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
330,876✔
718
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
330,876✔
719
  for (int32_t i = 0; i < runnerNum; ++i) {
1,322,022✔
720
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
991,146✔
721
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
991,146✔
722
  }
723

724
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
661,752✔
725
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
661,752✔
726
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->precision));
661,752✔
727

728
_exit:
330,876✔
729

730
  return code;
330,876✔
731
}
732

733

734
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
5,975,572✔
735
  int32_t code = 0;
5,975,572✔
736
  int32_t lino;
737

738
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
11,951,144✔
739
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
11,951,144✔
740
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
11,951,144✔
741
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
11,951,144✔
742
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
11,951,144✔
743
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
11,951,144✔
744

745
_exit:
5,975,572✔
746

747
  return code;
5,975,572✔
748
}
749

750

751
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
1,230,152✔
752
  int32_t code = 0;
1,230,152✔
753
  int32_t lino;
754

755
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,460,304✔
756
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
2,460,304✔
757
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
2,460,304✔
758
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
2,460,304✔
759
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
2,460,304✔
760
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
2,460,304✔
761
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
2,460,304✔
762
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
2,460,304✔
763

764
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
1,230,152✔
765
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
1,230,152✔
766
  for (int32_t i = 0; i < addrSize; ++i) {
1,304,810✔
767
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
74,658✔
768
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
149,316✔
769
  }
770
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
2,460,304✔
771

772
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
1,230,152✔
773
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
1,230,152✔
774
  for (int32_t i = 0; i < outColNum; ++i) {
5,980,452✔
775
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
4,750,300✔
776
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
4,750,300✔
777
  }
778

779
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
1,230,152✔
780
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
1,230,152✔
781
  for (int32_t i = 0; i < outTagNum; ++i) {
2,455,424✔
782
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1,225,272✔
783
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
1,225,272✔
784
  }
785

786
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
2,460,304✔
787
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
2,460,304✔
788

789
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
2,460,304✔
790
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
2,460,304✔
791

792
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
1,230,152✔
793
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
1,230,152✔
794
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,375,376✔
795
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
145,224✔
796
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
145,224✔
797

798
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
290,448✔
799
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
290,448✔
800
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
290,448✔
801
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
290,448✔
802
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
290,448✔
803
  }
804

805
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
2,460,304✔
806

807
  // colCids and tagCids - always encode size (0 if NULL) for compatibility
808
  int32_t colCidsSize = (int32_t)taosArrayGetSize(pMsg->colCids);
1,230,152✔
809
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, colCidsSize));
1,230,152✔
810
  if (colCidsSize > 0) {
1,230,152✔
811
    for (int32_t i = 0; i < colCidsSize; ++i) {
88,830✔
812
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->colCids, i);
65,832✔
813
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
131,664✔
814
    }
815
  }
816

817
  int32_t tagCidsSize = (int32_t)taosArrayGetSize(pMsg->tagCids);
1,230,152✔
818
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, tagCidsSize));
1,230,152✔
819
  if (tagCidsSize > 0) {
1,230,152✔
820
    for (int32_t i = 0; i < tagCidsSize; ++i) {
46,350✔
821
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->tagCids, i);
27,648✔
822
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
55,296✔
823
    }
824
  }
825

826
_exit:
1,230,152✔
827

828
  return code;
1,230,152✔
829
}
830

831
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
2,506,672✔
832
  int32_t code = 0;
2,506,672✔
833
  int32_t lino;
834

835
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
2,506,672✔
836
  switch (pTask->task.type) {
2,506,672✔
837
    case STREAM_READER_TASK:
945,644✔
838
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
945,644✔
839
      break;
945,644✔
840
    case STREAM_TRIGGER_TASK:
330,876✔
841
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
330,876✔
842
      break;
330,876✔
843
    case STREAM_RUNNER_TASK:
1,230,152✔
844
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
1,230,152✔
845
      break;
1,230,152✔
846
    default:
×
847
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
848
      break;
×
849
  }
850
  
851
_exit:
2,506,672✔
852

853
  return code;
2,506,672✔
854
}
855

856

857
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
398,412✔
858
  int32_t code = 0;
398,412✔
859
  int32_t lino;
860

861
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
796,824✔
862

863
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
398,412✔
864
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
398,412✔
865
  for (int32_t i = 0; i < readerNum; ++i) {
1,344,056✔
866
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
945,644✔
867
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
945,644✔
868
  }
869

870
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
796,824✔
871
  if (pStream->triggerTask) {
398,412✔
872
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
330,876✔
873
  }
874
  
875
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
398,412✔
876
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
398,412✔
877
  for (int32_t i = 0; i < runnerNum; ++i) {
1,628,564✔
878
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
1,230,152✔
879
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,230,152✔
880
  }
881

882
_exit:
398,412✔
883

884
  return code;
398,412✔
885
}
886

887
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
958,828✔
888
  int32_t code = 0;
958,828✔
889
  int32_t lino = 0;
958,828✔
890

891
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
1,917,656✔
892

893
_exit:
958,828✔
894
  return code;
958,828✔
895
}
896

897
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
479,503✔
898
  int32_t code = 0;
479,503✔
899
  int32_t lino;
900

901
  int32_t type = 0;
479,503✔
902
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
479,503✔
903
  pMsg->msgType = type;
479,503✔
904

905
_exit:
479,503✔
906
  return code;
479,503✔
907
}
908

909
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
315,276✔
910
  int32_t code = 0;
315,276✔
911
  int32_t lino;
912

913
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
315,276✔
914

915
_exit:
315,276✔
916

917
  return code;
315,276✔
918
}
919

920
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
315,276✔
921
  int32_t code = 0;
315,276✔
922
  int32_t lino;
923

924
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
315,276✔
925
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
315,276✔
926

927
_exit:
315,276✔
928

929
  return code;
315,276✔
930
}
931

932
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
584,580✔
933
  int32_t code = 0;
584,580✔
934
  int32_t lino;
935

936
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
584,580✔
937
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
1,169,160✔
938
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
1,169,160✔
939

940
_exit:
584,580✔
941

942
  return code;
584,580✔
943
}
944

945
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
584,580✔
946
  int32_t code = 0;
584,580✔
947
  int32_t lino;
948

949
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
584,580✔
950
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
584,580✔
951

952
_exit:
584,580✔
953

954
  return code;
584,580✔
955
}
956

957

958
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
7,440✔
959
  int32_t code = 0;
7,440✔
960
  int32_t lino;
961

962
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
14,880✔
963
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
14,880✔
964
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
14,880✔
965

966
_exit:
7,440✔
967

968
  return code;
7,440✔
969
}
970

971
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
58,972✔
972
  int32_t code = 0;
58,972✔
973
  int32_t lino;
974

975
  switch (msgType) {
58,972✔
976
    case STREAM_MSG_ORIGTBL_READER_INFO: {
46,230✔
977
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
46,230✔
978
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
46,230✔
979

980
      for (int32_t i = 0; i < vgNum; ++i) {
135,916✔
981
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
89,686✔
982
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
179,372✔
983
      }
984

985
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
46,230✔
986
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
46,230✔
987
      
988
      for (int32_t i = 0; i < readerNum; ++i) {
63,184✔
989
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
16,954✔
990
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
16,954✔
991
      }
992
      break;
46,230✔
993
    }
994
    case STREAM_MSG_UPDATE_RUNNER: {
×
995
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
996
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
997
      
998
      for (int32_t i = 0; i < runnerNum; ++i) {
×
999
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1000
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1001
      }
1002
      break;
×
1003
    }
1004
    case STREAM_MSG_USER_RECALC: {
7,440✔
1005
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
7,440✔
1006
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
7,440✔
1007
      
1008
      for (int32_t i = 0; i < recalcNum; ++i) {
14,880✔
1009
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
7,440✔
1010
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
7,440✔
1011
      }
1012
      break;
7,440✔
1013
    }
1014
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
5,302✔
1015
      int32_t rspNum = taosArrayGetSize(pRsp->execRspList);
5,302✔
1016
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
5,302✔
1017
      
1018
      for (int32_t i = 0; i < rspNum; ++i) {
10,604✔
1019
        SStreamOReaderDeployRsp* pDeployRsp = taosArrayGet(pRsp->execRspList, i);
5,302✔
1020
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeployRsp->execId));
10,604✔
1021
        int32_t vgNum = taosArrayGetSize(pDeployRsp->vgList);
5,302✔
1022
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
5,302✔
1023
        for (int32_t n = 0; n < vgNum; ++n) {
10,604✔
1024
          TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, taosArrayGet(pDeployRsp->vgList, n)));
5,302✔
1025
        }
1026
      }
1027
      break;
5,302✔
1028
    }
1029
    default:
×
1030
      break;
×
1031
  }
1032

1033
_exit:
58,972✔
1034

1035
  return code;
58,972✔
1036
}
1037

1038
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
58,972✔
1039
  int32_t code = 0;
58,972✔
1040
  int32_t lino;
1041

1042
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
58,972✔
1043
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
117,944✔
1044
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
117,944✔
1045
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
58,972✔
1046
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
58,972✔
1047

1048
_exit:
58,972✔
1049

1050
  return code;
58,972✔
1051
}
1052

1053

1054
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
34,578,148✔
1055
  int32_t code = 0;
34,578,148✔
1056
  int32_t lino;
1057

1058
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
34,578,148✔
1059
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
69,156,296✔
1060
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
34,578,148✔
1061
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
34,578,148✔
1062
  for (int32_t i = 0; i < deployNum; ++i) {
34,976,560✔
1063
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
398,412✔
1064
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
398,412✔
1065
  }
1066

1067
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
34,578,148✔
1068
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
34,578,148✔
1069
  for (int32_t i = 0; i < startNum; ++i) {
34,893,424✔
1070
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
315,276✔
1071
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
315,276✔
1072
  }
1073

1074
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
69,156,296✔
1075
  if (!pRsp->undeploy.undeployAll) {
34,578,148✔
1076
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
34,578,148✔
1077
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
34,578,148✔
1078
    for (int32_t i = 0; i < undeployNum; ++i) {
35,162,728✔
1079
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
584,580✔
1080
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
584,580✔
1081
    }
1082
  }
1083

1084
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
34,578,148✔
1085
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
34,578,148✔
1086
  for (int32_t i = 0; i < rspNum; ++i) {
34,637,120✔
1087
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
58,972✔
1088
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
58,972✔
1089
  }
1090
  
1091
_exit:
34,578,148✔
1092

1093
  tEndEncode(pEncoder);
34,578,148✔
1094

1095
  return code;
34,578,148✔
1096
}
1097

1098
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
211,223✔
1099
  int32_t code = 0;
211,223✔
1100
  int32_t lino;
1101

1102
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
422,446✔
1103
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
422,446✔
1104
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblSuid));
422,446✔
1105
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
422,446✔
1106
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
422,446✔
1107
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
422,446✔
1108
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
422,446✔
1109
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
422,446✔
1110
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
422,446✔
1111
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
422,446✔
1112
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
422,446✔
1113

1114
_exit:
211,223✔
1115

1116
  return code;
211,223✔
1117
}
1118

1119

1120
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
255,023✔
1121
  int32_t code = 0;
255,023✔
1122
  int32_t lino;
1123

1124
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
510,046✔
1125
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
510,046✔
1126

1127
_exit:
255,023✔
1128

1129
  return code;
255,023✔
1130
}
1131

1132

1133
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
466,246✔
1134
  int32_t code = 0;
466,246✔
1135
  int32_t lino;
1136

1137
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
932,492✔
1138
  if (pMsg->triggerReader) {
466,246✔
1139
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
211,223✔
1140
  } else {
1141
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
255,023✔
1142
  }
1143
  
1144
_exit:
255,023✔
1145

1146
  return code;
466,246✔
1147
}
1148

1149

1150
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
717,704✔
1151
  int32_t code = 0;
717,704✔
1152
  int32_t lino;
1153

1154
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
1,435,408✔
1155
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
1,435,408✔
1156
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
717,704✔
1157

1158
_exit:
717,704✔
1159

1160
  return code;
717,704✔
1161
}
1162

1163

1164
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
490,758✔
1165
  int32_t code = 0;
490,758✔
1166
  int32_t lino;
1167

1168
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
490,758✔
1169
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
981,516✔
1170

1171
_exit:
490,758✔
1172

1173
  return code;
490,758✔
1174
}
1175

1176

1177
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
163,833✔
1178
  int32_t code = 0;
163,833✔
1179
  int32_t lino;
1180

1181
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
327,666✔
1182
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
327,666✔
1183
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
327,666✔
1184
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
327,666✔
1185
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
327,666✔
1186
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
327,666✔
1187
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
327,666✔
1188
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
327,666✔
1189
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblStb));
327,666✔
1190
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
327,666✔
1191

1192
  int32_t addrSize = 0;
163,833✔
1193
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
163,833✔
1194
  if (addrSize > 0) {
163,833✔
1195
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
12,690✔
1196
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
12,690✔
1197
  }
1198
  for (int32_t i = 0; i < addrSize; ++i) {
176,523✔
1199
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
12,690✔
1200
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
12,690✔
1201
  }
1202
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
327,666✔
1203
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
327,666✔
1204
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
327,666✔
1205

1206
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
327,666✔
1207
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
327,666✔
1208
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
327,666✔
1209
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
327,666✔
1210
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->idleTimeoutMs));
327,666✔
1211

1212
  switch (pMsg->triggerType) {
163,833✔
1213
    case WINDOW_TYPE_SESSION:
2,420✔
1214
      // session trigger
1215
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
4,840✔
1216
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
4,840✔
1217
      break;
2,420✔
1218
    case WINDOW_TYPE_STATE:
65,445✔
1219
      // state trigger
1220
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
130,890✔
1221
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.extend));
130,890✔
1222
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForType));
130,890✔
1223
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForCount));
130,890✔
1224
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
130,890✔
1225
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.zeroth, NULL));
130,890✔
1226
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.expr, NULL));
130,890✔
1227
      break;
65,445✔
1228
    
1229
    case WINDOW_TYPE_INTERVAL:
59,887✔
1230
      // slide trigger
1231
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
119,774✔
1232
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
119,774✔
1233
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
119,774✔
1234
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
119,774✔
1235
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
119,774✔
1236
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
119,774✔
1237
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
119,774✔
1238
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
119,774✔
1239
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
119,774✔
1240
      break;
59,887✔
1241
    
1242
    case WINDOW_TYPE_EVENT:
16,304✔
1243
      // event trigger
1244
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
32,608✔
1245
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
32,608✔
1246
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForType));
32,608✔
1247
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForCount));
32,608✔
1248
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
32,608✔
1249
      break;
16,304✔
1250
    
1251
    case WINDOW_TYPE_COUNT:
14,259✔
1252
      // count trigger
1253
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
28,518✔
1254
      
1255
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
28,518✔
1256
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
28,518✔
1257
      break;
14,259✔
1258
    
1259
    case WINDOW_TYPE_PERIOD:
5,518✔
1260
      // period trigger
1261
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.periodUnit));
11,036✔
1262
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.offsetUnit));
11,036✔
1263
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.period.precision));
11,036✔
1264
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
11,036✔
1265
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
11,036✔
1266
      break;
5,518✔
1267
    default:
×
1268
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1269
      break;
×
1270
  }
1271

1272
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
327,666✔
1273
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
327,666✔
1274
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
327,666✔
1275
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
327,666✔
1276
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcPkSlotId));
327,666✔
1277
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triPkSlotId));
327,666✔
1278
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
327,666✔
1279
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
327,666✔
1280
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
327,666✔
1281

1282
  int32_t readerNum = 0;
163,833✔
1283
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
163,833✔
1284
  if (readerNum > 0) {
163,833✔
1285
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
160,166✔
1286
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
160,166✔
1287
  }
1288
  for (int32_t i = 0; i < readerNum; ++i) {
365,767✔
1289
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
201,934✔
1290
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
201,934✔
1291
  }
1292

1293
  int32_t runnerNum = 0;
163,833✔
1294
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
163,833✔
1295
  if (runnerNum > 0) {
163,833✔
1296
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
163,586✔
1297
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
163,586✔
1298
  }
1299
  for (int32_t i = 0; i < runnerNum; ++i) {
654,591✔
1300
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
490,758✔
1301
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
490,758✔
1302
  }
1303

1304
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
327,666✔
1305
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
327,666✔
1306
  if (!tDecodeIsEnd(pDecoder)) {
163,833✔
1307
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->precision));
327,666✔
1308
  }
1309

1310
_exit:
163,833✔
1311

1312
  return code;
163,833✔
1313
}
1314

1315

1316

1317
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
2,968,323✔
1318
  int32_t code = 0;
2,968,323✔
1319
  int32_t lino;
1320

1321
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
2,968,323✔
1322
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
5,936,646✔
1323
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
5,936,646✔
1324
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
5,936,646✔
1325
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
5,936,646✔
1326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
5,936,646✔
1327

1328
_exit:
2,968,323✔
1329

1330
  return code;
2,968,323✔
1331
}
1332

1333
void destroySStreamOutCols(void* p){
72,612✔
1334
  if (p == NULL) return;
72,612✔
1335
  SStreamOutCol* col = (SStreamOutCol*)p;
72,612✔
1336
  taosMemoryFreeClear(col->expr);
72,612✔
1337
}
1338

1339
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
610,275✔
1340
  int32_t code = 0;
610,275✔
1341
  int32_t lino;
1342

1343
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,220,550✔
1344
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1,220,550✔
1345
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
1,220,550✔
1346
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
1,220,550✔
1347
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
1,220,550✔
1348
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
1,220,550✔
1349
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
1,220,550✔
1350
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
1,220,550✔
1351

1352
  int32_t addrSize = 0;
610,275✔
1353
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
610,275✔
1354
  if (addrSize > 0) {
610,275✔
1355
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
37,329✔
1356
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
37,329✔
1357
  }
1358
  for (int32_t i = 0; i < addrSize; ++i) {
647,604✔
1359
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
37,329✔
1360
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
37,329✔
1361
  }
1362
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
1,220,550✔
1363

1364
  int32_t outColNum = 0;
610,275✔
1365
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
610,275✔
1366
  if (outColNum > 0) {
610,275✔
1367
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
610,275✔
1368
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
610,275✔
1369
  }
1370
  for (int32_t i = 0; i < outColNum; ++i) {
2,970,837✔
1371
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
2,360,562✔
1372
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
2,360,562✔
1373
  }
1374

1375
  int32_t outTagNum = 0;
610,275✔
1376
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
610,275✔
1377
  if (outTagNum > 0) {
610,275✔
1378
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
299,820✔
1379
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
299,820✔
1380
  }
1381
  for (int32_t i = 0; i < outTagNum; ++i) {
1,218,036✔
1382
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
607,761✔
1383
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
607,761✔
1384
  }
1385

1386
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
1,220,550✔
1387
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
1,220,550✔
1388

1389
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
1,220,550✔
1390
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
1,220,550✔
1391

1392
  int32_t forceOutColsSize = 0;
610,275✔
1393
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
610,275✔
1394
  if (forceOutColsSize > 0) {
610,275✔
1395
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
11,703✔
1396
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
11,703✔
1397
  }
1398
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
682,887✔
1399
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
72,612✔
1400

1401
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
145,224✔
1402
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
145,224✔
1403
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
145,224✔
1404
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
145,224✔
1405
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
145,224✔
1406
  }
1407

1408
  if (!tDecodeIsEnd(pDecoder)) {
610,275✔
1409
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
1,220,550✔
1410
  }
1411

1412
  // colCids and tagCids - always decode size, create array only if size > 0
1413
  // For backward compatibility, check if there's more data before decoding
1414
  if (!tDecodeIsEnd(pDecoder)) {
610,275✔
1415
    int32_t colCidsSize = 0;
610,275✔
1416
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &colCidsSize));
610,275✔
1417
    if (colCidsSize > 0 && colCidsSize <= TSDB_MAX_COLUMNS) {  // Sanity check
610,275✔
1418
      pMsg->colCids = taosArrayInit(colCidsSize, sizeof(int16_t));
8,175✔
1419
      TSDB_CHECK_NULL(pMsg->colCids, code, lino, _exit, terrno);
8,175✔
1420
      for (int32_t i = 0; i < colCidsSize; ++i) {
31,119✔
1421
        int16_t cid = 0;
22,944✔
1422
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
22,944✔
1423
        if (taosArrayPush(pMsg->colCids, &cid) == NULL) {
45,888✔
1424
          TAOS_CHECK_EXIT(terrno);
×
1425
        }
1426
      }
1427
    }
1428
  }
1429
  // Try to decode tagCids if there's more data
1430
  if (!tDecodeIsEnd(pDecoder)) {
610,275✔
1431
    int32_t tagCidsSize = 0;
610,275✔
1432
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &tagCidsSize));
610,275✔
1433
    if (tagCidsSize > 0 && tagCidsSize <= TSDB_MAX_TAGS) {  // Sanity check
610,275✔
1434
      pMsg->tagCids = taosArrayInit(tagCidsSize, sizeof(int16_t));
6,027✔
1435
      TSDB_CHECK_NULL(pMsg->tagCids, code, lino, _exit, terrno);
6,027✔
1436
      for (int32_t i = 0; i < tagCidsSize; ++i) {
16,527✔
1437
        int16_t cid = 0;
10,500✔
1438
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
10,500✔
1439
        if (taosArrayPush(pMsg->tagCids, &cid) == NULL) {
21,000✔
1440
          TAOS_CHECK_EXIT(terrno);
×
1441
        }
1442
      }
1443
    }
1444
  }
1445

1446
_exit:
608,553✔
1447

1448
  return code;
610,275✔
1449
}
1450

1451
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
1,240,354✔
1452
  int32_t code = 0;
1,240,354✔
1453
  int32_t lino;
1454

1455
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1,240,354✔
1456
  switch (pTask->task.type) {
1,240,354✔
1457
    case STREAM_READER_TASK:
466,246✔
1458
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
466,246✔
1459
      break;
466,246✔
1460
    case STREAM_TRIGGER_TASK:
163,833✔
1461
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
163,833✔
1462
      break;
163,833✔
1463
    case STREAM_RUNNER_TASK:
610,275✔
1464
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
610,275✔
1465
      break;
610,275✔
1466
    default:
×
1467
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1468
      break;
×
1469
  }
1470
  
1471
_exit:
1,240,354✔
1472

1473
  return code;
1,240,354✔
1474
}
1475

1476

1477
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
197,677✔
1478
  int32_t code = 0;
197,677✔
1479
  int32_t lino;
1480

1481
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
395,354✔
1482

1483
  int32_t readerNum = 0;
197,677✔
1484
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
197,677✔
1485
  if (readerNum > 0) {
197,677✔
1486
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
181,881✔
1487
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
181,881✔
1488
  }
1489
  for (int32_t i = 0; i < readerNum; ++i) {
663,923✔
1490
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
466,246✔
1491
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
466,246✔
1492
  }
1493

1494
  int32_t triggerTask = 0;
197,677✔
1495
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
197,677✔
1496
  if (triggerTask) {
197,677✔
1497
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
163,833✔
1498
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
163,833✔
1499
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
163,833✔
1500
  }
1501
  
1502
  int32_t runnerNum = 0;
197,677✔
1503
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
197,677✔
1504
  if (runnerNum > 0) {
197,677✔
1505
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
173,407✔
1506
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
173,407✔
1507
  }
1508
  for (int32_t i = 0; i < runnerNum; ++i) {
807,952✔
1509
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
610,275✔
1510
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
610,275✔
1511
  }
1512

1513
_exit:
197,677✔
1514

1515
  return code;
197,677✔
1516
}
1517

1518

1519
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
157,693✔
1520
  int32_t code = 0;
157,693✔
1521
  int32_t lino;
1522

1523
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
157,693✔
1524

1525
_exit:
157,693✔
1526

1527
  return code;
157,693✔
1528
}
1529

1530

1531
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
157,693✔
1532
  int32_t code = 0;
157,693✔
1533
  int32_t lino;
1534

1535
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
157,693✔
1536
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
157,693✔
1537

1538
_exit:
157,693✔
1539

1540
  return code;
157,693✔
1541
}
1542

1543

1544
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
292,324✔
1545
  int32_t code = 0;
292,324✔
1546
  int32_t lino;
1547

1548
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
292,324✔
1549
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
584,648✔
1550
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
584,648✔
1551

1552
_exit:
292,324✔
1553

1554
  return code;
292,324✔
1555
}
1556

1557

1558
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
292,324✔
1559
  int32_t code = 0;
292,324✔
1560
  int32_t lino;
1561

1562
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
292,324✔
1563
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
292,324✔
1564

1565
_exit:
292,324✔
1566

1567
  return code;
292,324✔
1568
}
1569

1570
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
3,720✔
1571
  int32_t code = 0;
3,720✔
1572
  int32_t lino;
1573

1574
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
7,440✔
1575
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
7,440✔
1576
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
7,440✔
1577

1578
_exit:
3,720✔
1579

1580
  return code;
3,720✔
1581
}
1582

1583
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
29,486✔
1584
  int32_t code = 0;
29,486✔
1585
  int32_t lino;
1586

1587
  switch (msgType) {
29,486✔
1588
    case STREAM_MSG_ORIGTBL_READER_INFO: {
23,115✔
1589
      int32_t vgNum = 0;
23,115✔
1590
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
23,115✔
1591
      if (vgNum > 0) {
23,115✔
1592
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
23,115✔
1593
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
23,115✔
1594
      }
1595
      for (int32_t i = 0; i < vgNum; ++i) {
67,958✔
1596
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
44,843✔
1597
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
44,843✔
1598
      }
1599

1600
      int32_t readerNum = 0;
23,115✔
1601
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
23,115✔
1602
      if (readerNum > 0) {
23,115✔
1603
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
7,996✔
1604
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
7,996✔
1605
      }
1606
      for (int32_t i = 0; i < readerNum; ++i) {
31,592✔
1607
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
8,477✔
1608
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
8,477✔
1609
      }
1610
      break;
23,115✔
1611
    }
1612
    case STREAM_MSG_UPDATE_RUNNER: {
×
1613
      int32_t runnerNum = 0;
×
1614
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1615
      if (runnerNum > 0) {
×
1616
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1617
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1618
      }
1619
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1620
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1621
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1622
      }
1623
      break;
×
1624
    }
1625
    case STREAM_MSG_USER_RECALC: {
3,720✔
1626
      int32_t recalcNum = 0;
3,720✔
1627
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
3,720✔
1628
      if (recalcNum > 0) {
3,720✔
1629
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
3,720✔
1630
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
3,720✔
1631
      }
1632
      for (int32_t i = 0; i < recalcNum; ++i) {
7,440✔
1633
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
3,720✔
1634
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
3,720✔
1635
      }
1636
      break;
3,720✔
1637
    }
1638
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
2,651✔
1639
      int32_t rspNum = 0, vgNum = 0;
2,651✔
1640
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));  
2,651✔
1641
      if (rspNum > 0) {
2,651✔
1642
        pCont->execRspList = taosArrayInit_s(sizeof(SStreamOReaderDeployRsp), rspNum);
2,651✔
1643
        TSDB_CHECK_NULL(pCont->execRspList, code, lino, _exit, terrno);
2,651✔
1644
      }
1645
      for (int32_t i = 0; i < rspNum; ++i) {
5,302✔
1646
        SStreamOReaderDeployRsp *pDeployRsp = taosArrayGet(pCont->execRspList, i);
2,651✔
1647
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pDeployRsp->execId));  
5,302✔
1648
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));
2,651✔
1649
        if (vgNum > 0) {
2,651✔
1650
          pDeployRsp->vgList = taosArrayInit_s(sizeof(SStreamTaskAddr), vgNum);
2,651✔
1651
          TSDB_CHECK_NULL(pDeployRsp->vgList, code, lino, _exit, terrno);
2,651✔
1652
        }
1653
        for (int32_t n = 0; n < vgNum; ++n) {
5,302✔
1654
          SStreamTaskAddr* pAddr = taosArrayGet(pDeployRsp->vgList, n);
2,651✔
1655
          TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));  
2,651✔
1656
        }
1657
      }
1658
      break;
2,651✔
1659
    }
1660
    default:
×
1661
      break;
×
1662
  }
1663

1664
_exit:
29,486✔
1665

1666
  return code;
29,486✔
1667
}
1668

1669

1670
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
29,486✔
1671
  int32_t code = 0;
29,486✔
1672
  int32_t lino;
1673

1674
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
29,486✔
1675
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
58,972✔
1676
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
58,972✔
1677
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
29,486✔
1678
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
29,486✔
1679

1680
_exit:
29,486✔
1681

1682
  return code;
29,486✔
1683
}
1684

1685
void tFreeSStreamOReaderDeployRsp(void* param) {
5,302✔
1686
  if (NULL == param) {
5,302✔
1687
    return;
×
1688
  }
1689

1690
  SStreamOReaderDeployRsp* pRsp = (SStreamOReaderDeployRsp*)param;
5,302✔
1691
  taosArrayDestroy(pRsp->vgList);
5,302✔
1692
}
1693

1694
void tFreeSStreamMgmtRsp(void* param) {
58,972✔
1695
  if (NULL == param) {
58,972✔
1696
    return;
×
1697
  }
1698
  
1699
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
58,972✔
1700

1701
  taosArrayDestroy(pRsp->cont.vgIds);
58,972✔
1702
  taosArrayDestroy(pRsp->cont.readerList);
58,972✔
1703
  taosArrayDestroy(pRsp->cont.runnerList);
58,972✔
1704
  taosArrayDestroy(pRsp->cont.recalcList);
58,972✔
1705
  taosArrayDestroyEx(pRsp->cont.execRspList, tFreeSStreamOReaderDeployRsp);
58,972✔
1706
}
1707

1708
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
466,246✔
1709
  if (NULL == pReader) {
466,246✔
1710
    return;
×
1711
  }
1712
  
1713
  if (pReader->triggerReader) {
466,246✔
1714
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
211,223✔
1715
    taosMemoryFree(pMsg->triggerTblName);
211,223✔
1716
    taosMemoryFree(pMsg->partitionCols);
211,223✔
1717
    taosMemoryFree(pMsg->triggerCols);
211,223✔
1718
    taosMemoryFree(pMsg->triggerScanPlan);
211,223✔
1719
    taosMemoryFree(pMsg->calcCacheScanPlan);
211,223✔
1720
  } else {
1721
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
255,023✔
1722
    taosMemoryFree(pMsg->calcScanPlan);
255,023✔
1723
  }
1724
}
1725

1726
void tFreeStreamNotifyUrl(void* param) {
×
1727
  if (NULL == param) {
×
1728
    return;
×
1729
  }
1730

1731
  taosMemoryFree(*(void**)param);
×
1732
}
1733

1734
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
163,833✔
1735
  if (NULL == pTrigger) {
163,833✔
1736
    return;
×
1737
  }
1738
  
1739
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
163,833✔
1740
  switch (pTrigger->triggerType) {
163,833✔
1741
    case WINDOW_TYPE_STATE:
65,445✔
1742
      taosMemoryFree(pTrigger->trigger.stateWin.zeroth);
65,445✔
1743
      taosMemoryFree(pTrigger->trigger.stateWin.expr);
65,445✔
1744
      break;
65,445✔
1745
    case WINDOW_TYPE_EVENT:
16,304✔
1746
      taosMemoryFree(pTrigger->trigger.event.startCond);
16,304✔
1747
      taosMemoryFree(pTrigger->trigger.event.endCond);
16,304✔
1748
      break;
16,304✔
1749
    case WINDOW_TYPE_COUNT:
14,259✔
1750
      taosMemoryFree(pTrigger->trigger.count.condCols);  
14,259✔
1751
      break;
14,259✔
1752
    default:
67,825✔
1753
      break;
67,825✔
1754
  }
1755

1756
  taosMemoryFree(pTrigger->partitionCols);
163,833✔
1757
  taosMemoryFree(pTrigger->triggerPrevFilter);
163,833✔
1758
  taosMemoryFree(pTrigger->triggerScanPlan);
163,833✔
1759
  taosMemoryFree(pTrigger->calcCacheScanPlan);
163,833✔
1760

1761
  taosArrayDestroy(pTrigger->readerList);
163,833✔
1762
  taosArrayDestroy(pTrigger->runnerList);
163,833✔
1763
  taosMemoryFree(pTrigger->streamName);
163,833✔
1764
}
1765

1766
void tFreeSStreamOutCol(void* param) {
×
1767
  if (NULL == param) {
×
1768
    return;
×
1769
  }
1770

1771
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1772
  taosMemoryFree(pOut->expr);
×
1773
}
1774

1775
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
610,275✔
1776
  if (NULL == pRunner) {
610,275✔
1777
    return;
×
1778
  }
1779

1780
  taosMemoryFree(pRunner->streamName);
610,275✔
1781
  taosMemoryFree(pRunner->pPlan);
610,275✔
1782
  taosMemoryFree(pRunner->outDBFName);
610,275✔
1783
  taosMemoryFree(pRunner->outTblName);
610,275✔
1784

1785
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
610,275✔
1786
  taosArrayDestroy(pRunner->outCols);
610,275✔
1787
  taosArrayDestroy(pRunner->outTags);
610,275✔
1788

1789
  taosMemoryFree(pRunner->subTblNameExpr);
610,275✔
1790
  taosMemoryFree(pRunner->tagValueExpr);
610,275✔
1791
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
610,275✔
1792
}
1793

1794
void tFreeSStmTaskDeploy(void* param) {
1,473,404✔
1795
  if (NULL == param) {
1,473,404✔
1796
    return;
233,050✔
1797
  }
1798

1799
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
1,240,354✔
1800
  switch (pTask->task.type)  {
1,240,354✔
1801
    case STREAM_READER_TASK:
466,246✔
1802
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
466,246✔
1803
      break;
466,246✔
1804
    case STREAM_TRIGGER_TASK:
163,833✔
1805
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
163,833✔
1806
      break;
163,833✔
1807
    case STREAM_RUNNER_TASK:
610,275✔
1808
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
610,275✔
1809
      break;
610,275✔
1810
    default:
×
1811
      break;
×
1812
  }
1813
}
1814

1815

1816
void tFreeSStmStreamDeploy(void* param) {
199,206✔
1817
  if (NULL == param) {
199,206✔
1818
    return;
×
1819
  }
1820
  
1821
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
199,206✔
1822
  int32_t readerNum = taosArrayGetSize(pDeploy->readerTasks);
199,206✔
1823
  for (int32_t i = 0; i < readerNum; ++i) {
672,028✔
1824
    SStmTaskDeploy* pReader = taosArrayGet(pDeploy->readerTasks, i);
472,822✔
1825
    if (!pReader->msg.reader.triggerReader && pReader->msg.reader.msg.calc.freeScanPlan) {
472,822✔
1826
      taosMemoryFreeClear(pReader->msg.reader.msg.calc.calcScanPlan);
246,134✔
1827
    }
1828
  }
1829
  taosArrayDestroy(pDeploy->readerTasks);
199,206✔
1830

1831
  if (pDeploy->triggerTask) {
199,206✔
1832
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
165,438✔
1833
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
165,438✔
1834
    taosMemoryFree(pDeploy->triggerTask);
165,438✔
1835
  }
1836

1837
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
199,206✔
1838
  for (int32_t i = 0; i < runnerNum; ++i) {
814,282✔
1839
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
615,076✔
1840
    taosMemoryFree(pRunner->msg.runner.pPlan);
615,076✔
1841
  }
1842
  taosArrayDestroy(pDeploy->runnerTasks);
199,206✔
1843
}
1844

1845
void tDeepFreeSStmStreamDeploy(void* param) {
396,883✔
1846
  if (NULL == param) {
396,883✔
1847
    return;
×
1848
  }
1849
  
1850
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
396,883✔
1851
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
396,883✔
1852
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
396,883✔
1853
  taosMemoryFree(pDeploy->triggerTask);
396,883✔
1854
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
396,883✔
1855
}
1856

1857

1858
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
34,916,440✔
1859
  if (NULL == pRsp) {
34,916,440✔
1860
    return;
×
1861
  }
1862
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
34,916,440✔
1863
  taosArrayDestroy(pRsp->start.taskList);
34,916,440✔
1864
  taosArrayDestroy(pRsp->undeploy.taskList);
34,916,440✔
1865
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
34,916,440✔
1866
}
1867

1868
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
17,231,379✔
1869
  if (NULL == pRsp) {
17,231,379✔
1870
    return;
×
1871
  }
1872
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
17,231,379✔
1873
  taosArrayDestroy(pRsp->start.taskList);
17,231,379✔
1874
  taosArrayDestroy(pRsp->undeploy.taskList);
17,231,379✔
1875
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
17,231,379✔
1876
}
1877

1878

1879

1880
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
17,231,379✔
1881
  int32_t code = 0;
17,231,379✔
1882
  int32_t lino;
1883

1884
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
17,231,379✔
1885
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
34,462,758✔
1886
  int32_t deployNum = 0;
17,231,379✔
1887
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
17,231,379✔
1888
  if (deployNum > 0) {
17,231,379✔
1889
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
69,421✔
1890
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
69,421✔
1891
  }
1892
  for (int32_t i = 0; i < deployNum; ++i) {
17,429,056✔
1893
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
197,677✔
1894
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
197,677✔
1895
  }
1896

1897
  int32_t startNum = 0;
17,231,379✔
1898
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
17,231,379✔
1899
  if (startNum > 0) {
17,231,379✔
1900
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
93,872✔
1901
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
93,872✔
1902
  }
1903
  for (int32_t i = 0; i < startNum; ++i) {
17,389,072✔
1904
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
157,693✔
1905
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
157,693✔
1906
  }
1907

1908
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
34,462,758✔
1909
  if (!pRsp->undeploy.undeployAll) {
17,231,379✔
1910
    int32_t undeployNum = 0;
17,231,379✔
1911
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
17,231,379✔
1912
    if (undeployNum > 0) {
17,231,379✔
1913
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
41,299✔
1914
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
41,299✔
1915
    }
1916
    for (int32_t i = 0; i < undeployNum; ++i) {
17,523,703✔
1917
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
292,324✔
1918
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
292,324✔
1919
    }
1920
  }  
1921

1922
  int32_t rspNum = 0;
17,231,379✔
1923
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
17,231,379✔
1924
  if (rspNum > 0) {
17,231,379✔
1925
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
14,090✔
1926
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
14,090✔
1927
    for (int32_t i = 0; i < rspNum; ++i) {
43,576✔
1928
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
29,486✔
1929
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
29,486✔
1930
    }
1931
  }
1932

1933
  tEndDecode(pDecoder);
17,231,379✔
1934

1935
_exit:
17,231,379✔
1936
  return code;
17,231,379✔
1937
}
1938

1939
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
1940
  int32_t code = 0;
×
1941
  int32_t lino;
1942

1943
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
1944
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
1945
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
1946
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
1947
  tEndEncode(pEncoder);
×
1948

1949
_exit:
×
1950
  return code;
×
1951
}
1952

1953
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
1954
  int32_t code = 0;
×
1955
  int32_t lino;
1956

1957
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
1958
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
1959
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
1960
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
1961
  tEndDecode(pDecoder);
×
1962

1963
_exit:
×
1964
  return code;
×
1965
}
1966

1967
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
1968
  int32_t code = 0;
×
1969
  int32_t lino;
1970

1971
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
1972
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
1973
  tEndEncode(pEncoder);
×
1974

1975
_exit:
×
1976
  return code;
×
1977
}
1978

1979
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
1980
  int32_t code = 0;
×
1981
  int32_t lino;
1982

1983
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
1984
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
1985
  tEndDecode(pDecoder);
×
1986

1987
_exit:
×
1988
  return code;
×
1989

1990
}
1991

1992

1993
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
1,496,488✔
1994
  int32_t code = TSDB_CODE_SUCCESS;
1,496,488✔
1995
  int32_t lino = 0;
1,496,488✔
1996

1997
  char*   json = NULL;
1,496,488✔
1998
  int32_t jsonLen = 0;
1,496,488✔
1999
  TAOS_CHECK_EXIT(scmCreateStreamReqToJson(pReq, false, &json, &jsonLen));
1,496,488✔
2000
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, json, jsonLen));
2,992,976✔
2001

2002
_exit:
1,496,488✔
2003
  taosMemoryFreeClear(json);
1,496,488✔
2004
  if (code) {
1,496,488✔
2005
    return code;
×
2006
  }
2007
  
2008
  return 0;
1,496,488✔
2009
}
2010

2011
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
839,450✔
2012
  SEncoder encoder = {0};
839,450✔
2013
  tEncoderInit(&encoder, buf, bufLen);
839,450✔
2014
  int32_t code = 0;
839,450✔
2015
  int32_t lino;
2016

2017
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
839,450✔
2018

2019
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
839,450✔
2020

2021
  tEndEncode(&encoder);
839,450✔
2022

2023
_exit:
839,450✔
2024
  if (code) {
839,450✔
2025
    tEncoderClear(&encoder);
×
2026
    return code;
×
2027
  } else {
2028
    int32_t tlen = encoder.pos;
839,450✔
2029
    tEncoderClear(&encoder);
839,450✔
2030
    return tlen;
839,450✔
2031
  }
2032
  return 0;
2033
}
2034

2035
// Old version deserialization for backward compatibility,
2036
// especially for stream version number 7
2037
int32_t tDeserializeSCMCreateStreamReqImplOld(SDecoder *pDecoder, SCMCreateStreamReq *pReq, int32_t leftBytes) {
×
2038
  int32_t code = 0;
×
2039
  int32_t lino;
2040
  pReq->calcPkSlotId = -1;
×
2041
  pReq->triPkSlotId = -1;
×
2042

2043
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2044

2045
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
×
2046
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
×
2047
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
×
2048
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
×
2049
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
×
2050
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
×
2051
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
×
2052

2053
  int32_t calcDbSize = 0;
×
2054
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
×
2055
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
×
2056
  if (pReq->calcDB == NULL) {
×
2057
    TAOS_CHECK_EXIT(terrno);
×
2058
  }
2059
  for (int32_t i = 0; i < calcDbSize; ++i) {
×
2060
    char *calcDb = NULL;
×
2061
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
×
2062
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
×
2063
    if (calcDb == NULL) {
×
2064
      TAOS_CHECK_EXIT(terrno);
×
2065
    }
2066
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
×
2067
      taosMemoryFree(calcDb);
×
2068
      TAOS_CHECK_EXIT(terrno);
×
2069
    }
2070
  }
2071

2072
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
×
2073
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
×
2074
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
×
2075
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
×
2076
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
×
2077
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
×
2078
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
×
2079
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
×
2080
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
×
2081
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
×
2082

2083
  int32_t addrSize = 0;
×
2084
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
×
2085
  if (addrSize > 0) {
×
2086
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
×
2087
    if (pReq->pNotifyAddrUrls == NULL) {
×
2088
      TAOS_CHECK_EXIT(terrno);
×
2089
    }
2090
  }
2091
  for (int32_t i = 0; i < addrSize; ++i) {
×
2092
    char *url = NULL;
×
2093
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
2094
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
2095
    if (url == NULL) {
×
2096
      TAOS_CHECK_EXIT(terrno);
×
2097
    }
2098
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
×
2099
      taosMemoryFree(url);
×
2100
      TAOS_CHECK_EXIT(terrno);
×
2101
    }
2102
  }
2103
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
×
2104
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->addOptions));
×
2105
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
×
2106

2107
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
×
2108
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
×
2109
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
×
2110

2111
  int32_t outColSize = 0;
×
2112
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
×
2113
  if (outColSize > 0) {
×
2114
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
×
2115
    if (pReq->outCols == NULL) {
×
2116
      TAOS_CHECK_EXIT(terrno);
×
2117
    }
2118

2119
    for (int32_t i = 0; i < outColSize; ++i) {
×
2120
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
×
2121
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
×
2122
    }
2123
  }
2124

2125
  int32_t outTagSize = 0;
×
2126
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
×
2127
  if (outTagSize > 0) {
×
2128
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
×
2129
    if (pReq->outTags == NULL) {
×
2130
      TAOS_CHECK_EXIT(terrno);
×
2131
    }
2132

2133
    for (int32_t i = 0; i < outTagSize; ++i) {
×
2134
      SFieldWithOptions field = {0};
×
2135
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
×
2136
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
×
2137
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
×
2138
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
×
2139
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
×
2140
        TAOS_CHECK_EXIT(terrno);
×
2141
      }
2142
    }
2143
  }
2144

2145
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
×
2146
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
×
2147
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
×
2148
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
×
2149

2150
  switch (pReq->triggerType) {
×
2151
    case WINDOW_TYPE_SESSION: {
×
2152
      // session trigger
2153
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
×
2154
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
×
2155
      break;
×
2156
    }
2157
      case WINDOW_TYPE_STATE: {
×
2158
        // state trigger
2159
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
×
2160
        pReq->trigger.stateWin.extend = 0;
×
2161
        pReq->trigger.stateWin.trueForType = 0;
×
2162
        pReq->trigger.stateWin.trueForCount = 0;
×
2163
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
×
2164
        break;
×
2165
      }
2166
      case WINDOW_TYPE_INTERVAL: {
×
2167
        // slide trigger
2168
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
×
2169
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
×
2170
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
×
2171
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
×
2172
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
×
2173
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
×
2174
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
×
2175
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
×
2176
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
×
2177
        break;
×
2178
      }
2179
      case WINDOW_TYPE_EVENT: {
×
2180
        // event trigger
2181
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
×
2182
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
×
2183
        pReq->trigger.event.trueForType = 0;
×
2184
        pReq->trigger.event.trueForCount = 0;
×
2185
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
×
2186
        break;
×
2187
      }
2188
      case WINDOW_TYPE_COUNT: {
×
2189
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
×
2190

2191
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
×
2192
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
×
2193
        break;
×
2194
      }
2195
      case WINDOW_TYPE_PERIOD: {
×
2196
        // period trigger
2197
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
×
2198
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
×
2199
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
×
2200
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
×
2201
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
×
2202
        break;
×
2203
      }
2204
      default:
×
2205
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2206
  }
2207

2208
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
×
2209
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
×
2210
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
×
2211
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
×
2212
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
×
2213
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
×
2214
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
×
2215
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
×
2216
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
×
2217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
×
2218
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
×
2219
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
×
2220
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
×
2221
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
×
2222

2223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
×
2224
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
×
2225

2226
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
×
2227

2228
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
×
2229
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
×
2230

2231
  int32_t calcScanPlanListSize = 0;
×
2232
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
×
2233
  if (calcScanPlanListSize > 0) {
×
2234
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
×
2235
    if (pReq->calcScanPlanList == NULL) {
×
2236
      TAOS_CHECK_EXIT(terrno);
×
2237
    }
2238
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
×
2239
      SStreamCalcScan calcScan = {0};
×
2240
      int32_t         vgListSize = 0;
×
2241
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
×
2242
      if (vgListSize > 0) {
×
2243
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
×
2244
        if (calcScan.vgList == NULL) {
×
2245
          TAOS_CHECK_EXIT(terrno);
×
2246
        }
2247
        for (int32_t j = 0; j < vgListSize; ++j) {
×
2248
          int32_t vgId = 0;
×
2249
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
×
2250
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
×
2251
            TAOS_CHECK_EXIT(terrno);
×
2252
          }
2253
        }
2254
      }
2255
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
×
2256
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
×
2257
      if (taosArrayPush(pReq->calcScanPlanList, &calcScan) == NULL) {
×
2258
        TAOS_CHECK_EXIT(terrno);
×
2259
      }
2260
    }
2261
  }
2262

2263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
×
2264
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
×
2265
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
×
2266
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
×
2267

2268
  int32_t forceOutColsSize = 0;
×
2269
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
×
2270
  if (forceOutColsSize > 0) {
×
2271
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
×
2272
    if (pReq->forceOutCols == NULL) {
×
2273
      TAOS_CHECK_EXIT(terrno);
×
2274
    }
2275
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
×
2276
      SStreamOutCol outCol = {0};
×
2277
      int64_t       exprLen = 0;
×
2278
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
×
2279
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
×
2280
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
×
2281
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
×
2282
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
×
2283
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
×
2284
        TAOS_CHECK_EXIT(terrno);
×
2285
      }
2286
    }
2287
  }
2288

2289
  // LeftBytes is the size of all fields at the tail of SStreamObj.
2290
  // If there are more data in the buffer, then it means
2291
  // the new fields are added in SStreamObj, need to decode them.
2292
  if (pDecoder->size - pDecoder->pos > leftBytes) {
×
2293
    switch (pReq->triggerType) {
×
2294
      case WINDOW_TYPE_STATE: {
×
2295
        // state trigger
2296
        if (!tDecodeIsEnd(pDecoder)) {
×
2297
          TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.stateWin.expr, NULL));
×
2298
        }
2299
        if (!tDecodeIsEnd(pDecoder)) {
×
2300
          TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.extend));
×
2301
        }
2302
        break;
×
2303
      }
2304
      case WINDOW_TYPE_INTERVAL: {
×
2305
        if (!tDecodeIsEnd(pDecoder)) {
×
2306
          TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.overlap));
×
2307
        }
2308
        break;
×
2309
      }
2310
      default:
×
2311
        break;
×
2312
    }
2313
  }
2314

2315
  if (pDecoder->size - pDecoder->pos > leftBytes) {
×
2316
    if (!tDecodeIsEnd(pDecoder)) {
×
2317
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pReq->triggerPrec));
×
2318
    }
2319
  }
2320

2321
_exit:
×
2322

2323
  return code;
×
2324
}
2325

2326
// New deserialization using JSON
2327
// start from taosd ver-3.3.8.6, stream version number 8
2328
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
398,703✔
2329
  int32_t code = 0;
398,703✔
2330
  int32_t lino;
2331

2332
  char* json = NULL;
398,703✔
2333
  SJson* pJson = NULL;
398,703✔
2334
  TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &json));
398,703✔
2335
  pJson = tjsonParse(json);
398,703✔
2336
  if (pJson == NULL) {
398,703✔
2337
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INVALID_JSON);
×
2338
  }
2339
  TAOS_CHECK_EXIT(jsonToSCMCreateStreamReq(pJson, pReq));
398,703✔
2340

2341
_exit:
398,703✔
2342
  taosMemoryFreeClear(json);
398,703✔
2343
  if (NULL != pJson) {
398,703✔
2344
    tjsonDelete(pJson);
398,703✔
2345
  }
2346

2347
  return code;
398,703✔
2348
}
2349

2350

2351
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
194,429✔
2352
  SDecoder decoder = {0};
194,429✔
2353
  tDecoderInit(&decoder, buf, bufLen);
194,429✔
2354
  int32_t code = 0;
194,429✔
2355
  int32_t lino;
2356

2357
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
194,429✔
2358
  
2359
  code = tDeserializeSCMCreateStreamReqImpl(&decoder, pReq);
194,429✔
2360
  if (TSDB_CODE_MND_STREAM_INVALID_JSON == code) {
194,429✔
2361
    uError("invalid json for stream create request, try old deserialization");
×
2362
    // try old deserialization for backward compatibility
2363
    tDecoderClear(&decoder);
×
2364
    tDecoderInit(&decoder, buf, bufLen);
×
2365
    TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
2366
    TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImplOld(&decoder, pReq, 0));
×
2367
  }
2368

2369
  tEndDecode(&decoder);
194,429✔
2370

2371
_exit:
194,429✔
2372

2373
  tDecoderClear(&decoder);
194,429✔
2374
  return code;
194,429✔
2375
}
2376

2377

2378
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
56,156✔
2379
  int32_t  code = 0;
56,156✔
2380
  int32_t  lino;
2381
  int32_t  tlen;
2382
  SEncoder encoder = {0};
56,156✔
2383
  tEncoderInit(&encoder, buf, bufLen);
56,156✔
2384

2385
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
56,156✔
2386

2387
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->count));
112,312✔
2388
  for (int32_t i = 0; i < pReq->count; i++) {
118,960✔
2389
    int32_t nameLen = pReq->name[i] == NULL ? 0 : (int32_t)strlen(pReq->name[i]) + 1;
62,804✔
2390
    TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name[i], nameLen));
125,608✔
2391
  }
2392
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
112,312✔
2393

2394
  tEndEncode(&encoder);
56,156✔
2395

2396
_exit:
56,156✔
2397
  if (code) {
56,156✔
2398
    tlen = code;
×
2399
  } else {
2400
    tlen = encoder.pos;
56,156✔
2401
  }
2402
  tEncoderClear(&encoder);
56,156✔
2403
  return tlen;
56,156✔
2404
}
2405

2406
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
21,833✔
2407
  SDecoder decoder = {0};
21,833✔
2408
  int32_t  code = 0;
21,833✔
2409
  int32_t  lino;
2410
  tDecoderInit(&decoder, buf, bufLen);
21,833✔
2411

2412
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
21,833✔
2413
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->count));
43,666✔
2414
  if (pReq->count > 0) {
21,833✔
2415
    pReq->name = taosMemoryCalloc(pReq->count, sizeof(char*));
21,833✔
2416
    if (pReq->name == NULL) {
21,833✔
2417
      code = terrno;
×
2418
      goto _exit;
×
2419
    }
2420
    for (int32_t i = 0; i < pReq->count; i++) {
46,990✔
2421
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name[i], NULL));
50,314✔
2422
    }
2423
  }
2424
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
43,666✔
2425

2426
  tEndDecode(&decoder);
21,833✔
2427

2428
_exit:
21,833✔
2429
  tDecoderClear(&decoder);
21,833✔
2430
  return code;
21,833✔
2431
}
2432

2433
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
50,847✔
2434
  if (NULL == pReq) {
50,847✔
2435
    return;
×
2436
  }
2437
  if (pReq->name) {
50,847✔
2438
    for (int32_t i = 0; i < pReq->count; i++) {
108,342✔
2439
      taosMemoryFreeClear(pReq->name[i]);
57,495✔
2440
    }
2441
    taosMemoryFreeClear(pReq->name);
50,847✔
2442
  }
2443
}
2444

2445
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
2,003,913✔
2446
  if (pScan == NULL) {
2,003,913✔
2447
    return;
×
2448
  }
2449
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
2,003,913✔
2450
  taosArrayDestroy(pCalcScan->vgList);
2,003,913✔
2451
  taosMemoryFreeClear(pCalcScan->scanPlan);
2,003,913✔
2452
}
2453

2454
void tFreeStreamOutCol(void* pCol) {
87,668✔
2455
  if (pCol == NULL) {
87,668✔
2456
    return;
×
2457
  }
2458
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
87,668✔
2459
  taosMemoryFreeClear(pOutCol->expr);
87,668✔
2460
}
2461

2462

2463

2464
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
1,247,706✔
2465
  if (NULL == pReq) {
1,247,706✔
2466
    return;
167,897✔
2467
  }
2468
  taosMemoryFreeClear(pReq->name);
1,079,809✔
2469
  taosMemoryFreeClear(pReq->sql);
1,079,809✔
2470
  taosMemoryFreeClear(pReq->streamDB);
1,079,809✔
2471
  taosMemoryFreeClear(pReq->triggerDB);
1,079,809✔
2472
  taosMemoryFreeClear(pReq->outDB);
1,079,809✔
2473
  taosMemoryFreeClear(pReq->triggerTblName);
1,079,809✔
2474
  taosMemoryFreeClear(pReq->outTblName);
1,079,809✔
2475

2476
  taosArrayDestroyP(pReq->calcDB, NULL);
1,079,809✔
2477
  pReq->calcDB = NULL;
1,079,809✔
2478
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
1,079,809✔
2479
  pReq->pNotifyAddrUrls = NULL;
1,079,809✔
2480

2481
  taosMemoryFreeClear(pReq->triggerFilterCols);
1,079,809✔
2482
  taosMemoryFreeClear(pReq->triggerCols);
1,079,809✔
2483
  taosMemoryFreeClear(pReq->partitionCols);
1,079,809✔
2484

2485
  taosArrayDestroy(pReq->outTags);
1,079,809✔
2486
  pReq->outTags = NULL;
1,079,809✔
2487
  taosArrayDestroy(pReq->outCols);
1,079,809✔
2488
  pReq->outCols = NULL;
1,079,809✔
2489

2490
  switch (pReq->triggerType) {
1,079,809✔
2491
    case WINDOW_TYPE_STATE:
320,194✔
2492
      taosMemoryFreeClear(pReq->trigger.stateWin.zeroth);
320,194✔
2493
      taosMemoryFreeClear(pReq->trigger.stateWin.expr);
320,194✔
2494
      break;
320,194✔
2495
    case WINDOW_TYPE_EVENT:
98,920✔
2496
      taosMemoryFreeClear(pReq->trigger.event.startCond);
98,920✔
2497
      taosMemoryFreeClear(pReq->trigger.event.endCond);
98,920✔
2498
      break;
98,920✔
2499
    default:
660,695✔
2500
      break;
660,695✔
2501
  }
2502

2503
  taosMemoryFreeClear(pReq->triggerScanPlan);
1,079,809✔
2504
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
1,079,809✔
2505
  pReq->calcScanPlanList = NULL;
1,079,809✔
2506
  taosMemoryFreeClear(pReq->triggerPrevFilter);
1,079,809✔
2507

2508
  taosMemoryFreeClear(pReq->calcPlan);
1,079,809✔
2509
  taosMemoryFreeClear(pReq->subTblNameExpr);
1,079,809✔
2510
  taosMemoryFreeClear(pReq->tagValueExpr);
1,079,809✔
2511
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
1,079,809✔
2512
  pReq->forceOutCols = NULL;
1,079,809✔
2513
  taosArrayDestroy(pReq->colCids);
1,079,809✔
2514
  pReq->colCids = NULL;
1,079,809✔
2515
  taosArrayDestroy(pReq->tagCids);
1,079,809✔
2516
  pReq->tagCids = NULL;
1,079,809✔
2517
}
2518

2519
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
156,583✔
2520
  int32_t code = 0, lino = 0;
156,583✔
2521
  if (NULL == pSrc) {
156,583✔
2522
    return code;
×
2523
  } 
2524

2525
  void* p = NULL;
156,583✔
2526
  int32_t num = 0;
156,583✔
2527
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
156,583✔
2528
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
156,583✔
2529

2530
  SCMCreateStreamReq* pDst = *ppDst;
156,583✔
2531

2532
  if (pSrc->outDB) {
156,583✔
2533
    pDst->outDB = COPY_STR(pSrc->outDB);
156,336✔
2534
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
156,336✔
2535
  }
2536
  
2537
  if (pSrc->triggerTblName) {
156,583✔
2538
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
152,916✔
2539
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
152,916✔
2540
  }
2541
  
2542
  if (pSrc->outTblName) {
156,583✔
2543
    pDst->outTblName = COPY_STR(pSrc->outTblName);
156,336✔
2544
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
156,336✔
2545
  }
2546
  
2547
  if (pSrc->pNotifyAddrUrls) {
156,583✔
2548
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
12,690✔
2549
    if (num > 0) {
12,690✔
2550
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
12,690✔
2551
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
12,690✔
2552
    }
2553
    for (int32_t i = 0; i < num; ++i) {
25,380✔
2554
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
12,690✔
2555
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
12,690✔
2556
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
25,380✔
2557
    }
2558
  }
2559
  
2560
  if (pSrc->triggerFilterCols) {
156,583✔
2561
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
7,024✔
2562
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
7,024✔
2563
  }
2564
  
2565
  if (pSrc->triggerCols) {
156,583✔
2566
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
151,287✔
2567
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
151,287✔
2568
  }
2569
  
2570
  if (pSrc->partitionCols) {
156,583✔
2571
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
71,376✔
2572
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
71,376✔
2573
  }
2574
  
2575
  if (pSrc->outCols) {
156,583✔
2576
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
156,336✔
2577
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
156,336✔
2578
  }
2579
  
2580
  if (pSrc->outTags) {
156,583✔
2581
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
71,376✔
2582
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
71,376✔
2583
  }
2584

2585
  pDst->triggerType = pSrc->triggerType;
156,583✔
2586
  
2587
  switch (pSrc->triggerType) {
156,583✔
2588
    case WINDOW_TYPE_STATE:
66,607✔
2589
      pDst->trigger.stateWin.slotId = pSrc->trigger.stateWin.slotId;
66,607✔
2590
      pDst->trigger.stateWin.extend = pSrc->trigger.stateWin.extend;
66,607✔
2591
      pDst->trigger.stateWin.trueForType = pSrc->trigger.stateWin.trueForType;
66,607✔
2592
      pDst->trigger.stateWin.trueForCount = pSrc->trigger.stateWin.trueForCount;
66,607✔
2593
      pDst->trigger.stateWin.trueForDuration = pSrc->trigger.stateWin.trueForDuration;
66,607✔
2594
      if (pSrc->trigger.stateWin.zeroth) {
66,607✔
2595
        pDst->trigger.stateWin.zeroth = COPY_STR(pSrc->trigger.stateWin.zeroth);
×
2596
        TSDB_CHECK_NULL(pDst->trigger.stateWin.zeroth, code, lino, _exit, terrno);
×
2597
      }
2598
      if (pSrc->trigger.stateWin.expr) {
66,607✔
2599
        pDst->trigger.stateWin.expr = COPY_STR(pSrc->trigger.stateWin.expr);
66,607✔
2600
        TSDB_CHECK_NULL(pDst->trigger.stateWin.expr, code, lino, _exit, terrno);
66,607✔
2601
      }
2602
      break;
66,607✔
2603
    case WINDOW_TYPE_EVENT:
15,522✔
2604
      if (pSrc->trigger.event.startCond) {
15,522✔
2605
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
15,522✔
2606
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
15,522✔
2607
      }
2608
      
2609
      if (pSrc->trigger.event.endCond) {
15,522✔
2610
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
13,378✔
2611
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
13,378✔
2612
      }
2613
      pDst->trigger.event.trueForType = pSrc->trigger.event.trueForType;
15,522✔
2614
      pDst->trigger.event.trueForCount = pSrc->trigger.event.trueForCount;
15,522✔
2615
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
15,522✔
2616
      break;
15,522✔
2617
    case WINDOW_TYPE_COUNT:
14,259✔
2618
      pDst->trigger.count.countVal = pSrc->trigger.count.countVal;
14,259✔
2619
      pDst->trigger.count.sliding = pSrc->trigger.count.sliding;
14,259✔
2620
      if (pSrc->trigger.count.condCols) {
14,259✔
2621
        pDst->trigger.count.condCols = COPY_STR(pSrc->trigger.count.condCols);
×
2622
        TSDB_CHECK_NULL(pDst->trigger.count.condCols, code, lino, _exit, terrno);
×
2623
      }
2624
      break;
14,259✔
2625
    default:
60,195✔
2626
      pDst->trigger = pSrc->trigger;
60,195✔
2627
      break;
60,195✔
2628
  }
2629

2630

2631
  if (pSrc->triggerScanPlan) {
156,583✔
2632
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
152,916✔
2633
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
152,916✔
2634
  }
2635
  
2636
  if (pSrc->calcScanPlanList) {
156,583✔
2637
    num = taosArrayGetSize(pSrc->calcScanPlanList);
156,336✔
2638
    if (num > 0) {
156,336✔
2639
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
156,336✔
2640
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
156,336✔
2641
    }
2642
    for (int32_t i = 0; i < num; ++i) {
447,683✔
2643
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
291,347✔
2644
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
291,347✔
2645

2646
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
291,347✔
2647
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
291,347✔
2648

2649
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
291,347✔
2650
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
291,347✔
2651
      
2652
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
582,694✔
2653
    }
2654
  }
2655
  
2656
  if (pSrc->triggerPrevFilter) {
156,583✔
2657
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
×
2658
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
×
2659
  }
2660
  
2661
  if (pSrc->calcPlan) {
156,583✔
2662
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
156,336✔
2663
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
156,336✔
2664
  }
2665
  
2666
  if (pSrc->subTblNameExpr) {
156,583✔
2667
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
71,376✔
2668
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
71,376✔
2669
  }
2670
  
2671
  if (pSrc->tagValueExpr) {
156,583✔
2672
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
71,376✔
2673
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
71,376✔
2674
  }
2675
  
2676
  if (pSrc->forceOutCols) {
156,583✔
2677
    num = taosArrayGetSize(pSrc->forceOutCols);
3,148✔
2678
    if (num > 0) {
3,148✔
2679
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
3,148✔
2680
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
3,148✔
2681
    }
2682
    for (int32_t i = 0; i < num; ++i) {
23,587✔
2683
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
20,439✔
2684
      SStreamOutCol  dcol = {.type = scol->type};
20,439✔
2685

2686
      dcol.expr = COPY_STR(scol->expr);
20,439✔
2687
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
20,439✔
2688
      
2689
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
40,878✔
2690
    }
2691
  }
2692

2693
  if (pSrc->colCids) {
156,583✔
2694
    pDst->colCids = taosArrayDup(pSrc->colCids, NULL);
3,833✔
2695
    TSDB_CHECK_NULL(pDst->colCids, code, lino, _exit, terrno);
3,833✔
2696
  }
2697

2698
  if (pSrc->tagCids) {
156,583✔
2699
    pDst->tagCids = taosArrayDup(pSrc->tagCids, NULL);
3,117✔
2700
    TSDB_CHECK_NULL(pDst->tagCids, code, lino, _exit, terrno);
3,117✔
2701
  }
2702

2703
  pDst->triggerTblUid = pSrc->triggerTblUid;
156,583✔
2704
  pDst->triggerTblType = pSrc->triggerTblType;
156,583✔
2705
  pDst->triggerPrec = pSrc->triggerPrec;
156,583✔
2706
  pDst->deleteReCalc = pSrc->deleteReCalc;
156,583✔
2707
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
156,583✔
2708
  pDst->flags = pSrc->flags;
156,583✔
2709
  
2710
_exit:
156,583✔
2711

2712
  if (code) {
156,583✔
2713
    tFreeSCMCreateStreamReq(pDst);
×
2714
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2715
  }
2716

2717
  return code;
156,583✔
2718
}
2719

2720

2721
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
6,472✔
2722
  int32_t  code = 0;
6,472✔
2723
  int32_t  lino;
2724
  int32_t  tlen;
2725
  SEncoder encoder = {0};
6,472✔
2726
  tEncoderInit(&encoder, buf, bufLen);
6,472✔
2727
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
6,472✔
2728

2729
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
6,472✔
2730
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
12,944✔
2731
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
12,944✔
2732
  tEndEncode(&encoder);
6,472✔
2733

2734
_exit:
6,472✔
2735
  if (code) {
6,472✔
2736
    tlen = code;
×
2737
  } else {
2738
    tlen = encoder.pos;
6,472✔
2739
  }
2740
  tEncoderClear(&encoder);
6,472✔
2741
  return tlen;
6,472✔
2742
}
2743

2744
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
3,055✔
2745
  SDecoder decoder = {0};
3,055✔
2746
  int32_t  code = 0;
3,055✔
2747
  int32_t  lino;
2748

2749
  tDecoderInit(&decoder, buf, bufLen);
3,055✔
2750
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
3,055✔
2751
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
6,110✔
2752
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
6,110✔
2753
  tEndDecode(&decoder);
3,055✔
2754

2755
_exit:
3,055✔
2756
  tDecoderClear(&decoder);
3,055✔
2757
  return code;
3,055✔
2758
}
2759

2760
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
3,236✔
2761
  taosMemoryFreeClear(pReq->name);
3,236✔
2762
}
3,236✔
2763

2764
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
5,724✔
2765
  SEncoder encoder = {0};
5,724✔
2766
  int32_t  code = 0;
5,724✔
2767
  int32_t  lino;
2768
  int32_t  tlen;
2769
  tEncoderInit(&encoder, buf, bufLen);
5,724✔
2770
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
5,724✔
2771
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
5,724✔
2772
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
11,448✔
2773
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
11,448✔
2774
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
11,448✔
2775
  tEndEncode(&encoder);
5,724✔
2776

2777
_exit:
5,724✔
2778
  if (code) {
5,724✔
2779
    tlen = code;
×
2780
  } else {
2781
    tlen = encoder.pos;
5,724✔
2782
  }
2783
  tEncoderClear(&encoder);
5,724✔
2784
  return tlen;
5,724✔
2785
}
2786

2787
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
2,681✔
2788
  SDecoder decoder = {0};
2,681✔
2789
  int32_t  code = 0;
2,681✔
2790
  int32_t  lino;
2791

2792
  tDecoderInit(&decoder, buf, bufLen);
2,681✔
2793
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,681✔
2794
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
5,362✔
2795
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
5,362✔
2796
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
5,362✔
2797
  tEndDecode(&decoder);
2,681✔
2798

2799
_exit:
2,681✔
2800
  tDecoderClear(&decoder);
2,681✔
2801
  return code;
2,681✔
2802
}
2803

2804
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
2,862✔
2805
  taosMemoryFreeClear(pReq->name);
2,862✔
2806
}
2,862✔
2807

2808
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
25,122✔
2809
  SEncoder encoder = {0};
25,122✔
2810
  int32_t  code = 0;
25,122✔
2811
  int32_t  lino;
2812
  int32_t  tlen;
2813
  tEncoderInit(&encoder, buf, bufLen);
25,122✔
2814
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
25,122✔
2815
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
25,122✔
2816
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
50,244✔
2817
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
50,244✔
2818
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
50,244✔
2819
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
50,244✔
2820
  tEndEncode(&encoder);
25,122✔
2821

2822
_exit:
25,122✔
2823
  if (code) {
25,122✔
2824
    tlen = code;
×
2825
  } else {
2826
    tlen = encoder.pos;
25,122✔
2827
  }
2828
  tEncoderClear(&encoder);
25,122✔
2829
  return tlen;
25,122✔
2830
}
2831

2832
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
12,561✔
2833
  SDecoder decoder = {0};
12,561✔
2834
  int32_t  code = 0;
12,561✔
2835
  int32_t  lino;
2836

2837
  tDecoderInit(&decoder, buf, bufLen);
12,561✔
2838
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
12,561✔
2839

2840
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
25,122✔
2841
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
25,122✔
2842
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
25,122✔
2843
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
25,122✔
2844
  tEndDecode(&decoder);
12,561✔
2845

2846
_exit:
12,561✔
2847
  tDecoderClear(&decoder);
12,561✔
2848
  return code;
12,561✔
2849
}
2850

2851
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
25,122✔
2852
  taosMemoryFreeClear(pReq->name);
25,122✔
2853
}
25,122✔
2854

2855
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
14,664✔
2856
  int32_t code = 0;
14,664✔
2857
  int32_t lino;
2858

2859
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
29,328✔
2860
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
29,328✔
2861
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
29,328✔
2862

2863
_exit:
14,664✔
2864
  return code;
14,664✔
2865
}
2866

2867
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
14,664✔
2868
  SEncoder encoder = {0};
14,664✔
2869
  int32_t  code = 0;
14,664✔
2870
  int32_t  lino;
2871
  int32_t  tlen;
2872
  tEncoderInit(&encoder, buf, bufLen);
14,664✔
2873

2874
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
14,664✔
2875
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
14,664✔
2876

2877
  tEndEncode(&encoder);
14,664✔
2878

2879
_exit:
14,664✔
2880
  if (code) {
14,664✔
2881
    tlen = code;
×
2882
  } else {
2883
    tlen = encoder.pos;
14,664✔
2884
  }
2885
  tEncoderClear(&encoder);
14,664✔
2886
  return tlen;
14,664✔
2887
}
2888

2889
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
12,168✔
2890
  int32_t code = 0;
12,168✔
2891
  int32_t lino;
2892

2893
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
24,336✔
2894
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
24,336✔
2895
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
24,336✔
2896

2897
_exit:
12,168✔
2898
  return code;
12,168✔
2899
}
2900

2901
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
12,168✔
2902
  SDecoder decoder = {0};
12,168✔
2903
  int32_t  code = 0;
12,168✔
2904
  int32_t  lino;
2905

2906
  tDecoderInit(&decoder, (char *)buf, bufLen);
12,168✔
2907

2908
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
12,168✔
2909
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
12,168✔
2910

2911
  tEndDecode(&decoder);
12,168✔
2912

2913
_exit:
12,168✔
2914
  tDecoderClear(&decoder);
12,168✔
2915
  return code;
12,168✔
2916
}
2917

2918
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
14,664✔
2919
  int32_t code = 0;
14,664✔
2920
  int32_t lino;
2921

2922
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
29,328✔
2923
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
29,328✔
2924
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
29,328✔
2925
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
29,328✔
2926

2927
_exit:
14,664✔
2928
  return code;
14,664✔
2929
}
2930

2931
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
14,664✔
2932
  SEncoder encoder = {0};
14,664✔
2933
  int32_t  code = 0;
14,664✔
2934
  int32_t  lino;
2935
  int32_t  tlen;
2936
  tEncoderInit(&encoder, buf, bufLen);
14,664✔
2937

2938
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
14,664✔
2939
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
14,664✔
2940

2941
  tEndEncode(&encoder);
14,664✔
2942

2943
_exit:
14,664✔
2944
  if (code) {
14,664✔
2945
    tlen = code;
×
2946
  } else {
2947
    tlen = encoder.pos;
14,664✔
2948
  }
2949
  tEncoderClear(&encoder);
14,664✔
2950
  return tlen;
14,664✔
2951
}
2952

2953
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
7,332✔
2954
  int32_t code = 0;
7,332✔
2955
  int32_t lino;
2956

2957
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
14,664✔
2958
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
14,664✔
2959
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
14,664✔
2960
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
14,664✔
2961

2962
_exit:
7,332✔
2963
  return code;
7,332✔
2964
}
2965

2966
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
7,332✔
2967
  SDecoder decoder = {0};
7,332✔
2968
  int32_t  code = 0;
7,332✔
2969
  int32_t  lino;
2970

2971
  tDecoderInit(&decoder, buf, bufLen);
7,332✔
2972

2973
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
7,332✔
2974
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
7,332✔
2975

2976
  tEndDecode(&decoder);
7,332✔
2977

2978
_exit:
7,332✔
2979
  tDecoderClear(&decoder);
7,332✔
2980
  return code;
7,332✔
2981
}
2982

2983
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
65,478✔
2984
  SEncoder encoder = {0};
65,478✔
2985
  int32_t  code = TSDB_CODE_SUCCESS;
65,478✔
2986
  int32_t  lino = 0;
65,478✔
2987
  int32_t  tlen = 0;
65,478✔
2988

2989
  tEncoderInit(&encoder, buf, bufLen);
65,478✔
2990
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
65,478✔
2991

2992
  int32_t size = taosArrayGetSize(pRsp->cols);
65,478✔
2993
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
65,478✔
2994
  for (int32_t i = 0; i < size; ++i) {
177,036✔
2995
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
111,558✔
2996
    if (oInfo == NULL) {
111,558✔
2997
      uError("col id is NULL at index %d", i);
×
2998
      code = TSDB_CODE_INVALID_PARA;
×
2999
      goto _exit;
×
3000
    }
3001
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
223,116✔
3002
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
223,116✔
3003
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
223,116✔
3004
  }
3005

3006
  tEndEncode(&encoder);
65,478✔
3007

3008
_exit:
65,478✔
3009
  if (code != TSDB_CODE_SUCCESS) {
65,478✔
3010
    tlen = code;
×
3011
  } else {
3012
    tlen = encoder.pos;
65,478✔
3013
  }
3014
  tEncoderClear(&encoder);
65,478✔
3015
  return tlen;
65,478✔
3016
}
3017

3018
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
32,739✔
3019
  SDecoder decoder = {0};
32,739✔
3020
  int32_t  code = TSDB_CODE_SUCCESS;
32,739✔
3021
  int32_t  lino = 0;
32,739✔
3022

3023
  tDecoderInit(&decoder, buf, bufLen);
32,739✔
3024
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
32,739✔
3025

3026
  int32_t size = 0;
32,739✔
3027
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
32,739✔
3028
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
32,739✔
3029
  if (pRsp->cols == NULL) {
32,739✔
3030
    code = terrno;
×
3031
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3032
    goto _exit;
×
3033
  }
3034
  for (int32_t i = 0; i < size; ++i) {
88,518✔
3035
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
55,779✔
3036
    if (oInfo == NULL) {
55,779✔
3037
      code = terrno;
×
3038
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3039
      goto _exit;
×
3040
    }
3041
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
111,558✔
3042
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
111,558✔
3043
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
111,558✔
3044
  }
3045

3046
  tEndDecode(&decoder);
32,739✔
3047

3048
_exit:
32,739✔
3049
  tDecoderClear(&decoder);
32,739✔
3050
  return code;
32,739✔
3051
}
3052

3053
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
7,893,805✔
3054
  taosArrayDestroy(pRsp->cols);
7,893,805✔
3055
}
7,893,337✔
3056

3057
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
15,116,313✔
3058
  if (pReq == NULL) return;
15,116,313✔
3059
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA_NEW || pReq->base.type == STRIGGER_PULL_WAL_CALC_DATA_NEW) {
16,076,346✔
3060
    SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
959,790✔
3061
    taosArrayDestroy(pRequest->versions);
959,790✔
3062
    tSimpleHashCleanup(pRequest->ranges);
959,815✔
3063
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
14,157,804✔
3064
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
70,928✔
3065
    if (pRequest->cids != NULL) {
70,928✔
3066
      taosArrayDestroy(pRequest->cids);
70,928✔
3067
      pRequest->cids = NULL;
70,928✔
3068
    }
3069
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
14,086,020✔
3070
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
28,372✔
3071
    if (pRequest->cids != NULL) {
28,372✔
3072
      taosArrayDestroy(pRequest->cids);
28,372✔
3073
      pRequest->cids = NULL;
28,372✔
3074
    }
3075
    if (pRequest->uids != NULL) {
28,372✔
3076
      taosArrayDestroy(pRequest->uids);
×
3077
      pRequest->uids = NULL;
×
3078
    }
3079
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
14,057,648✔
3080
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
119,702✔
3081
    if (pRequest->cids != NULL) {
119,702✔
3082
      taosArrayDestroy(pRequest->cids);
119,702✔
3083
      pRequest->cids = NULL;
119,702✔
3084
    }
3085
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
13,937,441✔
3086
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
32,739✔
3087
    if (pRequest->cols != NULL) {
32,739✔
3088
      taosArrayDestroy(pRequest->cols);
32,739✔
3089
      pRequest->cols = NULL;
32,739✔
3090
    }
3091
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
13,904,702✔
3092
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
32,739✔
3093
    tSimpleHashCleanup(pRequest->uidInfoTrigger);
32,739✔
3094
    tSimpleHashCleanup(pRequest->uidInfoCalc);
32,739✔
3095
  }
3096
}
3097

3098
int32_t encodePlainArray(SEncoder *encoder, SArray *pArr) {
496,636✔
3099
  int32_t  code = TSDB_CODE_SUCCESS;
496,636✔
3100
  int32_t  lino = 0;
496,636✔
3101
  int32_t  nEle = taosArrayGetSize(pArr);
496,636✔
3102
  uint8_t* buf = (nEle > 0) ? TARRAY_DATA(pArr) : NULL;
496,636✔
3103
  int32_t  len = (nEle > 0) ? (nEle * pArr->elemSize) : 0;
496,636✔
3104
  TAOS_CHECK_EXIT(tEncodeBinary(encoder, buf, len));
993,272✔
3105

3106
_exit:
496,636✔
3107
  return code;
496,636✔
3108
}
3109

3110
int32_t decodePlainArray(SDecoder* decoder, SArray** ppArr, uint32_t elemSize) {
247,374✔
3111
  int32_t  code = TSDB_CODE_SUCCESS;
247,374✔
3112
  int32_t  lino = 0;
247,374✔
3113
  void*    buf = NULL;
247,374✔
3114
  uint64_t len = 0;
247,374✔
3115
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(decoder, &buf, &len));
247,374✔
3116

3117
  if (len > 0) {
247,374✔
3118
    *ppArr = taosArrayInit(0, elemSize);
219,002✔
3119
    TSDB_CHECK_NULL(*ppArr, code, lino, _exit, terrno);
219,002✔
3120
    TSWAP((*ppArr)->pData, buf);
219,002✔
3121
    (*ppArr)->size = (*ppArr)->capacity = len / elemSize;
219,002✔
3122
  }
3123

3124
_exit:
247,374✔
3125
  if (buf != NULL) {
247,374✔
3126
    taosMemoryFree(buf);
219,002✔
3127
  }
3128
  return code;
247,374✔
3129
}
3130

3131
static int32_t encodeSetTableMapInfo(SEncoder* encoder, SSHashObj* pInfo) {
130,956✔
3132
  int32_t  code = TSDB_CODE_SUCCESS;
130,956✔
3133
  int32_t  lino = 0;
130,956✔
3134
  int32_t size = tSimpleHashGetSize(pInfo);
130,956✔
3135
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
130,956✔
3136
  int32_t iter = 0;
130,956✔
3137
  void*   px = tSimpleHashIterate(pInfo, NULL, &iter);
130,956✔
3138
  while (px != NULL) {
296,926✔
3139
    int64_t* uid = tSimpleHashGetKey(px, NULL);
165,970✔
3140
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *uid));
331,940✔
3141
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *(uid + 1)));
331,940✔
3142
    SSHashObj* info = *(SSHashObj**)px;
165,970✔
3143
    int32_t len = tSimpleHashGetSize(info);
165,970✔
3144
    TAOS_CHECK_EXIT(tEncodeI32(encoder, len));
165,970✔
3145
    int32_t iter1 = 0;
165,970✔
3146
    void*   px1 = tSimpleHashIterate(info, NULL, &iter1);
165,970✔
3147
    while (px1 != NULL) {
524,542✔
3148
      int16_t* slot = tSimpleHashGetKey(px1, NULL);
358,572✔
3149
      int16_t* cid = (int16_t*)px1;
358,572✔
3150
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *slot));
717,144✔
3151
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *cid));
717,144✔
3152

3153
      px1 = tSimpleHashIterate(info, px1, &iter1);
358,572✔
3154
    }
3155

3156
    px = tSimpleHashIterate(pInfo, px, &iter);
165,970✔
3157
  }
3158
  
3159
_exit:
130,956✔
3160
  return code;
130,956✔
3161
}
3162

3163
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
30,517,351✔
3164
  SEncoder encoder = {0};
30,517,351✔
3165
  int32_t  code = TSDB_CODE_SUCCESS;
30,517,865✔
3166
  int32_t  lino = 0;
30,517,865✔
3167
  int32_t  tlen = 0;
30,517,865✔
3168

3169
  tEncoderInit(&encoder, buf, bufLen);
30,517,865✔
3170
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
30,518,559✔
3171

3172
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
61,036,552✔
3173
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
61,033,810✔
3174
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
61,034,706✔
3175
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
61,034,272✔
3176

3177
  switch (pReq->type) {
30,515,702✔
3178
    case STRIGGER_PULL_SET_TABLE: {
65,478✔
3179
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
65,478✔
3180
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoTrigger));
65,478✔
3181
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoCalc));
65,478✔
3182
      break;
65,478✔
3183
    }
3184
    case STRIGGER_PULL_LAST_TS: {
392,070✔
3185
      break;
392,070✔
3186
    }
3187
    case STRIGGER_PULL_FIRST_TS: {
310,766✔
3188
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
310,766✔
3189
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
621,532✔
3190
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
621,284✔
3191
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
621,284✔
3192
      break;
310,766✔
3193
    }
3194
    case STRIGGER_PULL_TSDB_META: {
256,124✔
3195
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
256,124✔
3196
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
512,248✔
3197
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
512,248✔
3198
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
512,248✔
3199
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
512,248✔
3200
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
512,248✔
3201
      break;
256,124✔
3202
    }
3203
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3204
      break;
×
3205
    }
3206
    case STRIGGER_PULL_TSDB_TS_DATA: {
373,840✔
3207
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
373,840✔
3208
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
747,680✔
3209
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
747,680✔
3210
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
747,680✔
3211
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
747,680✔
3212
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
747,680✔
3213
      break;
373,840✔
3214
    }
3215
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
119,798✔
3216
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
119,798✔
3217
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
239,596✔
3218
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
239,596✔
3219
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
239,596✔
3220
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
239,596✔
3221
      break;
119,798✔
3222
    }
3223
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
119,798✔
3224
      break;
119,798✔
3225
    }
3226
    case STRIGGER_PULL_TSDB_CALC_DATA: {
12,734,272✔
3227
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
12,734,272✔
3228
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
25,468,544✔
3229
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
25,468,544✔
3230
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
25,468,544✔
3231
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
25,468,544✔
3232
      break;
12,734,272✔
3233
    }
3234
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3235
      break;
×
3236
    }
3237
    case STRIGGER_PULL_TSDB_DATA: {
141,856✔
3238
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
141,856✔
3239
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
283,712✔
3240
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
283,712✔
3241
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
283,712✔
3242
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
283,712✔
3243
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
141,856✔
3244
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
283,494✔
3245
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
283,494✔
3246
      break;
141,856✔
3247
    }
3248
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3249
      break;
×
3250
    }
3251
    case STRIGGER_PULL_WAL_META_NEW: {
6,366,078✔
3252
      SSTriggerWalMetaNewRequest* pRequest = (SSTriggerWalMetaNewRequest*)pReq;
6,366,078✔
3253
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
12,732,203✔
3254
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
12,731,773✔
3255
      break;
6,365,648✔
3256
    }
3257
    case STRIGGER_PULL_WAL_DATA_NEW:
1,920,408✔
3258
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3259
      SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
1,920,408✔
3260
      int32_t                     nVersion = taosArrayGetSize(pRequest->versions);
1,920,408✔
3261
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nVersion));
1,920,408✔
3262
      for (int32_t i = 0; i < nVersion; i++) {
6,028,596✔
3263
        int64_t ver = *(int64_t*)TARRAY_GET_ELEM(pRequest->versions, i);
4,107,940✔
3264
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, ver));
4,108,188✔
3265
      }
3266
      int32_t nRanges = tSimpleHashGetSize(pRequest->ranges);
1,920,656✔
3267
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nRanges));
1,920,658✔
3268
      int32_t iter = 0;
1,920,658✔
3269
      void*   px = tSimpleHashIterate(pRequest->ranges, NULL, &iter);
1,920,658✔
3270
      while (px != NULL) {
2,539,256✔
3271
        uint64_t* gid = tSimpleHashGetKey(px, NULL);
618,598✔
3272
        TAOS_CHECK_EXIT(tEncodeU64(&encoder, *gid));
1,237,196✔
3273
        int64_t* key = (int64_t*)px;
618,598✔
3274
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[0]));
1,236,928✔
3275
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[1]));
1,236,660✔
3276

3277
        px = tSimpleHashIterate(pRequest->ranges, px, &iter);
618,330✔
3278
      }
3279
      break;
1,920,658✔
3280
    }
3281
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
6,823,418✔
3282
      SSTriggerWalMetaDataNewRequest* pRequest = (SSTriggerWalMetaDataNewRequest*)pReq;
6,823,418✔
3283
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
13,645,832✔
3284
      break;
6,822,414✔
3285
    }
3286
    case STRIGGER_PULL_GROUP_COL_VALUE: {
530,874✔
3287
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
530,874✔
3288
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
1,061,748✔
3289
      break;
530,874✔
3290
    }
3291
    case STRIGGER_PULL_VTABLE_INFO: {
57,688✔
3292
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
57,688✔
3293
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
57,688✔
3294
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->uids));
57,688✔
3295
      TAOS_CHECK_EXIT(tEncodeBool(&encoder, pRequest->fetchAllTable));
57,688✔
3296
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
115,376✔
3297
      break;
57,688✔
3298
    }
3299
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
239,404✔
3300
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
239,404✔
3301
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
478,808✔
3302
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
239,404✔
3303
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
478,808✔
3304
      break;
239,404✔
3305
    }
3306
    case STRIGGER_PULL_OTABLE_INFO: {
65,478✔
3307
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
65,478✔
3308
      int32_t size = taosArrayGetSize(pRequest->cols);
65,478✔
3309
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
65,260✔
3310
      for (int32_t i = 0; i < size; ++i) {
176,818✔
3311
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
111,340✔
3312
        if (oInfo == NULL) {
111,340✔
3313
          uError("col id is NULL at index %d", i);
×
3314
          code = TSDB_CODE_INVALID_PARA;
×
3315
          goto _exit;
×
3316
        }
3317
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
222,898✔
3318
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
223,116✔
3319
      }
3320
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
130,956✔
3321
      break; 
65,478✔
3322
    }
3323
    default: {
×
3324
      uError("unknown pull type %d", pReq->type);
×
3325
      code = TSDB_CODE_INVALID_PARA;
×
3326
      break;
×
3327
    }
3328
  }
3329

3330
  tEndEncode(&encoder);
30,516,166✔
3331

3332
_exit:
30,512,389✔
3333
  if (code != TSDB_CODE_SUCCESS) {
30,513,854✔
3334
    tlen = code;
×
3335
  } else {
3336
    tlen = encoder.pos;
30,513,854✔
3337
  }
3338
  tEncoderClear(&encoder);
30,513,854✔
3339
  return tlen;
30,515,223✔
3340
}
3341

3342
static void destroyHash(void* data){
82,985✔
3343
  if (data){
82,985✔
3344
    SSHashObj* tmp = *(SSHashObj**)data;
82,985✔
3345
    tSimpleHashCleanup(tmp);
82,985✔
3346
  }
3347
}
82,985✔
3348

3349
static int32_t decodeSetTableMapInfo(SDecoder* decoder, SSHashObj** ppInfo) {
65,478✔
3350
  int32_t  code = TSDB_CODE_SUCCESS;
65,478✔
3351
  int32_t  lino = 0;
65,478✔
3352
  int32_t size = 0;
65,478✔
3353
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
65,478✔
3354
  *ppInfo = tSimpleHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
65,478✔
3355
  if (*ppInfo == NULL) {
65,478✔
3356
    TAOS_CHECK_EXIT(terrno);
×
3357
  }
3358
  tSimpleHashSetFreeFp(*ppInfo, destroyHash);
65,478✔
3359
  
3360
  for (int32_t i = 0; i < size; ++i) {
148,463✔
3361
    int64_t id[2] = {0};
82,985✔
3362
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id));
82,985✔
3363
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id+1));
165,970✔
3364
    int32_t len = 0;
82,985✔
3365
    TAOS_CHECK_EXIT(tDecodeI32(decoder, &len));
82,985✔
3366
    SSHashObj* tmp = tSimpleHashInit(len, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
82,985✔
3367
    if (tmp == NULL) {
82,985✔
3368
      TAOS_CHECK_EXIT(terrno);
×
3369
    }
3370
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppInfo, id, sizeof(id), &tmp, POINTER_BYTES));
82,985✔
3371

3372
    for (int32_t j = 0; j < len; ++j) {
262,271✔
3373
      int16_t slotId = 0;
179,286✔
3374
      int16_t cid = 0;
179,286✔
3375
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &slotId));
179,286✔
3376
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &cid));
179,286✔
3377
      TAOS_CHECK_EXIT(tSimpleHashPut(tmp, &slotId, sizeof(slotId), &cid, sizeof(cid)));
179,286✔
3378
    }
3379
  }
3380
_exit:
65,478✔
3381
  if (code != TSDB_CODE_SUCCESS) {
65,478✔
3382
    tSimpleHashCleanup(*ppInfo);
×
3383
    *ppInfo = NULL;
×
3384
  }
3385
  return code;
65,478✔
3386
}
3387

3388
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
15,116,092✔
3389
  SDecoder decoder = {0};
15,116,092✔
3390
  int32_t  code = TSDB_CODE_SUCCESS;
15,117,203✔
3391
  int32_t  lino = 0;
15,117,203✔
3392

3393
  tDecoderInit(&decoder, buf, bufLen);
15,117,203✔
3394
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
15,116,061✔
3395

3396
  int32_t type = 0;
15,117,088✔
3397
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
15,117,277✔
3398
  SSTriggerPullRequest* pBase = &(pReq->base);
15,117,277✔
3399
  pBase->type = type;
15,116,786✔
3400
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
30,236,102✔
3401
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
30,235,737✔
3402
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
30,235,731✔
3403

3404
  switch (type) {
15,118,666✔
3405
    case STRIGGER_PULL_SET_TABLE: {
32,739✔
3406
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
32,739✔
3407
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoTrigger));
32,739✔
3408
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoCalc));
32,739✔
3409
      break;
32,739✔
3410
    }
3411
    case STRIGGER_PULL_LAST_TS: {
195,461✔
3412
      break;
195,461✔
3413
    }
3414
    case STRIGGER_PULL_FIRST_TS: {
153,013✔
3415
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
153,013✔
3416
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
306,026✔
3417
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
306,026✔
3418
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
306,026✔
3419
      break;
153,013✔
3420
    }
3421
    case STRIGGER_PULL_TSDB_META: {
127,914✔
3422
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
127,914✔
3423
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
255,672✔
3424
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
255,672✔
3425
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
255,828✔
3426
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
255,828✔
3427
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
255,672✔
3428
      break;
127,758✔
3429
    }
3430
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3431
      break;
×
3432
    }
3433
    case STRIGGER_PULL_TSDB_TS_DATA: {
186,920✔
3434
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
186,920✔
3435
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
373,840✔
3436
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
373,840✔
3437
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
373,840✔
3438
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
373,840✔
3439
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
373,840✔
3440
      break;
186,920✔
3441
    }
3442
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
59,899✔
3443
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
59,899✔
3444
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
119,798✔
3445
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
119,798✔
3446
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
119,798✔
3447
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
119,798✔
3448
      break;
59,899✔
3449
    }
3450
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
59,899✔
3451
      break;
59,899✔
3452
    }
3453
    case STRIGGER_PULL_TSDB_CALC_DATA: {
6,365,435✔
3454
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
6,365,435✔
3455
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
12,731,091✔
3456
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
12,731,312✔
3457
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
12,731,312✔
3458
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
12,731,312✔
3459
      break;
6,365,656✔
3460
    }
3461
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3462
      break;
×
3463
    }
3464
    case STRIGGER_PULL_TSDB_DATA: {
70,928✔
3465
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
70,928✔
3466
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
141,856✔
3467
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
141,856✔
3468
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
141,856✔
3469
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
141,856✔
3470
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
70,928✔
3471
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
141,856✔
3472
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
141,856✔
3473
      break;
70,928✔
3474
    }
3475
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3476
      break;
×
3477
    }
3478
    case STRIGGER_PULL_WAL_META_NEW: {
3,115,740✔
3479
      SSTriggerWalMetaNewRequest* pRequest = &(pReq->walMetaNewReq);
3,115,740✔
3480
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
6,231,449✔
3481
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
6,230,942✔
3482
      break;
3,114,983✔
3483
    }
3484
    case STRIGGER_PULL_WAL_DATA_NEW:
960,033✔
3485
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3486
      SSTriggerWalDataNewRequest* pRequest = &(pReq->walDataNewReq);
960,033✔
3487
      int32_t                     nVersion = 0;
960,033✔
3488
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nVersion));
960,033✔
3489
      pRequest->versions = taosArrayInit_s(sizeof(int64_t), nVersion);
960,033✔
3490
      for (int32_t i = 0; i < nVersion; i++) {
3,013,552✔
3491
        int64_t* pVer = TARRAY_GET_ELEM(pRequest->versions, i);
2,053,519✔
3492
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, pVer));
2,053,519✔
3493
      }
3494
      int32_t nRanges = 0;
960,033✔
3495
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges));
960,033✔
3496
      pRequest->ranges = tSimpleHashInit(nRanges, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
960,033✔
3497
      if (pRequest->ranges == NULL) {
960,033✔
3498
        TAOS_CHECK_EXIT(terrno);
×
3499
      }
3500
      for (int32_t i = 0; i < nRanges; i++) {
1,269,036✔
3501
        uint64_t gid = 0;
309,003✔
3502
        int64_t pRange[2] = {0};
309,003✔
3503
        TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
309,003✔
3504
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[0]));
309,003✔
3505
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[1]));
308,753✔
3506
        TAOS_CHECK_EXIT(tSimpleHashPut(pRequest->ranges, &gid, sizeof(gid), pRange, sizeof(pRange)));
308,753✔
3507
      }
3508
      break;
960,033✔
3509
    }
3510
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
3,342,184✔
3511
      SSTriggerWalMetaDataNewRequest* pRequest = &(pReq->walMetaDataNewReq);
3,342,184✔
3512
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
6,686,683✔
3513
      break;
3,343,767✔
3514
    }
3515
    case STRIGGER_PULL_GROUP_COL_VALUE: {
265,141✔
3516
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
265,141✔
3517
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
530,282✔
3518
      break;
265,141✔
3519
    }
3520
    case STRIGGER_PULL_VTABLE_INFO: {
28,372✔
3521
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
28,372✔
3522
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
28,372✔
3523
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->uids, sizeof(int64_t)));
28,372✔
3524
      TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pRequest->fetchAllTable));
28,372✔
3525
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
56,744✔
3526
      break;
28,372✔
3527
    }
3528
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
119,702✔
3529
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
119,702✔
3530
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
239,404✔
3531
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
119,702✔
3532
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
239,404✔
3533
      break;
119,702✔
3534
    }
3535
    case STRIGGER_PULL_OTABLE_INFO: {
32,739✔
3536
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
32,739✔
3537
      int32_t size = 0;
32,739✔
3538
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
32,739✔
3539
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
32,739✔
3540
      if (pRequest->cols == NULL) {
32,739✔
3541
        code = terrno;
×
3542
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3543
        goto _exit;
×
3544
      }
3545
      for (int32_t i = 0; i < size; ++i) {
88,518✔
3546
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
55,779✔
3547
        if (oInfo == NULL) {
55,779✔
3548
          code = terrno;
×
3549
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3550
          goto _exit;
×
3551
        }
3552
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
55,779✔
3553
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
55,779✔
3554
      }
3555
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
65,478✔
3556

3557
      break;
32,739✔
3558
    }
3559
    default: {
2,547✔
3560
      uError("unknown pull type %d", type);
2,547✔
3561
      code = TSDB_CODE_INVALID_PARA;
×
3562
      break;
×
3563
    }
3564
  }
3565

3566
  tEndDecode(&decoder);
15,117,010✔
3567

3568
_exit:
15,116,822✔
3569
  tDecoderClear(&decoder);
15,117,628✔
3570
  return code;
15,114,413✔
3571
}
3572

3573
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo, bool full) {
28,037,223✔
3574
  int32_t size = full ? taosArrayGetSize(pParams) : 0;
28,037,223✔
3575
  int32_t code = 0;
28,038,003✔
3576
  int32_t lino = 0;
28,038,003✔
3577
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
28,038,002✔
3578
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3579
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
2,147,483,647✔
3580
    if (param == NULL) {
2,147,483,647✔
3581
      TAOS_CHECK_EXIT(terrno);
×
3582
    }
3583
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
2,147,483,647✔
3584
    if (pEncoder->data) {
2,147,483,647✔
3585
      TAOS_MEMCPY(pEncoder->data + pEncoder->pos, param, plainFieldSize);
2,147,483,647✔
3586
    }
3587
    pEncoder->pos += plainFieldSize;
2,147,483,647✔
3588

3589
    if (!ignoreNotificationInfo) {
2,147,483,647✔
3590
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
1,200,151,820✔
3591
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
600,075,910✔
3592
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
1,197,672,698✔
3593
    }
3594
  }
3595
_exit:
287,633✔
3596
  return code;
287,633✔
3597
}
3598

3599
void tDestroySSTriggerCalcParam(void* ptr) {
2,147,483,647✔
3600
  SSTriggerCalcParam* pParam = ptr;
2,147,483,647✔
3601
  if (pParam && pParam->extraNotifyContent != NULL) {
2,147,483,647✔
3602
    taosMemoryFreeClear(pParam->extraNotifyContent);
207,415✔
3603
  }
3604
  if (pParam && pParam->resultNotifyContent != NULL) {
2,147,483,647✔
3605
    taosMemoryFreeClear(pParam->resultNotifyContent);
×
3606
  }
3607
}
2,147,483,647✔
3608

3609
void tDestroySStreamGroupValue(void* ptr) {
24,009,530✔
3610
  SStreamGroupValue* pValue = ptr;
24,009,530✔
3611
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
24,009,530✔
3612
    taosMemoryFreeClear(pValue->data.pData);
14,484,963✔
3613
    pValue->data.nData = 0;
14,484,901✔
3614
  }
3615
}
24,009,814✔
3616

3617
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
14,022,553✔
3618
  int32_t size = 0, code = 0, lino = 0;
14,022,553✔
3619
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
14,023,112✔
3620
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
14,023,112✔
3621
  if (*ppParams == NULL) {
14,022,864✔
3622
    TAOS_CHECK_EXIT(terrno);
×
3623
  }
3624
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3625
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
2,147,483,647✔
3626
    if (param == NULL) {
2,147,483,647✔
3627
      TAOS_CHECK_EXIT(terrno);
×
3628
    }
3629
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
2,147,483,647✔
3630
    TAOS_MEMCPY(param, pDecoder->data + pDecoder->pos, plainFieldSize);
2,147,483,647✔
3631
    pDecoder->pos += plainFieldSize;
2,147,483,647✔
3632

3633
    if (!ignoreNotificationInfo) {
2,147,483,647✔
3634
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
599,924,418✔
3635
      uint64_t len = 0;
299,962,209✔
3636
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
599,924,418✔
3637
    }
3638
  }
3639

3640
_exit:
17,332,927✔
3641
  return code;
14,023,112✔
3642
}
3643

3644
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
28,568,314✔
3645
  int32_t code = TSDB_CODE_SUCCESS;
28,568,314✔
3646
  int32_t lino = 0;
28,568,314✔
3647

3648
  int32_t size = taosArrayGetSize(pGroupColVals);
28,568,314✔
3649
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
28,568,830✔
3650
  for (int32_t i = 0; i < size; ++i) {
74,100,536✔
3651
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
45,530,266✔
3652
    if (pValue == NULL) {
45,532,915✔
3653
      TAOS_CHECK_EXIT(terrno);
×
3654
    }
3655
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
45,532,915✔
3656
    if (pValue->isNull) {
45,533,034✔
3657
      continue;
22,440✔
3658
    }
3659
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
45,510,474✔
3660
    if (pValue->isTbname) {
45,510,519✔
3661
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
40,707,307✔
3662
      if (vgId != -1) { pValue->vgId = vgId; }
20,353,664✔
3663
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
40,707,038✔
3664
    }
3665
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
91,020,072✔
3666
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
45,509,822✔
3667
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
55,274,048✔
3668
    } else {
3669
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
35,745,204✔
3670
    }
3671
  }
3672

3673
_exit:
28,570,270✔
3674
  return code;
28,570,270✔
3675
}
3676

3677
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
14,288,549✔
3678
  int32_t code = TSDB_CODE_SUCCESS;
14,288,549✔
3679
  int32_t lino = 0;
14,288,549✔
3680
  int32_t size = 0;
14,288,549✔
3681

3682
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
14,288,286✔
3683
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
14,288,286✔
3684
  if (size > 0) {
14,288,549✔
3685
    if (*ppGroupColVals == NULL) {
11,340,641✔
3686
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
11,340,641✔
3687
      if (*ppGroupColVals == NULL) {
11,340,091✔
3688
        TAOS_CHECK_EXIT(terrno);
×
3689
      }
3690
    } else {
3691
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
×
3692
    }
3693
  }
3694
  for (int32_t i = 0; i < size; ++i) {
37,052,095✔
3695
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
22,763,122✔
3696
    if (pValue == NULL) {
22,764,252✔
3697
      TAOS_CHECK_EXIT(terrno);
×
3698
    }
3699
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
22,764,252✔
3700
    if (pValue->isNull) {
22,764,252✔
3701
      continue;
11,220✔
3702
    }
3703
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
22,753,032✔
3704
    if (pValue->isTbname) {
22,753,032✔
3705
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
20,348,604✔
3706
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
20,348,341✔
3707
    }
3708
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
45,505,645✔
3709
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
36,569,052✔
3710
      uint64_t len = 0;
13,815,913✔
3711
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
27,632,089✔
3712
      pValue->data.nData = len;
13,815,913✔
3713
    } else {
3714
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
17,873,244✔
3715
    }
3716
  }
3717
_exit:
14,288,973✔
3718
  return code;
14,288,549✔
3719
}
3720

3721
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
530,282✔
3722
  SEncoder encoder = {0};
530,282✔
3723
  int32_t  code = TSDB_CODE_SUCCESS;
530,282✔
3724
  int32_t  lino = 0;
530,282✔
3725
  int32_t  tlen = 0;
530,282✔
3726

3727
  tEncoderInit(&encoder, buf, bufLen);
530,282✔
3728
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
530,282✔
3729

3730
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
530,282✔
3731

3732
  tEndEncode(&encoder);
530,032✔
3733

3734
_exit:
530,032✔
3735
  if (code != TSDB_CODE_SUCCESS) {
530,032✔
3736
    tlen = code;
×
3737
  } else {
3738
    tlen = encoder.pos;
530,032✔
3739
  }
3740
  tEncoderClear(&encoder);
530,032✔
3741
  return tlen;
530,032✔
3742
}
3743

3744
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
265,437✔
3745
  SDecoder decoder = {0};
265,437✔
3746
  int32_t  code = TSDB_CODE_SUCCESS;
265,437✔
3747
  int32_t  lino = 0;
265,437✔
3748
  int32_t  size = 0;
265,437✔
3749

3750
  tDecoderInit(&decoder, buf, bufLen);
265,437✔
3751
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
265,437✔
3752

3753
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
265,437✔
3754

3755
  tEndDecode(&decoder);
265,437✔
3756

3757
_exit:
265,437✔
3758
  tDecoderClear(&decoder);
265,437✔
3759
  return code;
265,437✔
3760
}
3761

3762
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
2,699,231✔
3763
  SEncoder encoder = {0};
2,699,231✔
3764
  int32_t  code = TSDB_CODE_SUCCESS;
2,699,713✔
3765
  int32_t  lino = 0;
2,699,713✔
3766
  int32_t  tlen = 0;
2,699,713✔
3767

3768
  tEncoderInit(&encoder, buf, bufLen);
2,699,713✔
3769
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,699,954✔
3770

3771
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
5,399,667✔
3772
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
5,399,667✔
3773
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
5,399,908✔
3774
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
5,399,908✔
3775
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
5,399,908✔
3776

3777
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false, true));
2,699,954✔
3778
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
2,699,954✔
3779
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
5,399,426✔
3780
  TAOS_CHECK_EXIT(tEncodeBool(&encoder, pReq->isWindowTrigger));
2,699,472✔
3781
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->precision));
5,399,678✔
3782

3783
  tEndEncode(&encoder);
2,699,724✔
3784

3785
_exit:
2,699,954✔
3786
  if (code != TSDB_CODE_SUCCESS) {
2,699,954✔
3787
    tlen = code;
×
3788
  } else {
3789
    tlen = encoder.pos;
2,699,954✔
3790
  }
3791
  tEncoderClear(&encoder);
2,699,954✔
3792
  return tlen;
2,699,724✔
3793
}
3794

3795
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
1,349,697✔
3796
  SDecoder decoder = {0};
1,349,697✔
3797
  int32_t  code = TSDB_CODE_SUCCESS;
1,349,697✔
3798
  int32_t  lino = 0;
1,349,697✔
3799

3800
  tDecoderInit(&decoder, buf, bufLen);
1,349,697✔
3801
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,349,697✔
3802

3803
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
2,699,394✔
3804
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
2,699,394✔
3805
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
2,699,394✔
3806
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
2,699,394✔
3807
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
2,699,394✔
3808

3809
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
1,349,697✔
3810
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
1,349,697✔
3811
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
2,699,394✔
3812
  if (!tDecodeIsEnd(&decoder)) {
1,349,697✔
3813
    TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pReq->isWindowTrigger));
1,349,697✔
3814
    TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->precision));
2,699,394✔
3815
  }
3816

3817
  tEndDecode(&decoder);
1,349,697✔
3818

3819
_exit:
1,349,697✔
3820
  tDecoderClear(&decoder);
1,349,697✔
3821
  return code;
1,349,697✔
3822
}
3823

3824
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
6,792,502✔
3825
  if (pReq != NULL) {
6,792,502✔
3826
    if (pReq->params != NULL) {
6,793,006✔
3827
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
4,280,951✔
3828
      pReq->params = NULL;
4,280,710✔
3829
    }
3830
    if (pReq->groupColVals != NULL) {
6,792,765✔
3831
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
3,276,804✔
3832
      pReq->groupColVals = NULL;
3,276,804✔
3833
    }
3834
    blockDataDestroy(pReq->pOutBlock);
6,793,006✔
3835
  }
3836
}
6,792,502✔
3837

3838
int32_t tSerializeSTriggerDropTableRequest(void* buf, int32_t bufLen, const SSTriggerDropRequest* pReq) {
×
3839
  SEncoder encoder = {0};
×
3840
  int32_t  code = TSDB_CODE_SUCCESS;
×
3841
  int32_t  lino = 0;
×
3842
  int32_t  tlen = 0;
×
3843

3844
  tEncoderInit(&encoder, buf, bufLen);
×
3845
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3846

3847
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
×
3848
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
×
3849
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
×
3850
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
×
3851

3852
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
×
3853

3854
  tEndEncode(&encoder);
×
3855

3856
_exit:
×
3857
  if (code != TSDB_CODE_SUCCESS) {
×
3858
    tlen = code;
×
3859
  } else {
3860
    tlen = encoder.pos;
×
3861
  }
3862
  tEncoderClear(&encoder);
×
3863
  return tlen;
×
3864
}
3865

3866
int32_t tDeserializeSTriggerDropTableRequest(void* buf, int32_t bufLen, SSTriggerDropRequest* pReq) {
×
3867
  SDecoder decoder = {0};
×
3868
  int32_t  code = TSDB_CODE_SUCCESS;
×
3869
  int32_t  lino = 0;
×
3870

3871
  tDecoderInit(&decoder, buf, bufLen);
×
3872
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3873

3874
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
×
3875
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
×
3876
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
×
3877
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
×
3878

3879
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
×
3880

3881
  tEndDecode(&decoder);
×
3882

3883
_exit:
×
3884
  tDecoderClear(&decoder);
×
3885
  return code;
×
3886
}
3887

3888
void tDestroySSTriggerDropRequest(SSTriggerDropRequest* pReq) {
×
3889
  if (pReq != NULL) {
×
3890
    if (pReq->groupColVals != NULL) {
×
3891
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
×
3892
      pReq->groupColVals = NULL;
×
3893
    }
3894
  }
3895
}
×
3896

3897
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
22,340,690✔
3898
  SEncoder encoder = {0};
22,340,690✔
3899
  int32_t  code = TSDB_CODE_SUCCESS;
22,340,690✔
3900
  int32_t  lino = 0;
22,340,690✔
3901
  int32_t  tlen = 0;
22,340,690✔
3902

3903
  tEncoderInit(&encoder, buf, bufLen);
22,340,690✔
3904
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
22,340,690✔
3905

3906
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
44,681,380✔
3907
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
44,681,380✔
3908
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
44,681,380✔
3909
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
44,681,380✔
3910

3911
  tEndEncode(&encoder);
22,340,690✔
3912

3913
_exit:
22,340,690✔
3914
  if (code != TSDB_CODE_SUCCESS) {
22,340,690✔
3915
    tlen = code;
×
3916
  } else {
3917
    tlen = encoder.pos;
22,340,690✔
3918
  }
3919
  tEncoderClear(&encoder);
22,340,690✔
3920
  return tlen;
22,340,690✔
3921
}
3922

3923
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
33,497,667✔
3924
  SDecoder decoder = {0};
33,497,667✔
3925
  int32_t  code = TSDB_CODE_SUCCESS;
33,497,618✔
3926
  int32_t  lino = 0;
33,497,618✔
3927

3928
  tDecoderInit(&decoder, buf, bufLen);
33,497,618✔
3929
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
33,494,905✔
3930

3931
  int32_t type = 0;
33,498,799✔
3932
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
33,500,388✔
3933
  pReq->type = type;
33,500,388✔
3934
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
67,002,613✔
3935
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
67,006,594✔
3936
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
67,005,349✔
3937

3938
  tEndDecode(&decoder);
33,500,734✔
3939

3940
_exit:
33,500,132✔
3941
  tDecoderClear(&decoder);
33,500,875✔
3942
  return code;
33,496,345✔
3943
}
3944

3945
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo, bool full) {
25,337,016✔
3946
  int32_t code = 0, lino = 0;
25,337,016✔
3947
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true, full));
25,337,016✔
3948
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
25,338,842✔
3949
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.skey));
50,674,847✔
3950
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.ekey));
50,675,139✔
3951
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
50,675,395✔
3952
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
50,675,129✔
3953
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
50,674,363✔
3954
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
25,337,067✔
3955
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
50,673,705✔
3956
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->isWindowTrigger));
25,337,320✔
3957
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pInfo->precision));
50,674,964✔
3958
_exit:
25,336,374✔
3959
  return code;
25,336,374✔
3960
}
3961

3962
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
12,672,856✔
3963
  int32_t code = 0, lino = 0;
12,672,856✔
3964
  int32_t size = 0;
12,672,856✔
3965
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
12,672,856✔
3966
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
12,673,415✔
3967
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.skey));
25,346,546✔
3968
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.ekey));
25,346,567✔
3969
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
25,346,041✔
3970
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
25,346,304✔
3971
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
25,346,580✔
3972
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
12,673,165✔
3973
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
25,346,830✔
3974
  if (!tDecodeIsEnd(pDecoder)) {
12,673,415✔
3975
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->isWindowTrigger));
12,673,415✔
3976
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pInfo->precision));
25,346,044✔
3977
  }
3978
_exit:
12,673,415✔
3979
  return code;
12,673,415✔
3980
}
3981

3982
void tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
19,547,774✔
3983
  if (pInfo == NULL) return;
19,547,774✔
3984
  if (pInfo->pStreamPesudoFuncVals != NULL) {
19,547,774✔
3985
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
10,136,575✔
3986
    pInfo->pStreamPesudoFuncVals = NULL;
10,136,315✔
3987
  }
3988
  if (pInfo->pStreamPartColVals != NULL) {
19,547,777✔
3989
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
8,192,530✔
3990
    pInfo->pStreamPartColVals = NULL;
8,192,267✔
3991
  }
3992
}
3993

3994
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
56,744✔
3995
  SEncoder encoder = {0};
56,744✔
3996
  int32_t  code = TSDB_CODE_SUCCESS;
56,744✔
3997
  int32_t  lino = 0;
56,744✔
3998
  int32_t  tlen = 0;
56,744✔
3999

4000
  tEncoderInit(&encoder, buf, bufLen);
56,744✔
4001
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
56,744✔
4002

4003
  int32_t size = taosArrayGetSize(pRsp->infos);
56,744✔
4004
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
56,744✔
4005
  for (int32_t i = 0; i < size; ++i) {
146,430✔
4006
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
89,686✔
4007
    if (info == NULL) {
89,686✔
4008
      TAOS_CHECK_EXIT(terrno);
×
4009
    }
4010
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
179,372✔
4011
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
179,372✔
4012
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
89,686✔
4013
  }
4014

4015
  tEndEncode(&encoder);
56,744✔
4016

4017
_exit:
56,744✔
4018
  if (code != TSDB_CODE_SUCCESS) {
56,744✔
4019
    tlen = code;
×
4020
  } else {
4021
    tlen = encoder.pos;
56,744✔
4022
  }
4023
  tEncoderClear(&encoder);
56,744✔
4024
  return tlen;
56,744✔
4025
}
4026

4027
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
28,372✔
4028
  SDecoder decoder = {0};
28,372✔
4029
  int32_t  code = TSDB_CODE_SUCCESS;
28,372✔
4030
  int32_t  lino = 0;
28,372✔
4031
  int32_t  size = 0;
28,372✔
4032

4033
  tDecoderInit(&decoder, buf, bufLen);
28,372✔
4034
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
28,372✔
4035

4036
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
28,372✔
4037
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
28,372✔
4038
  if (vTableInfo->infos == NULL) {
28,372✔
4039
    TAOS_CHECK_EXIT(terrno);
×
4040
  }
4041
  for (int32_t i = 0; i < size; ++i) {
73,215✔
4042
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
44,843✔
4043
    if (info == NULL) {
44,843✔
4044
      TAOS_CHECK_EXIT(terrno);
×
4045
    }
4046
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
89,686✔
4047
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
89,686✔
4048
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
44,843✔
4049
  }
4050

4051
  tEndDecode(&decoder);
28,372✔
4052

4053
_exit:
28,372✔
4054
  tDecoderClear(&decoder);
28,372✔
4055
  return code;
28,372✔
4056
}
4057

4058

4059
void tDestroyVTableInfo(void *ptr) {
89,686✔
4060
  if (NULL == ptr) {
89,686✔
4061
    return;
×
4062
  }
4063
  VTableInfo* pTable = (VTableInfo*)ptr;
89,686✔
4064
  taosMemoryFree(pTable->cols.pColRef);
89,686✔
4065
}
4066

4067
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
7,889,863✔
4068
  if (ptr == NULL) return;
7,889,863✔
4069
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
7,889,863✔
4070
  ptr->infos = NULL;
7,889,220✔
4071
}
4072

4073
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
697,004✔
4074
  SEncoder encoder = {0};
697,004✔
4075
  int32_t  code = TSDB_CODE_SUCCESS;
697,500✔
4076
  int32_t  lino = 0;
697,500✔
4077
  int32_t  tlen = 0;
697,500✔
4078

4079
  tEncoderInit(&encoder, buf, bufLen);
697,500✔
4080
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
697,252✔
4081

4082
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
1,395,496✔
4083
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
697,500✔
4084
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
696,797✔
4085
  for (int32_t i = 0; i < size; ++i) {
1,460,018✔
4086
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
763,262✔
4087
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
1,526,028✔
4088
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
1,526,938✔
4089
  }
4090

4091
  tEndEncode(&encoder);
696,756✔
4092

4093
_exit:
697,500✔
4094
  if (code != TSDB_CODE_SUCCESS) {
697,748✔
4095
    tlen = code;
×
4096
  } else {
4097
    tlen = encoder.pos;
697,748✔
4098
  }
4099
  tEncoderClear(&encoder);
697,748✔
4100
  return tlen;
697,004✔
4101
}
4102

4103
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
349,172✔
4104
  SDecoder decoder = {0};
349,172✔
4105
  int32_t  code = TSDB_CODE_SUCCESS;
349,172✔
4106
  int32_t  lino = 0;
349,172✔
4107
  SSDataBlock *pResBlock = pBlock;
349,172✔
4108

4109
  tDecoderInit(&decoder, buf, bufLen);
349,172✔
4110
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
349,172✔
4111

4112
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
698,344✔
4113
  int32_t numOfCols = 2;
349,172✔
4114
  if (pResBlock->pDataBlock == NULL) {
349,172✔
4115
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
349,172✔
4116
    if (pResBlock->pDataBlock == NULL) {
349,172✔
4117
      TAOS_CHECK_EXIT(terrno);
×
4118
    }
4119
    for (int32_t i = 0; i< numOfCols; ++i) {
1,047,516✔
4120
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
698,344✔
4121
      if (pColInfoData == NULL) {
698,344✔
4122
        TAOS_CHECK_EXIT(terrno);
×
4123
      }
4124
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
698,344✔
4125
      pColInfoData->info.bytes = sizeof(int64_t);
698,344✔
4126
    }
4127
  }
4128
  int32_t numOfRows = 0;
349,172✔
4129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
349,172✔
4130
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
349,172✔
4131
  for (int32_t i = 0; i < numOfRows; ++i) {
731,543✔
4132
    for (int32_t j = 0; j < numOfCols; ++j) {
1,147,113✔
4133
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
764,742✔
4134
      if (pColInfoData == NULL) {
764,474✔
4135
        TAOS_CHECK_EXIT(terrno);
×
4136
      }
4137
      int64_t value = 0;
764,474✔
4138
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
764,474✔
4139
      colDataSetInt64(pColInfoData, i, &value);
764,474✔
4140
    }
4141
  }
4142

4143
  pResBlock->info.dataLoad = 1;
349,172✔
4144
  pResBlock->info.rows = numOfRows;
349,172✔
4145

4146
  tEndDecode(&decoder);
349,172✔
4147

4148
_exit:
349,172✔
4149
  tDecoderClear(&decoder);
349,172✔
4150
  return code;
349,172✔
4151
}
4152

4153
static int32_t encodeData(SEncoder* encoder, void* pBlock, SSHashObj* indexHash) {
1,130,388✔
4154
  int32_t code = TSDB_CODE_SUCCESS;
1,130,388✔
4155
  int32_t lino = 0;
1,130,388✔
4156
  int32_t len = 0;
1,130,388✔
4157
  if (encoder->data == NULL){
1,130,388✔
4158
    len = blockGetEncodeSize(pBlock);
565,007✔
4159
  } else {
4160
    len = blockEncode(pBlock, (char*)(encoder->data + encoder->pos), encoder->size - encoder->pos, blockDataGetNumOfCols(pBlock));
565,194✔
4161
    if (len < 0) {
565,194✔
4162
      TAOS_CHECK_EXIT(terrno);
×
4163
    }
4164
  }
4165
  encoder->pos += len;
1,130,388✔
4166

4167
  if (indexHash == NULL) {
1,130,388✔
4168
    goto _exit;
446,774✔
4169
  } 
4170
  
4171
  uint32_t pos = encoder->pos;
683,614✔
4172
  encoder->pos += sizeof(uint32_t); // reserve space for tables
683,614✔
4173
  int32_t tables = 0;
683,614✔
4174
  
4175
  void*   pe = NULL;
683,614✔
4176
  int32_t iter = 0;
683,614✔
4177
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
1,507,612✔
4178
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
823,998✔
4179
    if (pInfo->gId == -1){
823,998✔
4180
      continue;
×
4181
    }
4182
    int64_t uid = *(int64_t*)(tSimpleHashGetKey(pe, NULL));
823,998✔
4183
    TAOS_CHECK_EXIT(tEncodeI64(encoder, uid));
823,998✔
4184
    TAOS_CHECK_EXIT(tEncodeU64(encoder, pInfo->gId));
1,647,748✔
4185
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->startRowIdx));
1,647,500✔
4186
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->numRows));
1,647,748✔
4187
    tables++;
823,998✔
4188
  }
4189
  uint32_t tmpPos = encoder->pos;
683,614✔
4190
  encoder->pos = pos;
683,614✔
4191
  TAOS_CHECK_EXIT(tEncodeI32(encoder, tables));
683,614✔
4192
  encoder->pos = tmpPos;
683,614✔
4193
_exit:
1,130,388✔
4194
  return code;
1,130,388✔
4195
}
4196
 
4197
static int32_t encodeBlock(SEncoder* encoder, void* block, SSHashObj* indexHash) {
3,554,834✔
4198
  int32_t  code = TSDB_CODE_SUCCESS;
3,554,834✔
4199
  int32_t  lino = 0;
3,554,834✔
4200
  if (block != NULL && ((SSDataBlock*)block)->info.rows > 0) {
3,554,834✔
4201
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 1));
1,130,201✔
4202
    TAOS_CHECK_EXIT(encodeData(encoder, block, indexHash));
1,130,201✔
4203
  } else {
4204
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 0));
2,424,633✔
4205
  }
4206

4207
_exit:
2,424,633✔
4208
  return code;
3,555,021✔
4209
}
4210

4211
int32_t tSerializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* rsp) {
888,615✔
4212
  SEncoder encoder = {0};
888,615✔
4213
  int32_t  code = TSDB_CODE_SUCCESS;
888,802✔
4214
  int32_t  lino = 0;
888,802✔
4215
  int32_t  tlen = 0;
888,802✔
4216

4217
  tEncoderInit(&encoder, buf, bufLen);
888,802✔
4218
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
888,554✔
4219

4220
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->dataBlock, rsp->indexHash));
888,802✔
4221
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->metaBlock, NULL));
888,615✔
4222
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->deleteBlock, NULL));
888,802✔
4223
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->tableBlock, NULL));
888,802✔
4224

4225
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->ver));
1,776,980✔
4226
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->verTime));
1,777,167✔
4227
  tEndEncode(&encoder);
888,802✔
4228

4229
_exit:
888,615✔
4230
  if (code != TSDB_CODE_SUCCESS) {
888,615✔
4231
    tlen = code;
×
4232
  } else {
4233
    tlen = encoder.pos;
888,615✔
4234
  }
4235
  tEncoderClear(&encoder);
888,615✔
4236
  return tlen;
888,615✔
4237
}
4238

4239
static int32_t decodeBlock(SDecoder* decoder, void* pBlock) {
1,333,843✔
4240
  int32_t  code = TSDB_CODE_SUCCESS;
1,333,843✔
4241
  int32_t  lino = 0;
1,333,843✔
4242
  
4243
  int8_t hasData = false;
1,333,843✔
4244
  TAOS_CHECK_EXIT(tDecodeI8(decoder, &hasData));
1,333,841✔
4245
  if (hasData) {
1,333,841✔
4246
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
223,535✔
4247
    const char* pEndPos = NULL;
223,535✔
4248
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder->data + decoder->pos, &pEndPos));
223,535✔
4249
    decoder->pos = (uint8_t*)pEndPos - decoder->data;
223,535✔
4250
  } else if (pBlock != NULL) {
1,110,306✔
4251
    blockDataEmpty(pBlock);
444,768✔
4252
  }
4253

4254
_exit:
1,331,280✔
4255
  return code;
1,334,091✔
4256
}
4257

4258
int32_t tDeserializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* pRsp, SArray* pSlices){
444,697✔
4259
  SDecoder     decoder = {0};
444,697✔
4260
  int32_t      code = TSDB_CODE_SUCCESS;
444,697✔
4261
  int32_t      lino = 0;
444,697✔
4262
  SSDataBlock* pBlock = NULL;
444,697✔
4263

4264
  tDecoderInit(&decoder, buf, bufLen);
444,697✔
4265
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
444,697✔
4266

4267
  // decode data block
4268
  int8_t hasData = false;
444,697✔
4269
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasData));
444,697✔
4270
  pBlock = pRsp->dataBlock;
444,697✔
4271
  if (hasData) {
444,697✔
4272
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
341,955✔
4273
    const char* pEndPos = NULL;
341,955✔
4274
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
341,955✔
4275
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
341,955✔
4276

4277
    int32_t nSlices = 0;
341,955✔
4278
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nSlices));
341,955✔
4279
    TAOS_CHECK_EXIT(taosArrayEnsureCap(pSlices, nSlices));
341,955✔
4280
    taosArrayClear(pSlices);
341,955✔
4281
    int64_t  uid = 0;
341,955✔
4282
    uint64_t gid = 0;
341,955✔
4283
    int32_t  startIdx = 0;
341,955✔
4284
    int32_t  numRows = 0;
341,955✔
4285
    for (int32_t i = 0; i < nSlices; i++) {
753,854✔
4286
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &uid));
412,147✔
4287
      TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
412,147✔
4288
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &startIdx));
412,147✔
4289
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numRows));
412,147✔
4290
      int32_t endIdx = startIdx + numRows;
412,147✔
4291
      int64_t value[3] = {gid, uid, (int64_t)startIdx << 32 | endIdx};
412,147✔
4292
      void*   px = taosArrayPush(pSlices, value);
411,899✔
4293
      if (px == NULL) {
411,899✔
4294
        code = terrno;
×
4295
        goto _exit;
×
4296
      }
4297
    }
4298
  } else if (pBlock != NULL) {
102,742✔
4299
    blockDataEmpty(pBlock);
2,719✔
4300
    taosArrayClear(pSlices);
2,719✔
4301
  }
4302

4303
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->metaBlock));
444,449✔
4304
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->deleteBlock));
444,449✔
4305
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->tableBlock));
444,697✔
4306
  
4307
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
889,394✔
4308
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->verTime));
889,144✔
4309

4310
  tEndDecode(&decoder);
444,447✔
4311

4312
_exit:
444,447✔
4313
  if (code != TSDB_CODE_SUCCESS) {
444,447✔
4314
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4315
  }
4316
  tDecoderClear(&decoder);
444,447✔
4317
  return code;
444,429✔
4318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc