• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5007

29 Mar 2026 04:32AM UTC coverage: 72.25% (-0.02%) from 72.274%
#5007

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253624 of 351039 relevant lines covered (72.25%)

132531546.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.34
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "taos.h"
18
#include "tarray.h"
19
#include "tdatablock.h"
20
#include "thash.h"
21
#include "tlist.h"
22
#include "tmsg.h"
23
#include "os.h"
24
#include "tcommon.h"
25
#include "tsimplehash.h"
26

27
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
131,208✔
28
  int32_t code = 0;
131,208✔
29
  int32_t lino = 0;
131,208✔
30
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
262,416✔
31
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
262,416✔
32
  switch (pReq->type) {
131,208✔
33
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
126,412✔
34
      if (pReq->cont.pReqs) {
126,412✔
35
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
126,412✔
36
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
126,412✔
37
        for (int32_t i = 0; i < num; ++i) {
498,094✔
38
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.pReqs, i);
371,682✔
39
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
743,364✔
40
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
743,364✔
41
        }
42
      } else {
43
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
44
      }
45
      break;
126,412✔
46
    }
47
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
4,796✔
48
      if (pReq->cont.pReqs) {
4,796✔
49
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
4,796✔
50
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
4,796✔
51
        for (int32_t i = 0; i < num; ++i) {
9,592✔
52
          SStreamOReaderDeployReq* pDeploy = taosArrayGet(pReq->cont.pReqs, i);
4,796✔
53
          int32_t vgIdNum = taosArrayGetSize(pDeploy->vgIds);
4,796✔
54
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeploy->execId));
9,592✔
55
          TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pDeploy->uid));
9,592✔
56
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgIdNum));
4,796✔
57
          for (int32_t n = 0; n < vgIdNum; ++n) {
9,592✔
58
            TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pDeploy->vgIds, n)));
9,592✔
59
          }
60
        }
61
      } else {
62
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
63
      }
64
      break;
4,796✔
65
    }
66
    default:
×
67
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
68
      break;
×
69
  }
70

71
_exit:
131,208✔
72

73
  return code;
131,208✔
74
}
75

76
void tFreeRunnerOReaderDeployReq(void* param) {
7,194✔
77
  SStreamOReaderDeployReq* pReq = (SStreamOReaderDeployReq*)param;
7,194✔
78
  if (pReq) {
7,194✔
79
    taosArrayDestroy(pReq->vgIds);
7,194✔
80
  }
81
}
7,194✔
82

83
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
262,416✔
84
  if (NULL == pReq) {
262,416✔
85
    return;
65,604✔
86
  }
87

88
  switch (pReq->type) {
196,812✔
89
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
189,618✔
90
      taosArrayDestroy(pReq->cont.pReqs);
189,618✔
91
      break;
189,618✔
92
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER:
7,194✔
93
      taosArrayDestroyEx(pReq->cont.pReqs, tFreeRunnerOReaderDeployReq);
7,194✔
94
      break;
7,194✔
95
    default:
×
96
      break;
×
97
  }
98
}
99

100

101
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
65,604✔
102
  *ppDst = NULL;
65,604✔
103
  
104
  if (NULL == pSrc) {
65,604✔
105
    return TSDB_CODE_SUCCESS;
×
106
  }
107

108
  int32_t code = 0, lino = 0;
65,604✔
109
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
65,604✔
110
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
65,604✔
111

112
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
65,604✔
113
  if (pSrc->cont.pReqs) {
65,604✔
114
    switch (pSrc->type) {
65,604✔
115
      case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
63,206✔
116
        (*ppDst)->cont.pReqs = taosArrayDup(pSrc->cont.pReqs, NULL);
63,206✔
117
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
63,206✔
118
        break;
63,206✔
119
      case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
2,398✔
120
        int32_t reqNum = taosArrayGetSize(pSrc->cont.pReqs);
2,398✔
121
        (*ppDst)->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), reqNum);
2,398✔
122
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
2,398✔
123
        for (int32_t i = 0; i < reqNum; ++i) {
4,796✔
124
          SStreamOReaderDeployReq* pNew = taosArrayGet((*ppDst)->cont.pReqs, i);
2,398✔
125
          SStreamOReaderDeployReq* pReq = taosArrayGet(pSrc->cont.pReqs, i);
2,398✔
126
          pNew->vgIds = taosArrayDup(pReq->vgIds, NULL);
2,398✔
127
          TSDB_CHECK_NULL(pNew->vgIds, code, lino, _exit, terrno);
2,398✔
128
          pNew->execId = pReq->execId;
2,398✔
129
          pNew->uid = pReq->uid;
2,398✔
130
        }
131
        break;
2,398✔
132
      }  
133
      default:
×
134
        break;
×
135
    }
136
  }
137
  
138
_exit:
×
139

140
  if (code) {
65,604✔
141
    tFreeSStreamMgmtReq(*ppDst);
×
142
    taosMemoryFreeClear(*ppDst);
×
143
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
144
  }
145
  
146
  return code;
65,604✔
147
}
148

149

150
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
65,604✔
151
  int32_t code = 0;
65,604✔
152
  int32_t lino = 0;
65,604✔
153

154
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
131,208✔
155
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
131,208✔
156
  switch (pReq->type) {
65,604✔
157
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
63,206✔
158
      int32_t num = 0;
63,206✔
159
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
63,206✔
160
      if (num > 0) {
63,206✔
161
        pReq->cont.pReqs = taosArrayInit(num, sizeof(SStreamDbTableName));
63,206✔
162
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
63,206✔
163
        for (int32_t i = 0; i < num; ++i) {
249,047✔
164
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.pReqs, 1);
185,841✔
165
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
185,841✔
166
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
185,841✔
167
        }
168
      }
169
      break;
63,206✔
170
    }
171
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
2,398✔
172
      int32_t num = 0, vgIdNum = 0;
2,398✔
173
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
2,398✔
174
      if (num > 0) {
2,398✔
175
        pReq->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), num);
2,398✔
176
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
2,398✔
177
        for (int32_t i = 0; i < num; ++i) {
4,796✔
178
          SStreamOReaderDeployReq* p = taosArrayGet(pReq->cont.pReqs, i);
2,398✔
179
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &p->execId));
4,796✔
180
          TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &p->uid));
4,796✔
181
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgIdNum));
2,398✔
182
          if (vgIdNum > 0) {
2,398✔
183
            p->vgIds = taosArrayInit_s(sizeof(int32_t), vgIdNum);
2,398✔
184
            TSDB_CHECK_NULL(p->vgIds, code, lino, _exit, terrno);
2,398✔
185
          }
186
          for (int32_t n = 0; n < vgIdNum; ++n) {
4,796✔
187
            int32_t* vgId = taosArrayGet(p->vgIds, n);
2,398✔
188
            TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));
2,398✔
189
          }
190
        }
191
      }
192
      break;
2,398✔
193
    }
194
    default:
×
195
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
196
      break;
×
197
  }
198

199
_exit:
65,604✔
200

201
  return code;  
65,604✔
202
}
203

204
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
72,071,218✔
205
  int32_t code = 0;
72,071,218✔
206
  int32_t lino;
207

208
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
144,142,436✔
209
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
144,142,436✔
210
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
144,142,436✔
211

212
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
144,142,436✔
213
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
144,142,436✔
214
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
144,142,436✔
215
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
144,142,436✔
216
  // SKIP SESSIONID
217
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
144,142,436✔
218
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
144,142,436✔
219
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
144,142,436✔
220
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
144,142,436✔
221
  if (pTask->pMgmtReq) {
72,071,218✔
222
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
131,208✔
223
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
131,208✔
224
  } else {
225
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
71,940,010✔
226
  }
227

228
_exit:
71,940,010✔
229

230
  return code;
72,071,218✔
231
}
232

233

234
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
34,529,604✔
235
  int32_t code = 0;
34,529,604✔
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
69,059,208✔
239
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
69,059,208✔
240
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
69,059,208✔
241
  
242
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
69,059,208✔
243
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
69,059,208✔
244
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
69,059,208✔
245
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
69,059,208✔
246
  // SKIP SESSIONID
247
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
69,059,208✔
248
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
69,059,208✔
249
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
69,059,208✔
250
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
69,059,208✔
251
  int32_t req = 0;
34,529,604✔
252
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
34,529,604✔
253
  if (req) {
34,529,604✔
254
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
65,604✔
255
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
65,604✔
256
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
65,604✔
257
  }
258

259
_exit:
34,529,604✔
260

261
  return code;
34,529,604✔
262
}
263

264
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
265
  int32_t code = 0;
×
266
  int32_t lino;
267

268
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
269
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
270
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
271
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
272

273
_exit:
×
274

275
  return code;
×
276
}
277

278
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
283
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
284
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
285
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
286

287
_exit:
×
288

289
  return code;
×
290
}
291

292

293
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
4,263,862✔
294
  int32_t code = 0;
4,263,862✔
295
  int32_t lino;
296

297
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
8,527,724✔
298
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
8,527,724✔
299
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
8,527,724✔
300
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
8,527,724✔
301
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
8,527,724✔
302

303
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
4,263,862✔
304
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
4,263,862✔
305
  for (int32_t i = 0; i < recalcNum; ++i) {
4,263,862✔
306
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
307
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
308
  }
309

310
_exit:
4,263,862✔
311

312
  return code;
4,263,862✔
313
}
314

315
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
2,019,922✔
316
  int32_t code = 0;
2,019,922✔
317
  int32_t lino;
318

319
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
4,039,844✔
320
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
4,039,844✔
321
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
4,039,844✔
322
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
4,039,844✔
323
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
4,039,844✔
324

325
  int32_t recalcNum = 0;
2,019,922✔
326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
2,019,922✔
327
  if (recalcNum > 0) {
2,019,922✔
328
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
329
    if (NULL == pStatus->userRecalcs) {
×
330
      code = terrno;
×
331
      goto _exit;
×
332
    }
333
  }
334

335
  for (int32_t i = 0; i < recalcNum; ++i) {
2,019,922✔
336
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
337
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
338
  }
339

340
_exit:
2,019,922✔
341

342
  return code;
2,019,922✔
343
}
344

345

346
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
34,464,834✔
347
  int32_t code = 0;
34,464,834✔
348
  int32_t lino;
349

350
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
34,464,834✔
351
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
68,929,668✔
352
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
68,929,668✔
353
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
68,929,668✔
354
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
68,929,668✔
355

356
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
34,464,834✔
357
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
34,464,834✔
358
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
137,455,558✔
359
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
102,990,724✔
360
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
205,981,448✔
361
  }
362
  
363
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
34,464,834✔
364
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
34,464,834✔
365
  for (int32_t i = 0; i < statusNum; ++i) {
102,538,320✔
366
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
68,073,486✔
367
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
68,073,486✔
368
  }
369

370
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
34,464,834✔
371
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
34,464,834✔
372
  for (int32_t i = 0; i < reqNum; ++i) {
34,596,042✔
373
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
131,208✔
374
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
262,416✔
375
  }
376

377
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
34,464,834✔
378
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
34,464,834✔
379
  for (int32_t i = 0; i < triggerNum; ++i) {
38,728,696✔
380
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
4,263,862✔
381
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
4,263,862✔
382
  }
383
  
384
  tEndEncode(pEncoder);
34,464,834✔
385

386
_exit:
34,464,834✔
387
  if (code) {
34,464,834✔
388
    return code;
×
389
  } else {
390
    return pEncoder->pos;
34,464,834✔
391
  }
392
}
393

394
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
16,526,076✔
395
  int32_t code = 0;
16,526,076✔
396
  int32_t lino;
397

398
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
16,526,076✔
399
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
33,052,152✔
400
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
33,052,152✔
401
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
33,052,152✔
402
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
33,052,152✔
403

404
  int32_t vgLearderNum = 0;
16,526,076✔
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
16,526,076✔
406
  if (vgLearderNum > 0) {
16,526,076✔
407
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
12,966,202✔
408
    if (NULL == pReq->pVgLeaders) {
12,966,202✔
409
      code = terrno;
×
410
      goto _exit;
×
411
    }
412
  }
413
  for (int32_t i = 0; i < vgLearderNum; ++i) {
66,818,830✔
414
    int32_t vgId = 0;
50,292,754✔
415
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
50,292,754✔
416
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
100,585,508✔
417
      code = terrno;
×
418
      goto _exit;
×
419
    }
420
  }
421

422

423
  int32_t statusNum = 0;
16,526,076✔
424
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
16,526,076✔
425
  if (statusNum > 0) {
16,526,076✔
426
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
1,145,032✔
427
    if (NULL == pReq->pStreamStatus) {
1,145,032✔
428
      code = terrno;
×
429
      goto _exit;
×
430
    }
431
  }
432
  for (int32_t i = 0; i < statusNum; ++i) {
49,072,452✔
433
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
32,546,376✔
434
    if (NULL == pTask) {
32,546,376✔
435
      code = terrno;
×
436
      goto _exit;
×
437
    }
438
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
32,546,376✔
439
  }
440

441

442
  int32_t reqNum = 0;
16,526,076✔
443
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
16,526,076✔
444
  if (reqNum > 0) {
16,526,076✔
445
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
22,209✔
446
    if (NULL == pReq->pStreamReq) {
22,209✔
447
      code = terrno;
×
448
      goto _exit;
×
449
    }
450
  }
451
  for (int32_t i = 0; i < reqNum; ++i) {
16,591,680✔
452
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
65,604✔
453
    if (NULL == pIdx) {
65,604✔
454
      code = terrno;
×
455
      goto _exit;
×
456
    }
457
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
65,604✔
458
  }
459

460

461
  int32_t triggerNum = 0;
16,526,076✔
462
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
16,526,076✔
463
  if (triggerNum > 0) {
16,526,076✔
464
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
514,365✔
465
    if (NULL == pReq->pTriggerStatus) {
514,365✔
466
      code = terrno;
×
467
      goto _exit;
×
468
    }
469
  }
470
  for (int32_t i = 0; i < triggerNum; ++i) {
18,545,998✔
471
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
2,019,922✔
472
    if (NULL == pStatus) {
2,019,922✔
473
      code = terrno;
×
474
      goto _exit;
×
475
    }
476
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
2,019,922✔
477
  }
478

479
  
480
  tEndDecode(pDecoder);
16,526,076✔
481

482
_exit:
16,526,076✔
483
  return code;
16,526,076✔
484
}
485

486
void tFreeSSTriggerRuntimeStatus(void* param) {
4,151,853✔
487
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
4,151,853✔
488
  if (NULL == pStatus) {
4,151,853✔
489
    return;
×
490
  }
491
  taosArrayDestroy(pStatus->userRecalcs);
4,151,853✔
492
}
493

494
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
100,040,575✔
495
  if (pMsg == NULL) {
100,040,575✔
496
    return;
×
497
  }
498

499
  taosArrayDestroy(pMsg->pVgLeaders);
100,040,575✔
500
  if (deepClean) {
100,040,575✔
501
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
100,040,575✔
502
    for (int32_t i = 0; i < reqNum; ++i) {
100,171,783✔
503
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
131,208✔
504
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
131,208✔
505
      if (NULL == pTask) {
131,208✔
506
        continue;
×
507
      }
508

509
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
131,208✔
510
      taosMemoryFree(pTask->pMgmtReq);
131,208✔
511
    }
512
  }
513
  taosArrayDestroy(pMsg->pStreamReq);
100,040,575✔
514
  taosArrayDestroy(pMsg->pStreamStatus);
100,040,575✔
515
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
100,040,575✔
516
}
517

518
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
590,558✔
519
  int32_t code = 0;
590,558✔
520
  int32_t lino;
521

522
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
1,181,116✔
523
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
1,181,116✔
524
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblSuid));
1,181,116✔
525
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
1,181,116✔
526
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
1,181,116✔
527
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
1,181,116✔
528
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
1,181,116✔
529
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
1,181,116✔
530
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
1,181,116✔
531
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
532
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
1,181,116✔
533
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
1,181,116✔
534

535
_exit:
590,558✔
536

537
  return code;
590,558✔
538
}
539

540
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
732,102✔
541
  int32_t code = 0;
732,102✔
542
  int32_t lino;
543

544
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
1,464,204✔
545
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
1,464,204✔
546

547
_exit:
732,102✔
548

549
  return code;
732,102✔
550
}
551

552

553
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
1,322,660✔
554
  int32_t code = 0;
1,322,660✔
555
  int32_t lino;
556

557
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
2,645,320✔
558
  if (pMsg->triggerReader) {
1,322,660✔
559
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
590,558✔
560
  } else {
561
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
732,102✔
562
  }
563
  
564
_exit:
732,102✔
565

566
  return code;
1,322,660✔
567
}
568

569
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
1,748,952✔
570
  int32_t code = 0;
1,748,952✔
571
  int32_t lino;
572

573
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
3,497,904✔
574
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
3,497,904✔
575
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
1,748,952✔
576

577
_exit:
1,748,952✔
578

579
  return code;
1,748,952✔
580
}
581

582
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
1,150,074✔
583
  int32_t code = 0;
1,150,074✔
584
  int32_t lino;
585

586
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
1,150,074✔
587
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,300,148✔
588

589
_exit:
1,150,074✔
590

591
  return code;
1,150,074✔
592
}
593

594

595
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
389,948✔
596
  int32_t code = 0;
389,948✔
597
  int32_t lino;
598

599
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
779,896✔
600
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
779,896✔
601
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
779,896✔
602
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
779,896✔
603
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
779,896✔
604
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
779,896✔
605
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
779,896✔
606
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
779,896✔
607
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblStb));
779,896✔
608
  int32_t partitionColsLen = pMsg->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->partitionCols) + 1;
389,948✔
609
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, partitionColsLen));
779,896✔
610

611
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
389,948✔
612
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
389,948✔
613
  for (int32_t i = 0; i < addrSize; ++i) {
499,732✔
614
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
109,784✔
615
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
219,568✔
616
  }
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
779,896✔
618
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
779,896✔
619
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
779,896✔
620

621
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
779,896✔
622
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
779,896✔
623
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
779,896✔
624
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
779,896✔
625
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->idleTimeoutMs));
779,896✔
626

627
  switch (pMsg->triggerType) {
389,948✔
628
    case WINDOW_TYPE_SESSION: {
16,666✔
629
      // session trigger
630
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
33,332✔
631
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
33,332✔
632
      break;
16,666✔
633
    }
634
    case WINDOW_TYPE_STATE: {
128,544✔
635
      // state trigger
636
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
257,088✔
637
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.extend));
257,088✔
638
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForType));
257,088✔
639
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForCount));
257,088✔
640
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
257,088✔
641
      int32_t stateWindowZerothLen = 
128,544✔
642
          pMsg->trigger.stateWin.zeroth == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.zeroth) + 1;
128,544✔
643
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.zeroth, stateWindowZerothLen));
257,088✔
644
      int32_t stateWindowExprLen =
128,544✔
645
          pMsg->trigger.stateWin.expr == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.expr) + 1;
128,544✔
646
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.expr, stateWindowExprLen));
257,088✔
647
      break;
128,544✔
648
    }
649
    case WINDOW_TYPE_INTERVAL: {
140,636✔
650
      // slide trigger
651
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
281,272✔
652
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
281,272✔
653
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
281,272✔
654
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
281,272✔
655
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
281,272✔
656
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
281,272✔
657
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
281,272✔
658
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
281,272✔
659
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
281,272✔
660
      break;
140,636✔
661
    }
662
    case WINDOW_TYPE_EVENT: {
47,512✔
663
      // event trigger
664
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
47,512✔
665
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
47,512✔
666

667
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
95,024✔
668
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
95,024✔
669
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForType));
95,024✔
670
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForCount));
95,024✔
671
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
95,024✔
672
      break;
47,512✔
673
    }
674
    case WINDOW_TYPE_COUNT: {
35,676✔
675
      // count trigger
676
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
35,676✔
677
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
71,352✔
678

679
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
71,352✔
680
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
71,352✔
681
      break;
35,676✔
682
    }
683
    case WINDOW_TYPE_PERIOD: {
20,914✔
684
      // period trigger
685
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.periodUnit));
41,828✔
686
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.offsetUnit));
41,828✔
687
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.precision));
41,828✔
688
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
41,828✔
689
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
41,828✔
690
      break;
20,914✔
691
    }
692
    default:
×
693
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
694
      break;
×
695
  }
696

697
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
779,896✔
698
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
779,896✔
699
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
779,896✔
700
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
779,896✔
701
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcPkSlotId));
779,896✔
702
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triPkSlotId));
779,896✔
703
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
389,948✔
704
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
779,896✔
705
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
389,948✔
706
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
779,896✔
707
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
389,948✔
708
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
779,896✔
709

710
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
389,948✔
711
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
389,948✔
712
  for (int32_t i = 0; i < readerNum; ++i) {
856,670✔
713
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
466,722✔
714
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
466,722✔
715
  }
716

717
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
389,948✔
718
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
389,948✔
719
  for (int32_t i = 0; i < runnerNum; ++i) {
1,540,022✔
720
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1,150,074✔
721
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
1,150,074✔
722
  }
723

724
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
779,896✔
725
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
779,896✔
726
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->precision));
779,896✔
727

728
_exit:
389,948✔
729

730
  return code;
389,948✔
731
}
732

733

734
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
6,166,572✔
735
  int32_t code = 0;
6,166,572✔
736
  int32_t lino;
737

738
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
12,333,144✔
739
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
12,333,144✔
740
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
12,333,144✔
741
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
12,333,144✔
742
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
12,333,144✔
743
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
12,333,144✔
744

745
_exit:
6,166,572✔
746

747
  return code;
6,166,572✔
748
}
749

750

751
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
1,328,712✔
752
  int32_t code = 0;
1,328,712✔
753
  int32_t lino;
754

755
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
2,657,424✔
756
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
2,657,424✔
757
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
2,657,424✔
758
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
2,657,424✔
759
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
2,657,424✔
760
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
2,657,424✔
761
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
2,657,424✔
762
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
2,657,424✔
763

764
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
1,328,712✔
765
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
1,328,712✔
766
  for (int32_t i = 0; i < addrSize; ++i) {
1,638,294✔
767
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
309,582✔
768
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
619,164✔
769
  }
770
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
2,657,424✔
771

772
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
1,328,712✔
773
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
1,328,712✔
774
  for (int32_t i = 0; i < outColNum; ++i) {
6,680,694✔
775
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
5,351,982✔
776
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
5,351,982✔
777
  }
778

779
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
1,328,712✔
780
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
1,328,712✔
781
  for (int32_t i = 0; i < outTagNum; ++i) {
2,143,302✔
782
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
814,590✔
783
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
814,590✔
784
  }
785

786
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
2,657,424✔
787
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
2,657,424✔
788

789
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
2,657,424✔
790
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
2,657,424✔
791

792
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
1,328,712✔
793
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
1,328,712✔
794
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1,459,446✔
795
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
130,734✔
796
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
130,734✔
797

798
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
261,468✔
799
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
261,468✔
800
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
261,468✔
801
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
261,468✔
802
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
261,468✔
803
  }
804

805
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
2,657,424✔
806

807
  // colCids and tagCids - always encode size (0 if NULL) for compatibility
808
  int32_t colCidsSize = (int32_t)taosArrayGetSize(pMsg->colCids);
1,328,712✔
809
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, colCidsSize));
1,328,712✔
810
  if (colCidsSize > 0) {
1,328,712✔
811
    for (int32_t i = 0; i < colCidsSize; ++i) {
93,828✔
812
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->colCids, i);
70,812✔
813
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
141,624✔
814
    }
815
  }
816

817
  int32_t tagCidsSize = (int32_t)taosArrayGetSize(pMsg->tagCids);
1,328,712✔
818
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, tagCidsSize));
1,328,712✔
819
  if (tagCidsSize > 0) {
1,328,712✔
820
    for (int32_t i = 0; i < tagCidsSize; ++i) {
46,476✔
821
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->tagCids, i);
27,450✔
822
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
54,900✔
823
    }
824
  }
825

826
_exit:
1,328,712✔
827

828
  return code;
1,328,712✔
829
}
830

831
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
3,041,320✔
832
  int32_t code = 0;
3,041,320✔
833
  int32_t lino;
834

835
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
3,041,320✔
836
  switch (pTask->task.type) {
3,041,320✔
837
    case STREAM_READER_TASK:
1,322,660✔
838
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
1,322,660✔
839
      break;
1,322,660✔
840
    case STREAM_TRIGGER_TASK:
389,948✔
841
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
389,948✔
842
      break;
389,948✔
843
    case STREAM_RUNNER_TASK:
1,328,712✔
844
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
1,328,712✔
845
      break;
1,328,712✔
846
    default:
×
847
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
848
      break;
×
849
  }
850
  
851
_exit:
3,041,320✔
852

853
  return code;
3,041,320✔
854
}
855

856

857
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
533,936✔
858
  int32_t code = 0;
533,936✔
859
  int32_t lino;
860

861
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
1,067,872✔
862

863
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
533,936✔
864
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
533,936✔
865
  for (int32_t i = 0; i < readerNum; ++i) {
1,856,596✔
866
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
1,322,660✔
867
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,322,660✔
868
  }
869

870
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
1,067,872✔
871
  if (pStream->triggerTask) {
533,936✔
872
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
389,948✔
873
  }
874
  
875
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
533,936✔
876
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
533,936✔
877
  for (int32_t i = 0; i < runnerNum; ++i) {
1,862,648✔
878
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
1,328,712✔
879
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
1,328,712✔
880
  }
881

882
_exit:
533,936✔
883

884
  return code;
533,936✔
885
}
886

887
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
956,412✔
888
  int32_t code = 0;
956,412✔
889
  int32_t lino = 0;
956,412✔
890

891
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
1,912,824✔
892

893
_exit:
956,412✔
894
  return code;
956,412✔
895
}
896

897
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
478,257✔
898
  int32_t code = 0;
478,257✔
899
  int32_t lino;
900

901
  int32_t type = 0;
478,257✔
902
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
478,257✔
903
  pMsg->msgType = type;
478,257✔
904

905
_exit:
478,257✔
906
  return code;
478,257✔
907
}
908

909
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
391,738✔
910
  int32_t code = 0;
391,738✔
911
  int32_t lino;
912

913
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
391,738✔
914

915
_exit:
391,738✔
916

917
  return code;
391,738✔
918
}
919

920
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
391,738✔
921
  int32_t code = 0;
391,738✔
922
  int32_t lino;
923

924
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
391,738✔
925
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
391,738✔
926

927
_exit:
391,738✔
928

929
  return code;
391,738✔
930
}
931

932
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
426,722✔
933
  int32_t code = 0;
426,722✔
934
  int32_t lino;
935

936
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
426,722✔
937
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
853,444✔
938
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
853,444✔
939

940
_exit:
426,722✔
941

942
  return code;
426,722✔
943
}
944

945
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
426,722✔
946
  int32_t code = 0;
426,722✔
947
  int32_t lino;
948

949
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
426,722✔
950
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
426,722✔
951

952
_exit:
426,722✔
953

954
  return code;
426,722✔
955
}
956

957

958
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
6,744✔
959
  int32_t code = 0;
6,744✔
960
  int32_t lino;
961

962
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
13,488✔
963
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
13,488✔
964
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
13,488✔
965

966
_exit:
6,744✔
967

968
  return code;
6,744✔
969
}
970

971
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
137,952✔
972
  int32_t code = 0;
137,952✔
973
  int32_t lino;
974

975
  switch (msgType) {
137,952✔
976
    case STREAM_MSG_ORIGTBL_READER_INFO: {
126,412✔
977
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
126,412✔
978
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
126,412✔
979

980
      for (int32_t i = 0; i < vgNum; ++i) {
498,094✔
981
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
371,682✔
982
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
743,364✔
983
      }
984

985
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
126,412✔
986
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
126,412✔
987
      
988
      for (int32_t i = 0; i < readerNum; ++i) {
249,144✔
989
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
122,732✔
990
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
122,732✔
991
      }
992
      break;
126,412✔
993
    }
994
    case STREAM_MSG_UPDATE_RUNNER: {
×
995
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
996
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
997
      
998
      for (int32_t i = 0; i < runnerNum; ++i) {
×
999
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1000
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1001
      }
1002
      break;
×
1003
    }
1004
    case STREAM_MSG_USER_RECALC: {
6,744✔
1005
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
6,744✔
1006
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
6,744✔
1007
      
1008
      for (int32_t i = 0; i < recalcNum; ++i) {
13,488✔
1009
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
6,744✔
1010
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
6,744✔
1011
      }
1012
      break;
6,744✔
1013
    }
1014
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
4,796✔
1015
      int32_t rspNum = taosArrayGetSize(pRsp->execRspList);
4,796✔
1016
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
4,796✔
1017
      
1018
      for (int32_t i = 0; i < rspNum; ++i) {
9,592✔
1019
        SStreamOReaderDeployRsp* pDeployRsp = taosArrayGet(pRsp->execRspList, i);
4,796✔
1020
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeployRsp->execId));
9,592✔
1021
        int32_t vgNum = taosArrayGetSize(pDeployRsp->vgList);
4,796✔
1022
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
4,796✔
1023
        for (int32_t n = 0; n < vgNum; ++n) {
9,592✔
1024
          TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, taosArrayGet(pDeployRsp->vgList, n)));
4,796✔
1025
        }
1026
      }
1027
      break;
4,796✔
1028
    }
1029
    default:
×
1030
      break;
×
1031
  }
1032

1033
_exit:
137,952✔
1034

1035
  return code;
137,952✔
1036
}
1037

1038
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
137,952✔
1039
  int32_t code = 0;
137,952✔
1040
  int32_t lino;
1041

1042
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
137,952✔
1043
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
275,904✔
1044
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
275,904✔
1045
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
137,952✔
1046
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
137,952✔
1047

1048
_exit:
137,952✔
1049

1050
  return code;
137,952✔
1051
}
1052

1053

1054
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
32,757,354✔
1055
  int32_t code = 0;
32,757,354✔
1056
  int32_t lino;
1057

1058
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
32,757,354✔
1059
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
65,514,708✔
1060
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
32,757,354✔
1061
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
32,757,354✔
1062
  for (int32_t i = 0; i < deployNum; ++i) {
33,291,290✔
1063
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
533,936✔
1064
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
533,936✔
1065
  }
1066

1067
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
32,757,354✔
1068
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
32,757,354✔
1069
  for (int32_t i = 0; i < startNum; ++i) {
33,149,092✔
1070
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
391,738✔
1071
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
391,738✔
1072
  }
1073

1074
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
65,514,708✔
1075
  if (!pRsp->undeploy.undeployAll) {
32,757,354✔
1076
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
32,757,354✔
1077
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
32,757,354✔
1078
    for (int32_t i = 0; i < undeployNum; ++i) {
33,184,076✔
1079
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
426,722✔
1080
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
426,722✔
1081
    }
1082
  }
1083

1084
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
32,757,354✔
1085
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
32,757,354✔
1086
  for (int32_t i = 0; i < rspNum; ++i) {
32,895,306✔
1087
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
137,952✔
1088
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
137,952✔
1089
  }
1090
  
1091
_exit:
32,757,354✔
1092

1093
  tEndEncode(pEncoder);
32,757,354✔
1094

1095
  return code;
32,757,354✔
1096
}
1097

1098
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
295,279✔
1099
  int32_t code = 0;
295,279✔
1100
  int32_t lino;
1101

1102
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
590,558✔
1103
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
590,558✔
1104
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblSuid));
590,558✔
1105
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
590,558✔
1106
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
590,558✔
1107
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
590,558✔
1108
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
590,558✔
1109
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
590,558✔
1110
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
590,558✔
1111
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
590,558✔
1112
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
590,558✔
1113

1114
_exit:
295,279✔
1115

1116
  return code;
295,279✔
1117
}
1118

1119

1120
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
362,879✔
1121
  int32_t code = 0;
362,879✔
1122
  int32_t lino;
1123

1124
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
725,758✔
1125
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
725,758✔
1126

1127
_exit:
362,879✔
1128

1129
  return code;
362,879✔
1130
}
1131

1132

1133
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
658,158✔
1134
  int32_t code = 0;
658,158✔
1135
  int32_t lino;
1136

1137
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
1,316,316✔
1138
  if (pMsg->triggerReader) {
658,158✔
1139
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
295,279✔
1140
  } else {
1141
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
362,879✔
1142
  }
1143
  
1144
_exit:
362,879✔
1145

1146
  return code;
658,158✔
1147
}
1148

1149

1150
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
874,037✔
1151
  int32_t code = 0;
874,037✔
1152
  int32_t lino;
1153

1154
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
1,748,074✔
1155
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
1,748,074✔
1156
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
874,037✔
1157

1158
_exit:
874,037✔
1159

1160
  return code;
874,037✔
1161
}
1162

1163

1164
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
565,662✔
1165
  int32_t code = 0;
565,662✔
1166
  int32_t lino;
1167

1168
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
565,662✔
1169
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,131,324✔
1170

1171
_exit:
565,662✔
1172

1173
  return code;
565,662✔
1174
}
1175

1176

1177
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
191,849✔
1178
  int32_t code = 0;
191,849✔
1179
  int32_t lino;
1180

1181
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
383,698✔
1182
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
383,698✔
1183
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
383,698✔
1184
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
383,698✔
1185
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
383,698✔
1186
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
383,698✔
1187
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
383,698✔
1188
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
383,698✔
1189
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblStb));
383,698✔
1190
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
383,698✔
1191

1192
  int32_t addrSize = 0;
191,849✔
1193
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
191,849✔
1194
  if (addrSize > 0) {
191,849✔
1195
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
54,892✔
1196
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
54,892✔
1197
  }
1198
  for (int32_t i = 0; i < addrSize; ++i) {
246,741✔
1199
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
54,892✔
1200
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
54,892✔
1201
  }
1202
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
383,698✔
1203
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
383,698✔
1204
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
383,698✔
1205

1206
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
383,698✔
1207
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
383,698✔
1208
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
383,698✔
1209
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
383,698✔
1210
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->idleTimeoutMs));
383,698✔
1211

1212
  switch (pMsg->triggerType) {
191,849✔
1213
    case WINDOW_TYPE_SESSION:
8,333✔
1214
      // session trigger
1215
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
16,666✔
1216
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
16,666✔
1217
      break;
8,333✔
1218
    case WINDOW_TYPE_STATE:
64,272✔
1219
      // state trigger
1220
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
128,544✔
1221
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.extend));
128,544✔
1222
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForType));
128,544✔
1223
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForCount));
128,544✔
1224
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
128,544✔
1225
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.zeroth, NULL));
128,544✔
1226
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.expr, NULL));
128,544✔
1227
      break;
64,272✔
1228
    
1229
    case WINDOW_TYPE_INTERVAL:
70,365✔
1230
      // slide trigger
1231
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
140,730✔
1232
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
140,730✔
1233
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
140,730✔
1234
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
140,730✔
1235
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
140,730✔
1236
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
140,730✔
1237
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
140,730✔
1238
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
140,730✔
1239
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
140,730✔
1240
      break;
70,365✔
1241
    
1242
    case WINDOW_TYPE_EVENT:
23,756✔
1243
      // event trigger
1244
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
47,512✔
1245
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
47,512✔
1246
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForType));
47,512✔
1247
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForCount));
47,512✔
1248
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
47,512✔
1249
      break;
23,756✔
1250
    
1251
    case WINDOW_TYPE_COUNT:
17,838✔
1252
      // count trigger
1253
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
35,676✔
1254
      
1255
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
35,676✔
1256
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
35,676✔
1257
      break;
17,838✔
1258
    
1259
    case WINDOW_TYPE_PERIOD:
7,285✔
1260
      // period trigger
1261
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.periodUnit));
14,570✔
1262
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.offsetUnit));
14,570✔
1263
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.period.precision));
14,570✔
1264
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
14,570✔
1265
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
14,570✔
1266
      break;
7,285✔
1267
    default:
×
1268
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1269
      break;
×
1270
  }
1271

1272
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
383,698✔
1273
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
383,698✔
1274
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
383,698✔
1275
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
383,698✔
1276
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcPkSlotId));
383,698✔
1277
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triPkSlotId));
383,698✔
1278
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
383,698✔
1279
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
383,698✔
1280
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
383,698✔
1281

1282
  int32_t readerNum = 0;
191,849✔
1283
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
191,849✔
1284
  if (readerNum > 0) {
191,849✔
1285
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
190,850✔
1286
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
190,850✔
1287
  }
1288
  for (int32_t i = 0; i < readerNum; ++i) {
425,290✔
1289
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
233,441✔
1290
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
233,441✔
1291
  }
1292

1293
  int32_t runnerNum = 0;
191,849✔
1294
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
191,849✔
1295
  if (runnerNum > 0) {
191,849✔
1296
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
188,554✔
1297
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
188,554✔
1298
  }
1299
  for (int32_t i = 0; i < runnerNum; ++i) {
757,511✔
1300
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
565,662✔
1301
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
565,662✔
1302
  }
1303

1304
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
383,698✔
1305
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
383,698✔
1306
  if (!tDecodeIsEnd(pDecoder)) {
191,849✔
1307
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->precision));
383,698✔
1308
  }
1309

1310
_exit:
191,849✔
1311

1312
  return code;
191,849✔
1313
}
1314

1315

1316

1317
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
3,064,601✔
1318
  int32_t code = 0;
3,064,601✔
1319
  int32_t lino;
1320

1321
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
3,064,601✔
1322
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
6,129,202✔
1323
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
6,129,202✔
1324
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
6,129,202✔
1325
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
6,129,202✔
1326
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
6,129,202✔
1327

1328
_exit:
3,064,601✔
1329

1330
  return code;
3,064,601✔
1331
}
1332

1333
void destroySStreamOutCols(void* p){
65,367✔
1334
  if (p == NULL) return;
65,367✔
1335
  SStreamOutCol* col = (SStreamOutCol*)p;
65,367✔
1336
  taosMemoryFreeClear(col->expr);
65,367✔
1337
}
1338

1339
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
654,964✔
1340
  int32_t code = 0;
654,964✔
1341
  int32_t lino;
1342

1343
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1,309,928✔
1344
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1,309,928✔
1345
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
1,309,928✔
1346
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
1,309,928✔
1347
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
1,309,928✔
1348
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
1,309,928✔
1349
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
1,309,928✔
1350
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
1,309,928✔
1351

1352
  int32_t addrSize = 0;
654,964✔
1353
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
654,964✔
1354
  if (addrSize > 0) {
654,964✔
1355
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
154,791✔
1356
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
154,791✔
1357
  }
1358
  for (int32_t i = 0; i < addrSize; ++i) {
809,755✔
1359
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
154,791✔
1360
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
154,791✔
1361
  }
1362
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
1,309,928✔
1363

1364
  int32_t outColNum = 0;
654,964✔
1365
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
654,964✔
1366
  if (outColNum > 0) {
654,964✔
1367
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
654,964✔
1368
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
654,964✔
1369
  }
1370
  for (int32_t i = 0; i < outColNum; ++i) {
3,312,171✔
1371
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
2,657,207✔
1372
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
2,657,207✔
1373
  }
1374

1375
  int32_t outTagNum = 0;
654,964✔
1376
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
654,964✔
1377
  if (outTagNum > 0) {
654,964✔
1378
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
263,067✔
1379
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
263,067✔
1380
  }
1381
  for (int32_t i = 0; i < outTagNum; ++i) {
1,062,358✔
1382
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
407,394✔
1383
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
407,394✔
1384
  }
1385

1386
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
1,309,928✔
1387
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
1,309,928✔
1388

1389
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
1,309,928✔
1390
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
1,309,928✔
1391

1392
  int32_t forceOutColsSize = 0;
654,964✔
1393
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
654,964✔
1394
  if (forceOutColsSize > 0) {
654,964✔
1395
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
11,868✔
1396
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
11,868✔
1397
  }
1398
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
720,331✔
1399
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
65,367✔
1400

1401
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
130,734✔
1402
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
130,734✔
1403
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
130,734✔
1404
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
130,734✔
1405
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
130,734✔
1406
  }
1407

1408
  if (!tDecodeIsEnd(pDecoder)) {
654,964✔
1409
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
1,309,928✔
1410
  }
1411

1412
  // colCids and tagCids - always decode size, create array only if size > 0
1413
  // For backward compatibility, check if there's more data before decoding
1414
  if (!tDecodeIsEnd(pDecoder)) {
654,964✔
1415
    int32_t colCidsSize = 0;
654,964✔
1416
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &colCidsSize));
654,964✔
1417
    if (colCidsSize > 0 && colCidsSize <= TSDB_MAX_COLUMNS) {  // Sanity check
654,964✔
1418
      pMsg->colCids = taosArrayInit(colCidsSize, sizeof(int16_t));
11,508✔
1419
      TSDB_CHECK_NULL(pMsg->colCids, code, lino, _exit, terrno);
11,508✔
1420
      for (int32_t i = 0; i < colCidsSize; ++i) {
46,914✔
1421
        int16_t cid = 0;
35,406✔
1422
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
35,406✔
1423
        if (taosArrayPush(pMsg->colCids, &cid) == NULL) {
70,812✔
1424
          TAOS_CHECK_EXIT(terrno);
×
1425
        }
1426
      }
1427
    }
1428
  }
1429
  // Try to decode tagCids if there's more data
1430
  if (!tDecodeIsEnd(pDecoder)) {
654,964✔
1431
    int32_t tagCidsSize = 0;
654,964✔
1432
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &tagCidsSize));
654,964✔
1433
    if (tagCidsSize > 0 && tagCidsSize <= TSDB_MAX_TAGS) {  // Sanity check
654,964✔
1434
      pMsg->tagCids = taosArrayInit(tagCidsSize, sizeof(int16_t));
9,513✔
1435
      TSDB_CHECK_NULL(pMsg->tagCids, code, lino, _exit, terrno);
9,513✔
1436
      for (int32_t i = 0; i < tagCidsSize; ++i) {
23,238✔
1437
        int16_t cid = 0;
13,725✔
1438
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
13,725✔
1439
        if (taosArrayPush(pMsg->tagCids, &cid) == NULL) {
27,450✔
1440
          TAOS_CHECK_EXIT(terrno);
×
1441
        }
1442
      }
1443
    }
1444
  }
1445

1446
_exit:
653,569✔
1447

1448
  return code;
654,964✔
1449
}
1450

1451
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
1,504,971✔
1452
  int32_t code = 0;
1,504,971✔
1453
  int32_t lino;
1454

1455
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1,504,971✔
1456
  switch (pTask->task.type) {
1,504,971✔
1457
    case STREAM_READER_TASK:
658,158✔
1458
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
658,158✔
1459
      break;
658,158✔
1460
    case STREAM_TRIGGER_TASK:
191,849✔
1461
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
191,849✔
1462
      break;
191,849✔
1463
    case STREAM_RUNNER_TASK:
654,964✔
1464
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
654,964✔
1465
      break;
654,964✔
1466
    default:
×
1467
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1468
      break;
×
1469
  }
1470
  
1471
_exit:
1,504,971✔
1472

1473
  return code;
1,504,971✔
1474
}
1475

1476

1477
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
263,852✔
1478
  int32_t code = 0;
263,852✔
1479
  int32_t lino;
1480

1481
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
527,704✔
1482

1483
  int32_t readerNum = 0;
263,852✔
1484
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
263,852✔
1485
  if (readerNum > 0) {
263,852✔
1486
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
250,078✔
1487
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
250,078✔
1488
  }
1489
  for (int32_t i = 0; i < readerNum; ++i) {
922,010✔
1490
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
658,158✔
1491
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
658,158✔
1492
  }
1493

1494
  int32_t triggerTask = 0;
263,852✔
1495
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
263,852✔
1496
  if (triggerTask) {
263,852✔
1497
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
191,849✔
1498
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
191,849✔
1499
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
191,849✔
1500
  }
1501
  
1502
  int32_t runnerNum = 0;
263,852✔
1503
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
263,852✔
1504
  if (runnerNum > 0) {
263,852✔
1505
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
195,011✔
1506
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
195,011✔
1507
  }
1508
  for (int32_t i = 0; i < runnerNum; ++i) {
918,816✔
1509
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
654,964✔
1510
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
654,964✔
1511
  }
1512

1513
_exit:
263,852✔
1514

1515
  return code;
263,852✔
1516
}
1517

1518

1519
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
195,914✔
1520
  int32_t code = 0;
195,914✔
1521
  int32_t lino;
1522

1523
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
195,914✔
1524

1525
_exit:
195,914✔
1526

1527
  return code;
195,914✔
1528
}
1529

1530

1531
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
195,914✔
1532
  int32_t code = 0;
195,914✔
1533
  int32_t lino;
1534

1535
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
195,914✔
1536
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
195,914✔
1537

1538
_exit:
195,914✔
1539

1540
  return code;
195,914✔
1541
}
1542

1543

1544
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
213,367✔
1545
  int32_t code = 0;
213,367✔
1546
  int32_t lino;
1547

1548
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
213,367✔
1549
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
426,734✔
1550
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
426,734✔
1551

1552
_exit:
213,367✔
1553

1554
  return code;
213,367✔
1555
}
1556

1557

1558
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
213,367✔
1559
  int32_t code = 0;
213,367✔
1560
  int32_t lino;
1561

1562
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
213,367✔
1563
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
213,367✔
1564

1565
_exit:
213,367✔
1566

1567
  return code;
213,367✔
1568
}
1569

1570
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
3,372✔
1571
  int32_t code = 0;
3,372✔
1572
  int32_t lino;
1573

1574
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
6,744✔
1575
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
6,744✔
1576
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
6,744✔
1577

1578
_exit:
3,372✔
1579

1580
  return code;
3,372✔
1581
}
1582

1583
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
68,976✔
1584
  int32_t code = 0;
68,976✔
1585
  int32_t lino;
1586

1587
  switch (msgType) {
68,976✔
1588
    case STREAM_MSG_ORIGTBL_READER_INFO: {
63,206✔
1589
      int32_t vgNum = 0;
63,206✔
1590
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
63,206✔
1591
      if (vgNum > 0) {
63,206✔
1592
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
63,206✔
1593
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
63,206✔
1594
      }
1595
      for (int32_t i = 0; i < vgNum; ++i) {
249,047✔
1596
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
185,841✔
1597
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
185,841✔
1598
      }
1599

1600
      int32_t readerNum = 0;
63,206✔
1601
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
63,206✔
1602
      if (readerNum > 0) {
63,206✔
1603
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
53,266✔
1604
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
53,266✔
1605
      }
1606
      for (int32_t i = 0; i < readerNum; ++i) {
124,572✔
1607
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
61,366✔
1608
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
61,366✔
1609
      }
1610
      break;
63,206✔
1611
    }
1612
    case STREAM_MSG_UPDATE_RUNNER: {
×
1613
      int32_t runnerNum = 0;
×
1614
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1615
      if (runnerNum > 0) {
×
1616
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1617
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1618
      }
1619
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1620
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1621
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1622
      }
1623
      break;
×
1624
    }
1625
    case STREAM_MSG_USER_RECALC: {
3,372✔
1626
      int32_t recalcNum = 0;
3,372✔
1627
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
3,372✔
1628
      if (recalcNum > 0) {
3,372✔
1629
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
3,372✔
1630
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
3,372✔
1631
      }
1632
      for (int32_t i = 0; i < recalcNum; ++i) {
6,744✔
1633
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
3,372✔
1634
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
3,372✔
1635
      }
1636
      break;
3,372✔
1637
    }
1638
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
2,398✔
1639
      int32_t rspNum = 0, vgNum = 0;
2,398✔
1640
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));  
2,398✔
1641
      if (rspNum > 0) {
2,398✔
1642
        pCont->execRspList = taosArrayInit_s(sizeof(SStreamOReaderDeployRsp), rspNum);
2,398✔
1643
        TSDB_CHECK_NULL(pCont->execRspList, code, lino, _exit, terrno);
2,398✔
1644
      }
1645
      for (int32_t i = 0; i < rspNum; ++i) {
4,796✔
1646
        SStreamOReaderDeployRsp *pDeployRsp = taosArrayGet(pCont->execRspList, i);
2,398✔
1647
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pDeployRsp->execId));  
4,796✔
1648
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));
2,398✔
1649
        if (vgNum > 0) {
2,398✔
1650
          pDeployRsp->vgList = taosArrayInit_s(sizeof(SStreamTaskAddr), vgNum);
2,398✔
1651
          TSDB_CHECK_NULL(pDeployRsp->vgList, code, lino, _exit, terrno);
2,398✔
1652
        }
1653
        for (int32_t n = 0; n < vgNum; ++n) {
4,796✔
1654
          SStreamTaskAddr* pAddr = taosArrayGet(pDeployRsp->vgList, n);
2,398✔
1655
          TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));  
2,398✔
1656
        }
1657
      }
1658
      break;
2,398✔
1659
    }
1660
    default:
×
1661
      break;
×
1662
  }
1663

1664
_exit:
68,976✔
1665

1666
  return code;
68,976✔
1667
}
1668

1669

1670
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
68,976✔
1671
  int32_t code = 0;
68,976✔
1672
  int32_t lino;
1673

1674
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
68,976✔
1675
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
137,952✔
1676
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
137,952✔
1677
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
68,976✔
1678
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
68,976✔
1679

1680
_exit:
68,976✔
1681

1682
  return code;
68,976✔
1683
}
1684

1685
void tFreeSStreamOReaderDeployRsp(void* param) {
4,796✔
1686
  if (NULL == param) {
4,796✔
1687
    return;
×
1688
  }
1689

1690
  SStreamOReaderDeployRsp* pRsp = (SStreamOReaderDeployRsp*)param;
4,796✔
1691
  taosArrayDestroy(pRsp->vgList);
4,796✔
1692
}
1693

1694
void tFreeSStreamMgmtRsp(void* param) {
137,952✔
1695
  if (NULL == param) {
137,952✔
1696
    return;
×
1697
  }
1698
  
1699
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
137,952✔
1700

1701
  taosArrayDestroy(pRsp->cont.vgIds);
137,952✔
1702
  taosArrayDestroy(pRsp->cont.readerList);
137,952✔
1703
  taosArrayDestroy(pRsp->cont.runnerList);
137,952✔
1704
  taosArrayDestroy(pRsp->cont.recalcList);
137,952✔
1705
  taosArrayDestroyEx(pRsp->cont.execRspList, tFreeSStreamOReaderDeployRsp);
137,952✔
1706
}
1707

1708
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
658,158✔
1709
  if (NULL == pReader) {
658,158✔
1710
    return;
×
1711
  }
1712
  
1713
  if (pReader->triggerReader) {
658,158✔
1714
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
295,279✔
1715
    taosMemoryFree(pMsg->triggerTblName);
295,279✔
1716
    taosMemoryFree(pMsg->partitionCols);
295,279✔
1717
    taosMemoryFree(pMsg->triggerCols);
295,279✔
1718
    taosMemoryFree(pMsg->triggerScanPlan);
295,279✔
1719
    taosMemoryFree(pMsg->calcCacheScanPlan);
295,279✔
1720
  } else {
1721
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
362,879✔
1722
    taosMemoryFree(pMsg->calcScanPlan);
362,879✔
1723
  }
1724
}
1725

1726
void tFreeStreamNotifyUrl(void* param) {
×
1727
  if (NULL == param) {
×
1728
    return;
×
1729
  }
1730

1731
  taosMemoryFree(*(void**)param);
×
1732
}
1733

1734
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
191,849✔
1735
  if (NULL == pTrigger) {
191,849✔
1736
    return;
×
1737
  }
1738
  
1739
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
191,849✔
1740
  switch (pTrigger->triggerType) {
191,849✔
1741
    case WINDOW_TYPE_STATE:
64,272✔
1742
      taosMemoryFree(pTrigger->trigger.stateWin.zeroth);
64,272✔
1743
      taosMemoryFree(pTrigger->trigger.stateWin.expr);
64,272✔
1744
      break;
64,272✔
1745
    case WINDOW_TYPE_EVENT:
23,756✔
1746
      taosMemoryFree(pTrigger->trigger.event.startCond);
23,756✔
1747
      taosMemoryFree(pTrigger->trigger.event.endCond);
23,756✔
1748
      break;
23,756✔
1749
    case WINDOW_TYPE_COUNT:
17,838✔
1750
      taosMemoryFree(pTrigger->trigger.count.condCols);  
17,838✔
1751
      break;
17,838✔
1752
    default:
85,983✔
1753
      break;
85,983✔
1754
  }
1755

1756
  taosMemoryFree(pTrigger->partitionCols);
191,849✔
1757
  taosMemoryFree(pTrigger->triggerPrevFilter);
191,849✔
1758
  taosMemoryFree(pTrigger->triggerScanPlan);
191,849✔
1759
  taosMemoryFree(pTrigger->calcCacheScanPlan);
191,849✔
1760

1761
  taosArrayDestroy(pTrigger->readerList);
191,849✔
1762
  taosArrayDestroy(pTrigger->runnerList);
191,849✔
1763
  taosMemoryFree(pTrigger->streamName);
191,849✔
1764
}
1765

1766
void tFreeSStreamOutCol(void* param) {
×
1767
  if (NULL == param) {
×
1768
    return;
×
1769
  }
1770

1771
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1772
  taosMemoryFree(pOut->expr);
×
1773
}
1774

1775
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
654,964✔
1776
  if (NULL == pRunner) {
654,964✔
1777
    return;
×
1778
  }
1779

1780
  taosMemoryFree(pRunner->streamName);
654,964✔
1781
  taosMemoryFree(pRunner->pPlan);
654,964✔
1782
  taosMemoryFree(pRunner->outDBFName);
654,964✔
1783
  taosMemoryFree(pRunner->outTblName);
654,964✔
1784

1785
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
654,964✔
1786
  taosArrayDestroy(pRunner->outCols);
654,964✔
1787
  taosArrayDestroy(pRunner->outTags);
654,964✔
1788

1789
  taosMemoryFree(pRunner->subTblNameExpr);
654,964✔
1790
  taosMemoryFree(pRunner->tagValueExpr);
654,964✔
1791
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
654,964✔
1792
}
1793

1794
void tFreeSStmTaskDeploy(void* param) {
1,843,942✔
1795
  if (NULL == param) {
1,843,942✔
1796
    return;
338,971✔
1797
  }
1798

1799
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
1,504,971✔
1800
  switch (pTask->task.type)  {
1,504,971✔
1801
    case STREAM_READER_TASK:
658,158✔
1802
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
658,158✔
1803
      break;
658,158✔
1804
    case STREAM_TRIGGER_TASK:
191,849✔
1805
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
191,849✔
1806
      break;
191,849✔
1807
    case STREAM_RUNNER_TASK:
654,964✔
1808
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
654,964✔
1809
      break;
654,964✔
1810
    default:
×
1811
      break;
×
1812
  }
1813
}
1814

1815

1816
void tFreeSStmStreamDeploy(void* param) {
266,968✔
1817
  if (NULL == param) {
266,968✔
1818
    return;
×
1819
  }
1820
  
1821
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
266,968✔
1822
  int32_t readerNum = taosArrayGetSize(pDeploy->readerTasks);
266,968✔
1823
  for (int32_t i = 0; i < readerNum; ++i) {
928,298✔
1824
    SStmTaskDeploy* pReader = taosArrayGet(pDeploy->readerTasks, i);
661,330✔
1825
    if (!pReader->msg.reader.triggerReader && pReader->msg.reader.msg.calc.freeScanPlan) {
661,330✔
1826
      taosMemoryFreeClear(pReader->msg.reader.msg.calc.calcScanPlan);
349,313✔
1827
    }
1828
  }
1829
  taosArrayDestroy(pDeploy->readerTasks);
266,968✔
1830

1831
  if (pDeploy->triggerTask) {
266,968✔
1832
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
194,974✔
1833
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
194,974✔
1834
    taosMemoryFree(pDeploy->triggerTask);
194,974✔
1835
  }
1836

1837
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
266,968✔
1838
  for (int32_t i = 0; i < runnerNum; ++i) {
931,324✔
1839
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
664,356✔
1840
    taosMemoryFree(pRunner->msg.runner.pPlan);
664,356✔
1841
  }
1842
  taosArrayDestroy(pDeploy->runnerTasks);
266,968✔
1843
}
1844

1845
void tDeepFreeSStmStreamDeploy(void* param) {
530,820✔
1846
  if (NULL == param) {
530,820✔
1847
    return;
×
1848
  }
1849
  
1850
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
530,820✔
1851
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
530,820✔
1852
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
530,820✔
1853
  taosMemoryFree(pDeploy->triggerTask);
530,820✔
1854
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
530,820✔
1855
}
1856

1857

1858
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
33,052,152✔
1859
  if (NULL == pRsp) {
33,052,152✔
1860
    return;
×
1861
  }
1862
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
33,052,152✔
1863
  taosArrayDestroy(pRsp->start.taskList);
33,052,152✔
1864
  taosArrayDestroy(pRsp->undeploy.taskList);
33,052,152✔
1865
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
33,052,152✔
1866
}
1867

1868
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
16,323,705✔
1869
  if (NULL == pRsp) {
16,323,705✔
1870
    return;
×
1871
  }
1872
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
16,323,705✔
1873
  taosArrayDestroy(pRsp->start.taskList);
16,323,705✔
1874
  taosArrayDestroy(pRsp->undeploy.taskList);
16,323,705✔
1875
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
16,323,705✔
1876
}
1877

1878

1879

1880
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
16,323,705✔
1881
  int32_t code = 0;
16,323,705✔
1882
  int32_t lino;
1883

1884
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
16,323,705✔
1885
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
32,647,410✔
1886
  int32_t deployNum = 0;
16,323,705✔
1887
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
16,323,705✔
1888
  if (deployNum > 0) {
16,323,705✔
1889
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
74,450✔
1890
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
74,450✔
1891
  }
1892
  for (int32_t i = 0; i < deployNum; ++i) {
16,587,557✔
1893
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
263,852✔
1894
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
263,852✔
1895
  }
1896

1897
  int32_t startNum = 0;
16,323,705✔
1898
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
16,323,705✔
1899
  if (startNum > 0) {
16,323,705✔
1900
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
96,201✔
1901
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
96,201✔
1902
  }
1903
  for (int32_t i = 0; i < startNum; ++i) {
16,519,619✔
1904
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
195,914✔
1905
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
195,914✔
1906
  }
1907

1908
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
32,647,410✔
1909
  if (!pRsp->undeploy.undeployAll) {
16,323,705✔
1910
    int32_t undeployNum = 0;
16,323,705✔
1911
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
16,323,705✔
1912
    if (undeployNum > 0) {
16,323,705✔
1913
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
31,456✔
1914
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
31,456✔
1915
    }
1916
    for (int32_t i = 0; i < undeployNum; ++i) {
16,537,072✔
1917
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
213,367✔
1918
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
213,367✔
1919
    }
1920
  }  
1921

1922
  int32_t rspNum = 0;
16,323,705✔
1923
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
16,323,705✔
1924
  if (rspNum > 0) {
16,323,705✔
1925
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
25,378✔
1926
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
25,378✔
1927
    for (int32_t i = 0; i < rspNum; ++i) {
94,354✔
1928
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
68,976✔
1929
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
68,976✔
1930
    }
1931
  }
1932

1933
  tEndDecode(pDecoder);
16,323,705✔
1934

1935
_exit:
16,323,705✔
1936
  return code;
16,323,705✔
1937
}
1938

1939
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
1940
  int32_t code = 0;
×
1941
  int32_t lino;
1942

1943
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
1944
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
1945
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
1946
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
1947
  tEndEncode(pEncoder);
×
1948

1949
_exit:
×
1950
  return code;
×
1951
}
1952

1953
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
1954
  int32_t code = 0;
×
1955
  int32_t lino;
1956

1957
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
1958
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
1959
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
1960
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
1961
  tEndDecode(pDecoder);
×
1962

1963
_exit:
×
1964
  return code;
×
1965
}
1966

1967
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
1968
  int32_t code = 0;
×
1969
  int32_t lino;
1970

1971
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
1972
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
1973
  tEndEncode(pEncoder);
×
1974

1975
_exit:
×
1976
  return code;
×
1977
}
1978

1979
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
1980
  int32_t code = 0;
×
1981
  int32_t lino;
1982

1983
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
1984
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
1985
  tEndDecode(pDecoder);
×
1986

1987
_exit:
×
1988
  return code;
×
1989

1990
}
1991

1992

1993
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
1,427,268✔
1994
  int32_t code = TSDB_CODE_SUCCESS;
1,427,268✔
1995
  int32_t lino = 0;
1,427,268✔
1996

1997
  char*   json = NULL;
1,427,268✔
1998
  int32_t jsonLen = 0;
1,427,268✔
1999
  TAOS_CHECK_EXIT(scmCreateStreamReqToJson(pReq, false, &json, &jsonLen));
1,427,268✔
2000
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, json, jsonLen));
2,854,536✔
2001

2002
_exit:
1,427,268✔
2003
  taosMemoryFreeClear(json);
1,427,268✔
2004
  if (code) {
1,427,268✔
2005
    return code;
×
2006
  }
2007
  
2008
  return 0;
1,427,268✔
2009
}
2010

2011
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
646,512✔
2012
  SEncoder encoder = {0};
646,512✔
2013
  tEncoderInit(&encoder, buf, bufLen);
646,512✔
2014
  int32_t code = 0;
646,512✔
2015
  int32_t lino;
2016

2017
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
646,512✔
2018

2019
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
646,512✔
2020

2021
  tEndEncode(&encoder);
646,512✔
2022

2023
_exit:
646,512✔
2024
  if (code) {
646,512✔
2025
    tEncoderClear(&encoder);
×
2026
    return code;
×
2027
  } else {
2028
    int32_t tlen = encoder.pos;
646,512✔
2029
    tEncoderClear(&encoder);
646,512✔
2030
    return tlen;
646,512✔
2031
  }
2032
  return 0;
2033
}
2034

2035
// Old version deserialization for backward compatibility,
2036
// especially for stream version number 7
2037
int32_t tDeserializeSCMCreateStreamReqImplOld(SDecoder *pDecoder, SCMCreateStreamReq *pReq, int32_t leftBytes) {
×
2038
  int32_t code = 0;
×
2039
  int32_t lino;
2040
  pReq->calcPkSlotId = -1;
×
2041
  pReq->triPkSlotId = -1;
×
2042

2043
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2044

2045
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
×
2046
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
×
2047
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
×
2048
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
×
2049
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
×
2050
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
×
2051
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
×
2052

2053
  int32_t calcDbSize = 0;
×
2054
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
×
2055
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
×
2056
  if (pReq->calcDB == NULL) {
×
2057
    TAOS_CHECK_EXIT(terrno);
×
2058
  }
2059
  for (int32_t i = 0; i < calcDbSize; ++i) {
×
2060
    char *calcDb = NULL;
×
2061
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
×
2062
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
×
2063
    if (calcDb == NULL) {
×
2064
      TAOS_CHECK_EXIT(terrno);
×
2065
    }
2066
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
×
2067
      taosMemoryFree(calcDb);
×
2068
      TAOS_CHECK_EXIT(terrno);
×
2069
    }
2070
  }
2071

2072
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
×
2073
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
×
2074
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
×
2075
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
×
2076
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
×
2077
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
×
2078
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
×
2079
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
×
2080
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
×
2081
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
×
2082

2083
  int32_t addrSize = 0;
×
2084
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
×
2085
  if (addrSize > 0) {
×
2086
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
×
2087
    if (pReq->pNotifyAddrUrls == NULL) {
×
2088
      TAOS_CHECK_EXIT(terrno);
×
2089
    }
2090
  }
2091
  for (int32_t i = 0; i < addrSize; ++i) {
×
2092
    char *url = NULL;
×
2093
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
2094
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
2095
    if (url == NULL) {
×
2096
      TAOS_CHECK_EXIT(terrno);
×
2097
    }
2098
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
×
2099
      taosMemoryFree(url);
×
2100
      TAOS_CHECK_EXIT(terrno);
×
2101
    }
2102
  }
2103
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
×
2104
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->addOptions));
×
2105
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
×
2106

2107
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
×
2108
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
×
2109
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
×
2110

2111
  int32_t outColSize = 0;
×
2112
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
×
2113
  if (outColSize > 0) {
×
2114
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
×
2115
    if (pReq->outCols == NULL) {
×
2116
      TAOS_CHECK_EXIT(terrno);
×
2117
    }
2118

2119
    for (int32_t i = 0; i < outColSize; ++i) {
×
2120
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
×
2121
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
×
2122
    }
2123
  }
2124

2125
  int32_t outTagSize = 0;
×
2126
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
×
2127
  if (outTagSize > 0) {
×
2128
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
×
2129
    if (pReq->outTags == NULL) {
×
2130
      TAOS_CHECK_EXIT(terrno);
×
2131
    }
2132

2133
    for (int32_t i = 0; i < outTagSize; ++i) {
×
2134
      SFieldWithOptions field = {0};
×
2135
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
×
2136
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
×
2137
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
×
2138
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
×
2139
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
×
2140
        TAOS_CHECK_EXIT(terrno);
×
2141
      }
2142
    }
2143
  }
2144

2145
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
×
2146
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
×
2147
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
×
2148
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
×
2149

2150
  switch (pReq->triggerType) {
×
2151
    case WINDOW_TYPE_SESSION: {
×
2152
      // session trigger
2153
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
×
2154
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
×
2155
      break;
×
2156
    }
2157
      case WINDOW_TYPE_STATE: {
×
2158
        // state trigger
2159
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
×
2160
        pReq->trigger.stateWin.extend = 0;
×
2161
        pReq->trigger.stateWin.trueForType = 0;
×
2162
        pReq->trigger.stateWin.trueForCount = 0;
×
2163
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
×
2164
        break;
×
2165
      }
2166
      case WINDOW_TYPE_INTERVAL: {
×
2167
        // slide trigger
2168
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
×
2169
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
×
2170
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
×
2171
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
×
2172
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
×
2173
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
×
2174
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
×
2175
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
×
2176
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
×
2177
        break;
×
2178
      }
2179
      case WINDOW_TYPE_EVENT: {
×
2180
        // event trigger
2181
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
×
2182
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
×
2183
        pReq->trigger.event.trueForType = 0;
×
2184
        pReq->trigger.event.trueForCount = 0;
×
2185
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
×
2186
        break;
×
2187
      }
2188
      case WINDOW_TYPE_COUNT: {
×
2189
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
×
2190

2191
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
×
2192
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
×
2193
        break;
×
2194
      }
2195
      case WINDOW_TYPE_PERIOD: {
×
2196
        // period trigger
2197
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
×
2198
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
×
2199
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
×
2200
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
×
2201
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
×
2202
        break;
×
2203
      }
2204
      default:
×
2205
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2206
  }
2207

2208
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
×
2209
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
×
2210
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
×
2211
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
×
2212
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
×
2213
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
×
2214
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
×
2215
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
×
2216
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
×
2217
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
×
2218
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
×
2219
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
×
2220
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
×
2221
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
×
2222

2223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
×
2224
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
×
2225

2226
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
×
2227

2228
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
×
2229
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
×
2230

2231
  int32_t calcScanPlanListSize = 0;
×
2232
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
×
2233
  if (calcScanPlanListSize > 0) {
×
2234
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
×
2235
    if (pReq->calcScanPlanList == NULL) {
×
2236
      TAOS_CHECK_EXIT(terrno);
×
2237
    }
2238
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
×
2239
      SStreamCalcScan calcScan = {0};
×
2240
      int32_t         vgListSize = 0;
×
2241
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
×
2242
      if (vgListSize > 0) {
×
2243
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
×
2244
        if (calcScan.vgList == NULL) {
×
2245
          TAOS_CHECK_EXIT(terrno);
×
2246
        }
2247
        for (int32_t j = 0; j < vgListSize; ++j) {
×
2248
          int32_t vgId = 0;
×
2249
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
×
2250
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
×
2251
            TAOS_CHECK_EXIT(terrno);
×
2252
          }
2253
        }
2254
      }
2255
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
×
2256
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
×
2257
      if (taosArrayPush(pReq->calcScanPlanList, &calcScan) == NULL) {
×
2258
        TAOS_CHECK_EXIT(terrno);
×
2259
      }
2260
    }
2261
  }
2262

2263
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
×
2264
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
×
2265
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
×
2266
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
×
2267

2268
  int32_t forceOutColsSize = 0;
×
2269
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
×
2270
  if (forceOutColsSize > 0) {
×
2271
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
×
2272
    if (pReq->forceOutCols == NULL) {
×
2273
      TAOS_CHECK_EXIT(terrno);
×
2274
    }
2275
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
×
2276
      SStreamOutCol outCol = {0};
×
2277
      int64_t       exprLen = 0;
×
2278
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
×
2279
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
×
2280
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
×
2281
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
×
2282
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
×
2283
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
×
2284
        TAOS_CHECK_EXIT(terrno);
×
2285
      }
2286
    }
2287
  }
2288

2289
  // LeftBytes is the size of all fields at the tail of SStreamObj.
2290
  // If there are more data in the buffer, then it means
2291
  // the new fields are added in SStreamObj, need to decode them.
2292
  if (pDecoder->size - pDecoder->pos > leftBytes) {
×
2293
    switch (pReq->triggerType) {
×
2294
      case WINDOW_TYPE_STATE: {
×
2295
        // state trigger
2296
        if (!tDecodeIsEnd(pDecoder)) {
×
2297
          TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.stateWin.expr, NULL));
×
2298
        }
2299
        if (!tDecodeIsEnd(pDecoder)) {
×
2300
          TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.extend));
×
2301
        }
2302
        break;
×
2303
      }
2304
      case WINDOW_TYPE_INTERVAL: {
×
2305
        if (!tDecodeIsEnd(pDecoder)) {
×
2306
          TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.overlap));
×
2307
        }
2308
        break;
×
2309
      }
2310
      default:
×
2311
        break;
×
2312
    }
2313
  }
2314

2315
  if (pDecoder->size - pDecoder->pos > leftBytes) {
×
2316
    if (!tDecodeIsEnd(pDecoder)) {
×
2317
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pReq->triggerPrec));
×
2318
    }
2319
  }
2320

2321
_exit:
×
2322

2323
  return code;
×
2324
}
2325

2326
// New deserialization using JSON
2327
// start from taosd ver-3.3.8.6, stream version number 8
2328
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
442,536✔
2329
  int32_t code = 0;
442,536✔
2330
  int32_t lino;
2331

2332
  char* json = NULL;
442,536✔
2333
  SJson* pJson = NULL;
442,536✔
2334
  TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &json));
442,536✔
2335
  pJson = tjsonParse(json);
442,536✔
2336
  if (pJson == NULL) {
442,536✔
2337
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INVALID_JSON);
×
2338
  }
2339
  TAOS_CHECK_EXIT(jsonToSCMCreateStreamReq(pJson, pReq));
442,536✔
2340

2341
_exit:
442,536✔
2342
  taosMemoryFreeClear(json);
442,536✔
2343
  if (NULL != pJson) {
442,536✔
2344
    tjsonDelete(pJson);
442,536✔
2345
  }
2346

2347
  return code;
442,536✔
2348
}
2349

2350

2351
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
212,332✔
2352
  SDecoder decoder = {0};
212,332✔
2353
  tDecoderInit(&decoder, buf, bufLen);
212,332✔
2354
  int32_t code = 0;
212,332✔
2355
  int32_t lino;
2356

2357
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
212,332✔
2358
  
2359
  code = tDeserializeSCMCreateStreamReqImpl(&decoder, pReq);
212,332✔
2360
  if (TSDB_CODE_MND_STREAM_INVALID_JSON == code) {
212,332✔
2361
    uError("invalid json for stream create request, try old deserialization");
×
2362
    // try old deserialization for backward compatibility
2363
    tDecoderClear(&decoder);
×
2364
    tDecoderInit(&decoder, buf, bufLen);
×
2365
    TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
2366
    TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImplOld(&decoder, pReq, 0));
×
2367
  }
2368

2369
  tEndDecode(&decoder);
212,332✔
2370

2371
_exit:
212,332✔
2372

2373
  tDecoderClear(&decoder);
212,332✔
2374
  return code;
212,332✔
2375
}
2376

2377

2378
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
50,712✔
2379
  int32_t  code = 0;
50,712✔
2380
  int32_t  lino;
2381
  int32_t  tlen;
2382
  SEncoder encoder = {0};
50,712✔
2383
  tEncoderInit(&encoder, buf, bufLen);
50,712✔
2384

2385
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
50,712✔
2386

2387
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->count));
101,424✔
2388
  for (int32_t i = 0; i < pReq->count; i++) {
107,664✔
2389
    int32_t nameLen = pReq->name[i] == NULL ? 0 : (int32_t)strlen(pReq->name[i]) + 1;
56,952✔
2390
    TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name[i], nameLen));
113,904✔
2391
  }
2392
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
101,424✔
2393

2394
  tEndEncode(&encoder);
50,712✔
2395

2396
_exit:
50,712✔
2397
  if (code) {
50,712✔
2398
    tlen = code;
×
2399
  } else {
2400
    tlen = encoder.pos;
50,712✔
2401
  }
2402
  tEncoderClear(&encoder);
50,712✔
2403
  return tlen;
50,712✔
2404
}
2405

2406
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
18,029✔
2407
  SDecoder decoder = {0};
18,029✔
2408
  int32_t  code = 0;
18,029✔
2409
  int32_t  lino;
2410
  tDecoderInit(&decoder, buf, bufLen);
18,029✔
2411

2412
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
18,029✔
2413
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->count));
36,058✔
2414
  if (pReq->count > 0) {
18,029✔
2415
    pReq->name = taosMemoryCalloc(pReq->count, sizeof(char*));
18,029✔
2416
    if (pReq->name == NULL) {
18,029✔
2417
      code = terrno;
×
2418
      goto _exit;
×
2419
    }
2420
    for (int32_t i = 0; i < pReq->count; i++) {
39,154✔
2421
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name[i], NULL));
42,250✔
2422
    }
2423
  }
2424
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
36,058✔
2425

2426
  tEndDecode(&decoder);
18,029✔
2427

2428
_exit:
18,029✔
2429
  tDecoderClear(&decoder);
18,029✔
2430
  return code;
18,029✔
2431
}
2432

2433
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
44,279✔
2434
  if (NULL == pReq) {
44,279✔
2435
    return;
×
2436
  }
2437
  if (pReq->name) {
44,279✔
2438
    for (int32_t i = 0; i < pReq->count; i++) {
94,774✔
2439
      taosMemoryFreeClear(pReq->name[i]);
50,495✔
2440
    }
2441
    taosMemoryFreeClear(pReq->name);
44,279✔
2442
  }
2443
}
2444

2445
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
2,120,375✔
2446
  if (pScan == NULL) {
2,120,375✔
2447
    return;
×
2448
  }
2449
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
2,120,375✔
2450
  taosArrayDestroy(pCalcScan->vgList);
2,120,375✔
2451
  taosMemoryFreeClear(pCalcScan->scanPlan);
2,120,375✔
2452
}
2453

2454
void tFreeStreamOutCol(void* pCol) {
74,750✔
2455
  if (pCol == NULL) {
74,750✔
2456
    return;
×
2457
  }
2458
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
74,750✔
2459
  taosMemoryFreeClear(pOutCol->expr);
74,750✔
2460
}
2461

2462

2463

2464
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
1,230,932✔
2465
  if (NULL == pReq) {
1,230,932✔
2466
    return;
198,868✔
2467
  }
2468
  taosMemoryFreeClear(pReq->name);
1,032,064✔
2469
  taosMemoryFreeClear(pReq->sql);
1,032,064✔
2470
  taosMemoryFreeClear(pReq->streamDB);
1,032,064✔
2471
  taosMemoryFreeClear(pReq->triggerDB);
1,032,064✔
2472
  taosMemoryFreeClear(pReq->outDB);
1,032,064✔
2473
  taosMemoryFreeClear(pReq->triggerTblName);
1,032,064✔
2474
  taosMemoryFreeClear(pReq->outTblName);
1,032,064✔
2475

2476
  taosArrayDestroyP(pReq->calcDB, NULL);
1,032,064✔
2477
  pReq->calcDB = NULL;
1,032,064✔
2478
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
1,032,064✔
2479
  pReq->pNotifyAddrUrls = NULL;
1,032,064✔
2480

2481
  taosMemoryFreeClear(pReq->triggerFilterCols);
1,032,064✔
2482
  taosMemoryFreeClear(pReq->triggerCols);
1,032,064✔
2483
  taosMemoryFreeClear(pReq->partitionCols);
1,032,064✔
2484

2485
  taosArrayDestroy(pReq->outTags);
1,032,064✔
2486
  pReq->outTags = NULL;
1,032,064✔
2487
  taosArrayDestroy(pReq->outCols);
1,032,064✔
2488
  pReq->outCols = NULL;
1,032,064✔
2489

2490
  switch (pReq->triggerType) {
1,032,064✔
2491
    case WINDOW_TYPE_STATE:
295,170✔
2492
      taosMemoryFreeClear(pReq->trigger.stateWin.zeroth);
295,170✔
2493
      taosMemoryFreeClear(pReq->trigger.stateWin.expr);
295,170✔
2494
      break;
295,170✔
2495
    case WINDOW_TYPE_EVENT:
108,770✔
2496
      taosMemoryFreeClear(pReq->trigger.event.startCond);
108,770✔
2497
      taosMemoryFreeClear(pReq->trigger.event.endCond);
108,770✔
2498
      break;
108,770✔
2499
    default:
628,124✔
2500
      break;
628,124✔
2501
  }
2502

2503
  taosMemoryFreeClear(pReq->triggerScanPlan);
1,032,064✔
2504
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
1,032,064✔
2505
  pReq->calcScanPlanList = NULL;
1,032,064✔
2506
  taosMemoryFreeClear(pReq->triggerPrevFilter);
1,032,064✔
2507

2508
  taosMemoryFreeClear(pReq->calcPlan);
1,032,064✔
2509
  taosMemoryFreeClear(pReq->subTblNameExpr);
1,032,064✔
2510
  taosMemoryFreeClear(pReq->tagValueExpr);
1,032,064✔
2511
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
1,032,064✔
2512
  pReq->forceOutCols = NULL;
1,032,064✔
2513
  taosArrayDestroy(pReq->colCids);
1,032,064✔
2514
  pReq->colCids = NULL;
1,032,064✔
2515
  taosArrayDestroy(pReq->tagCids);
1,032,064✔
2516
  pReq->tagCids = NULL;
1,032,064✔
2517
}
2518

2519
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
188,498✔
2520
  int32_t code = 0, lino = 0;
188,498✔
2521
  if (NULL == pSrc) {
188,498✔
2522
    return code;
×
2523
  } 
2524

2525
  void* p = NULL;
188,498✔
2526
  int32_t num = 0;
188,498✔
2527
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
188,498✔
2528
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
188,498✔
2529

2530
  SCMCreateStreamReq* pDst = *ppDst;
188,498✔
2531

2532
  if (pSrc->outDB) {
188,498✔
2533
    pDst->outDB = COPY_STR(pSrc->outDB);
185,203✔
2534
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
185,203✔
2535
  }
2536
  
2537
  if (pSrc->triggerTblName) {
188,498✔
2538
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
184,327✔
2539
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
184,327✔
2540
  }
2541
  
2542
  if (pSrc->outTblName) {
188,498✔
2543
    pDst->outTblName = COPY_STR(pSrc->outTblName);
185,203✔
2544
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
185,203✔
2545
  }
2546
  
2547
  if (pSrc->pNotifyAddrUrls) {
188,498✔
2548
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
54,892✔
2549
    if (num > 0) {
54,892✔
2550
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
54,892✔
2551
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
54,892✔
2552
    }
2553
    for (int32_t i = 0; i < num; ++i) {
109,784✔
2554
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
54,892✔
2555
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
54,892✔
2556
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
109,784✔
2557
    }
2558
  }
2559
  
2560
  if (pSrc->triggerFilterCols) {
188,498✔
2561
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
17,282✔
2562
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
17,282✔
2563
  }
2564
  
2565
  if (pSrc->triggerCols) {
188,498✔
2566
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
178,244✔
2567
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
178,244✔
2568
  }
2569
  
2570
  if (pSrc->partitionCols) {
188,498✔
2571
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
67,173✔
2572
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
67,173✔
2573
  }
2574
  
2575
  if (pSrc->outCols) {
188,498✔
2576
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
185,203✔
2577
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
185,203✔
2578
  }
2579
  
2580
  if (pSrc->outTags) {
188,498✔
2581
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
67,173✔
2582
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
67,173✔
2583
  }
2584

2585
  pDst->triggerType = pSrc->triggerType;
188,498✔
2586
  
2587
  switch (pSrc->triggerType) {
188,498✔
2588
    case WINDOW_TYPE_STATE:
63,814✔
2589
      pDst->trigger.stateWin.slotId = pSrc->trigger.stateWin.slotId;
63,814✔
2590
      pDst->trigger.stateWin.extend = pSrc->trigger.stateWin.extend;
63,814✔
2591
      pDst->trigger.stateWin.trueForType = pSrc->trigger.stateWin.trueForType;
63,814✔
2592
      pDst->trigger.stateWin.trueForCount = pSrc->trigger.stateWin.trueForCount;
63,814✔
2593
      pDst->trigger.stateWin.trueForDuration = pSrc->trigger.stateWin.trueForDuration;
63,814✔
2594
      if (pSrc->trigger.stateWin.zeroth) {
63,814✔
2595
        pDst->trigger.stateWin.zeroth = COPY_STR(pSrc->trigger.stateWin.zeroth);
×
2596
        TSDB_CHECK_NULL(pDst->trigger.stateWin.zeroth, code, lino, _exit, terrno);
×
2597
      }
2598
      if (pSrc->trigger.stateWin.expr) {
63,814✔
2599
        pDst->trigger.stateWin.expr = COPY_STR(pSrc->trigger.stateWin.expr);
63,814✔
2600
        TSDB_CHECK_NULL(pDst->trigger.stateWin.expr, code, lino, _exit, terrno);
63,814✔
2601
      }
2602
      break;
63,814✔
2603
    case WINDOW_TYPE_EVENT:
23,029✔
2604
      if (pSrc->trigger.event.startCond) {
23,029✔
2605
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
23,029✔
2606
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
23,029✔
2607
      }
2608
      
2609
      if (pSrc->trigger.event.endCond) {
23,029✔
2610
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
21,069✔
2611
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
21,069✔
2612
      }
2613
      pDst->trigger.event.trueForType = pSrc->trigger.event.trueForType;
23,029✔
2614
      pDst->trigger.event.trueForCount = pSrc->trigger.event.trueForCount;
23,029✔
2615
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
23,029✔
2616
      break;
23,029✔
2617
    case WINDOW_TYPE_COUNT:
17,838✔
2618
      pDst->trigger.count.countVal = pSrc->trigger.count.countVal;
17,838✔
2619
      pDst->trigger.count.sliding = pSrc->trigger.count.sliding;
17,838✔
2620
      if (pSrc->trigger.count.condCols) {
17,838✔
2621
        pDst->trigger.count.condCols = COPY_STR(pSrc->trigger.count.condCols);
×
2622
        TSDB_CHECK_NULL(pDst->trigger.count.condCols, code, lino, _exit, terrno);
×
2623
      }
2624
      break;
17,838✔
2625
    default:
83,817✔
2626
      pDst->trigger = pSrc->trigger;
83,817✔
2627
      break;
83,817✔
2628
  }
2629

2630

2631
  if (pSrc->triggerScanPlan) {
188,498✔
2632
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
184,327✔
2633
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
184,327✔
2634
  }
2635
  
2636
  if (pSrc->calcScanPlanList) {
188,498✔
2637
    num = taosArrayGetSize(pSrc->calcScanPlanList);
185,203✔
2638
    if (num > 0) {
185,203✔
2639
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
185,203✔
2640
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
185,203✔
2641
    }
2642
    for (int32_t i = 0; i < num; ++i) {
620,590✔
2643
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
435,387✔
2644
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
435,387✔
2645

2646
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
435,387✔
2647
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
435,387✔
2648

2649
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
435,387✔
2650
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
435,387✔
2651
      
2652
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
870,774✔
2653
    }
2654
  }
2655
  
2656
  if (pSrc->triggerPrevFilter) {
188,498✔
2657
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
7,660✔
2658
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
7,660✔
2659
  }
2660
  
2661
  if (pSrc->calcPlan) {
188,498✔
2662
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
185,203✔
2663
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
185,203✔
2664
  }
2665
  
2666
  if (pSrc->subTblNameExpr) {
188,498✔
2667
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
67,173✔
2668
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
67,173✔
2669
  }
2670
  
2671
  if (pSrc->tagValueExpr) {
188,498✔
2672
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
67,173✔
2673
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
67,173✔
2674
  }
2675
  
2676
  if (pSrc->forceOutCols) {
188,498✔
2677
    num = taosArrayGetSize(pSrc->forceOutCols);
3,272✔
2678
    if (num > 0) {
3,272✔
2679
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
3,272✔
2680
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
3,272✔
2681
    }
2682
    for (int32_t i = 0; i < num; ++i) {
21,641✔
2683
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
18,369✔
2684
      SStreamOutCol  dcol = {.type = scol->type};
18,369✔
2685

2686
      dcol.expr = COPY_STR(scol->expr);
18,369✔
2687
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
18,369✔
2688
      
2689
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
36,738✔
2690
    }
2691
  }
2692

2693
  if (pSrc->colCids) {
188,498✔
2694
    pDst->colCids = taosArrayDup(pSrc->colCids, NULL);
3,836✔
2695
    TSDB_CHECK_NULL(pDst->colCids, code, lino, _exit, terrno);
3,836✔
2696
  }
2697

2698
  if (pSrc->tagCids) {
188,498✔
2699
    pDst->tagCids = taosArrayDup(pSrc->tagCids, NULL);
3,171✔
2700
    TSDB_CHECK_NULL(pDst->tagCids, code, lino, _exit, terrno);
3,171✔
2701
  }
2702

2703
  pDst->triggerTblUid = pSrc->triggerTblUid;
188,498✔
2704
  pDst->triggerTblType = pSrc->triggerTblType;
188,498✔
2705
  pDst->triggerPrec = pSrc->triggerPrec;
188,498✔
2706
  pDst->deleteReCalc = pSrc->deleteReCalc;
188,498✔
2707
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
188,498✔
2708
  pDst->flags = pSrc->flags;
188,498✔
2709
  
2710
_exit:
188,498✔
2711

2712
  if (code) {
188,498✔
2713
    tFreeSCMCreateStreamReq(pDst);
×
2714
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2715
  }
2716

2717
  return code;
188,498✔
2718
}
2719

2720

2721
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
5,912✔
2722
  int32_t  code = 0;
5,912✔
2723
  int32_t  lino;
2724
  int32_t  tlen;
2725
  SEncoder encoder = {0};
5,912✔
2726
  tEncoderInit(&encoder, buf, bufLen);
5,912✔
2727
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
5,912✔
2728

2729
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
5,912✔
2730
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
11,824✔
2731
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
11,824✔
2732
  tEndEncode(&encoder);
5,912✔
2733

2734
_exit:
5,912✔
2735
  if (code) {
5,912✔
2736
    tlen = code;
×
2737
  } else {
2738
    tlen = encoder.pos;
5,912✔
2739
  }
2740
  tEncoderClear(&encoder);
5,912✔
2741
  return tlen;
5,912✔
2742
}
2743

2744
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
2,795✔
2745
  SDecoder decoder = {0};
2,795✔
2746
  int32_t  code = 0;
2,795✔
2747
  int32_t  lino;
2748

2749
  tDecoderInit(&decoder, buf, bufLen);
2,795✔
2750
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,795✔
2751
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
5,590✔
2752
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
5,590✔
2753
  tEndDecode(&decoder);
2,795✔
2754

2755
_exit:
2,795✔
2756
  tDecoderClear(&decoder);
2,795✔
2757
  return code;
2,795✔
2758
}
2759

2760
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
2,956✔
2761
  taosMemoryFreeClear(pReq->name);
2,956✔
2762
}
2,956✔
2763

2764
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
5,196✔
2765
  SEncoder encoder = {0};
5,196✔
2766
  int32_t  code = 0;
5,196✔
2767
  int32_t  lino;
2768
  int32_t  tlen;
2769
  tEncoderInit(&encoder, buf, bufLen);
5,196✔
2770
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
5,196✔
2771
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
5,196✔
2772
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
10,392✔
2773
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
10,392✔
2774
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
10,392✔
2775
  tEndEncode(&encoder);
5,196✔
2776

2777
_exit:
5,196✔
2778
  if (code) {
5,196✔
2779
    tlen = code;
×
2780
  } else {
2781
    tlen = encoder.pos;
5,196✔
2782
  }
2783
  tEncoderClear(&encoder);
5,196✔
2784
  return tlen;
5,196✔
2785
}
2786

2787
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
2,437✔
2788
  SDecoder decoder = {0};
2,437✔
2789
  int32_t  code = 0;
2,437✔
2790
  int32_t  lino;
2791

2792
  tDecoderInit(&decoder, buf, bufLen);
2,437✔
2793
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,437✔
2794
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
4,874✔
2795
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
4,874✔
2796
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
4,874✔
2797
  tEndDecode(&decoder);
2,437✔
2798

2799
_exit:
2,437✔
2800
  tDecoderClear(&decoder);
2,437✔
2801
  return code;
2,437✔
2802
}
2803

2804
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
2,598✔
2805
  taosMemoryFreeClear(pReq->name);
2,598✔
2806
}
2,598✔
2807

2808
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
23,440✔
2809
  SEncoder encoder = {0};
23,440✔
2810
  int32_t  code = 0;
23,440✔
2811
  int32_t  lino;
2812
  int32_t  tlen;
2813
  tEncoderInit(&encoder, buf, bufLen);
23,440✔
2814
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
23,440✔
2815
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
23,440✔
2816
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
46,880✔
2817
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
46,880✔
2818
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
46,880✔
2819
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
46,880✔
2820
  tEndEncode(&encoder);
23,440✔
2821

2822
_exit:
23,440✔
2823
  if (code) {
23,440✔
2824
    tlen = code;
×
2825
  } else {
2826
    tlen = encoder.pos;
23,440✔
2827
  }
2828
  tEncoderClear(&encoder);
23,440✔
2829
  return tlen;
23,440✔
2830
}
2831

2832
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
11,682✔
2833
  SDecoder decoder = {0};
11,682✔
2834
  int32_t  code = 0;
11,682✔
2835
  int32_t  lino;
2836

2837
  tDecoderInit(&decoder, buf, bufLen);
11,682✔
2838
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
11,682✔
2839

2840
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
23,364✔
2841
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
23,364✔
2842
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
23,364✔
2843
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
23,364✔
2844
  tEndDecode(&decoder);
11,682✔
2845

2846
_exit:
11,682✔
2847
  tDecoderClear(&decoder);
11,682✔
2848
  return code;
11,682✔
2849
}
2850

2851
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
23,402✔
2852
  taosMemoryFreeClear(pReq->name);
23,402✔
2853
}
23,402✔
2854

2855
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
12,038✔
2856
  int32_t code = 0;
12,038✔
2857
  int32_t lino;
2858

2859
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
24,076✔
2860
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
24,076✔
2861
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
24,076✔
2862

2863
_exit:
12,038✔
2864
  return code;
12,038✔
2865
}
2866

2867
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
12,038✔
2868
  SEncoder encoder = {0};
12,038✔
2869
  int32_t  code = 0;
12,038✔
2870
  int32_t  lino;
2871
  int32_t  tlen;
2872
  tEncoderInit(&encoder, buf, bufLen);
12,038✔
2873

2874
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
12,038✔
2875
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
12,038✔
2876

2877
  tEndEncode(&encoder);
12,038✔
2878

2879
_exit:
12,038✔
2880
  if (code) {
12,038✔
2881
    tlen = code;
×
2882
  } else {
2883
    tlen = encoder.pos;
12,038✔
2884
  }
2885
  tEncoderClear(&encoder);
12,038✔
2886
  return tlen;
12,038✔
2887
}
2888

2889
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
2,028✔
2890
  int32_t code = 0;
2,028✔
2891
  int32_t lino;
2892

2893
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
4,056✔
2894
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
4,056✔
2895
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
4,056✔
2896

2897
_exit:
2,028✔
2898
  return code;
2,028✔
2899
}
2900

2901
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
2,028✔
2902
  SDecoder decoder = {0};
2,028✔
2903
  int32_t  code = 0;
2,028✔
2904
  int32_t  lino;
2905

2906
  tDecoderInit(&decoder, (char *)buf, bufLen);
2,028✔
2907

2908
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,028✔
2909
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
2,028✔
2910

2911
  tEndDecode(&decoder);
2,028✔
2912

2913
_exit:
2,028✔
2914
  tDecoderClear(&decoder);
2,028✔
2915
  return code;
2,028✔
2916
}
2917

2918
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
2,444✔
2919
  int32_t code = 0;
2,444✔
2920
  int32_t lino;
2921

2922
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
4,888✔
2923
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
4,888✔
2924
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
4,888✔
2925
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
4,888✔
2926

2927
_exit:
2,444✔
2928
  return code;
2,444✔
2929
}
2930

2931
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
2,444✔
2932
  SEncoder encoder = {0};
2,444✔
2933
  int32_t  code = 0;
2,444✔
2934
  int32_t  lino;
2935
  int32_t  tlen;
2936
  tEncoderInit(&encoder, buf, bufLen);
2,444✔
2937

2938
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,444✔
2939
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
2,444✔
2940

2941
  tEndEncode(&encoder);
2,444✔
2942

2943
_exit:
2,444✔
2944
  if (code) {
2,444✔
2945
    tlen = code;
×
2946
  } else {
2947
    tlen = encoder.pos;
2,444✔
2948
  }
2949
  tEncoderClear(&encoder);
2,444✔
2950
  return tlen;
2,444✔
2951
}
2952

2953
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
6,019✔
2954
  int32_t code = 0;
6,019✔
2955
  int32_t lino;
2956

2957
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
12,038✔
2958
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
12,038✔
2959
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
12,038✔
2960
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
12,038✔
2961

2962
_exit:
6,019✔
2963
  return code;
6,019✔
2964
}
2965

2966
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
6,019✔
2967
  SDecoder decoder = {0};
6,019✔
2968
  int32_t  code = 0;
6,019✔
2969
  int32_t  lino;
2970

2971
  tDecoderInit(&decoder, buf, bufLen);
6,019✔
2972

2973
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
6,019✔
2974
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
6,019✔
2975

2976
  tEndDecode(&decoder);
6,019✔
2977

2978
_exit:
6,019✔
2979
  tDecoderClear(&decoder);
6,019✔
2980
  return code;
6,019✔
2981
}
2982

2983
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
266,562✔
2984
  SEncoder encoder = {0};
266,562✔
2985
  int32_t  code = TSDB_CODE_SUCCESS;
266,562✔
2986
  int32_t  lino = 0;
266,562✔
2987
  int32_t  tlen = 0;
266,562✔
2988

2989
  tEncoderInit(&encoder, buf, bufLen);
266,562✔
2990
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
266,562✔
2991

2992
  int32_t size = taosArrayGetSize(pRsp->cols);
266,562✔
2993
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
266,562✔
2994
  for (int32_t i = 0; i < size; ++i) {
983,897✔
2995
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
717,335✔
2996
    if (oInfo == NULL) {
717,102✔
2997
      uError("col id is NULL at index %d", i);
×
2998
      code = TSDB_CODE_INVALID_PARA;
×
2999
      goto _exit;
×
3000
    }
3001
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
1,434,437✔
3002
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
1,434,670✔
3003
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
1,434,670✔
3004
  }
3005

3006
  tEndEncode(&encoder);
266,562✔
3007

3008
_exit:
266,562✔
3009
  if (code != TSDB_CODE_SUCCESS) {
266,562✔
3010
    tlen = code;
×
3011
  } else {
3012
    tlen = encoder.pos;
266,562✔
3013
  }
3014
  tEncoderClear(&encoder);
266,562✔
3015
  return tlen;
266,562✔
3016
}
3017

3018
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
133,281✔
3019
  SDecoder decoder = {0};
133,281✔
3020
  int32_t  code = TSDB_CODE_SUCCESS;
133,281✔
3021
  int32_t  lino = 0;
133,281✔
3022

3023
  tDecoderInit(&decoder, buf, bufLen);
133,281✔
3024
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
133,281✔
3025

3026
  int32_t size = 0;
133,281✔
3027
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
133,281✔
3028
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
133,281✔
3029
  if (pRsp->cols == NULL) {
133,281✔
3030
    code = terrno;
×
3031
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3032
    goto _exit;
×
3033
  }
3034
  for (int32_t i = 0; i < size; ++i) {
492,065✔
3035
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
358,784✔
3036
    if (oInfo == NULL) {
358,784✔
3037
      code = terrno;
×
3038
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3039
      goto _exit;
×
3040
    }
3041
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
717,568✔
3042
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
717,568✔
3043
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
717,568✔
3044
  }
3045

3046
  tEndDecode(&decoder);
133,281✔
3047

3048
_exit:
133,281✔
3049
  tDecoderClear(&decoder);
133,281✔
3050
  return code;
133,281✔
3051
}
3052

3053
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
24,267,927✔
3054
  taosArrayDestroy(pRsp->cols);
24,267,927✔
3055
}
24,267,927✔
3056

3057
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
32,469,320✔
3058
  if (pReq == NULL) return;
32,469,320✔
3059
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA_NEW || pReq->base.type == STRIGGER_PULL_WAL_CALC_DATA_NEW) {
40,917,110✔
3060
    SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
8,447,596✔
3061
    taosArrayDestroy(pRequest->versions);
8,447,596✔
3062
    tSimpleHashCleanup(pRequest->ranges);
8,447,790✔
3063
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
24,025,992✔
3064
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
385,590✔
3065
    if (pRequest->cids != NULL) {
385,590✔
3066
      taosArrayDestroy(pRequest->cids);
385,590✔
3067
      pRequest->cids = NULL;
385,590✔
3068
    }
3069
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
23,639,973✔
3070
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
75,626✔
3071
    if (pRequest->cids != NULL) {
75,626✔
3072
      taosArrayDestroy(pRequest->cids);
75,626✔
3073
      pRequest->cids = NULL;
75,626✔
3074
    }
3075
    if (pRequest->uids != NULL) {
75,626✔
3076
      taosArrayDestroy(pRequest->uids);
×
3077
      pRequest->uids = NULL;
×
3078
    }
3079
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
23,562,677✔
3080
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
669,520✔
3081
    if (pRequest->cids != NULL) {
669,520✔
3082
      taosArrayDestroy(pRequest->cids);
669,520✔
3083
      pRequest->cids = NULL;
669,520✔
3084
    }
3085
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
22,892,721✔
3086
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
133,281✔
3087
    if (pRequest->cols != NULL) {
133,281✔
3088
      taosArrayDestroy(pRequest->cols);
133,281✔
3089
      pRequest->cols = NULL;
133,281✔
3090
    }
3091
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
22,758,697✔
3092
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
133,281✔
3093
    tSimpleHashCleanup(pRequest->uidInfoTrigger);
133,281✔
3094
    tSimpleHashCleanup(pRequest->uidInfoCalc);
133,281✔
3095
  }
3096
}
3097

3098
int32_t encodePlainArray(SEncoder *encoder, SArray *pArr) {
2,420,580✔
3099
  int32_t  code = TSDB_CODE_SUCCESS;
2,420,580✔
3100
  int32_t  lino = 0;
2,420,580✔
3101
  int32_t  nEle = taosArrayGetSize(pArr);
2,420,580✔
3102
  uint8_t* buf = (nEle > 0) ? TARRAY_DATA(pArr) : NULL;
2,420,580✔
3103
  int32_t  len = (nEle > 0) ? (nEle * pArr->elemSize) : 0;
2,420,580✔
3104
  TAOS_CHECK_EXIT(tEncodeBinary(encoder, buf, len));
4,841,160✔
3105

3106
_exit:
2,420,580✔
3107
  return code;
2,420,580✔
3108
}
3109

3110
int32_t decodePlainArray(SDecoder* decoder, SArray** ppArr, uint32_t elemSize) {
1,206,362✔
3111
  int32_t  code = TSDB_CODE_SUCCESS;
1,206,362✔
3112
  int32_t  lino = 0;
1,206,362✔
3113
  void*    buf = NULL;
1,206,362✔
3114
  uint64_t len = 0;
1,206,362✔
3115
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(decoder, &buf, &len));
1,206,362✔
3116

3117
  if (len > 0) {
1,206,362✔
3118
    *ppArr = taosArrayInit(0, elemSize);
1,130,736✔
3119
    TSDB_CHECK_NULL(*ppArr, code, lino, _exit, terrno);
1,130,736✔
3120
    TSWAP((*ppArr)->pData, buf);
1,130,736✔
3121
    (*ppArr)->size = (*ppArr)->capacity = len / elemSize;
1,130,736✔
3122
  }
3123

3124
_exit:
1,206,362✔
3125
  if (buf != NULL) {
1,206,362✔
3126
    taosMemoryFree(buf);
1,130,736✔
3127
  }
3128
  return code;
1,206,362✔
3129
}
3130

3131
static int32_t encodeSetTableMapInfo(SEncoder* encoder, SSHashObj* pInfo) {
533,124✔
3132
  int32_t  code = TSDB_CODE_SUCCESS;
533,124✔
3133
  int32_t  lino = 0;
533,124✔
3134
  int32_t size = tSimpleHashGetSize(pInfo);
533,124✔
3135
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
532,358✔
3136
  int32_t iter = 0;
532,358✔
3137
  void*   px = tSimpleHashIterate(pInfo, NULL, &iter);
532,358✔
3138
  while (px != NULL) {
1,195,306✔
3139
    int64_t* uid = tSimpleHashGetKey(px, NULL);
662,182✔
3140
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *uid));
1,324,364✔
3141
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *(uid + 1)));
1,324,364✔
3142
    SSHashObj* info = *(SSHashObj**)px;
662,182✔
3143
    int32_t len = tSimpleHashGetSize(info);
662,182✔
3144
    TAOS_CHECK_EXIT(tEncodeI32(encoder, len));
662,182✔
3145
    int32_t iter1 = 0;
662,182✔
3146
    void*   px1 = tSimpleHashIterate(info, NULL, &iter1);
662,182✔
3147
    while (px1 != NULL) {
2,497,632✔
3148
      int16_t* slot = tSimpleHashGetKey(px1, NULL);
1,835,240✔
3149
      int16_t* cid = (int16_t*)px1;
1,835,240✔
3150
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *slot));
3,670,690✔
3151
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *cid));
3,670,900✔
3152

3153
      px1 = tSimpleHashIterate(info, px1, &iter1);
1,835,450✔
3154
    }
3155

3156
    px = tSimpleHashIterate(pInfo, px, &iter);
662,182✔
3157
  }
3158
  
3159
_exit:
533,124✔
3160
  return code;
533,124✔
3161
}
3162

3163
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
65,299,569✔
3164
  SEncoder encoder = {0};
65,299,569✔
3165
  int32_t  code = TSDB_CODE_SUCCESS;
65,301,260✔
3166
  int32_t  lino = 0;
65,301,260✔
3167
  int32_t  tlen = 0;
65,301,260✔
3168

3169
  tEncoderInit(&encoder, buf, bufLen);
65,301,260✔
3170
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
65,301,369✔
3171

3172
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
130,603,605✔
3173
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
130,598,531✔
3174
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
130,597,299✔
3175
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
130,598,552✔
3176

3177
  switch (pReq->type) {
65,298,176✔
3178
    case STRIGGER_PULL_SET_TABLE: {
266,562✔
3179
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
266,562✔
3180
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoTrigger));
266,562✔
3181
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoCalc));
266,562✔
3182
      break;
266,562✔
3183
    }
3184
    case STRIGGER_PULL_LAST_TS: {
561,186✔
3185
      break;
561,186✔
3186
    }
3187
    case STRIGGER_PULL_FIRST_TS: {
469,185✔
3188
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
469,185✔
3189
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
938,369✔
3190
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
938,598✔
3191
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
938,599✔
3192
      break;
469,185✔
3193
    }
3194
    case STRIGGER_PULL_TSDB_META: {
1,203,798✔
3195
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
1,203,798✔
3196
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
2,407,596✔
3197
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
2,407,596✔
3198
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
2,407,596✔
3199
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
2,407,596✔
3200
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,407,596✔
3201
      break;
1,203,798✔
3202
    }
3203
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3204
      break;
×
3205
    }
3206
    case STRIGGER_PULL_TSDB_TS_DATA: {
251,132✔
3207
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
251,132✔
3208
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
502,264✔
3209
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
502,264✔
3210
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
502,264✔
3211
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
502,264✔
3212
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
502,264✔
3213
      break;
251,132✔
3214
    }
3215
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
112,012✔
3216
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
112,012✔
3217
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
224,024✔
3218
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
224,024✔
3219
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
223,794✔
3220
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
223,794✔
3221
      break;
112,012✔
3222
    }
3223
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
112,012✔
3224
      break;
112,012✔
3225
    }
3226
    case STRIGGER_PULL_TSDB_CALC_DATA: {
12,522,194✔
3227
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
12,522,194✔
3228
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
25,044,388✔
3229
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
25,044,388✔
3230
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
25,044,388✔
3231
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
25,044,388✔
3232
      break;
12,522,194✔
3233
    }
3234
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3235
      break;
×
3236
    }
3237
    case STRIGGER_PULL_TSDB_DATA: {
775,776✔
3238
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
775,776✔
3239
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
1,551,552✔
3240
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
1,551,552✔
3241
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
1,551,552✔
3242
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
1,551,552✔
3243
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
775,776✔
3244
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
1,551,552✔
3245
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
1,551,552✔
3246
      break;
775,776✔
3247
    }
3248
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3249
      break;
×
3250
    }
3251
    case STRIGGER_PULL_WAL_META_NEW: {
23,376,718✔
3252
      SSTriggerWalMetaNewRequest* pRequest = (SSTriggerWalMetaNewRequest*)pReq;
23,376,718✔
3253
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
46,754,040✔
3254
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
46,754,641✔
3255
      break;
23,377,319✔
3256
    }
3257
    case STRIGGER_PULL_WAL_DATA_NEW:
16,893,442✔
3258
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3259
      SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
16,893,442✔
3260
      int32_t                     nVersion = taosArrayGetSize(pRequest->versions);
16,893,442✔
3261
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nVersion));
16,894,907✔
3262
      for (int32_t i = 0; i < nVersion; i++) {
33,908,233✔
3263
        int64_t ver = *(int64_t*)TARRAY_GET_ELEM(pRequest->versions, i);
17,013,326✔
3264
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, ver));
17,013,326✔
3265
      }
3266
      int32_t nRanges = tSimpleHashGetSize(pRequest->ranges);
16,894,907✔
3267
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nRanges));
16,894,915✔
3268
      int32_t iter = 0;
16,894,915✔
3269
      void*   px = tSimpleHashIterate(pRequest->ranges, NULL, &iter);
16,895,910✔
3270
      while (px != NULL) {
21,238,240✔
3271
        uint64_t* gid = tSimpleHashGetKey(px, NULL);
4,342,564✔
3272
        TAOS_CHECK_EXIT(tEncodeU64(&encoder, *gid));
8,685,128✔
3273
        int64_t* key = (int64_t*)px;
4,342,564✔
3274
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[0]));
8,684,362✔
3275
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[1]));
8,682,830✔
3276

3277
        px = tSimpleHashIterate(pRequest->ranges, px, &iter);
4,341,032✔
3278
      }
3279
      break;
16,895,676✔
3280
    }
3281
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
6,427,415✔
3282
      SSTriggerWalMetaDataNewRequest* pRequest = (SSTriggerWalMetaDataNewRequest*)pReq;
6,427,415✔
3283
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
12,856,233✔
3284
      break;
6,428,818✔
3285
    }
3286
    case STRIGGER_PULL_GROUP_COL_VALUE: {
565,478✔
3287
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
565,478✔
3288
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
1,130,956✔
3289
      break;
565,478✔
3290
    }
3291
    case STRIGGER_PULL_VTABLE_INFO: {
152,116✔
3292
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
152,116✔
3293
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
152,116✔
3294
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->uids));
152,116✔
3295
      TAOS_CHECK_EXIT(tEncodeBool(&encoder, pRequest->fetchAllTable));
152,116✔
3296
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
304,232✔
3297
      break;
152,116✔
3298
    }
3299
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
1,340,572✔
3300
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
1,340,572✔
3301
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
2,681,144✔
3302
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
1,340,572✔
3303
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
2,681,144✔
3304
      break;
1,340,572✔
3305
    }
3306
    case STRIGGER_PULL_OTABLE_INFO: {
266,562✔
3307
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
266,562✔
3308
      int32_t size = taosArrayGetSize(pRequest->cols);
266,562✔
3309
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
266,562✔
3310
      for (int32_t i = 0; i < size; ++i) {
984,130✔
3311
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
717,568✔
3312
        if (oInfo == NULL) {
717,568✔
3313
          uError("col id is NULL at index %d", i);
×
3314
          code = TSDB_CODE_INVALID_PARA;
×
3315
          goto _exit;
×
3316
        }
3317
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
1,435,136✔
3318
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
1,435,136✔
3319
      }
3320
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
533,124✔
3321
      break; 
266,562✔
3322
    }
3323
    default: {
1,418✔
3324
      uError("unknown pull type %d", pReq->type);
1,418✔
3325
      code = TSDB_CODE_INVALID_PARA;
×
3326
      break;
×
3327
    }
3328
  }
3329

3330
  tEndEncode(&encoder);
65,300,398✔
3331

3332
_exit:
65,296,395✔
3333
  if (code != TSDB_CODE_SUCCESS) {
65,296,856✔
3334
    tlen = code;
×
3335
  } else {
3336
    tlen = encoder.pos;
65,296,856✔
3337
  }
3338
  tEncoderClear(&encoder);
65,296,856✔
3339
  return tlen;
65,298,447✔
3340
}
3341

3342
static void destroyHash(void* data){
331,091✔
3343
  if (data){
331,091✔
3344
    SSHashObj* tmp = *(SSHashObj**)data;
331,091✔
3345
    tSimpleHashCleanup(tmp);
331,091✔
3346
  }
3347
}
331,091✔
3348

3349
static int32_t decodeSetTableMapInfo(SDecoder* decoder, SSHashObj** ppInfo) {
266,562✔
3350
  int32_t  code = TSDB_CODE_SUCCESS;
266,562✔
3351
  int32_t  lino = 0;
266,562✔
3352
  int32_t size = 0;
266,562✔
3353
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
266,562✔
3354
  *ppInfo = tSimpleHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
266,562✔
3355
  if (*ppInfo == NULL) {
266,562✔
3356
    TAOS_CHECK_EXIT(terrno);
×
3357
  }
3358
  tSimpleHashSetFreeFp(*ppInfo, destroyHash);
266,562✔
3359
  
3360
  for (int32_t i = 0; i < size; ++i) {
597,653✔
3361
    int64_t id[2] = {0};
331,091✔
3362
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id));
331,091✔
3363
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id+1));
662,182✔
3364
    int32_t len = 0;
331,091✔
3365
    TAOS_CHECK_EXIT(tDecodeI32(decoder, &len));
331,091✔
3366
    SSHashObj* tmp = tSimpleHashInit(len, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
331,091✔
3367
    if (tmp == NULL) {
331,091✔
3368
      TAOS_CHECK_EXIT(terrno);
×
3369
    }
3370
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppInfo, id, sizeof(id), &tmp, POINTER_BYTES));
331,091✔
3371

3372
    for (int32_t j = 0; j < len; ++j) {
1,248,816✔
3373
      int16_t slotId = 0;
917,725✔
3374
      int16_t cid = 0;
917,725✔
3375
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &slotId));
917,725✔
3376
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &cid));
917,725✔
3377
      TAOS_CHECK_EXIT(tSimpleHashPut(tmp, &slotId, sizeof(slotId), &cid, sizeof(cid)));
917,725✔
3378
    }
3379
  }
3380
_exit:
266,562✔
3381
  if (code != TSDB_CODE_SUCCESS) {
266,562✔
3382
    tSimpleHashCleanup(*ppInfo);
×
3383
    *ppInfo = NULL;
×
3384
  }
3385
  return code;
266,562✔
3386
}
3387

3388
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
32,471,252✔
3389
  SDecoder decoder = {0};
32,471,252✔
3390
  int32_t  code = TSDB_CODE_SUCCESS;
32,471,252✔
3391
  int32_t  lino = 0;
32,471,252✔
3392

3393
  tDecoderInit(&decoder, buf, bufLen);
32,471,252✔
3394
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
32,473,094✔
3395

3396
  int32_t type = 0;
32,474,170✔
3397
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
32,473,675✔
3398
  SSTriggerPullRequest* pBase = &(pReq->base);
32,473,675✔
3399
  pBase->type = type;
32,472,784✔
3400
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
64,950,771✔
3401
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
64,951,071✔
3402
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
64,950,866✔
3403

3404
  switch (type) {
32,475,476✔
3405
    case STRIGGER_PULL_SET_TABLE: {
133,281✔
3406
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
133,281✔
3407
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoTrigger));
133,281✔
3408
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoCalc));
133,281✔
3409
      break;
133,281✔
3410
    }
3411
    case STRIGGER_PULL_LAST_TS: {
280,463✔
3412
      break;
280,463✔
3413
    }
3414
    case STRIGGER_PULL_FIRST_TS: {
232,942✔
3415
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
232,942✔
3416
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
466,114✔
3417
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
466,344✔
3418
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
466,344✔
3419
      break;
233,172✔
3420
    }
3421
    case STRIGGER_PULL_TSDB_META: {
601,001✔
3422
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
601,001✔
3423
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
1,202,768✔
3424
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
1,203,534✔
3425
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
1,203,534✔
3426
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
1,203,534✔
3427
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,203,534✔
3428
      break;
601,767✔
3429
    }
3430
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3431
      break;
×
3432
    }
3433
    case STRIGGER_PULL_TSDB_TS_DATA: {
125,566✔
3434
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
125,566✔
3435
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
251,132✔
3436
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
251,132✔
3437
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
251,132✔
3438
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
251,132✔
3439
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
251,132✔
3440
      break;
125,566✔
3441
    }
3442
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
56,006✔
3443
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
56,006✔
3444
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
112,012✔
3445
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
112,012✔
3446
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
112,012✔
3447
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
112,012✔
3448
      break;
56,006✔
3449
    }
3450
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
56,006✔
3451
      break;
56,006✔
3452
    }
3453
    case STRIGGER_PULL_TSDB_CALC_DATA: {
6,259,473✔
3454
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
6,259,473✔
3455
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
12,518,946✔
3456
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
12,518,946✔
3457
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
12,518,946✔
3458
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
12,518,946✔
3459
      break;
6,259,473✔
3460
    }
3461
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3462
      break;
×
3463
    }
3464
    case STRIGGER_PULL_TSDB_DATA: {
385,590✔
3465
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
385,590✔
3466
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
771,180✔
3467
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
771,180✔
3468
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
771,180✔
3469
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
771,180✔
3470
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
385,590✔
3471
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
771,180✔
3472
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
771,180✔
3473
      break;
385,590✔
3474
    }
3475
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3476
      break;
×
3477
    }
3478
    case STRIGGER_PULL_WAL_META_NEW: {
11,589,327✔
3479
      SSTriggerWalMetaNewRequest* pRequest = &(pReq->walMetaNewReq);
11,589,327✔
3480
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
23,178,761✔
3481
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
23,178,808✔
3482
      break;
11,589,171✔
3483
    }
3484
    case STRIGGER_PULL_WAL_DATA_NEW:
8,447,790✔
3485
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3486
      SSTriggerWalDataNewRequest* pRequest = &(pReq->walDataNewReq);
8,447,790✔
3487
      int32_t                     nVersion = 0;
8,447,790✔
3488
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nVersion));
8,447,557✔
3489
      pRequest->versions = taosArrayInit_s(sizeof(int64_t), nVersion);
8,447,557✔
3490
      for (int32_t i = 0; i < nVersion; i++) {
16,968,347✔
3491
        int64_t* pVer = TARRAY_GET_ELEM(pRequest->versions, i);
8,520,557✔
3492
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, pVer));
8,520,557✔
3493
      }
3494
      int32_t nRanges = 0;
8,447,790✔
3495
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges));
8,447,790✔
3496
      pRequest->ranges = tSimpleHashInit(nRanges, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
8,447,790✔
3497
      if (pRequest->ranges == NULL) {
8,447,790✔
3498
        TAOS_CHECK_EXIT(terrno);
×
3499
      }
3500
      for (int32_t i = 0; i < nRanges; i++) {
10,619,442✔
3501
        uint64_t gid = 0;
2,171,652✔
3502
        int64_t pRange[2] = {0};
2,171,652✔
3503
        TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
2,170,120✔
3504
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[0]));
2,171,652✔
3505
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[1]));
2,170,886✔
3506
        TAOS_CHECK_EXIT(tSimpleHashPut(pRequest->ranges, &gid, sizeof(gid), pRange, sizeof(pRange)));
2,170,886✔
3507
      }
3508
      break;
8,447,790✔
3509
    }
3510
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
3,144,971✔
3511
      SSTriggerWalMetaDataNewRequest* pRequest = &(pReq->walMetaDataNewReq);
3,144,971✔
3512
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
6,292,098✔
3513
      break;
3,146,164✔
3514
    }
3515
    case STRIGGER_PULL_GROUP_COL_VALUE: {
282,541✔
3516
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
282,541✔
3517
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
565,082✔
3518
      break;
282,541✔
3519
    }
3520
    case STRIGGER_PULL_VTABLE_INFO: {
75,626✔
3521
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
75,626✔
3522
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
75,626✔
3523
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->uids, sizeof(int64_t)));
75,626✔
3524
      TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pRequest->fetchAllTable));
75,626✔
3525
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
151,252✔
3526
      break;
75,626✔
3527
    }
3528
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
669,520✔
3529
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
669,520✔
3530
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
1,339,040✔
3531
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
669,520✔
3532
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
1,339,040✔
3533
      break;
669,520✔
3534
    }
3535
    case STRIGGER_PULL_OTABLE_INFO: {
133,281✔
3536
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
133,281✔
3537
      int32_t size = 0;
133,281✔
3538
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
133,281✔
3539
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
133,281✔
3540
      if (pRequest->cols == NULL) {
133,281✔
3541
        code = terrno;
×
3542
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3543
        goto _exit;
×
3544
      }
3545
      for (int32_t i = 0; i < size; ++i) {
492,065✔
3546
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
358,784✔
3547
        if (oInfo == NULL) {
358,784✔
3548
          code = terrno;
×
3549
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3550
          goto _exit;
×
3551
        }
3552
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
358,784✔
3553
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
358,784✔
3554
      }
3555
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
266,562✔
3556

3557
      break;
133,281✔
3558
    }
3559
    default: {
2,092✔
3560
      uError("unknown pull type %d", type);
2,092✔
3561
      code = TSDB_CODE_INVALID_PARA;
×
3562
      break;
×
3563
    }
3564
  }
3565

3566
  tEndDecode(&decoder);
32,475,417✔
3567

3568
_exit:
32,473,606✔
3569
  tDecoderClear(&decoder);
32,474,478✔
3570
  return code;
32,468,366✔
3571
}
3572

3573
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo, bool full) {
35,357,868✔
3574
  int32_t size = full ? taosArrayGetSize(pParams) : 0;
35,357,868✔
3575
  int32_t code = 0;
35,360,225✔
3576
  int32_t lino = 0;
35,360,225✔
3577
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
35,360,169✔
3578
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3579
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
2,147,483,647✔
3580
    if (param == NULL) {
2,147,483,647✔
3581
      TAOS_CHECK_EXIT(terrno);
×
3582
    }
3583
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
2,147,483,647✔
3584
    if (pEncoder->data) {
2,147,483,647✔
3585
      TAOS_MEMCPY(pEncoder->data + pEncoder->pos, param, plainFieldSize);
2,147,483,647✔
3586
    }
3587
    pEncoder->pos += plainFieldSize;
2,147,483,647✔
3588

3589
    if (!ignoreNotificationInfo) {
2,147,483,647✔
3590
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
1,109,636,119✔
3591
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
554,817,934✔
3592
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
1,110,838,853✔
3593
    }
3594
  }
3595
_exit:
5,838,985✔
3596
  return code;
5,838,985✔
3597
}
3598

3599
void tDestroySSTriggerCalcParam(void* ptr) {
2,147,483,647✔
3600
  SSTriggerCalcParam* pParam = ptr;
2,147,483,647✔
3601
  if (pParam && pParam->extraNotifyContent != NULL) {
2,147,483,647✔
3602
    taosMemoryFreeClear(pParam->extraNotifyContent);
264,653✔
3603
  }
3604
  if (pParam && pParam->resultNotifyContent != NULL) {
2,147,483,647✔
3605
    taosMemoryFreeClear(pParam->resultNotifyContent);
×
3606
  }
3607
}
2,147,483,647✔
3608

3609
void tDestroySStreamGroupValue(void* ptr) {
24,523,822✔
3610
  SStreamGroupValue* pValue = ptr;
24,523,822✔
3611
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
24,523,822✔
3612
    taosMemoryFreeClear(pValue->data.pData);
17,124,810✔
3613
    pValue->data.nData = 0;
17,124,572✔
3614
  }
3615
}
24,524,067✔
3616

3617
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
17,672,125✔
3618
  int32_t size = 0, code = 0, lino = 0;
17,672,125✔
3619
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
17,672,376✔
3620
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
17,672,376✔
3621
  if (*ppParams == NULL) {
17,672,376✔
3622
    TAOS_CHECK_EXIT(terrno);
×
3623
  }
3624
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3625
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
2,147,483,647✔
3626
    if (param == NULL) {
2,147,483,647✔
3627
      TAOS_CHECK_EXIT(terrno);
×
3628
    }
3629
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
2,147,483,647✔
3630
    TAOS_MEMCPY(param, pDecoder->data + pDecoder->pos, plainFieldSize);
2,147,483,647✔
3631
    pDecoder->pos += plainFieldSize;
2,147,483,647✔
3632

3633
    if (!ignoreNotificationInfo) {
2,147,483,647✔
3634
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
554,818,886✔
3635
      uint64_t len = 0;
277,409,443✔
3636
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
554,818,886✔
3637
    }
3638
  }
3639

3640
_exit:
23,148,787✔
3641
  return code;
17,672,376✔
3642
}
3643

3644
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
35,925,301✔
3645
  int32_t code = TSDB_CODE_SUCCESS;
35,925,301✔
3646
  int32_t lino = 0;
35,925,301✔
3647

3648
  int32_t size = taosArrayGetSize(pGroupColVals);
35,925,301✔
3649
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
35,925,924✔
3650
  for (int32_t i = 0; i < size; ++i) {
82,814,533✔
3651
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
46,888,839✔
3652
    if (pValue == NULL) {
46,888,309✔
3653
      TAOS_CHECK_EXIT(terrno);
×
3654
    }
3655
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
46,888,309✔
3656
    if (pValue->isNull) {
46,888,933✔
3657
      continue;
12,840✔
3658
    }
3659
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
46,872,242✔
3660
    if (pValue->isTbname) {
46,875,298✔
3661
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
46,391,921✔
3662
      if (vgId != -1) { pValue->vgId = vgId; }
23,196,082✔
3663
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
46,391,681✔
3664
    }
3665
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
93,749,082✔
3666
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
46,874,024✔
3667
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
65,549,715✔
3668
    } else {
3669
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
28,201,330✔
3670
    }
3671
  }
3672

3673
_exit:
35,925,694✔
3674
  return code;
35,925,694✔
3675
}
3676

3677
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
17,954,679✔
3678
  int32_t code = TSDB_CODE_SUCCESS;
17,954,679✔
3679
  int32_t lino = 0;
17,954,679✔
3680
  int32_t size = 0;
17,954,679✔
3681

3682
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
17,954,897✔
3683
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
17,954,897✔
3684
  if (size > 0) {
17,955,115✔
3685
    if (*ppGroupColVals == NULL) {
12,645,538✔
3686
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
12,645,538✔
3687
      if (*ppGroupColVals == NULL) {
12,645,538✔
3688
        TAOS_CHECK_EXIT(terrno);
×
3689
      }
3690
    } else {
3691
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
×
3692
    }
3693
  }
3694
  for (int32_t i = 0; i < size; ++i) {
41,397,154✔
3695
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
23,442,763✔
3696
    if (pValue == NULL) {
23,443,034✔
3697
      TAOS_CHECK_EXIT(terrno);
×
3698
    }
3699
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
23,443,034✔
3700
    if (pValue->isNull) {
23,442,483✔
3701
      continue;
6,420✔
3702
    }
3703
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
23,436,063✔
3704
    if (pValue->isTbname) {
23,436,614✔
3705
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
23,192,854✔
3706
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
23,192,594✔
3707
    }
3708
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
46,872,682✔
3709
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
39,821,772✔
3710
      uint64_t len = 0;
16,385,908✔
3711
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
32,771,385✔
3712
      pValue->data.nData = len;
16,385,704✔
3713
    } else {
3714
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
14,100,840✔
3715
    }
3716
  }
3717
_exit:
17,954,391✔
3718
  return code;
17,954,625✔
3719
}
3720

3721
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
565,082✔
3722
  SEncoder encoder = {0};
565,082✔
3723
  int32_t  code = TSDB_CODE_SUCCESS;
565,082✔
3724
  int32_t  lino = 0;
565,082✔
3725
  int32_t  tlen = 0;
565,082✔
3726

3727
  tEncoderInit(&encoder, buf, bufLen);
565,082✔
3728
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
565,082✔
3729

3730
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
565,082✔
3731

3732
  tEndEncode(&encoder);
565,082✔
3733

3734
_exit:
565,082✔
3735
  if (code != TSDB_CODE_SUCCESS) {
565,082✔
3736
    tlen = code;
×
3737
  } else {
3738
    tlen = encoder.pos;
565,082✔
3739
  }
3740
  tEncoderClear(&encoder);
565,082✔
3741
  return tlen;
565,082✔
3742
}
3743

3744
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
282,739✔
3745
  SDecoder decoder = {0};
282,739✔
3746
  int32_t  code = TSDB_CODE_SUCCESS;
282,739✔
3747
  int32_t  lino = 0;
282,739✔
3748
  int32_t  size = 0;
282,739✔
3749

3750
  tDecoderInit(&decoder, buf, bufLen);
282,739✔
3751
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
282,739✔
3752

3753
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
282,739✔
3754

3755
  tEndDecode(&decoder);
282,739✔
3756

3757
_exit:
282,739✔
3758
  tDecoderClear(&decoder);
282,739✔
3759
  return code;
282,739✔
3760
}
3761

3762
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
5,096,676✔
3763
  SEncoder encoder = {0};
5,096,676✔
3764
  int32_t  code = TSDB_CODE_SUCCESS;
5,097,104✔
3765
  int32_t  lino = 0;
5,097,104✔
3766
  int32_t  tlen = 0;
5,097,104✔
3767

3768
  tEncoderInit(&encoder, buf, bufLen);
5,097,104✔
3769
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
5,096,656✔
3770

3771
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
10,193,978✔
3772
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
10,193,760✔
3773
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
10,193,521✔
3774
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
10,193,488✔
3775
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
10,193,509✔
3776

3777
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false, true));
5,096,656✔
3778
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
5,097,104✔
3779
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
10,194,208✔
3780
  TAOS_CHECK_EXIT(tEncodeBool(&encoder, pReq->isWindowTrigger));
5,097,104✔
3781
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->precision));
10,193,739✔
3782

3783
  tEndEncode(&encoder);
5,096,635✔
3784

3785
_exit:
5,096,668✔
3786
  if (code != TSDB_CODE_SUCCESS) {
5,096,668✔
3787
    tlen = code;
×
3788
  } else {
3789
    tlen = encoder.pos;
5,096,668✔
3790
  }
3791
  tEncoderClear(&encoder);
5,096,668✔
3792
  return tlen;
5,097,104✔
3793
}
3794

3795
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
2,547,020✔
3796
  SDecoder decoder = {0};
2,547,020✔
3797
  int32_t  code = TSDB_CODE_SUCCESS;
2,547,020✔
3798
  int32_t  lino = 0;
2,547,020✔
3799

3800
  tDecoderInit(&decoder, buf, bufLen);
2,547,020✔
3801
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2,547,020✔
3802

3803
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
5,094,040✔
3804
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
5,094,040✔
3805
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
5,094,040✔
3806
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
5,094,040✔
3807
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
5,094,040✔
3808

3809
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
2,547,020✔
3810
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
2,547,020✔
3811
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
5,093,822✔
3812
  if (!tDecodeIsEnd(&decoder)) {
2,547,020✔
3813
    TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pReq->isWindowTrigger));
2,547,020✔
3814
    TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->precision));
5,094,040✔
3815
  }
3816

3817
  tEndDecode(&decoder);
2,547,020✔
3818

3819
_exit:
2,547,020✔
3820
  tDecoderClear(&decoder);
2,547,020✔
3821
  return code;
2,547,020✔
3822
}
3823

3824
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
8,055,612✔
3825
  if (pReq != NULL) {
8,055,612✔
3826
    if (pReq->params != NULL) {
8,055,612✔
3827
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
5,280,622✔
3828
      pReq->params = NULL;
5,280,851✔
3829
    }
3830
    if (pReq->groupColVals != NULL) {
8,055,841✔
3831
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
3,133,846✔
3832
      pReq->groupColVals = NULL;
3,133,846✔
3833
    }
3834
    blockDataDestroy(pReq->pOutBlock);
8,055,612✔
3835
  }
3836
}
8,055,612✔
3837

3838
int32_t tSerializeSTriggerDropTableRequest(void* buf, int32_t bufLen, const SSTriggerDropRequest* pReq) {
×
3839
  SEncoder encoder = {0};
×
3840
  int32_t  code = TSDB_CODE_SUCCESS;
×
3841
  int32_t  lino = 0;
×
3842
  int32_t  tlen = 0;
×
3843

3844
  tEncoderInit(&encoder, buf, bufLen);
×
3845
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3846

3847
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
×
3848
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
×
3849
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
×
3850
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
×
3851

3852
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
×
3853

3854
  tEndEncode(&encoder);
×
3855

3856
_exit:
×
3857
  if (code != TSDB_CODE_SUCCESS) {
×
3858
    tlen = code;
×
3859
  } else {
3860
    tlen = encoder.pos;
×
3861
  }
3862
  tEncoderClear(&encoder);
×
3863
  return tlen;
×
3864
}
3865

3866
int32_t tDeserializeSTriggerDropTableRequest(void* buf, int32_t bufLen, SSTriggerDropRequest* pReq) {
×
3867
  SDecoder decoder = {0};
×
3868
  int32_t  code = TSDB_CODE_SUCCESS;
×
3869
  int32_t  lino = 0;
×
3870

3871
  tDecoderInit(&decoder, buf, bufLen);
×
3872
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3873

3874
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
×
3875
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
×
3876
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
×
3877
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
×
3878

3879
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
×
3880

3881
  tEndDecode(&decoder);
×
3882

3883
_exit:
×
3884
  tDecoderClear(&decoder);
×
3885
  return code;
×
3886
}
3887

3888
void tDestroySSTriggerDropRequest(SSTriggerDropRequest* pReq) {
×
3889
  if (pReq != NULL) {
×
3890
    if (pReq->groupColVals != NULL) {
×
3891
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
×
3892
      pReq->groupColVals = NULL;
×
3893
    }
3894
  }
3895
}
×
3896

3897
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
37,763,172✔
3898
  SEncoder encoder = {0};
37,763,172✔
3899
  int32_t  code = TSDB_CODE_SUCCESS;
37,763,172✔
3900
  int32_t  lino = 0;
37,763,172✔
3901
  int32_t  tlen = 0;
37,763,172✔
3902

3903
  tEncoderInit(&encoder, buf, bufLen);
37,763,172✔
3904
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
37,763,172✔
3905

3906
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
75,526,344✔
3907
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
75,526,113✔
3908
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
75,526,113✔
3909
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
75,526,344✔
3910

3911
  tEndEncode(&encoder);
37,763,172✔
3912

3913
_exit:
37,763,172✔
3914
  if (code != TSDB_CODE_SUCCESS) {
37,763,172✔
3915
    tlen = code;
×
3916
  } else {
3917
    tlen = encoder.pos;
37,763,172✔
3918
  }
3919
  tEncoderClear(&encoder);
37,763,172✔
3920
  return tlen;
37,763,172✔
3921
}
3922

3923
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
56,620,559✔
3924
  SDecoder decoder = {0};
56,620,559✔
3925
  int32_t  code = TSDB_CODE_SUCCESS;
56,621,958✔
3926
  int32_t  lino = 0;
56,621,958✔
3927

3928
  tDecoderInit(&decoder, buf, bufLen);
56,621,958✔
3929
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
56,623,254✔
3930

3931
  int32_t type = 0;
56,627,074✔
3932
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
56,628,164✔
3933
  pReq->type = type;
56,628,164✔
3934
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
113,263,696✔
3935
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
113,269,710✔
3936
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
113,269,580✔
3937

3938
  tEndDecode(&decoder);
56,634,893✔
3939

3940
_exit:
56,631,884✔
3941
  tDecoderClear(&decoder);
56,633,316✔
3942
  return code;
56,623,348✔
3943
}
3944

3945
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo, bool full) {
30,263,255✔
3946
  int32_t code = 0, lino = 0;
30,263,255✔
3947
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true, full));
30,263,255✔
3948
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
30,263,868✔
3949
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.skey));
60,526,864✔
3950
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.ekey));
60,526,281✔
3951
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
60,525,539✔
3952
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
60,525,271✔
3953
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
60,525,518✔
3954
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
30,262,769✔
3955
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
60,525,281✔
3956
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->isWindowTrigger));
30,262,538✔
3957
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pInfo->precision));
60,526,663✔
3958
_exit:
30,262,892✔
3959
  return code;
30,262,892✔
3960
}
3961

3962
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
15,125,105✔
3963
  int32_t code = 0, lino = 0;
15,125,105✔
3964
  int32_t size = 0;
15,125,105✔
3965
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
15,125,105✔
3966
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
15,125,356✔
3967
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.skey));
30,249,449✔
3968
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.ekey));
30,250,199✔
3969
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
30,249,948✔
3970
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
30,250,216✔
3971
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
30,250,712✔
3972
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
15,125,356✔
3973
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
30,250,207✔
3974
  if (!tDecodeIsEnd(pDecoder)) {
15,124,828✔
3975
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->isWindowTrigger));
15,124,828✔
3976
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pInfo->precision));
30,250,712✔
3977
  }
3978
_exit:
15,125,356✔
3979
  return code;
15,125,356✔
3980
}
3981

3982
void tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
23,840,681✔
3983
  if (pInfo == NULL) return;
23,840,681✔
3984
  if (pInfo->pStreamPesudoFuncVals != NULL) {
23,840,681✔
3985
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
12,857,637✔
3986
    pInfo->pStreamPesudoFuncVals = NULL;
12,857,392✔
3987
  }
3988
  if (pInfo->pStreamPartColVals != NULL) {
23,842,165✔
3989
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
9,695,065✔
3990
    pInfo->pStreamPartColVals = NULL;
9,695,065✔
3991
  }
3992
}
3993

3994
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
151,252✔
3995
  SEncoder encoder = {0};
151,252✔
3996
  int32_t  code = TSDB_CODE_SUCCESS;
151,252✔
3997
  int32_t  lino = 0;
151,252✔
3998
  int32_t  tlen = 0;
151,252✔
3999

4000
  tEncoderInit(&encoder, buf, bufLen);
151,252✔
4001
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
151,252✔
4002

4003
  int32_t size = taosArrayGetSize(pRsp->infos);
151,252✔
4004
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
151,252✔
4005
  for (int32_t i = 0; i < size; ++i) {
522,934✔
4006
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
371,682✔
4007
    if (info == NULL) {
371,682✔
4008
      TAOS_CHECK_EXIT(terrno);
×
4009
    }
4010
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
743,364✔
4011
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
743,364✔
4012
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
371,682✔
4013
  }
4014

4015
  tEndEncode(&encoder);
151,252✔
4016

4017
_exit:
151,252✔
4018
  if (code != TSDB_CODE_SUCCESS) {
151,252✔
4019
    tlen = code;
×
4020
  } else {
4021
    tlen = encoder.pos;
151,252✔
4022
  }
4023
  tEncoderClear(&encoder);
151,252✔
4024
  return tlen;
151,252✔
4025
}
4026

4027
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
74,860✔
4028
  SDecoder decoder = {0};
74,860✔
4029
  int32_t  code = TSDB_CODE_SUCCESS;
74,860✔
4030
  int32_t  lino = 0;
74,860✔
4031
  int32_t  size = 0;
74,860✔
4032

4033
  tDecoderInit(&decoder, buf, bufLen);
74,860✔
4034
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
75,626✔
4035

4036
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
75,626✔
4037
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
75,626✔
4038
  if (vTableInfo->infos == NULL) {
75,626✔
4039
    TAOS_CHECK_EXIT(terrno);
×
4040
  }
4041
  for (int32_t i = 0; i < size; ++i) {
261,467✔
4042
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
185,841✔
4043
    if (info == NULL) {
185,841✔
4044
      TAOS_CHECK_EXIT(terrno);
×
4045
    }
4046
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
371,682✔
4047
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
371,682✔
4048
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
185,841✔
4049
  }
4050

4051
  tEndDecode(&decoder);
75,626✔
4052

4053
_exit:
75,626✔
4054
  tDecoderClear(&decoder);
75,626✔
4055
  return code;
75,626✔
4056
}
4057

4058

4059
void tDestroyVTableInfo(void *ptr) {
371,682✔
4060
  if (NULL == ptr) {
371,682✔
4061
    return;
×
4062
  }
4063
  VTableInfo* pTable = (VTableInfo*)ptr;
371,682✔
4064
  taosMemoryFree(pTable->cols.pColRef);
371,682✔
4065
}
4066

4067
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
24,210,272✔
4068
  if (ptr == NULL) return;
24,210,272✔
4069
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
24,210,272✔
4070
  ptr->infos = NULL;
24,209,506✔
4071
}
4072

4073
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
1,025,514✔
4074
  SEncoder encoder = {0};
1,025,514✔
4075
  int32_t  code = TSDB_CODE_SUCCESS;
1,026,235✔
4076
  int32_t  lino = 0;
1,026,235✔
4077
  int32_t  tlen = 0;
1,026,235✔
4078

4079
  tEncoderInit(&encoder, buf, bufLen);
1,026,235✔
4080
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,025,048✔
4081

4082
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
2,051,993✔
4083
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
1,026,227✔
4084
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
1,025,995✔
4085
  for (int32_t i = 0; i < size; ++i) {
2,314,205✔
4086
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
1,289,125✔
4087
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
2,577,103✔
4088
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
2,576,876✔
4089
  }
4090

4091
  tEndEncode(&encoder);
1,025,080✔
4092

4093
_exit:
1,024,858✔
4094
  if (code != TSDB_CODE_SUCCESS) {
1,024,858✔
4095
    tlen = code;
×
4096
  } else {
4097
    tlen = encoder.pos;
1,024,858✔
4098
  }
4099
  tEncoderClear(&encoder);
1,024,858✔
4100
  return tlen;
1,025,538✔
4101
}
4102

4103
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
513,909✔
4104
  SDecoder decoder = {0};
513,909✔
4105
  int32_t  code = TSDB_CODE_SUCCESS;
513,909✔
4106
  int32_t  lino = 0;
513,909✔
4107
  SSDataBlock *pResBlock = pBlock;
513,909✔
4108

4109
  tDecoderInit(&decoder, buf, bufLen);
513,909✔
4110
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
513,909✔
4111

4112
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
1,027,818✔
4113
  int32_t numOfCols = 2;
513,909✔
4114
  if (pResBlock->pDataBlock == NULL) {
513,909✔
4115
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
513,909✔
4116
    if (pResBlock->pDataBlock == NULL) {
513,909✔
4117
      TAOS_CHECK_EXIT(terrno);
×
4118
    }
4119
    for (int32_t i = 0; i< numOfCols; ++i) {
1,541,485✔
4120
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
1,027,818✔
4121
      if (pColInfoData == NULL) {
1,027,818✔
4122
        TAOS_CHECK_EXIT(terrno);
×
4123
      }
4124
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
1,027,818✔
4125
      pColInfoData->info.bytes = sizeof(int64_t);
1,027,576✔
4126
    }
4127
  }
4128
  int32_t numOfRows = 0;
513,667✔
4129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
513,909✔
4130
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
513,909✔
4131
  for (int32_t i = 0; i < numOfRows; ++i) {
1,159,646✔
4132
    for (int32_t j = 0; j < numOfCols; ++j) {
1,937,211✔
4133
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
1,291,474✔
4134
      if (pColInfoData == NULL) {
1,291,474✔
4135
        TAOS_CHECK_EXIT(terrno);
×
4136
      }
4137
      int64_t value = 0;
1,291,474✔
4138
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
1,291,474✔
4139
      colDataSetInt64(pColInfoData, i, &value);
1,291,474✔
4140
    }
4141
  }
4142

4143
  pResBlock->info.dataLoad = 1;
513,909✔
4144
  pResBlock->info.rows = numOfRows;
513,909✔
4145

4146
  tEndDecode(&decoder);
513,909✔
4147

4148
_exit:
513,909✔
4149
  tDecoderClear(&decoder);
513,909✔
4150
  return code;
513,909✔
4151
}
4152

4153
static int32_t encodeData(SEncoder* encoder, void* pBlock, SSHashObj* indexHash) {
1,678,090✔
4154
  int32_t code = TSDB_CODE_SUCCESS;
1,678,090✔
4155
  int32_t lino = 0;
1,678,090✔
4156
  int32_t len = 0;
1,678,090✔
4157
  if (encoder->data == NULL){
1,678,090✔
4158
    len = blockGetEncodeSize(pBlock);
839,160✔
4159
  } else {
4160
    len = blockEncode(pBlock, (char*)(encoder->data + encoder->pos), encoder->size - encoder->pos, blockDataGetNumOfCols(pBlock));
839,160✔
4161
    if (len < 0) {
839,160✔
4162
      TAOS_CHECK_EXIT(terrno);
×
4163
    }
4164
  }
4165
  encoder->pos += len;
1,678,320✔
4166

4167
  if (indexHash == NULL) {
1,678,320✔
4168
    goto _exit;
724,938✔
4169
  } 
4170
  
4171
  uint32_t pos = encoder->pos;
953,382✔
4172
  encoder->pos += sizeof(uint32_t); // reserve space for tables
953,382✔
4173
  int32_t tables = 0;
953,382✔
4174
  
4175
  void*   pe = NULL;
953,382✔
4176
  int32_t iter = 0;
953,382✔
4177
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
2,161,199✔
4178
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
1,208,047✔
4179
    if (pInfo->gId == -1){
1,208,047✔
4180
      continue;
×
4181
    }
4182
    int64_t uid = *(int64_t*)(tSimpleHashGetKey(pe, NULL));
1,208,047✔
4183
    TAOS_CHECK_EXIT(tEncodeI64(encoder, uid));
1,208,047✔
4184
    TAOS_CHECK_EXIT(tEncodeU64(encoder, pInfo->gId));
2,416,094✔
4185
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->startRowIdx));
2,416,325✔
4186
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->numRows));
2,416,095✔
4187
    tables++;
1,207,817✔
4188
  }
4189
  uint32_t tmpPos = encoder->pos;
953,151✔
4190
  encoder->pos = pos;
953,151✔
4191
  TAOS_CHECK_EXIT(tEncodeI32(encoder, tables));
952,921✔
4192
  encoder->pos = tmpPos;
952,921✔
4193
_exit:
1,677,859✔
4194
  return code;
1,677,859✔
4195
}
4196
 
4197
static int32_t encodeBlock(SEncoder* encoder, void* block, SSHashObj* indexHash) {
5,763,803✔
4198
  int32_t  code = TSDB_CODE_SUCCESS;
5,763,803✔
4199
  int32_t  lino = 0;
5,763,803✔
4200
  if (block != NULL && ((SSDataBlock*)block)->info.rows > 0) {
5,763,803✔
4201
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 1));
1,678,320✔
4202
    TAOS_CHECK_EXIT(encodeData(encoder, block, indexHash));
1,678,320✔
4203
  } else {
4204
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 0));
4,085,726✔
4205
  }
4206

4207
_exit:
4,085,726✔
4208
  return code;
5,763,585✔
4209
}
4210

4211
int32_t tSerializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* rsp) {
1,441,242✔
4212
  SEncoder encoder = {0};
1,441,242✔
4213
  int32_t  code = TSDB_CODE_SUCCESS;
1,441,242✔
4214
  int32_t  lino = 0;
1,441,242✔
4215
  int32_t  tlen = 0;
1,441,242✔
4216

4217
  tEncoderInit(&encoder, buf, bufLen);
1,441,242✔
4218
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,441,242✔
4219

4220
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->dataBlock, rsp->indexHash));
1,441,242✔
4221
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->metaBlock, NULL));
1,440,781✔
4222
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->deleteBlock, NULL));
1,441,242✔
4223
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->tableBlock, NULL));
1,440,781✔
4224

4225
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->ver));
2,881,792✔
4226
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->verTime));
2,881,792✔
4227
  tEndEncode(&encoder);
1,441,011✔
4228

4229
_exit:
1,441,012✔
4230
  if (code != TSDB_CODE_SUCCESS) {
1,441,012✔
4231
    tlen = code;
×
4232
  } else {
4233
    tlen = encoder.pos;
1,441,012✔
4234
  }
4235
  tEncoderClear(&encoder);
1,441,012✔
4236
  return tlen;
1,441,012✔
4237
}
4238

4239
static int32_t decodeBlock(SDecoder* decoder, void* pBlock) {
2,162,122✔
4240
  int32_t  code = TSDB_CODE_SUCCESS;
2,162,122✔
4241
  int32_t  lino = 0;
2,162,122✔
4242
  
4243
  int8_t hasData = false;
2,162,122✔
4244
  TAOS_CHECK_EXIT(tDecodeI8(decoder, &hasData));
2,162,853✔
4245
  if (hasData) {
2,162,853✔
4246
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
362,099✔
4247
    const char* pEndPos = NULL;
362,099✔
4248
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder->data + decoder->pos, &pEndPos));
362,601✔
4249
    decoder->pos = (uint8_t*)pEndPos - decoder->data;
362,601✔
4250
  } else if (pBlock != NULL) {
1,800,754✔
4251
    blockDataEmpty(pBlock);
723,327✔
4252
  }
4253

4254
_exit:
2,161,150✔
4255
  return code;
2,162,853✔
4256
}
4257

4258
int32_t tDeserializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* pRsp, SArray* pSlices){
720,951✔
4259
  SDecoder     decoder = {0};
720,951✔
4260
  int32_t      code = TSDB_CODE_SUCCESS;
720,951✔
4261
  int32_t      lino = 0;
720,951✔
4262
  SSDataBlock* pBlock = NULL;
720,951✔
4263

4264
  tDecoderInit(&decoder, buf, bufLen);
720,951✔
4265
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
720,951✔
4266

4267
  // decode data block
4268
  int8_t hasData = false;
720,951✔
4269
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasData));
720,951✔
4270
  pBlock = pRsp->dataBlock;
720,951✔
4271
  if (hasData) {
720,951✔
4272
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
476,889✔
4273
    const char* pEndPos = NULL;
476,889✔
4274
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
476,889✔
4275
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
476,889✔
4276

4277
    int32_t nSlices = 0;
476,889✔
4278
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nSlices));
476,889✔
4279
    TAOS_CHECK_EXIT(taosArrayEnsureCap(pSlices, nSlices));
476,889✔
4280
    taosArrayClear(pSlices);
476,387✔
4281
    int64_t  uid = 0;
476,387✔
4282
    uint64_t gid = 0;
476,387✔
4283
    int32_t  startIdx = 0;
476,387✔
4284
    int32_t  numRows = 0;
476,387✔
4285
    for (int32_t i = 0; i < nSlices; i++) {
1,080,059✔
4286
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &uid));
604,174✔
4287
      TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
603,901✔
4288
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &startIdx));
604,174✔
4289
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numRows));
604,174✔
4290
      int32_t endIdx = startIdx + numRows;
604,174✔
4291
      int64_t value[3] = {gid, uid, (int64_t)startIdx << 32 | endIdx};
604,174✔
4292
      void*   px = taosArrayPush(pSlices, value);
603,672✔
4293
      if (px == NULL) {
603,672✔
4294
        code = terrno;
×
4295
        goto _exit;
×
4296
      }
4297
    }
4298
  } else if (pBlock != NULL) {
244,062✔
4299
    blockDataEmpty(pBlock);
3,272✔
4300
    taosArrayClear(pSlices);
3,272✔
4301
  }
4302

4303
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->metaBlock));
720,951✔
4304
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->deleteBlock));
720,951✔
4305
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->tableBlock));
720,951✔
4306
  
4307
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
1,441,902✔
4308
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->verTime));
1,441,673✔
4309

4310
  tEndDecode(&decoder);
720,722✔
4311

4312
_exit:
720,449✔
4313
  if (code != TSDB_CODE_SUCCESS) {
720,951✔
4314
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4315
  }
4316
  tDecoderClear(&decoder);
720,951✔
4317
  return code;
720,951✔
4318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc