• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.27
/source/common/src/msg/streamMsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamMsg.h"
17
#include "tdatablock.h"
18
#include "tmsg.h"
19
#include "os.h"
20
#include "tcommon.h"
21

22
typedef struct STaskId {
23
  int64_t streamId;
24
  int64_t taskId;
25
} STaskId;
26

27
typedef enum EWindowType {
28
  WINDOW_TYPE_INTERVAL = 1,
29
  WINDOW_TYPE_SESSION,
30
  WINDOW_TYPE_STATE,
31
  WINDOW_TYPE_EVENT,
32
  WINDOW_TYPE_COUNT,
33
  WINDOW_TYPE_ANOMALY,
34
  WINDOW_TYPE_EXTERNAL,
35
  WINDOW_TYPE_PERIOD
36
} EWindowType;
37

38
typedef struct STaskCkptInfo {
39
  int64_t latestId;          // saved checkpoint id
40
  int64_t latestVer;         // saved checkpoint ver
41
  int64_t latestTime;        // latest checkpoint time
42
  int64_t latestSize;        // latest checkpoint size
43
  int8_t  remoteBackup;      // latest checkpoint backup done
44
  int64_t activeId;          // current active checkpoint id
45
  int32_t activeTransId;     // checkpoint trans id
46
  int8_t  failed;            // denote if the checkpoint is failed or not
47
  int8_t  consensusChkptId;  // required the consensus-checkpointId
48
  int64_t consensusTs;       //
49
} STaskCkptInfo;
50

51
typedef struct STaskStatusEntry {
52
  STaskId       id;
53
  int32_t       status;
54
  int32_t       statusLastDuration;  // to record the last duration of current status
55
  int64_t       stage;
56
  int32_t       nodeId;
57
  SVersionRange verRange;      // start/end version in WAL, only valid for source task
58
  int64_t       processedVer;  // only valid for source task
59
  double        inputQUsed;    // in MiB
60
  double        inputRate;
61
  double        procsThroughput;   // duration between one element put into input queue and being processed.
62
  double        procsTotal;        // duration between one element put into input queue and being processed.
63
  double        outputThroughput;  // the size of dispatched result blocks in bytes
64
  double        outputTotal;       // the size of dispatched result blocks in bytes
65
  double        sinkQuota;         // existed quota size for sink task
66
  double        sinkDataSize;      // sink to dst data size
67
  int64_t       startTime;
68
  int64_t       startCheckpointId;
69
  int64_t       startCheckpointVer;
70
  int64_t       hTaskId;
71
  STaskCkptInfo checkpointInfo;
72
  STaskNotifyEventStat notifyEventStat;
73
} STaskStatusEntry;
74

75
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
×
76
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
×
77
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId));
×
78
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId));
×
79
  TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet));
×
80
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage));
×
81
  return 0;
×
82
}
83

84
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
×
85
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId));
×
86
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId));
×
87
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId));
×
88
  TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet));
×
89
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage));
×
90
  return 0;
×
91
}
92

93
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
×
94
  int32_t code = 0;
×
95
  int32_t lino;
96

97
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
98
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId));
×
99
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId));
×
100

101
  int32_t size = taosArrayGetSize(pMsg->pNodeList);
×
102
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
103

104
  for (int32_t i = 0; i < size; ++i) {
×
105
    SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
×
106
    if (pInfo == NULL) {
×
107
      TAOS_CHECK_EXIT(terrno);
×
108
    }
109

110
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId));
×
111
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp));
×
112
    TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp));
×
113
  }
114

115
  // todo this new attribute will be result in being incompatible with previous version
116
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId));
×
117

118
  int32_t numOfTasks = taosArrayGetSize(pMsg->pTaskList);
×
119
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfTasks));
×
120

121
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
122
    int32_t* pId = taosArrayGet(pMsg->pTaskList, i);
×
123
    if (pId == NULL) {
×
124
      TAOS_CHECK_EXIT(terrno);
×
125
    }
126
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)pId));
×
127
  }
128

129
  tEndEncode(pEncoder);
×
130
_exit:
×
131
  if (code) {
×
132
    return code;
×
133
  } else {
134
    return pEncoder->pos;
×
135
  }
136
}
137

138
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
×
139
  int32_t code = 0;
×
140
  int32_t lino;
141

142
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
143
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId));
×
144
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId));
×
145

146
  int32_t size = 0;
×
147
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
148

149
  pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
×
150
  TSDB_CHECK_NULL(pMsg->pNodeList, code, lino, _exit, terrno);
×
151

152
  for (int32_t i = 0; i < size; ++i) {
×
153
    SNodeUpdateInfo info = {0};
×
154
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId));
×
155
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp));
×
156
    TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp));
×
157

158
    if (taosArrayPush(pMsg->pNodeList, &info) == NULL) {
×
159
      TAOS_CHECK_EXIT(terrno);
×
160
    }
161
  }
162

163
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId));
×
164

165
  // number of tasks
166
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
167
  pMsg->pTaskList = taosArrayInit(size, sizeof(int32_t));
×
168
  if (pMsg->pTaskList == NULL) {
×
169
    TAOS_CHECK_EXIT(terrno);
×
170
  }
171

172
  for (int32_t i = 0; i < size; ++i) {
×
173
    int32_t id = 0;
×
174
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &id));
×
175
    if (taosArrayPush(pMsg->pTaskList, &id) == NULL) {
×
176
      TAOS_CHECK_EXIT(terrno);
×
177
    }
178
  }
179

180
  tEndDecode(pDecoder);
×
181
_exit:
×
182
  return code;
×
183
}
184

185
void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg) {
×
186
  taosArrayDestroy(pMsg->pNodeList);
×
187
  taosArrayDestroy(pMsg->pTaskList);
×
188
  pMsg->pNodeList = NULL;
×
189
  pMsg->pTaskList = NULL;
×
190
}
×
191

192
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
×
193
  int32_t code = 0;
×
194
  int32_t lino;
195

196
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
197
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
198
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
199
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
200
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
201
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
×
202
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId));
×
203
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId));
×
204
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
205
  tEndEncode(pEncoder);
×
206

207
_exit:
×
208
  if (code) {
×
209
    return code;
×
210
  } else {
211
    return pEncoder->pos;
×
212
  }
213
}
214

215
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
×
216
  int32_t code = 0;
×
217
  int32_t lino;
218

219
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
220
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
221
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
222
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
223
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
224
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
×
225
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId));
×
226
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId));
×
227
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
228
  tEndDecode(pDecoder);
×
229

230
_exit:
×
231
  return code;
×
232
}
233

234
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
×
235
  int32_t code = 0;
×
236
  int32_t lino;
237

238
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
239
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
×
240
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
241
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId));
×
242
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
×
243
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId));
×
244
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId));
×
245
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId));
×
246
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage));
×
247
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status));
×
248
  tEndEncode(pEncoder);
×
249

250
_exit:
×
251
  if (code) {
×
252
    return code;
×
253
  } else {
254
    return pEncoder->pos;
×
255
  }
256
}
257

258
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
×
259
  int32_t code = 0;
×
260
  int32_t lino;
261

262
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
263
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
×
264
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
265
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId));
×
266
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
×
267
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId));
×
268
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId));
×
269
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId));
×
270
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage));
×
271
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status));
×
272
  tEndDecode(pDecoder);
×
273

274
_exit:
×
275
  return code;
×
276
}
277

278
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
×
279
  int32_t code = 0;
×
280
  int32_t lino;
281

282
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
283
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage));
×
284
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId));
×
285
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId));
×
286
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
287
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
288
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
289
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
290
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
×
291
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId));
×
292
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
×
293
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId));
×
294
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum));
×
295
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
×
296

297
  if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
×
298
    uError("invalid dispatch req msg");
×
299
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
300
  }
301

302
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
303
    int32_t* pLen = taosArrayGet(pReq->dataLen, i);
×
304
    void*    data = taosArrayGetP(pReq->data, i);
×
305
    if (data == NULL || pLen == NULL) {
×
306
      TAOS_CHECK_EXIT(terrno);
×
307
    }
308

309
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen));
×
310
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen));
×
311
  }
312
  tEndEncode(pEncoder);
×
313
_exit:
×
314
  if (code) {
×
315
    return code;
×
316
  } else {
317
    return pEncoder->pos;
×
318
  }
319
}
320

321
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
×
322
  int32_t code = 0;
×
323
  int32_t lino;
324

325
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
326
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage));
×
327
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId));
×
328
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId));
×
329
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
330
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
331
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
332
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type));
×
333
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
×
334
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId));
×
335
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
×
336
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId));
×
337
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum));
×
338
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen));
×
339

340
  if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) {
×
341
    TAOS_CHECK_EXIT(terrno);
×
342
  }
343
  if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) {
×
344
    TAOS_CHECK_EXIT(terrno);
×
345
  }
346
  for (int32_t i = 0; i < pReq->blockNum; i++) {
×
347
    int32_t  len1;
348
    uint64_t len2;
349
    void*    data;
350
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1));
×
351
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2));
×
352

353
    if (len1 != len2) {
×
354
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
355
    }
356

357
    if (taosArrayPush(pReq->dataLen, &len1) == NULL) {
×
358
      TAOS_CHECK_EXIT(terrno);
×
359
    }
360

361
    if (taosArrayPush(pReq->data, &data) == NULL) {
×
362
      TAOS_CHECK_EXIT(terrno);
×
363
    }
364
  }
365

366
  tEndDecode(pDecoder);
×
367
_exit:
×
368
  return code;
×
369
}
370

371
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
×
372
  taosArrayDestroyP(pReq->data, NULL);
×
373
  taosArrayDestroy(pReq->dataLen);
×
374
}
×
375

376
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
×
377
  int32_t code = 0;
×
378
  int32_t lino;
379

380
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
381
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
382
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
383
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId));
×
384
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId));
×
385
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId));
×
386
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId));
×
387
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen));
×
388
  tEndEncode(pEncoder);
×
389

390
_exit:
×
391
  if (code) {
×
392
    return code;
×
393
  } else {
394
    return pEncoder->pos;
×
395
  }
396
}
397

398
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
×
399
  int32_t code = 0;
×
400
  int32_t lino;
401

402
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
403
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
404
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
405
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId));
×
406
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId));
×
407
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId));
×
408
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId));
×
409
  uint64_t len = 0;
×
410
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len));
×
411
  pReq->retrieveLen = (int32_t)len;
×
412
  tEndDecode(pDecoder);
×
413

414
_exit:
×
415
  return code;
×
416
}
417

418
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
×
419

420

421
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
×
422
  int32_t code = 0;
×
423
  int32_t lino = 0;
×
424
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
×
425
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
×
426
  switch (pReq->type) {
×
427
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
×
428
      if (pReq->cont.fullTableNames) {
×
429
        int32_t num = taosArrayGetSize(pReq->cont.fullTableNames);
×
430
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
×
431
        for (int32_t i = 0; i < num; ++i) {
×
432
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.fullTableNames, i);
×
433
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
×
434
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
×
435
        }
436
      } else {
437
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
438
      }
439
      break;
×
440
    }
441
    default:
×
442
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
443
      break;
×
444
  }
445

446
_exit:
×
447

448
  return code;
×
449
}
450

451
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
×
452
  if (NULL == pReq) {
×
453
    return;
×
454
  }
455

456
  taosArrayDestroy(pReq->cont.fullTableNames);
×
457
}
458

459

460
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
×
461
  *ppDst = NULL;
×
462
  
463
  if (NULL == pSrc) {
×
464
    return TSDB_CODE_SUCCESS;
×
465
  }
466

467
  int32_t code = 0, lino = 0;
×
468
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
×
469
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
×
470

471
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
×
472
  if (pSrc->cont.fullTableNames) {
×
473
    (*ppDst)->cont.fullTableNames = taosArrayDup(pSrc->cont.fullTableNames, NULL);
×
474
    TSDB_CHECK_NULL((*ppDst)->cont.fullTableNames, code, lino, _exit, terrno);
×
475
  }
476
  
477
_exit:
×
478

479
  if (code) {
×
480
    tFreeSStreamMgmtReq(*ppDst);
×
481
    taosMemoryFreeClear(*ppDst);
×
482
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
483
  }
484
  
485
  return code;
×
486
}
487

488

489
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
×
490
  int32_t code = 0;
×
491
  int32_t lino = 0;
×
492

493
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
×
494
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
×
495
  switch (pReq->type) {
×
496
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
×
497
      int32_t num = 0;
×
498
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
×
499
      if (num > 0) {
×
500
        pReq->cont.fullTableNames = taosArrayInit(num, sizeof(SStreamDbTableName));
×
501
        TSDB_CHECK_NULL(pReq->cont.fullTableNames, code, lino, _exit, terrno);
×
502
        for (int32_t i = 0; i < num; ++i) {
×
503
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.fullTableNames, 1);
×
504
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
×
505
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
×
506
        }
507
      }
508
      break;
×
509
    }
510
    default:
×
511
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
×
512
      break;
×
513
  }
514

515
_exit:
×
516

517
  return code;  
×
518
}
519

520
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
×
521
  int32_t code = 0;
×
522
  int32_t lino;
523

524
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
×
525
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
×
526
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
×
527

528
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
×
529
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
×
530
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
×
531
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
×
532
  // SKIP SESSIONID
533
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
×
534
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
×
535
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
×
536
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
×
537
  if (pTask->pMgmtReq) {
×
538
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
×
539
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
×
540
  } else {
541
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
×
542
  }
543

544
_exit:
×
545

546
  return code;
×
547
}
548

549

550
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
×
551
  int32_t code = 0;
×
552
  int32_t lino;
553

554
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
×
555
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
×
556
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
×
557
  
558
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
×
559
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
×
560
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
×
561
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
×
562
  // SKIP SESSIONID
563
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
×
564
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
×
565
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
×
566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
×
567
  int32_t req = 0;
×
568
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
×
569
  if (req) {
×
570
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
×
571
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
×
572
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
×
573
  }
574

575
_exit:
×
576

577
  return code;
×
578
}
579

580
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
×
581
  int32_t code = 0;
×
582
  int32_t lino;
583

584
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
×
585
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
×
586
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
×
587
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
×
588

589
_exit:
×
590

591
  return code;
×
592
}
593

594
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
×
595
  int32_t code = 0;
×
596
  int32_t lino;
597

598
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
×
599
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
×
600
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
×
601
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
×
602

603
_exit:
×
604

605
  return code;
×
606
}
607

608

609
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
×
610
  int32_t code = 0;
×
611
  int32_t lino;
612

613
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
×
614
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
×
615
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
×
616
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
×
617
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
×
618

619
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
×
620
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
×
621
  for (int32_t i = 0; i < recalcNum; ++i) {
×
622
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
623
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
×
624
  }
625

626
_exit:
×
627

628
  return code;
×
629
}
630

631
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
×
632
  int32_t code = 0;
×
633
  int32_t lino;
634

635
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
×
636
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
×
637
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
×
638
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
×
639
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
×
640

641
  int32_t recalcNum = 0;
×
642
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
×
643
  if (recalcNum > 0) {
×
644
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
×
645
    if (NULL == pStatus->userRecalcs) {
×
646
      code = terrno;
×
647
      goto _exit;
×
648
    }
649
  }
650

651
  for (int32_t i = 0; i < recalcNum; ++i) {
×
652
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
×
653
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
×
654
  }
655

656
_exit:
×
657

658
  return code;
×
659
}
660

661

662
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
94✔
663
  int32_t code = 0;
94✔
664
  int32_t lino;
665

666
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
94!
667
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
188!
668
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
188!
669
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
188!
670
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
188!
671

672
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
94✔
673
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
94!
674
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
760✔
675
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
666✔
676
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
1,332!
677
  }
678
  
679
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
94✔
680
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
94!
681
  for (int32_t i = 0; i < statusNum; ++i) {
94!
682
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
×
683
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
×
684
  }
685

686
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
94✔
687
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
94!
688
  for (int32_t i = 0; i < reqNum; ++i) {
94!
689
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
×
690
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
×
691
  }
692

693
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
94✔
694
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
94!
695
  for (int32_t i = 0; i < triggerNum; ++i) {
94!
696
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
×
697
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
×
698
  }
699
  
700
  tEndEncode(pEncoder);
94✔
701

702
_exit:
94✔
703
  if (code) {
94!
704
    return code;
×
705
  } else {
706
    return pEncoder->pos;
94✔
707
  }
708
}
709

710
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
35✔
711
  int32_t code = 0;
35✔
712
  int32_t lino;
713

714
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
35!
715
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
70!
716
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
70!
717
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
70!
718
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
70!
719

720
  int32_t vgLearderNum = 0;
35✔
721
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
35!
722
  if (vgLearderNum > 0) {
35!
723
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
35✔
724
    if (NULL == pReq->pVgLeaders) {
35!
725
      code = terrno;
×
726
      goto _exit;
×
727
    }
728
  }
729
  for (int32_t i = 0; i < vgLearderNum; ++i) {
338✔
730
    int32_t vgId = 0;
303✔
731
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
303!
732
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
606!
733
      code = terrno;
×
734
      goto _exit;
×
735
    }
736
  }
737

738

739
  int32_t statusNum = 0;
35✔
740
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
35!
741
  if (statusNum > 0) {
35!
742
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
×
743
    if (NULL == pReq->pStreamStatus) {
×
744
      code = terrno;
×
745
      goto _exit;
×
746
    }
747
  }
748
  for (int32_t i = 0; i < statusNum; ++i) {
35!
749
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
×
750
    if (NULL == pTask) {
×
751
      code = terrno;
×
752
      goto _exit;
×
753
    }
754
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
×
755
  }
756

757

758
  int32_t reqNum = 0;
35✔
759
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
35!
760
  if (reqNum > 0) {
35!
761
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
×
762
    if (NULL == pReq->pStreamReq) {
×
763
      code = terrno;
×
764
      goto _exit;
×
765
    }
766
  }
767
  for (int32_t i = 0; i < reqNum; ++i) {
35!
768
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
×
769
    if (NULL == pIdx) {
×
770
      code = terrno;
×
771
      goto _exit;
×
772
    }
773
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
×
774
  }
775

776

777
  int32_t triggerNum = 0;
35✔
778
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
35!
779
  if (triggerNum > 0) {
35!
780
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
×
781
    if (NULL == pReq->pTriggerStatus) {
×
782
      code = terrno;
×
783
      goto _exit;
×
784
    }
785
  }
786
  for (int32_t i = 0; i < triggerNum; ++i) {
35!
787
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
×
788
    if (NULL == pStatus) {
×
789
      code = terrno;
×
790
      goto _exit;
×
791
    }
792
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
×
793
  }
794

795
  
796
  tEndDecode(pDecoder);
35✔
797

798
_exit:
35✔
799
  return code;
35✔
800
}
801

802
void tFreeSSTriggerRuntimeStatus(void* param) {
×
803
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
×
804
  if (NULL == pStatus) {
×
805
    return;
×
806
  }
807
  taosArrayDestroy(pStatus->userRecalcs);
×
808
}
809

810
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
280✔
811
  if (pMsg == NULL) {
280!
812
    return;
×
813
  }
814

815
  taosArrayDestroy(pMsg->pVgLeaders);
280✔
816
  if (deepClean) {
280✔
817
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
35✔
818
    for (int32_t i = 0; i < reqNum; ++i) {
35!
819
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
×
820
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
×
821
      if (NULL == pTask) {
×
822
        continue;
×
823
      }
824

825
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
×
826
      taosMemoryFree(pTask->pMgmtReq);
×
827
    }
828
  }
829
  taosArrayDestroy(pMsg->pStreamReq);
280✔
830
  taosArrayDestroy(pMsg->pStreamStatus);
280✔
831
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
280✔
832
}
833

834
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
×
835
  int32_t code = 0;
×
836
  int32_t lino;
837

838
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
×
839
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
×
840
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
×
841
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
×
842
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
×
843
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
×
844
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
×
845
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
846
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
×
847
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
×
848

849
_exit:
×
850

851
  return code;
×
852
}
853

854
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
×
855
  int32_t code = 0;
×
856
  int32_t lino;
857

858
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
×
859
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
×
860

861
_exit:
×
862

863
  return code;
×
864
}
865

866

867
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
×
868
  int32_t code = 0;
×
869
  int32_t lino;
870

871
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
×
872
  if (pMsg->triggerReader) {
×
873
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
×
874
  } else {
875
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
×
876
  }
877
  
878
_exit:
×
879

880
  return code;
×
881
}
882

883
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
×
884
  int32_t code = 0;
×
885
  int32_t lino;
886

887
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
×
888
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
×
889
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
×
890

891
_exit:
×
892

893
  return code;
×
894
}
895

896
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
×
897
  int32_t code = 0;
×
898
  int32_t lino;
899

900
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
×
901
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
×
902

903
_exit:
×
904

905
  return code;
×
906
}
907

908

909
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
×
910
  int32_t code = 0;
×
911
  int32_t lino;
912

913
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
×
914
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
×
915
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
×
916
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
×
917
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
×
918
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
×
919
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->hasPartitionBy));
×
920
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
×
921

922
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
×
923
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
×
924
  for (int32_t i = 0; i < addrSize; ++i) {
×
925
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
×
926
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
×
927
  }
928
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
×
929
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
×
930
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
×
931

932
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
×
933
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
×
934
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
×
935
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
×
936

937
  switch (pMsg->triggerType) {
×
938
    case WINDOW_TYPE_SESSION: {
×
939
      // session trigger
940
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
×
941
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
×
942
      break;
×
943
    }
944
    case WINDOW_TYPE_STATE: {
×
945
      // state trigger
946
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
×
947
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
×
948
      break;
×
949
    }
950
    case WINDOW_TYPE_INTERVAL: {
×
951
      // slide trigger
952
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
×
953
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
×
954
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
×
955
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
×
956
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
×
957
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
×
958
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
×
959
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
×
960
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
×
961
      break;
×
962
    }
963
    case WINDOW_TYPE_EVENT: {
×
964
      // event trigger
965
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
×
966
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
×
967

968
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
×
969
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
×
970

971
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
×
972
      break;
×
973
    }
974
    case WINDOW_TYPE_COUNT: {
×
975
      // count trigger
976
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
×
977
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
×
978

979
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
×
980
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
×
981
      break;
×
982
    }
983
    case WINDOW_TYPE_PERIOD: {
×
984
      // period trigger
985
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
×
986
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
×
987
      break;
×
988
    }
989
    default:
×
990
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
991
      break;
×
992
  }
993

994
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
×
995
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
×
996
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
×
997
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
×
998
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
×
999
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
×
1000
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
×
1001
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
×
1002
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
×
1003
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
×
1004

1005
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
×
1006
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
×
1007
  for (int32_t i = 0; i < readerNum; ++i) {
×
1008
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
×
1009
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
×
1010
  }
1011

1012
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
×
1013
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
1014
  for (int32_t i = 0; i < runnerNum; ++i) {
×
1015
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
×
1016
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
×
1017
  }
1018

1019
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
×
1020
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
×
1021

1022
_exit:
×
1023

1024
  return code;
×
1025
}
1026

1027

1028
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
×
1029
  int32_t code = 0;
×
1030
  int32_t lino;
1031

1032
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
×
1033
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
×
1034
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
×
1035
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
×
1036
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
×
1037
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
×
1038

1039
_exit:
×
1040

1041
  return code;
×
1042
}
1043

1044

1045
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
×
1046
  int32_t code = 0;
×
1047
  int32_t lino;
1048

1049
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
×
1050
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
×
1051
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
×
1052
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
×
1053
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
×
1054
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
×
1055
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
×
1056
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
×
1057

1058
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
×
1059
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
×
1060
  for (int32_t i = 0; i < addrSize; ++i) {
×
1061
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
×
1062
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
×
1063
  }
1064
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyErrorHandle));
×
1065

1066
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
×
1067
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
×
1068
  for (int32_t i = 0; i < outColNum; ++i) {
×
1069
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
×
1070
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
×
1071
  }
1072

1073
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
×
1074
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
×
1075
  for (int32_t i = 0; i < outTagNum; ++i) {
×
1076
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
×
1077
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
×
1078
  }
1079

1080
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
×
1081
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
×
1082

1083
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
×
1084
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
×
1085

1086
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
×
1087
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
×
1088
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
×
1089
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
×
1090
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
×
1091

1092
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
×
1093
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
×
1094
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
×
1095
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
×
1096
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
×
1097
  }
1098

1099
_exit:
×
1100

1101
  return code;
×
1102
}
1103

1104

1105
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
×
1106
  int32_t code = 0;
×
1107
  int32_t lino;
1108

1109
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
×
1110
  switch (pTask->task.type) {
×
1111
    case STREAM_READER_TASK:
×
1112
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
×
1113
      break;
×
1114
    case STREAM_TRIGGER_TASK:
×
1115
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
×
1116
      break;
×
1117
    case STREAM_RUNNER_TASK:
×
1118
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
×
1119
      break;
×
1120
    default:
×
1121
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1122
      break;
×
1123
  }
1124
  
1125
_exit:
×
1126

1127
  return code;
×
1128
}
1129

1130

1131
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
×
1132
  int32_t code = 0;
×
1133
  int32_t lino;
1134

1135
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
×
1136

1137
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
×
1138
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
×
1139
  for (int32_t i = 0; i < readerNum; ++i) {
×
1140
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
×
1141
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
×
1142
  }
1143

1144
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
×
1145
  if (pStream->triggerTask) {
×
1146
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
×
1147
  }
1148
  
1149
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
×
1150
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
1151
  for (int32_t i = 0; i < runnerNum; ++i) {
×
1152
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
×
1153
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
×
1154
  }
1155

1156
_exit:
×
1157

1158
  return code;
×
1159
}
1160

1161
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
×
1162
  int32_t code = 0;
×
1163
  int32_t lino = 0;
×
1164

1165
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
×
1166

1167
_exit:
×
1168
  return code;
×
1169
}
1170

1171
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
×
1172
  int32_t code = 0;
×
1173
  int32_t lino;
1174

1175
  int32_t type = 0;
×
1176
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
×
1177
  pMsg->msgType = type;
×
1178

1179
_exit:
×
1180
  return code;
×
1181
}
1182

1183
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
×
1184
  int32_t code = 0;
×
1185
  int32_t lino;
1186

1187
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
×
1188

1189
_exit:
×
1190

1191
  return code;
×
1192
}
1193

1194
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
×
1195
  int32_t code = 0;
×
1196
  int32_t lino;
1197

1198
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
×
1199
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
×
1200

1201
_exit:
×
1202

1203
  return code;
×
1204
}
1205

1206
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
×
1207
  int32_t code = 0;
×
1208
  int32_t lino;
1209

1210
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
×
1211
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
×
1212
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
×
1213

1214
_exit:
×
1215

1216
  return code;
×
1217
}
1218

1219
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
×
1220
  int32_t code = 0;
×
1221
  int32_t lino;
1222

1223
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
×
1224
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
×
1225

1226
_exit:
×
1227

1228
  return code;
×
1229
}
1230

1231

1232
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
×
1233
  int32_t code = 0;
×
1234
  int32_t lino;
1235

1236
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
×
1237
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
×
1238
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
×
1239

1240
_exit:
×
1241

1242
  return code;
×
1243
}
1244

1245
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
×
1246
  int32_t code = 0;
×
1247
  int32_t lino;
1248

1249
  switch (msgType) {
×
1250
    case STREAM_MSG_ORIGTBL_READER_INFO: {
×
1251
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
×
1252
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
×
1253

1254
      for (int32_t i = 0; i < vgNum; ++i) {
×
1255
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
×
1256
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
×
1257
      }
1258

1259
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
×
1260
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
×
1261
      
1262
      for (int32_t i = 0; i < readerNum; ++i) {
×
1263
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
×
1264
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
×
1265
      }
1266
      break;
×
1267
    }
1268
    case STREAM_MSG_UPDATE_RUNNER: {
×
1269
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
×
1270
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
×
1271
      
1272
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1273
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
×
1274
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
×
1275
      }
1276
      break;
×
1277
    }
1278
    case STREAM_MSG_USER_RECALC:{
×
1279
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
×
1280
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
×
1281
      
1282
      for (int32_t i = 0; i < recalcNum; ++i) {
×
1283
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
×
1284
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
×
1285
      }
1286
      break;
×
1287
    }
1288
    default:
×
1289
      break;
×
1290
  }
1291

1292
_exit:
×
1293

1294
  return code;
×
1295
}
1296

1297
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
×
1298
  int32_t code = 0;
×
1299
  int32_t lino;
1300

1301
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
×
1302
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
×
1303
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
×
1304
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
×
1305
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
×
1306

1307
_exit:
×
1308

1309
  return code;
×
1310
}
1311

1312

1313
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
70✔
1314
  int32_t code = 0;
70✔
1315
  int32_t lino;
1316

1317
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
70!
1318
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
140!
1319
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
70✔
1320
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
70!
1321
  for (int32_t i = 0; i < deployNum; ++i) {
70!
1322
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
×
1323
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
×
1324
  }
1325

1326
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
70✔
1327
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
70!
1328
  for (int32_t i = 0; i < startNum; ++i) {
70!
1329
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
×
1330
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
×
1331
  }
1332

1333
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
140!
1334
  if (!pRsp->undeploy.undeployAll) {
70!
1335
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
70✔
1336
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
70!
1337
    for (int32_t i = 0; i < undeployNum; ++i) {
70!
1338
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
×
1339
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
×
1340
    }
1341
  }
1342

1343
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
70✔
1344
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
70!
1345
  for (int32_t i = 0; i < rspNum; ++i) {
70!
1346
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
×
1347
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
×
1348
  }
1349
  
1350
_exit:
70✔
1351

1352
  tEndEncode(pEncoder);
70✔
1353

1354
  return code;
70✔
1355
}
1356

1357
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
×
1358
  int32_t code = 0;
×
1359
  int32_t lino;
1360

1361
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
×
1362
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
×
1363
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
×
1364
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
×
1365
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
×
1366
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
×
1367
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
×
1368
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
×
1369
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
×
1370

1371
_exit:
×
1372

1373
  return code;
×
1374
}
1375

1376

1377
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
×
1378
  int32_t code = 0;
×
1379
  int32_t lino;
1380

1381
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
×
1382
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
×
1383

1384
_exit:
×
1385

1386
  return code;
×
1387
}
1388

1389

1390
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
×
1391
  int32_t code = 0;
×
1392
  int32_t lino;
1393

1394
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
×
1395
  if (pMsg->triggerReader) {
×
1396
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
×
1397
  } else {
1398
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
×
1399
  }
1400
  
1401
_exit:
×
1402

1403
  return code;
×
1404
}
1405

1406

1407
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
×
1408
  int32_t code = 0;
×
1409
  int32_t lino;
1410

1411
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
×
1412
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
×
1413
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
×
1414

1415
_exit:
×
1416

1417
  return code;
×
1418
}
1419

1420

1421
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
×
1422
  int32_t code = 0;
×
1423
  int32_t lino;
1424

1425
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
×
1426
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
×
1427

1428
_exit:
×
1429

1430
  return code;
×
1431
}
1432

1433

1434
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
×
1435
  int32_t code = 0;
×
1436
  int32_t lino;
1437

1438
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
×
1439
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
×
1440
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
×
1441
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
×
1442
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
×
1443
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
×
1444
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->hasPartitionBy));
×
1445
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
×
1446

1447
  int32_t addrSize = 0;
×
1448
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
×
1449
  if (addrSize > 0) {
×
1450
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
×
1451
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
×
1452
  }
1453
  for (int32_t i = 0; i < addrSize; ++i) {
×
1454
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
×
1455
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
×
1456
  }
1457
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
×
1458
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
×
1459
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
×
1460

1461
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
×
1462
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
×
1463
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
×
1464
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
×
1465

1466
  switch (pMsg->triggerType) {
×
1467
    case WINDOW_TYPE_SESSION:
×
1468
      // session trigger
1469
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
×
1470
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
×
1471
      break;
×
1472
    case WINDOW_TYPE_STATE:
×
1473
      // state trigger
1474
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
×
1475
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
×
1476
      break;
×
1477
    
1478
    case WINDOW_TYPE_INTERVAL:
×
1479
      // slide trigger
1480
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
×
1481
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
×
1482
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
×
1483
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
×
1484
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
×
1485
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
×
1486
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
×
1487
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
×
1488
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
×
1489
      break;
×
1490
    
1491
    case WINDOW_TYPE_EVENT:
×
1492
      // event trigger
1493
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
×
1494
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
×
1495
      
1496
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
×
1497
      break;
×
1498
    
1499
    case WINDOW_TYPE_COUNT:
×
1500
      // count trigger
1501
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
×
1502
      
1503
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
×
1504
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
×
1505
      break;
×
1506
    
1507
    case WINDOW_TYPE_PERIOD:
×
1508
      // period trigger
1509
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
×
1510
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
×
1511
      break;
×
1512
    default:
×
1513
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1514
      break;
×
1515
  }
1516

1517
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
×
1518
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
×
1519
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
×
1520
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
×
1521
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
×
1522
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
×
1523
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
×
1524

1525
  int32_t readerNum = 0;
×
1526
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
×
1527
  if (readerNum > 0) {
×
1528
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
×
1529
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
×
1530
  }
1531
  for (int32_t i = 0; i < readerNum; ++i) {
×
1532
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
×
1533
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
×
1534
  }
1535

1536
  int32_t runnerNum = 0;
×
1537
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
×
1538
  if (runnerNum > 0) {
×
1539
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1540
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
×
1541
  }
1542
  for (int32_t i = 0; i < runnerNum; ++i) {
×
1543
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
×
1544
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
×
1545
  }
1546

1547
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
×
1548
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
×
1549

1550
_exit:
×
1551

1552
  return code;
×
1553
}
1554

1555

1556

1557
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
×
1558
  int32_t code = 0;
×
1559
  int32_t lino;
1560

1561
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
×
1562
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
×
1563
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
×
1564
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
×
1565
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
×
1566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
×
1567

1568
_exit:
×
1569

1570
  return code;
×
1571
}
1572

1573
void destroySStreamOutCols(void* p){
×
1574
  if (p == NULL) return;
×
1575
  SStreamOutCol* col = (SStreamOutCol*)p;
×
1576
  taosMemoryFreeClear(col->expr);
×
1577
}
1578

1579
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
×
1580
  int32_t code = 0;
×
1581
  int32_t lino;
1582

1583
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
×
1584
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
×
1585
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
×
1586
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
×
1587
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
×
1588
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
×
1589
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
×
1590
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
×
1591

1592
  int32_t addrSize = 0;
×
1593
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
×
1594
  if (addrSize > 0) {
×
1595
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
×
1596
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
×
1597
  }
1598
  for (int32_t i = 0; i < addrSize; ++i) {
×
1599
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
×
1600
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
×
1601
  }
1602
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyErrorHandle));
×
1603

1604
  int32_t outColNum = 0;
×
1605
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
×
1606
  if (outColNum > 0) {
×
1607
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
×
1608
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
×
1609
  }
1610
  for (int32_t i = 0; i < outColNum; ++i) {
×
1611
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
×
1612
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
×
1613
  }
1614

1615
  int32_t outTagNum = 0;
×
1616
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
×
1617
  if (outTagNum > 0) {
×
1618
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
×
1619
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
×
1620
  }
1621
  for (int32_t i = 0; i < outTagNum; ++i) {
×
1622
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
×
1623
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
×
1624
  }
1625

1626
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
×
1627
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
×
1628

1629
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
×
1630
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
×
1631

1632
  int32_t forceOutColsSize = 0;
×
1633
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
×
1634
  if (forceOutColsSize > 0) {
×
1635
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
×
1636
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
×
1637
  }
1638
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
×
1639
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
×
1640

1641
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
×
1642
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
×
1643
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
×
1644
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
×
1645
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
×
1646
  }
1647

1648
_exit:
×
1649

1650
  return code;
×
1651
}
1652

1653
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
×
1654
  int32_t code = 0;
×
1655
  int32_t lino;
1656

1657
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
×
1658
  switch (pTask->task.type) {
×
1659
    case STREAM_READER_TASK:
×
1660
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
×
1661
      break;
×
1662
    case STREAM_TRIGGER_TASK:
×
1663
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
×
1664
      break;
×
1665
    case STREAM_RUNNER_TASK:
×
1666
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
×
1667
      break;
×
1668
    default:
×
1669
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1670
      break;
×
1671
  }
1672
  
1673
_exit:
×
1674

1675
  return code;
×
1676
}
1677

1678

1679
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
×
1680
  int32_t code = 0;
×
1681
  int32_t lino;
1682

1683
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
×
1684

1685
  int32_t readerNum = 0;
×
1686
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
×
1687
  if (readerNum > 0) {
×
1688
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
×
1689
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
×
1690
  }
1691
  for (int32_t i = 0; i < readerNum; ++i) {
×
1692
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
×
1693
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
×
1694
  }
1695

1696
  int32_t triggerTask = 0;
×
1697
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
×
1698
  if (triggerTask) {
×
1699
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
×
1700
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
×
1701
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
×
1702
  }
1703
  
1704
  int32_t runnerNum = 0;
×
1705
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
×
1706
  if (runnerNum > 0) {
×
1707
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
×
1708
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
×
1709
  }
1710
  for (int32_t i = 0; i < runnerNum; ++i) {
×
1711
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
×
1712
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
×
1713
  }
1714

1715
_exit:
×
1716

1717
  return code;
×
1718
}
1719

1720

1721
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
×
1722
  int32_t code = 0;
×
1723
  int32_t lino;
1724

1725
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
×
1726

1727
_exit:
×
1728

1729
  return code;
×
1730
}
1731

1732

1733
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
×
1734
  int32_t code = 0;
×
1735
  int32_t lino;
1736

1737
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
×
1738
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
×
1739

1740
_exit:
×
1741

1742
  return code;
×
1743
}
1744

1745

1746
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
×
1747
  int32_t code = 0;
×
1748
  int32_t lino;
1749

1750
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
×
1751
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
×
1752
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
×
1753

1754
_exit:
×
1755

1756
  return code;
×
1757
}
1758

1759

1760
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
×
1761
  int32_t code = 0;
×
1762
  int32_t lino;
1763

1764
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
×
1765
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
×
1766

1767
_exit:
×
1768

1769
  return code;
×
1770
}
1771

1772
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
×
1773
  int32_t code = 0;
×
1774
  int32_t lino;
1775

1776
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
×
1777
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
×
1778
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
×
1779

1780
_exit:
×
1781

1782
  return code;
×
1783
}
1784

1785
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
×
1786
  int32_t code = 0;
×
1787
  int32_t lino;
1788

1789
  switch (msgType) {
×
1790
    case STREAM_MSG_ORIGTBL_READER_INFO: {
×
1791
      int32_t vgNum = 0;
×
1792
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
×
1793
      if (vgNum > 0) {
×
1794
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
×
1795
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
×
1796
      }
1797
      for (int32_t i = 0; i < vgNum; ++i) {
×
1798
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
×
1799
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
×
1800
      }
1801

1802
      int32_t readerNum = 0;
×
1803
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
×
1804
      if (readerNum > 0) {
×
1805
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
×
1806
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
×
1807
      }
1808
      for (int32_t i = 0; i < readerNum; ++i) {
×
1809
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
×
1810
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
×
1811
      }
1812
      break;
×
1813
    }
1814
    case STREAM_MSG_UPDATE_RUNNER: {
×
1815
      int32_t runnerNum = 0;
×
1816
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
×
1817
      if (runnerNum > 0) {
×
1818
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
×
1819
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
×
1820
      }
1821
      for (int32_t i = 0; i < runnerNum; ++i) {
×
1822
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
×
1823
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
×
1824
      }
1825
      break;
×
1826
    }
1827
    case STREAM_MSG_USER_RECALC: {
×
1828
      int32_t recalcNum = 0;
×
1829
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
×
1830
      if (recalcNum > 0) {
×
1831
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
×
1832
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
×
1833
      }
1834
      for (int32_t i = 0; i < recalcNum; ++i) {
×
1835
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
×
1836
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
×
1837
      }
1838
      break;
×
1839
    }
1840
    default:
×
1841
      break;
×
1842
  }
1843

1844
_exit:
×
1845

1846
  return code;
×
1847
}
1848

1849

1850
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
×
1851
  int32_t code = 0;
×
1852
  int32_t lino;
1853

1854
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
×
1855
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
×
1856
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
×
1857
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
×
1858
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
×
1859

1860
_exit:
×
1861

1862
  return code;
×
1863
}
1864

1865
void tFreeSStreamMgmtRsp(void* param) {
×
1866
  if (NULL == param) {
×
1867
    return;
×
1868
  }
1869
  
1870
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
×
1871

1872
  taosArrayDestroy(pRsp->cont.vgIds);
×
1873
  taosArrayDestroy(pRsp->cont.readerList);
×
1874
  taosArrayDestroy(pRsp->cont.runnerList);
×
1875
  taosArrayDestroy(pRsp->cont.recalcList);
×
1876
}
1877

1878
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
×
1879
  if (NULL == pReader) {
×
1880
    return;
×
1881
  }
1882
  
1883
  if (pReader->triggerReader) {
×
1884
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
×
1885
    taosMemoryFree(pMsg->triggerTblName);
×
1886
    taosMemoryFree(pMsg->partitionCols);
×
1887
    taosMemoryFree(pMsg->triggerCols);
×
1888
    taosMemoryFree(pMsg->triggerScanPlan);
×
1889
    taosMemoryFree(pMsg->calcCacheScanPlan);
×
1890
  } else {
1891
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
×
1892
    taosMemoryFree(pMsg->calcScanPlan);
×
1893
  }
1894
}
1895

1896
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
×
1897
  if (NULL == pTrigger) {
×
1898
    return;
×
1899
  }
1900
  
1901
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, taosAutoMemoryFree);
×
1902
  switch (pTrigger->triggerType) {
×
1903
    case WINDOW_TYPE_EVENT:
×
1904
      taosMemoryFree(pTrigger->trigger.event.startCond);
×
1905
      taosMemoryFree(pTrigger->trigger.event.endCond);
×
1906
      break;
×
1907
    case WINDOW_TYPE_COUNT:
×
1908
      taosMemoryFree(pTrigger->trigger.count.condCols);  
×
1909
      break;
×
1910
    default:
×
1911
      break;
×
1912
  }
1913

1914
  taosMemoryFree(pTrigger->triggerPrevFilter);
×
1915
  taosMemoryFree(pTrigger->triggerScanPlan);
×
1916
  taosMemoryFree(pTrigger->calcCacheScanPlan);
×
1917

1918
  taosArrayDestroy(pTrigger->readerList);
×
1919
  taosArrayDestroy(pTrigger->runnerList);
×
1920
  taosMemoryFree(pTrigger->streamName);
×
1921
}
1922

1923
void tFreeSStreamOutCol(void* param) {
×
1924
  if (NULL == param) {
×
1925
    return;
×
1926
  }
1927

1928
  SStreamOutCol* pOut = (SStreamOutCol*)param;
×
1929
  taosMemoryFree(pOut->expr);
×
1930
}
1931

1932
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
×
1933
  if (NULL == pRunner) {
×
1934
    return;
×
1935
  }
1936

1937
  taosMemoryFree(pRunner->streamName);
×
1938
  taosMemoryFree(pRunner->pPlan);
×
1939
  taosMemoryFree(pRunner->outDBFName);
×
1940
  taosMemoryFree(pRunner->outTblName);
×
1941

1942
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, taosAutoMemoryFree);
×
1943
  taosArrayDestroy(pRunner->outCols);
×
1944
  taosArrayDestroy(pRunner->outTags);
×
1945

1946
  taosMemoryFree(pRunner->subTblNameExpr);
×
1947
  taosMemoryFree(pRunner->tagValueExpr);
×
1948
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
×
1949
}
1950

1951
void tFreeSStmTaskDeploy(void* param) {
×
1952
  if (NULL == param) {
×
1953
    return;
×
1954
  }
1955

1956
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
×
1957
  switch (pTask->task.type)  {
×
1958
    case STREAM_READER_TASK:
×
1959
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
×
1960
      break;
×
1961
    case STREAM_TRIGGER_TASK:
×
1962
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
×
1963
      break;
×
1964
    case STREAM_RUNNER_TASK:
×
1965
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
×
1966
      break;
×
1967
    default:
×
1968
      break;
×
1969
  }
1970
}
1971

1972
void tFreeSStmStreamDeploy(void* param) {
×
1973
  if (NULL == param) {
×
1974
    return;
×
1975
  }
1976
  
1977
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
×
1978
  taosArrayDestroy(pDeploy->readerTasks);
×
1979
  if (pDeploy->triggerTask) {
×
1980
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
×
1981
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
×
1982
    taosMemoryFree(pDeploy->triggerTask);
×
1983
  }
1984

1985
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
×
1986
  for (int32_t i = 0; i < runnerNum; ++i) {
×
1987
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
×
1988
    taosMemoryFree(pRunner->msg.runner.pPlan);
×
1989
  }
1990
  taosArrayDestroy(pDeploy->runnerTasks);
×
1991
}
1992

1993
void tDeepFreeSStmStreamDeploy(void* param) {
×
1994
  if (NULL == param) {
×
1995
    return;
×
1996
  }
1997
  
1998
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
×
1999
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
×
2000
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
×
2001
  taosMemoryFree(pDeploy->triggerTask);
×
2002
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
×
2003
}
2004

2005

2006
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
70✔
2007
  if (NULL == pRsp) {
70!
2008
    return;
×
2009
  }
2010
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
70✔
2011
  taosArrayDestroy(pRsp->start.taskList);
70✔
2012
  taosArrayDestroy(pRsp->undeploy.taskList);
70✔
2013
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
70✔
2014
}
2015

2016
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
35✔
2017
  if (NULL == pRsp) {
35!
2018
    return;
×
2019
  }
2020
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
35✔
2021
  taosArrayDestroy(pRsp->start.taskList);
35✔
2022
  taosArrayDestroy(pRsp->undeploy.taskList);
35✔
2023
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
35✔
2024
}
2025

2026

2027

2028
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
35✔
2029
  int32_t code = 0;
35✔
2030
  int32_t lino;
2031

2032
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
35!
2033
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
70!
2034
  int32_t deployNum = 0;
35✔
2035
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
35!
2036
  if (deployNum > 0) {
35!
2037
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
×
2038
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
×
2039
  }
2040
  for (int32_t i = 0; i < deployNum; ++i) {
35!
2041
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
×
2042
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
×
2043
  }
2044

2045
  int32_t startNum = 0;
35✔
2046
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
35!
2047
  if (startNum > 0) {
35!
2048
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
×
2049
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
×
2050
  }
2051
  for (int32_t i = 0; i < startNum; ++i) {
35!
2052
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
×
2053
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
×
2054
  }
2055

2056
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
70!
2057
  if (!pRsp->undeploy.undeployAll) {
35!
2058
    int32_t undeployNum = 0;
35✔
2059
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
35!
2060
    if (undeployNum > 0) {
35!
2061
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
×
2062
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
×
2063
    }
2064
    for (int32_t i = 0; i < undeployNum; ++i) {
35!
2065
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
×
2066
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
×
2067
    }
2068
  }  
2069

2070
  int32_t rspNum = 0;
35✔
2071
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
35!
2072
  if (rspNum > 0) {
35!
2073
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
×
2074
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
×
2075
    for (int32_t i = 0; i < rspNum; ++i) {
×
2076
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
×
2077
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
×
2078
    }
2079
  }
2080

2081
  tEndDecode(pDecoder);
35✔
2082

2083
_exit:
35✔
2084
  return code;
35✔
2085
}
2086

2087
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
×
2088
  int32_t code = 0;
×
2089
  int32_t lino;
2090

2091
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2092
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2093
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
×
2094
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
×
2095
  tEndEncode(pEncoder);
×
2096

2097
_exit:
×
2098
  return code;
×
2099
}
2100

2101
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
×
2102
  int32_t code = 0;
×
2103
  int32_t lino;
2104

2105
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2106
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2107
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
×
2108
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
×
2109
  tEndDecode(pDecoder);
×
2110

2111
_exit:
×
2112
  return code;
×
2113
}
2114

2115
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
×
2116
  int32_t code = 0;
×
2117
  int32_t lino;
2118

2119
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
×
2120
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2121
  tEndEncode(pEncoder);
×
2122

2123
_exit:
×
2124
  return code;
×
2125
}
2126

2127
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
×
2128
  int32_t code = 0;
×
2129
  int32_t lino;
2130

2131
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
×
2132
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2133
  tEndDecode(pDecoder);
×
2134

2135
_exit:
×
2136
  return code;
×
2137

2138
}
2139

2140

2141
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
×
2142
  int32_t code = 0;
×
2143
  int32_t lino;
2144

2145
  // name part
2146
  int32_t sqlLen = pReq->sql == NULL ? 0 : (int32_t)strlen(pReq->sql) + 1;
×
2147
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2148
  int32_t outDbLen = pReq->outDB == NULL ? 0 : (int32_t)strlen(pReq->outDB) + 1;
×
2149
  int32_t streamDBLen = pReq->streamDB == NULL ? 0 : (int32_t)strlen(pReq->streamDB) + 1;
×
2150
  int32_t triggerDBLen = pReq->triggerDB == NULL ? 0 : (int32_t)strlen(pReq->triggerDB) + 1;
×
2151
  int32_t triggerTblNameLen = pReq->triggerTblName == NULL ? 0 : (int32_t)strlen(pReq->triggerTblName) + 1;
×
2152
  int32_t outTblNameLen = pReq->outTblName == NULL ? 0 : (int32_t)strlen(pReq->outTblName) + 1;
×
2153

2154
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
2155

2156
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->name, nameLen));
×
2157
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->sql, sqlLen));
×
2158
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outDB, outDbLen));
×
2159
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->streamDB, streamDBLen));
×
2160
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerDB, triggerDBLen));
×
2161
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerTblName, triggerTblNameLen));
×
2162
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->outTblName, outTblNameLen));
×
2163

2164
  int32_t calcDbSize = (int32_t)taosArrayGetSize(pReq->calcDB);
×
2165
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcDbSize));
×
2166
  for (int32_t i = 0; i < calcDbSize; ++i) {
×
2167
    const char *dbName = taosArrayGetP(pReq->calcDB, i);
×
2168
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, dbName)));
×
2169
  }
2170

2171
  // trigger control part
2172
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igExists));
×
2173
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerType));
×
2174
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igDisorder));
×
2175
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteReCalc));
×
2176
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->deleteOutTbl));
×
2177
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistory));
×
2178
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->fillHistoryFirst));
×
2179
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->calcNotifyOnly));
×
2180
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->lowLatencyCalc));
×
2181
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->igNoDataTrigger));
×
2182

2183
  // notify part
2184
  int32_t addrSize = (int32_t)taosArrayGetSize(pReq->pNotifyAddrUrls);
×
2185
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
×
2186
  for (int32_t i = 0; i < addrSize; ++i) {
×
2187
    const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
×
2188
    TAOS_CHECK_EXIT((tEncodeCStr(pEncoder, url)));
×
2189
  }
2190
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyEventTypes));
×
2191
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->notifyErrorHandle));
×
2192
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->notifyHistory));
×
2193

2194
  // out table part
2195

2196
  // trigger cols and partition cols
2197
  int32_t filterColsLen = pReq->triggerFilterCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerFilterCols) + 1;
×
2198
  int32_t triggerColsLen = pReq->triggerCols == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerCols) + 1;
×
2199
  int32_t partitionColsLen = pReq->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pReq->partitionCols) + 1;
×
2200
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerFilterCols, filterColsLen));
×
2201
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerCols, triggerColsLen));
×
2202
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->partitionCols, partitionColsLen));
×
2203

2204
  // out col
2205
  int32_t outColSize = (int32_t )taosArrayGetSize(pReq->outCols);
×
2206
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColSize));
×
2207
  for (int32_t i = 0; i < outColSize; ++i) {
×
2208
    SFieldWithOptions *pField = taosArrayGet(pReq->outCols, i);
×
2209
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pField));
×
2210
  }
2211

2212
  // out tag
2213
  int32_t outTagSize = (int32_t )taosArrayGetSize(pReq->outTags);
×
2214
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagSize));
×
2215
  for (int32_t i = 0; i < outTagSize; ++i) {
×
2216
    SField *pField = taosArrayGet(pReq->outTags, i);
×
2217
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->type));
×
2218
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
×
2219
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
×
2220
    TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
×
2221
  }
2222

2223
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->maxDelay));
×
2224
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->fillHistoryStartTime));
×
2225
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->watermark));
×
2226
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->expiredTime));
×
2227

2228
  switch (pReq->triggerType) {
×
2229
    case WINDOW_TYPE_SESSION: {
×
2230
      // session trigger
2231
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.session.slotId));
×
2232
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.session.sessionVal));
×
2233
      break;
×
2234
    }
2235
    case WINDOW_TYPE_STATE: {
×
2236
      // state trigger
2237
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->trigger.stateWin.slotId));
×
2238
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.stateWin.trueForDuration));
×
2239
      break;
×
2240
    }
2241
    case WINDOW_TYPE_INTERVAL: {
×
2242
      // slide trigger
2243
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.intervalUnit));
×
2244
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.slidingUnit));
×
2245
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.offsetUnit));
×
2246
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.soffsetUnit));
×
2247
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.sliding.precision));
×
2248
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.interval));
×
2249
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.offset));
×
2250
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.sliding));
×
2251
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.sliding.soffset));
×
2252
      break;
×
2253
    }
2254
    case WINDOW_TYPE_EVENT: {
×
2255
      // event trigger
2256
      int32_t eventWindowStartCondLen = pReq->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.startCond) + 1;
×
2257
      int32_t eventWindowEndCondLen = pReq->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.event.endCond) + 1;
×
2258

2259
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.startCond, eventWindowStartCondLen));
×
2260
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.event.endCond, eventWindowEndCondLen));
×
2261

2262
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.event.trueForDuration));
×
2263
      break;
×
2264
    }
2265
    case WINDOW_TYPE_COUNT: {
×
2266
      // count trigger
2267
      int32_t countWindowCondColsLen = pReq->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pReq->trigger.count.condCols) + 1;
×
2268
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->trigger.count.condCols, countWindowCondColsLen));
×
2269

2270
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.countVal));
×
2271
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.count.sliding));
×
2272
      break;
×
2273
    }
2274
    case WINDOW_TYPE_PERIOD: {
×
2275
      // period trigger
2276
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.precision));
×
2277
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.periodUnit));
×
2278
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->trigger.period.offsetUnit));
×
2279
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.period));
×
2280
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->trigger.period.offset));
×
2281
      break;
×
2282
    }
2283
  }
2284

2285
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->triggerTblType));
×
2286
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblUid));
×
2287
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->triggerTblSuid));
×
2288
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->vtableCalc));
×
2289
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outTblType));
×
2290
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->outStbExists));
×
2291
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pReq->outStbUid));
×
2292
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outStbSversion));
×
2293
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->eventTypes));
×
2294
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->flags));
×
2295
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->tsmaId));
×
2296
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->placeHolderBitmap));
×
2297
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->calcTsSlotId));
×
2298
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pReq->triTsSlotId));
×
2299

2300
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->triggerTblVgId));
×
2301
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->outTblVgId));
×
2302

2303
  int32_t triggerScanPlanLen = pReq->triggerScanPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerScanPlan) + 1;
×
2304
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerScanPlan, triggerScanPlanLen));
×
2305

2306
  int32_t triggerFilterLen = pReq->triggerPrevFilter == NULL ? 0 : (int32_t)strlen((char*)pReq->triggerPrevFilter) + 1;
×
2307
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->triggerPrevFilter, triggerFilterLen));
×
2308

2309
  int32_t calcScanPlanListSize = (int32_t)taosArrayGetSize(pReq->calcScanPlanList);
×
2310
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, calcScanPlanListSize));
×
2311
  for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
×
2312
    SStreamCalcScan* pCalcScanPlan = (SStreamCalcScan*)taosArrayGet(pReq->calcScanPlanList, i);
×
2313
    int32_t          vgListSize = (int32_t)taosArrayGetSize(pCalcScanPlan->vgList);
×
2314
    int32_t          scanPlanLen = pCalcScanPlan->scanPlan == NULL ? 0 : (int32_t)strlen((char*)pCalcScanPlan->scanPlan) + 1;
×
2315
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgListSize));
×
2316
    for (int32_t j = 0; j < vgListSize; ++j) {
×
2317
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pCalcScanPlan->vgList, j)));
×
2318
    }
2319
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pCalcScanPlan->readFromCache));
×
2320
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCalcScanPlan->scanPlan, scanPlanLen));
×
2321
  }
2322

2323
  int32_t calcPlanLen = pReq->calcPlan == NULL ? 0 : (int32_t)strlen((char*)pReq->calcPlan) + 1;
×
2324
  int32_t subTblNameExprLen = pReq->subTblNameExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->subTblNameExpr) + 1;
×
2325
  int32_t tagValueExprLen = pReq->tagValueExpr == NULL ? 0 : (int32_t)strlen((char*)pReq->tagValueExpr) + 1;
×
2326

2327
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfCalcSubplan));
×
2328
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->calcPlan, calcPlanLen));
×
2329
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->subTblNameExpr, subTblNameExprLen));
×
2330
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pReq->tagValueExpr, tagValueExprLen));
×
2331

2332
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pReq->forceOutCols);
×
2333
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
×
2334
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
×
2335
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pReq->forceOutCols, i);
×
2336
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
×
2337

2338
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
×
2339
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
×
2340
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
×
2341
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
×
2342
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
×
2343
  }
2344

2345
_exit:
×
2346

2347
  if (code) {
×
2348
    return code;
×
2349
  }
2350
  
2351
  return 0;
×
2352
}
2353

2354
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
×
2355
  SEncoder encoder = {0};
×
2356
  tEncoderInit(&encoder, buf, bufLen);
×
2357
  int32_t code = 0;
×
2358
  int32_t lino;
2359

2360
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2361

2362
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
×
2363

2364
  tEndEncode(&encoder);
×
2365

2366
_exit:
×
2367
  if (code) {
×
2368
    tEncoderClear(&encoder);
×
2369
    return code;
×
2370
  } else {
2371
    int32_t tlen = encoder.pos;
×
2372
    tEncoderClear(&encoder);
×
2373
    return tlen;
×
2374
  }
2375
  return 0;
2376
}
2377

2378

2379
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
×
2380
  int32_t code = 0;
×
2381
  int32_t lino;
2382

2383
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
2384

2385
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
×
2386
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
×
2387
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
×
2388
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
×
2389
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
×
2390
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
×
2391
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
×
2392

2393
  int32_t calcDbSize = 0;
×
2394
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
×
2395
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
×
2396
  if (pReq->calcDB == NULL) {
×
2397
    TAOS_CHECK_EXIT(terrno);
×
2398
  }
2399
  for (int32_t i = 0; i < calcDbSize; ++i) {
×
2400
    char *calcDb = NULL;
×
2401
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
×
2402
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
×
2403
    if (calcDb == NULL) {
×
2404
      TAOS_CHECK_EXIT(terrno);
×
2405
    }
2406
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
×
2407
      taosMemoryFree(calcDb);
×
2408
      TAOS_CHECK_EXIT(terrno);
×
2409
    }
2410
  }
2411

2412
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
×
2413
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
×
2414
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
×
2415
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
×
2416
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
×
2417
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
×
2418
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
×
2419
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
×
2420
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
×
2421
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
×
2422

2423
  int32_t addrSize = 0;
×
2424
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
×
2425
  if (addrSize > 0) {
×
2426
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
×
2427
    if (pReq->pNotifyAddrUrls == NULL) {
×
2428
      TAOS_CHECK_EXIT(terrno);
×
2429
    }
2430
  }
2431
  for (int32_t i = 0; i < addrSize; ++i) {
×
2432
    char *url = NULL;
×
2433
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
×
2434
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
×
2435
    if (url == NULL) {
×
2436
      TAOS_CHECK_EXIT(terrno);
×
2437
    }
2438
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
×
2439
      taosMemoryFree(url);
×
2440
      TAOS_CHECK_EXIT(terrno);
×
2441
    }
2442
  }
2443
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
×
2444
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyErrorHandle));
×
2445
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
×
2446

2447
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
×
2448
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
×
2449
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
×
2450

2451
  int32_t outColSize = 0;
×
2452
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
×
2453
  if (outColSize > 0) {
×
2454
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
×
2455
    if (pReq->outCols == NULL) {
×
2456
      TAOS_CHECK_EXIT(terrno);
×
2457
    }
2458

2459
    for (int32_t i = 0; i < outColSize; ++i) {
×
2460
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
×
2461
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
×
2462
    }
2463
  }
2464

2465
  int32_t outTagSize = 0;
×
2466
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
×
2467
  if (outTagSize > 0) {
×
2468
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
×
2469
    if (pReq->outTags == NULL) {
×
2470
      TAOS_CHECK_EXIT(terrno);
×
2471
    }
2472

2473
    for (int32_t i = 0; i < outTagSize; ++i) {
×
2474
      SFieldWithOptions field = {0};
×
2475
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
×
2476
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
×
2477
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
×
2478
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
×
2479
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
×
2480
        TAOS_CHECK_EXIT(terrno);
×
2481
      }
2482
    }
2483
  }
2484

2485
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
×
2486
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
×
2487
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
×
2488
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
×
2489

2490
  switch (pReq->triggerType) {
×
2491
    case WINDOW_TYPE_SESSION: {
×
2492
      // session trigger
2493
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
×
2494
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
×
2495
      break;
×
2496
    }
2497
      case WINDOW_TYPE_STATE: {
×
2498
        // state trigger
2499
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
×
2500
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
×
2501
        break;
×
2502
      }
2503
      case WINDOW_TYPE_INTERVAL: {
×
2504
        // slide trigger
2505
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
×
2506
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
×
2507
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
×
2508
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
×
2509
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
×
2510
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
×
2511
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
×
2512
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
×
2513
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
×
2514
        break;
×
2515
      }
2516
      case WINDOW_TYPE_EVENT: {
×
2517
        // event trigger
2518
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
×
2519
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
×
2520
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
×
2521
        break;
×
2522
      }
2523
      case WINDOW_TYPE_COUNT: {
×
2524
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
×
2525

2526
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
×
2527
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
×
2528
        break;
×
2529
      }
2530
      case WINDOW_TYPE_PERIOD: {
×
2531
        // period trigger
2532
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
×
2533
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
×
2534
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
×
2535
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
×
2536
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
×
2537
        break;
×
2538
      }
2539
      default:
×
2540
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
2541
  }
2542

2543
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
×
2544
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
×
2545
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
×
2546
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
×
2547
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
×
2548
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
×
2549
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
×
2550
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
×
2551
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
×
2552
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
×
2553
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
×
2554
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
×
2555
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
×
2556
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
×
2557

2558
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
×
2559
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
×
2560

2561
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
×
2562

2563
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
×
2564

2565
  int32_t calcScanPlanListSize = 0;
×
2566
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
×
2567
  if (calcScanPlanListSize > 0) {
×
2568
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
×
2569
    if (pReq->calcScanPlanList == NULL) {
×
2570
      TAOS_CHECK_EXIT(terrno);
×
2571
    }
2572
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
×
2573
      SStreamCalcScan calcScan = {0};
×
2574
      int32_t         vgListSize = 0;
×
2575
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
×
2576
      if (vgListSize > 0) {
×
2577
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
×
2578
        if (calcScan.vgList == NULL) {
×
2579
          TAOS_CHECK_EXIT(terrno);
×
2580
        }
2581
        for (int32_t j = 0; j < vgListSize; ++j) {
×
2582
          int32_t vgId = 0;
×
2583
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
×
2584
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
×
2585
            TAOS_CHECK_EXIT(terrno);
×
2586
          }
2587
        }
2588
      }
2589
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
×
2590
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
×
2591
      taosArrayPush(pReq->calcScanPlanList, &calcScan);
×
2592
    }
2593
  }
2594

2595
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
×
2596
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
×
2597
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
×
2598
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
×
2599

2600
  int32_t forceOutColsSize = 0;
×
2601
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
×
2602
  if (forceOutColsSize > 0) {
×
2603
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
×
2604
    if (pReq->forceOutCols == NULL) {
×
2605
      TAOS_CHECK_EXIT(terrno);
×
2606
    }
2607
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
×
2608
      SStreamOutCol outCol = {0};
×
2609
      int64_t       exprLen = 0;
×
2610
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
×
2611
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
×
2612
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
×
2613
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
×
2614
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
×
2615
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
×
2616
        TAOS_CHECK_EXIT(terrno);
×
2617
      }
2618
    }
2619
  }
2620

2621
_exit:
×
2622

2623
  return code;
×
2624
}
2625

2626

2627
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
×
2628
  SDecoder decoder = {0};
×
2629
  tDecoderInit(&decoder, buf, bufLen);
×
2630
  int32_t code = 0;
×
2631
  int32_t lino;
2632

2633
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
2634
  
2635
  TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImpl(&decoder, pReq));
×
2636

2637
  tEndDecode(&decoder);
×
2638

2639
_exit:
×
2640

2641
  tDecoderClear(&decoder);
×
2642
  return code;
×
2643
}
2644

2645

2646
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
×
2647
  int32_t  code = 0;
×
2648
  int32_t  lino;
2649
  int32_t  tlen;
2650
  SEncoder encoder = {0};
×
2651
  tEncoderInit(&encoder, buf, bufLen);
×
2652

2653
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2654

2655
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2656
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2657
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2658

2659
  tEndEncode(&encoder);
×
2660

2661
_exit:
×
2662
  if (code) {
×
2663
    tlen = code;
×
2664
  } else {
2665
    tlen = encoder.pos;
×
2666
  }
2667
  tEncoderClear(&encoder);
×
2668
  return tlen;
×
2669
}
2670

2671
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
×
2672
  SDecoder decoder = {0};
×
2673
  int32_t  code = 0;
×
2674
  int32_t  lino;
2675
  tDecoderInit(&decoder, buf, bufLen);
×
2676

2677
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
2678
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
×
2679
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
×
2680

2681
  tEndDecode(&decoder);
×
2682

2683
_exit:
×
2684
  tDecoderClear(&decoder);
×
2685
  return code;
×
2686
}
2687

2688
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
×
2689
  taosMemoryFreeClear(pReq->name);
×
2690
}
×
2691

2692
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
×
2693
  if (pScan == NULL) {
×
2694
    return;
×
2695
  }
2696
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
×
2697
  taosArrayDestroy(pCalcScan->vgList);
×
2698
  taosMemoryFreeClear(pCalcScan->scanPlan);
×
2699
}
2700

2701
void tFreeStreamOutCol(void* pCol) {
×
2702
  if (pCol == NULL) {
×
2703
    return;
×
2704
  }
2705
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
×
2706
  taosMemoryFreeClear(pOutCol->expr);
×
2707
}
2708

2709

2710

2711
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
×
2712
  if (NULL == pReq) {
×
2713
    return;
×
2714
  }
2715
  taosMemoryFreeClear(pReq->name);
×
2716
  taosMemoryFreeClear(pReq->sql);
×
2717
  taosMemoryFreeClear(pReq->streamDB);
×
2718
  taosMemoryFreeClear(pReq->triggerDB);
×
2719
  taosMemoryFreeClear(pReq->outDB);
×
2720
  taosMemoryFreeClear(pReq->triggerTblName);
×
2721
  taosMemoryFreeClear(pReq->outTblName);
×
2722

2723
  taosArrayDestroyP(pReq->calcDB, NULL);
×
2724
  pReq->calcDB = NULL;
×
2725
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
×
2726
  pReq->pNotifyAddrUrls = NULL;
×
2727

2728
  taosMemoryFreeClear(pReq->triggerFilterCols);
×
2729
  taosMemoryFreeClear(pReq->triggerCols);
×
2730
  taosMemoryFreeClear(pReq->partitionCols);
×
2731

2732
  taosArrayDestroy(pReq->outTags);
×
2733
  pReq->outTags = NULL;
×
2734
  taosArrayDestroy(pReq->outCols);
×
2735
  pReq->outCols = NULL;
×
2736

2737
  switch (pReq->triggerType) {
×
2738
    case WINDOW_TYPE_EVENT:
×
2739
      taosMemoryFreeClear(pReq->trigger.event.startCond);
×
2740
      taosMemoryFreeClear(pReq->trigger.event.endCond);
×
2741
      break;
×
2742
    default:
×
2743
      break;
×
2744
  }
2745

2746
  taosMemoryFreeClear(pReq->triggerScanPlan);
×
2747
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
×
2748
  pReq->calcScanPlanList = NULL;
×
2749
  taosMemoryFreeClear(pReq->triggerPrevFilter);
×
2750

2751
  taosMemoryFreeClear(pReq->calcPlan);
×
2752
  taosMemoryFreeClear(pReq->subTblNameExpr);
×
2753
  taosMemoryFreeClear(pReq->tagValueExpr);
×
2754
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
×
2755
  pReq->forceOutCols = NULL;
×
2756
}
2757

2758
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
×
2759
  int32_t code = 0, lino = 0;
×
2760
  if (NULL == pSrc) {
×
2761
    return code;
×
2762
  } 
2763

2764
  void* p = NULL;
×
2765
  int32_t num = 0;
×
2766
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
×
2767
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
×
2768

2769
  SCMCreateStreamReq* pDst = *ppDst;
×
2770

2771
  if (pSrc->outDB) {
×
2772
    pDst->outDB = COPY_STR(pSrc->outDB);
×
2773
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
×
2774
  }
2775
  
2776
  if (pSrc->triggerTblName) {
×
2777
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
×
2778
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
×
2779
  }
2780
  
2781
  if (pSrc->outTblName) {
×
2782
    pDst->outTblName = COPY_STR(pSrc->outTblName);
×
2783
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
×
2784
  }
2785
  
2786
  if (pSrc->pNotifyAddrUrls) {
×
2787
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
×
2788
    if (num > 0) {
×
2789
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
×
2790
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
×
2791
    }
2792
    for (int32_t i = 0; i < num; ++i) {
×
2793
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
×
2794
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
×
2795
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
×
2796
    }
2797
  }
2798
  
2799
  if (pSrc->triggerFilterCols) {
×
2800
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
×
2801
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
×
2802
  }
2803
  
2804
  if (pSrc->triggerCols) {
×
2805
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
×
2806
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
×
2807
  }
2808
  
2809
  if (pSrc->partitionCols) {
×
2810
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
×
2811
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
×
2812
  }
2813
  
2814
  if (pSrc->outCols) {
×
2815
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
×
2816
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
×
2817
  }
2818
  
2819
  if (pSrc->outTags) {
×
2820
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
×
2821
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
×
2822
  }
2823
  
2824
  switch (pSrc->triggerType) {
×
2825
    case WINDOW_TYPE_EVENT:
×
2826
      if (pSrc->trigger.event.startCond) {
×
2827
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
×
2828
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
×
2829
      }
2830
      
2831
      if (pSrc->trigger.event.endCond) {
×
2832
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
×
2833
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
×
2834
      }
2835
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
×
2836
      break;
×
2837
    default:
×
2838
      pDst->trigger = pSrc->trigger;
×
2839
      break;
×
2840
  }
2841

2842

2843
  if (pSrc->triggerScanPlan) {
×
2844
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
×
2845
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
×
2846
  }
2847
  
2848
  if (pSrc->calcScanPlanList) {
×
2849
    num = taosArrayGetSize(pSrc->calcScanPlanList);
×
2850
    if (num > 0) {
×
2851
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
×
2852
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
×
2853
    }
2854
    for (int32_t i = 0; i < num; ++i) {
×
2855
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
×
2856
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
×
2857

2858
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
×
2859
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
×
2860

2861
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
×
2862
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
×
2863
      
2864
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
×
2865
    }
2866
  }
2867
  
2868
  if (pSrc->triggerPrevFilter) {
×
2869
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
×
2870
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
×
2871
  }
2872
  
2873
  if (pSrc->calcPlan) {
×
2874
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
×
2875
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
×
2876
  }
2877
  
2878
  if (pSrc->subTblNameExpr) {
×
2879
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
×
2880
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
×
2881
  }
2882
  
2883
  if (pSrc->tagValueExpr) {
×
2884
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
×
2885
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
×
2886
  }
2887
  
2888
  if (pSrc->forceOutCols) {
×
2889
    num = taosArrayGetSize(pSrc->forceOutCols);
×
2890
    if (num > 0) {
×
2891
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
×
2892
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
×
2893
    }
2894
    for (int32_t i = 0; i < num; ++i) {
×
2895
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
×
2896
      SStreamOutCol  dcol = {.type = scol->type};
×
2897

2898
      dcol.expr = COPY_STR(scol->expr);
×
2899
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
×
2900
      
2901
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
×
2902
    }
2903
  }
2904
  
2905
_exit:
×
2906

2907
  if (code) {
×
2908
    tFreeSCMCreateStreamReq(pDst);
×
2909
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
2910
  }
2911

2912
  return code;
×
2913
}
2914

2915

2916
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
×
2917
  int32_t  code = 0;
×
2918
  int32_t  lino;
2919
  int32_t  tlen;
2920
  SEncoder encoder = {0};
×
2921
  tEncoderInit(&encoder, buf, bufLen);
×
2922
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2923

2924
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2925
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2926
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2927
  tEndEncode(&encoder);
×
2928

2929
_exit:
×
2930
  if (code) {
×
2931
    tlen = code;
×
2932
  } else {
2933
    tlen = encoder.pos;
×
2934
  }
2935
  tEncoderClear(&encoder);
×
2936
  return tlen;
×
2937
}
2938

2939
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
×
2940
  SDecoder decoder = {0};
×
2941
  int32_t  code = 0;
×
2942
  int32_t  lino;
2943

2944
  tDecoderInit(&decoder, buf, bufLen);
×
2945
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
2946
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
×
2947
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
×
2948
  tEndDecode(&decoder);
×
2949

2950
_exit:
×
2951
  tDecoderClear(&decoder);
×
2952
  return code;
×
2953
}
2954

2955
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
×
2956
  taosMemoryFreeClear(pReq->name);
×
2957
}
×
2958

2959
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
×
2960
  SEncoder encoder = {0};
×
2961
  int32_t  code = 0;
×
2962
  int32_t  lino;
2963
  int32_t  tlen;
2964
  tEncoderInit(&encoder, buf, bufLen);
×
2965
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
2966
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
2967
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
2968
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
×
2969
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
×
2970
  tEndEncode(&encoder);
×
2971

2972
_exit:
×
2973
  if (code) {
×
2974
    tlen = code;
×
2975
  } else {
2976
    tlen = encoder.pos;
×
2977
  }
2978
  tEncoderClear(&encoder);
×
2979
  return tlen;
×
2980
}
2981

2982
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
×
2983
  SDecoder decoder = {0};
×
2984
  int32_t  code = 0;
×
2985
  int32_t  lino;
2986

2987
  tDecoderInit(&decoder, buf, bufLen);
×
2988
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
2989
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
×
2990
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
×
2991
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
×
2992
  tEndDecode(&decoder);
×
2993

2994
_exit:
×
2995
  tDecoderClear(&decoder);
×
2996
  return code;
×
2997
}
2998

2999
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
×
3000
  taosMemoryFreeClear(pReq->name);
×
3001
}
×
3002

3003
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
×
3004
  SEncoder encoder = {0};
×
3005
  int32_t  code = 0;
×
3006
  int32_t  lino;
3007
  int32_t  tlen;
3008
  tEncoderInit(&encoder, buf, bufLen);
×
3009
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3010
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
×
3011
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
×
3012
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
×
3013
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
×
3014
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
×
3015
  tEndEncode(&encoder);
×
3016

3017
_exit:
×
3018
  if (code) {
×
3019
    tlen = code;
×
3020
  } else {
3021
    tlen = encoder.pos;
×
3022
  }
3023
  tEncoderClear(&encoder);
×
3024
  return tlen;
×
3025
}
3026

3027
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
×
3028
  SDecoder decoder = {0};
×
3029
  int32_t  code = 0;
×
3030
  int32_t  lino;
3031

3032
  tDecoderInit(&decoder, buf, bufLen);
×
3033
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3034

3035
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
×
3036
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
×
3037
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
×
3038
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
×
3039
  tEndDecode(&decoder);
×
3040

3041
_exit:
×
3042
  tDecoderClear(&decoder);
×
3043
  return code;
×
3044
}
3045

3046
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
×
3047
  taosMemoryFreeClear(pReq->name);
×
3048
}
×
3049

3050
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
×
3051
  int32_t code = 0;
×
3052
  int32_t lino;
3053

3054
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
×
3055
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
×
3056
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
×
3057

3058
_exit:
×
3059
  return code;
×
3060
}
3061

3062
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
×
3063
  SEncoder encoder = {0};
×
3064
  int32_t  code = 0;
×
3065
  int32_t  lino;
3066
  int32_t  tlen;
3067
  tEncoderInit(&encoder, buf, bufLen);
×
3068

3069
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3070
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
×
3071

3072
  tEndEncode(&encoder);
×
3073

3074
_exit:
×
3075
  if (code) {
×
3076
    tlen = code;
×
3077
  } else {
3078
    tlen = encoder.pos;
×
3079
  }
3080
  tEncoderClear(&encoder);
×
3081
  return tlen;
×
3082
}
3083

3084
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
×
3085
  int32_t code = 0;
×
3086
  int32_t lino;
3087

3088
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
×
3089
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
×
3090
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
×
3091

3092
_exit:
×
3093
  return code;
×
3094
}
3095

3096
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
×
3097
  SDecoder decoder = {0};
×
3098
  int32_t  code = 0;
×
3099
  int32_t  lino;
3100

3101
  tDecoderInit(&decoder, (char *)buf, bufLen);
×
3102

3103
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3104
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
×
3105

3106
  tEndDecode(&decoder);
×
3107

3108
_exit:
×
3109
  tDecoderClear(&decoder);
×
3110
  return code;
×
3111
}
3112

3113
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
×
3114
  int32_t code = 0;
×
3115
  int32_t lino;
3116

3117
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
×
3118
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
×
3119
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
×
3120
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
×
3121

3122
_exit:
×
3123
  return code;
×
3124
}
3125

3126
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
×
3127
  SEncoder encoder = {0};
×
3128
  int32_t  code = 0;
×
3129
  int32_t  lino;
3130
  int32_t  tlen;
3131
  tEncoderInit(&encoder, buf, bufLen);
×
3132

3133
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3134
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
×
3135

3136
  tEndEncode(&encoder);
×
3137

3138
_exit:
×
3139
  if (code) {
×
3140
    tlen = code;
×
3141
  } else {
3142
    tlen = encoder.pos;
×
3143
  }
3144
  tEncoderClear(&encoder);
×
3145
  return tlen;
×
3146
}
3147

3148
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
×
3149
  int32_t code = 0;
×
3150
  int32_t lino;
3151

3152
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
×
3153
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
×
3154
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
×
3155
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
×
3156

3157
_exit:
×
3158
  return code;
×
3159
}
3160

3161
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
×
3162
  SDecoder decoder = {0};
×
3163
  int32_t  code = 0;
×
3164
  int32_t  lino;
3165

3166
  tDecoderInit(&decoder, buf, bufLen);
×
3167

3168
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3169
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
×
3170

3171
  tEndDecode(&decoder);
×
3172

3173
_exit:
×
3174
  tDecoderClear(&decoder);
×
3175
  return code;
×
3176
}
3177

3178
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
×
3179
  SEncoder encoder = {0};
×
3180
  int32_t  code = TSDB_CODE_SUCCESS;
×
3181
  int32_t  lino = 0;
×
3182
  int32_t  tlen = 0;
×
3183

3184
  tEncoderInit(&encoder, buf, bufLen);
×
3185
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3186

3187
  int32_t size = taosArrayGetSize(pRsp->cols);
×
3188
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
×
3189
  for (int32_t i = 0; i < size; ++i) {
×
3190
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
×
3191
    if (oInfo == NULL) {
×
3192
      uError("col id is NULL at index %d", i);
×
3193
      code = TSDB_CODE_INVALID_PARA;
×
3194
      goto _exit;
×
3195
    }
3196
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
×
3197
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
×
3198
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
×
3199
  }
3200

3201
  tEndEncode(&encoder);
×
3202

3203
_exit:
×
3204
  if (code != TSDB_CODE_SUCCESS) {
×
3205
    tlen = code;
×
3206
  } else {
3207
    tlen = encoder.pos;
×
3208
  }
3209
  tEncoderClear(&encoder);
×
3210
  return tlen;
×
3211
}
3212

3213
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
×
3214
  SDecoder decoder = {0};
×
3215
  int32_t  code = TSDB_CODE_SUCCESS;
×
3216
  int32_t  lino = 0;
×
3217

3218
  tDecoderInit(&decoder, buf, bufLen);
×
3219
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3220

3221
  int32_t size = 0;
×
3222
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
×
3223
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
×
3224
  if (pRsp->cols == NULL) {
×
3225
    code = terrno;
×
3226
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3227
    goto _exit;
×
3228
  }
3229
  for (int32_t i = 0; i < size; ++i) {
×
3230
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
×
3231
    if (oInfo == NULL) {
×
3232
      code = terrno;
×
3233
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3234
      goto _exit;
×
3235
    }
3236
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
×
3237
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
×
3238
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
×
3239
  }
3240

3241
  tEndDecode(&decoder);
×
3242

3243
_exit:
×
3244
  tDecoderClear(&decoder);
×
3245
  return code;
×
3246
}
3247

3248
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
×
3249
  taosArrayDestroy(pRsp->cols);
×
3250
}
×
3251

3252
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
×
3253
  if (pReq == NULL) return;
×
3254
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA) {
×
3255
    SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
×
3256
    if (pRequest->cids != NULL) {
×
3257
      taosArrayDestroy(pRequest->cids);
×
3258
      pRequest->cids = NULL;
×
3259
    }
3260
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
×
3261
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
×
3262
    if (pRequest->cids != NULL) {
×
3263
      taosArrayDestroy(pRequest->cids);
×
3264
      pRequest->cids = NULL;
×
3265
    }
3266
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
×
3267
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
×
3268
    if (pRequest->cids != NULL) {
×
3269
      taosArrayDestroy(pRequest->cids);
×
3270
      pRequest->cids = NULL;
×
3271
    }
3272
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
×
3273
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
×
3274
    if (pRequest->cols != NULL) {
×
3275
      taosArrayDestroy(pRequest->cols);
×
3276
      pRequest->cols = NULL;
×
3277
    }
3278
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
×
3279
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
×
3280
    if (pRequest->uids != NULL) {
×
3281
      taosArrayDestroy(pRequest->uids);
×
3282
      pRequest->uids = NULL;
×
3283
    }
3284
  }
3285
}
3286

3287
int32_t encodeColsArray(SEncoder* encoder, SArray* cids) {
×
3288
  int32_t  code = TSDB_CODE_SUCCESS;
×
3289
  int32_t  lino = 0;
×
3290
  int32_t size = taosArrayGetSize(cids);
×
3291
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
×
3292
  for (int32_t i = 0; i < size; ++i) {
×
3293
    col_id_t* pColId = taosArrayGet(cids, i);
×
3294
    if (pColId == NULL) {
×
3295
      uError("col id is NULL at index %d", i);
×
3296
      code = TSDB_CODE_INVALID_PARA;
×
3297
      goto _exit;
×
3298
    }
3299
    TAOS_CHECK_EXIT(tEncodeI16(encoder, *pColId));
×
3300
  }
3301
  _exit:
×
3302

3303
  return code;
×
3304
}
3305

3306
int32_t decodeColsArray(SDecoder* decoder, SArray** cids) {
×
3307
  int32_t code = TSDB_CODE_SUCCESS;
×
3308
  int32_t lino = 0;
×
3309
  int32_t size = 0;
×
3310

3311
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
×
3312
  if (size > 0){
×
3313
    *cids = taosArrayInit(size, sizeof(col_id_t));
×
3314
    if (*cids == NULL) {
×
3315
      code = terrno;
×
3316
      uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3317
      goto _exit;
×
3318
    }
3319
  
3320
    for (int32_t i = 0; i < size; ++i) {
×
3321
      col_id_t* pColId = taosArrayReserve(*cids, 1);
×
3322
      if (pColId == NULL) {
×
3323
        code = terrno;
×
3324
        uError("failed to reserve memory for col id at index %d, errno: %d", i, code);
×
3325
        goto _exit;
×
3326
      }
3327
      TAOS_CHECK_RETURN(tDecodeI16(decoder, pColId));
×
3328
    }  
3329
  }
3330
  
3331
_exit:
×
3332
  if (code != TSDB_CODE_SUCCESS) {
×
3333
    taosArrayDestroy(*cids);
×
3334
    *cids = NULL;
×
3335
  }
3336
  return code;
×
3337
}
3338

3339
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
×
3340
  SEncoder encoder = {0};
×
3341
  int32_t  code = TSDB_CODE_SUCCESS;
×
3342
  int32_t  lino = 0;
×
3343
  int32_t  tlen = 0;
×
3344

3345
  tEncoderInit(&encoder, buf, bufLen);
×
3346
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3347

3348
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
×
3349
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
×
3350
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
×
3351
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
×
3352

3353
  switch (pReq->type) {
×
3354
    case STRIGGER_PULL_SET_TABLE: {
×
3355
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
×
3356
      int32_t size = taosArrayGetSize(pRequest->uids);
×
3357
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
×
3358
      for (int32_t i = 0; i < size; ++i) {
×
3359
        int64_t* uids = taosArrayGet(pRequest->uids, i);
×
3360
        if (uids == NULL) {
×
3361
          uError("uid is NULL at index %d", i);
×
3362
          code = TSDB_CODE_INVALID_PARA;
×
3363
          goto _exit;
×
3364
        }
3365
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[0]));
×
3366
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, uids[1]));
×
3367
      }
3368
      break;
×
3369
    }
3370
    case STRIGGER_PULL_LAST_TS: {
×
3371
      break;
×
3372
    }
3373
    case STRIGGER_PULL_FIRST_TS: {
×
3374
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
×
3375
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
×
3376
      break;
×
3377
    }
3378
    case STRIGGER_PULL_TSDB_META: {
×
3379
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
×
3380
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
×
3381
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
×
3382
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
×
3383
      break;
×
3384
    }
3385
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3386
      break;
×
3387
    }
3388
    case STRIGGER_PULL_TSDB_TS_DATA: {
×
3389
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
×
3390
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
×
3391
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
×
3392
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
×
3393
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
×
3394
      break;
×
3395
    }
3396
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
×
3397
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
×
3398
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
×
3399
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
×
3400
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
×
3401
      break;
×
3402
    }
3403
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
×
3404
      break;
×
3405
    }
3406
    case STRIGGER_PULL_TSDB_CALC_DATA: {
×
3407
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
×
3408
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
×
3409
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
×
3410
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
×
3411
      break;
×
3412
    }
3413
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3414
      break;
×
3415
    }
3416
    case STRIGGER_PULL_TSDB_DATA: {
×
3417
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
×
3418
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
×
3419
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
×
3420
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
×
3421
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
×
3422
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
×
3423
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
×
3424
      break;
×
3425
    }
3426
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3427
      break;
×
3428
    }
3429
    case STRIGGER_PULL_WAL_META: {
×
3430
      SSTriggerWalMetaRequest* pRequest = (SSTriggerWalMetaRequest*)pReq;
×
3431
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
×
3432
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
×
3433
      break;
×
3434
    }
3435
    case STRIGGER_PULL_WAL_TS_DATA:
×
3436
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3437
    case STRIGGER_PULL_WAL_CALC_DATA:
3438
    case STRIGGER_PULL_WAL_DATA: {
3439
      SSTriggerWalDataRequest* pRequest = (SSTriggerWalDataRequest*)pReq;
×
3440
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
×
3441
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
×
3442
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
×
3443
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
×
3444
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
×
3445
      break;
×
3446
    }
3447
    case STRIGGER_PULL_GROUP_COL_VALUE: {
×
3448
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
×
3449
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
×
3450
      break;
×
3451
    }
3452
    case STRIGGER_PULL_VTABLE_INFO: {
×
3453
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
×
3454
      TAOS_CHECK_EXIT(encodeColsArray(&encoder, pRequest->cids));
×
3455
      break;
×
3456
    }
3457
    case STRIGGER_PULL_OTABLE_INFO: {
×
3458
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
×
3459
      int32_t size = taosArrayGetSize(pRequest->cols);
×
3460
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
×
3461
      for (int32_t i = 0; i < size; ++i) {
×
3462
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
×
3463
        if (oInfo == NULL) {
×
3464
          uError("col id is NULL at index %d", i);
×
3465
          code = TSDB_CODE_INVALID_PARA;
×
3466
          goto _exit;
×
3467
        }
3468
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
×
3469
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
×
3470
      }
3471
      break; 
×
3472
    }
3473
    default: {
×
3474
      uError("unknown pull type %d", pReq->type);
×
3475
      code = TSDB_CODE_INVALID_PARA;
×
3476
      break;
×
3477
    }
3478
  }
3479

3480
  tEndEncode(&encoder);
×
3481

3482
_exit:
×
3483
  if (code != TSDB_CODE_SUCCESS) {
×
3484
    tlen = code;
×
3485
  } else {
3486
    tlen = encoder.pos;
×
3487
  }
3488
  tEncoderClear(&encoder);
×
3489
  return tlen;
×
3490
}
3491

3492

3493
int32_t tDserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
×
3494
  SDecoder decoder = {0};
×
3495
  int32_t  code = TSDB_CODE_SUCCESS;
×
3496
  int32_t  lino = 0;
×
3497

3498
  tDecoderInit(&decoder, buf, bufLen);
×
3499
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3500

3501
  int32_t type = 0;
×
3502
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
×
3503
  SSTriggerPullRequest* pBase = &(pReq->base);
×
3504
  pBase->type = type;
×
3505
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
×
3506
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
×
3507
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
×
3508

3509
  switch (type) {
×
3510
    case STRIGGER_PULL_SET_TABLE: {
×
3511
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
×
3512
      int32_t size = 0;
×
3513
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
×
3514
      pRequest->uids = taosArrayInit(size, 2 * sizeof(int64_t));
×
3515
      if (pRequest->uids == NULL) {
×
3516
        code = terrno;
×
3517
        uError("failed to allocate memory for uids, size: %d, errno: %d", size, code);
×
3518
        goto _exit;
×
3519
      }
3520
      for (int32_t i = 0; i < size; ++i) {
×
3521
        int64_t* uid = taosArrayReserve(pRequest->uids, 1);
×
3522
        if (uid == NULL) {
×
3523
          code = terrno;
×
3524
          uError("failed to reserve memory for uid, size: %d, errno: %d", size, code);
×
3525
          goto _exit;
×
3526
        }
3527
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid));
×
3528
        TAOS_CHECK_RETURN(tDecodeI64(&decoder, uid + 1));
×
3529
      }
3530
      break;
×
3531
    }
3532
    case STRIGGER_PULL_LAST_TS: {
×
3533
      break;
×
3534
    }
3535
    case STRIGGER_PULL_FIRST_TS: {
×
3536
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
×
3537
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
×
3538
      break;
×
3539
    }
3540
    case STRIGGER_PULL_TSDB_META: {
×
3541
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
×
3542
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
×
3543
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
×
3544
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
×
3545
      break;
×
3546
    }
3547
    case STRIGGER_PULL_TSDB_META_NEXT: {
×
3548
      break;
×
3549
    }
3550
    case STRIGGER_PULL_TSDB_TS_DATA: {
×
3551
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
×
3552
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
×
3553
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
×
3554
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
×
3555
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
×
3556
      break;
×
3557
    }
3558
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
×
3559
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
×
3560
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
×
3561
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
×
3562
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
×
3563
      break;
×
3564
    }
3565
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
×
3566
      break;
×
3567
    }
3568
    case STRIGGER_PULL_TSDB_CALC_DATA: {
×
3569
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
×
3570
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
×
3571
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
×
3572
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
×
3573
      break;
×
3574
    }
3575
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
×
3576
      break;
×
3577
    }
3578
    case STRIGGER_PULL_TSDB_DATA: {
×
3579
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
×
3580
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
×
3581
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
×
3582
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
×
3583
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
×
3584
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
×
3585
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
×
3586
      break;
×
3587
    }
3588
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
×
3589
      break;
×
3590
    }
3591
    case STRIGGER_PULL_WAL_META: {
×
3592
      SSTriggerWalMetaRequest* pRequest = &(pReq->walMetaReq);
×
3593
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
×
3594
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
×
3595
      break;
×
3596
    }
3597
    case STRIGGER_PULL_WAL_TS_DATA:
×
3598
    case STRIGGER_PULL_WAL_TRIGGER_DATA:
3599
    case STRIGGER_PULL_WAL_CALC_DATA:
3600
    case STRIGGER_PULL_WAL_DATA: {
3601
      SSTriggerWalDataRequest* pRequest = &(pReq->walDataReq);
×
3602
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
×
3603
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
×
3604
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
×
3605
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
×
3606
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
×
3607
      break;
×
3608
    }
3609
    case STRIGGER_PULL_GROUP_COL_VALUE: {
×
3610
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
×
3611
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
×
3612
      break;
×
3613
    }
3614
    case STRIGGER_PULL_VTABLE_INFO: {
×
3615
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
×
3616
      TAOS_CHECK_EXIT(decodeColsArray(&decoder, &pRequest->cids));
×
3617
      break;
×
3618
    }
3619
    case STRIGGER_PULL_OTABLE_INFO: {
×
3620
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
×
3621
      int32_t size = 0;
×
3622
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
×
3623
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
×
3624
      if (pRequest->cols == NULL) {
×
3625
        code = terrno;
×
3626
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
×
3627
        goto _exit;
×
3628
      }
3629
      for (int32_t i = 0; i < size; ++i) {
×
3630
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
×
3631
        if (oInfo == NULL) {
×
3632
          code = terrno;
×
3633
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
×
3634
          goto _exit;
×
3635
        }
3636
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
×
3637
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
×
3638
      }
3639
      break;
×
3640
    }
3641
    default: {
×
3642
      uError("unknown pull type %d", type);
×
3643
      code = TSDB_CODE_INVALID_PARA;
×
3644
      break;
×
3645
    }
3646
  }
3647

3648
  tEndDecode(&decoder);
×
3649

3650
_exit:
×
3651
  tDecoderClear(&decoder);
×
3652
  return code;
×
3653
}
3654

3655
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo) {
×
3656
  int32_t size = taosArrayGetSize(pParams);
×
3657
  int32_t code = 0;
×
3658
  int32_t lino = 0;
×
3659
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
3660
  for (int32_t i = 0; i < size; ++i) {
×
3661
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
×
3662
    if (param == NULL) {
×
3663
      TAOS_CHECK_EXIT(terrno);
×
3664
    }
3665

3666
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevTs));
×
3667
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->currentTs));
×
3668
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextTs));
×
3669

3670
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wstart));
×
3671
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wend));
×
3672
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wduration));
×
3673
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->wrownum));
×
3674

3675
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->prevLocalTime));
×
3676
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->nextLocalTime));
×
3677

3678
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, param->triggerTime));
×
3679
    if (!ignoreNotificationInfo) {
×
3680
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
×
3681
      uint32_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) : 0;
×
3682
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
×
3683
    }
3684
  }
3685
_exit:
×
3686
  return code;
×
3687
}
3688

3689
void tDestroySSTriggerCalcParam(void* ptr) {
×
3690
  SSTriggerCalcParam* pParam = ptr;
×
3691
  if (pParam && pParam->extraNotifyContent != NULL) {
×
3692
    taosMemoryFreeClear(pParam->extraNotifyContent);
×
3693
  }
3694
  if (pParam && pParam->resultNotifyContent != NULL) {
×
3695
    taosMemoryFreeClear(pParam->resultNotifyContent);
×
3696
  }
3697
}
×
3698

3699
void tDestroySStreamGroupValue(void* ptr) {
×
3700
  SStreamGroupValue* pValue = ptr;
×
3701
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
×
3702
    taosMemoryFreeClear(pValue->data.pData);
×
3703
    pValue->data.nData = 0;
×
3704
  }
3705
}
×
3706

3707
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
×
3708
  int32_t size = 0, code = 0, lino = 0;
×
3709
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
3710
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
×
3711
  if (*ppParams == NULL) {
×
3712
    TAOS_CHECK_EXIT(terrno);
×
3713
  }
3714
  for (int32_t i = 0; i < size; ++i) {
×
3715
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
×
3716
    if (param == NULL) {
×
3717
      TAOS_CHECK_EXIT(terrno);
×
3718
    }
3719
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevTs));
×
3720
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->currentTs));
×
3721
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextTs));
×
3722

3723
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wstart));
×
3724
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wend));
×
3725
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wduration));
×
3726
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->wrownum));
×
3727

3728
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->prevLocalTime));
×
3729
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->nextLocalTime));
×
3730

3731
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &param->triggerTime));
×
3732
    if (!ignoreNotificationInfo) {
×
3733
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
×
3734
      uint64_t len = 0;
×
3735
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
×
3736
    }
3737
  }
3738

3739
_exit:
×
3740
  return code;
×
3741
}
3742

3743
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
×
3744
  int32_t code = TSDB_CODE_SUCCESS;
×
3745
  int32_t lino = 0;
×
3746

3747
  int32_t size = taosArrayGetSize(pGroupColVals);
×
3748
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
×
3749
  for (int32_t i = 0; i < size; ++i) {
×
3750
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
×
3751
    if (pValue == NULL) {
×
3752
      TAOS_CHECK_EXIT(terrno);
×
3753
    }
3754
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
×
3755
    if (pValue->isNull) {
×
3756
      continue;
×
3757
    }
3758
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
×
3759
    if (pValue->isTbname) {
×
3760
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
×
3761
      if (vgId != -1) { pValue->vgId = vgId; }
×
3762
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
×
3763
    }
3764
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
×
3765
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
×
3766
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
×
3767
    } else {
3768
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
×
3769
    }
3770
  }
3771

3772
_exit:
×
3773
  return code;
×
3774
}
3775

3776
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
×
3777
  int32_t code = TSDB_CODE_SUCCESS;
×
3778
  int32_t lino = 0;
×
3779
  int32_t size = 0;
×
3780

3781
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
×
3782
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
×
3783
  if (size > 0) {
×
3784
    if (*ppGroupColVals == NULL) {
×
3785
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
×
3786
      if (*ppGroupColVals == NULL) {
×
3787
        TAOS_CHECK_EXIT(terrno);
×
3788
      }
3789
    } else {
3790
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
×
3791
    }
3792
  }
3793
  for (int32_t i = 0; i < size; ++i) {
×
3794
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
×
3795
    if (pValue == NULL) {
×
3796
      TAOS_CHECK_EXIT(terrno);
×
3797
    }
3798
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
×
3799
    if (pValue->isNull) {
×
3800
      continue;
×
3801
    }
3802
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
×
3803
    if (pValue->isTbname) {
×
3804
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
×
3805
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
×
3806
    }
3807
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
×
3808
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
×
3809
      uint64_t len = 0;
×
3810
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
×
3811
      pValue->data.nData = len;
×
3812
    } else {
3813
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
×
3814
    }
3815
  }
3816
_exit:
×
3817
  return code;
×
3818
}
3819

3820
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
×
3821
  SEncoder encoder = {0};
×
3822
  int32_t  code = TSDB_CODE_SUCCESS;
×
3823
  int32_t  lino = 0;
×
3824
  int32_t  tlen = 0;
×
3825

3826
  tEncoderInit(&encoder, buf, bufLen);
×
3827
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3828

3829
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
×
3830

3831
  tEndEncode(&encoder);
×
3832

3833
_exit:
×
3834
  if (code != TSDB_CODE_SUCCESS) {
×
3835
    tlen = code;
×
3836
  } else {
3837
    tlen = encoder.pos;
×
3838
  }
3839
  tEncoderClear(&encoder);
×
3840
  return tlen;
×
3841
}
3842

3843
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
×
3844
  SDecoder decoder = {0};
×
3845
  int32_t  code = TSDB_CODE_SUCCESS;
×
3846
  int32_t  lino = 0;
×
3847
  int32_t  size = 0;
×
3848

3849
  tDecoderInit(&decoder, buf, bufLen);
×
3850
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3851

3852
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
×
3853

3854
  tEndDecode(&decoder);
×
3855

3856
_exit:
×
3857
  tDecoderClear(&decoder);
×
3858
  return code;
×
3859
}
3860

3861
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
×
3862
  SEncoder encoder = {0};
×
3863
  int32_t  code = TSDB_CODE_SUCCESS;
×
3864
  int32_t  lino = 0;
×
3865
  int32_t  tlen = 0;
×
3866

3867
  tEncoderInit(&encoder, buf, bufLen);
×
3868
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3869

3870
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
×
3871
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
×
3872
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
×
3873
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
×
3874
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
×
3875

3876
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false));
×
3877
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
×
3878
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
×
3879

3880
  tEndEncode(&encoder);
×
3881

3882
_exit:
×
3883
  if (code != TSDB_CODE_SUCCESS) {
×
3884
    tlen = code;
×
3885
  } else {
3886
    tlen = encoder.pos;
×
3887
  }
3888
  tEncoderClear(&encoder);
×
3889
  return tlen;
×
3890
}
3891

3892
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
×
3893
  SDecoder decoder = {0};
×
3894
  int32_t  code = TSDB_CODE_SUCCESS;
×
3895
  int32_t  lino = 0;
×
3896

3897
  tDecoderInit(&decoder, buf, bufLen);
×
3898
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
3899

3900
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
×
3901
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
×
3902
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
×
3903
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
×
3904
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
×
3905

3906
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
×
3907
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
×
3908
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
×
3909

3910
  tEndDecode(&decoder);
×
3911

3912
_exit:
×
3913
  tDecoderClear(&decoder);
×
3914
  return code;
×
3915
}
3916

3917
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
×
3918
  if (pReq != NULL) {
×
3919
    if (pReq->params != NULL) {
×
3920
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
×
3921
      pReq->params = NULL;
×
3922
    }
3923
    if (pReq->groupColVals != NULL) {
×
3924
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
×
3925
      pReq->groupColVals = NULL;
×
3926
    }
3927
    blockDataDestroy(pReq->pOutBlock);
×
3928
  }
3929
}
×
3930

3931
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo) {
×
3932
  int32_t code = 0, lino = 0;
×
3933
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true));
×
3934
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
×
3935
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
×
3936
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
×
3937
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
×
3938
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
×
3939
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
×
3940
_exit:
×
3941
  return code;
×
3942
}
3943

3944
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
×
3945
  int32_t code = 0, lino = 0;
×
3946
  int32_t size = 0;
×
3947
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
×
3948
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
×
3949
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
×
3950
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
×
3951
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
×
3952
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
×
3953
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
×
3954
_exit:
×
3955
  return code;
×
3956
}
3957

3958
int32_t tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
×
3959
  if (pInfo == NULL) return TSDB_CODE_SUCCESS;
×
3960
  if (pInfo->pStreamPesudoFuncVals != NULL) {
×
3961
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
×
3962
    pInfo->pStreamPesudoFuncVals = NULL;
×
3963
  }
3964
  if (pInfo->pStreamPartColVals != NULL) {
×
3965
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
×
3966
    pInfo->pStreamPartColVals = NULL;
×
3967
  }
3968
  return TSDB_CODE_SUCCESS;
×
3969
}
3970

3971
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
×
3972
  SEncoder encoder = {0};
×
3973
  int32_t  code = TSDB_CODE_SUCCESS;
×
3974
  int32_t  lino = 0;
×
3975
  int32_t  tlen = 0;
×
3976

3977
  tEncoderInit(&encoder, buf, bufLen);
×
3978
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
3979

3980
  int32_t size = taosArrayGetSize(pRsp->infos);
×
3981
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
×
3982
  for (int32_t i = 0; i < size; ++i) {
×
3983
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
×
3984
    if (info == NULL) {
×
3985
      TAOS_CHECK_EXIT(terrno);
×
3986
    }
3987
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
×
3988
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
×
3989
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->ver));
×
3990
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
×
3991
  }
3992

3993
  tEndEncode(&encoder);
×
3994

3995
_exit:
×
3996
  if (code != TSDB_CODE_SUCCESS) {
×
3997
    tlen = code;
×
3998
  } else {
3999
    tlen = encoder.pos;
×
4000
  }
4001
  tEncoderClear(&encoder);
×
4002
  return tlen;
×
4003
}
4004

4005
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
×
4006
  SDecoder decoder = {0};
×
4007
  int32_t  code = TSDB_CODE_SUCCESS;
×
4008
  int32_t  lino = 0;
×
4009
  int32_t  size = 0;
×
4010

4011
  tDecoderInit(&decoder, buf, bufLen);
×
4012
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
4013

4014
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
×
4015
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
×
4016
  if (vTableInfo->infos == NULL) {
×
4017
    TAOS_CHECK_EXIT(terrno);
×
4018
  }
4019
  for (int32_t i = 0; i < size; ++i) {
×
4020
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
×
4021
    if (info == NULL) {
×
4022
      TAOS_CHECK_EXIT(terrno);
×
4023
    }
4024
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
×
4025
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
×
4026
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->ver));
×
4027
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
×
4028
  }
4029

4030
  tEndDecode(&decoder);
×
4031

4032
_exit:
×
4033
  tDecoderClear(&decoder);
×
4034
  return code;
×
4035
}
4036

4037

4038
void tDestroyVTableInfo(void *ptr) {
×
4039
  if (NULL == ptr) {
×
4040
    return;
×
4041
  }
4042
  VTableInfo* pTable = (VTableInfo*)ptr;
×
4043
  taosMemoryFree(pTable->cols.pColRef);
×
4044
}
4045

4046
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
×
4047
  if (ptr == NULL) return;
×
4048
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
×
4049
  ptr->infos = NULL;
×
4050
}
4051

4052
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
×
4053
  SEncoder encoder = {0};
×
4054
  int32_t  code = TSDB_CODE_SUCCESS;
×
4055
  int32_t  lino = 0;
×
4056
  int32_t  tlen = 0;
×
4057

4058
  tEncoderInit(&encoder, buf, bufLen);
×
4059
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
4060

4061
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
×
4062
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
×
4063
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
×
4064
  for (int32_t i = 0; i < size; ++i) {
×
4065
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
×
4066
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
×
4067
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
×
4068
  }
4069

4070
  tEndEncode(&encoder);
×
4071

4072
_exit:
×
4073
  if (code != TSDB_CODE_SUCCESS) {
×
4074
    tlen = code;
×
4075
  } else {
4076
    tlen = encoder.pos;
×
4077
  }
4078
  tEncoderClear(&encoder);
×
4079
  return tlen;
×
4080
}
4081

4082
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
×
4083
  SDecoder decoder = {0};
×
4084
  int32_t  code = TSDB_CODE_SUCCESS;
×
4085
  int32_t  lino = 0;
×
4086
  SSDataBlock *pResBlock = pBlock;
×
4087

4088
  tDecoderInit(&decoder, buf, bufLen);
×
4089
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
4090

4091
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
×
4092
  int32_t numOfCols = 2;
×
4093
  if (pResBlock->pDataBlock == NULL) {
×
4094
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
×
4095
    if (pResBlock->pDataBlock == NULL) {
×
4096
      TAOS_CHECK_EXIT(terrno);
×
4097
    }
4098
    for (int32_t i = 0; i< numOfCols; ++i) {
×
4099
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
×
4100
      if (pColInfoData == NULL) {
×
4101
        TAOS_CHECK_EXIT(terrno);
×
4102
      }
4103
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
×
4104
      pColInfoData->info.bytes = sizeof(int64_t);
×
4105
    }
4106
  }
4107
  int32_t numOfRows = 0;
×
4108
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
×
4109
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
×
4110
  for (int32_t i = 0; i < numOfRows; ++i) {
×
4111
    for (int32_t j = 0; j < numOfCols; ++j) {
×
4112
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
×
4113
      if (pColInfoData == NULL) {
×
4114
        TAOS_CHECK_EXIT(terrno);
×
4115
      }
4116
      int64_t value = 0;
×
4117
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
×
4118
      colDataSetInt64(pColInfoData, i, &value);
×
4119
    }
4120
  }
4121

4122
  pResBlock->info.dataLoad = 1;
×
4123
  pResBlock->info.rows = numOfRows;
×
4124

4125
  tEndDecode(&decoder);
×
4126

4127
_exit:
×
4128
  tDecoderClear(&decoder);
×
4129
  return code;
×
4130
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc