• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4906

30 Dec 2025 10:52AM UTC coverage: 65.514% (+0.09%) from 65.423%
#4906

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

4080 existing lines in 123 files now uncovered.

193840 of 295877 relevant lines covered (65.51%)

120444601.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.73
/source/common/src/msg/streamJson.c
1
#include "streamMsg.h"
2
#include "tjson.h"
3

4
static const char* jkFieldName     = "name";
5
static const char* jkFieldType     = "type";
6
static const char* jkFieldFlags    = "flags";
7
static const char* jkFieldBytes    = "bytes";
8
static const char* jkFieldCompress = "compress";
9
static const char* jkFieldTypeMod  = "typeMod";
10
static int32_t sfieldWithOptionsToJson(const void* pObj, SJson* pJson) {
4,292,496✔
11
  const SFieldWithOptions* pField = (const SFieldWithOptions*)pObj;
4,292,496✔
12
  if (NULL != pField->name) {
4,292,496✔
13
    TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson, jkFieldName, pField->name));
4,292,496✔
14
  }
15
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkFieldType, pField->type));
4,292,496✔
16
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
4,292,496✔
17
    pJson, jkFieldFlags, pField->flags));
18
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
4,292,496✔
19
    pJson, jkFieldBytes, pField->bytes));
20
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
4,292,496✔
21
    pJson, jkFieldCompress, pField->compress));
22
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
4,292,496✔
23
    pJson, jkFieldTypeMod, pField->typeMod));
24
  return TSDB_CODE_SUCCESS;
4,292,496✔
25
}
26

27
static int32_t jsonToSFieldWithOptions(const SJson* pJson, void* pObj) {
1,229,185✔
28
  SFieldWithOptions* pField = (SFieldWithOptions*)pObj;
1,229,185✔
29
  TAOS_CHECK_RETURN(tjsonGetStringValue(pJson, jkFieldName, pField->name));
1,229,185✔
30
  TAOS_CHECK_RETURN(tjsonGetUTinyIntValue(pJson, jkFieldType, &pField->type));
1,229,185✔
31
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(pJson, jkFieldFlags, &pField->flags));
1,229,185✔
32
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkFieldBytes, &pField->bytes));
1,229,185✔
33
  TAOS_CHECK_RETURN(tjsonGetUIntValue(
1,229,185✔
34
    pJson, jkFieldCompress, &pField->compress));
35
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkFieldTypeMod, &pField->typeMod));
1,229,185✔
36
  return TSDB_CODE_SUCCESS;
1,229,185✔
37
}
38

39
static int32_t stagFieldWithOptionsToJson(const void* pObj, SJson* pJson) {
666,834✔
40
  const SFieldWithOptions* pField = (const SFieldWithOptions*)pObj;
666,834✔
41
  if (NULL != pField->name) {
666,834✔
42
    TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson, jkFieldName, pField->name));
666,834✔
43
  }
44
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkFieldType, pField->type));
666,834✔
45
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
666,834✔
46
    pJson, jkFieldFlags, pField->flags));
47
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
666,834✔
48
    pJson, jkFieldBytes, pField->bytes));
49
  // TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
50
  //   pJson, jkFieldCompress, pField->compress));
51
  // TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
52
  //   pJson, jkFieldTypeMod, pField->typeMod));
53
  return TSDB_CODE_SUCCESS;
666,834✔
54
}
55

56
static int32_t jsonToSTagFieldWithOptions(const SJson* pJson, void* pObj) {
187,678✔
57
  SFieldWithOptions* pField = (SFieldWithOptions*)pObj;
187,678✔
58
  TAOS_CHECK_RETURN(tjsonGetStringValue(pJson, jkFieldName, pField->name));
187,678✔
59
  TAOS_CHECK_RETURN(tjsonGetUTinyIntValue(pJson, jkFieldType, &pField->type));
187,678✔
60
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(pJson, jkFieldFlags, &pField->flags));
187,678✔
61
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkFieldBytes, &pField->bytes));
187,678✔
62
  // TAOS_CHECK_RETURN(tjsonGetUIntValue(
63
  //   pJson, jkFieldCompress, &pField->compress));
64
  // TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkFieldTypeMod, &pField->typeMod));
65
  return TSDB_CODE_SUCCESS;
187,678✔
66
}
67

68
static const char* jkSessionTriggerSlotId     = "slotId";
69
static const char* jkSessionTriggerSessionVal = "sessionVal";
70
static int32_t sessionTriggerToJson(const void* pObj, SJson* pJson) {
65,204✔
71
  const SSessionTrigger* pTrigger = (const SSessionTrigger*)pObj;
65,204✔
72
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
65,204✔
73
    pJson, jkSessionTriggerSlotId, pTrigger->slotId));
74
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
65,204✔
75
    pJson, jkSessionTriggerSessionVal, pTrigger->sessionVal));
76
  return TSDB_CODE_SUCCESS;
65,204✔
77
}
78

79
static int32_t jsonToSessionTrigger(const SJson* pJson, void* pObj) {
15,611✔
80
  SSessionTrigger* pTrigger = (SSessionTrigger*)pObj;
15,611✔
81
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(
15,611✔
82
    pJson, jkSessionTriggerSlotId, &pTrigger->slotId));
83
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
15,611✔
84
    pJson, jkSessionTriggerSessionVal, &pTrigger->sessionVal));
85
  return TSDB_CODE_SUCCESS;
15,611✔
86
}
87

88
static const char* jkStateTriggerSlotId           = "slotId";
89
static const char* jkStateTriggerExtend           = "extend";
90
static const char* jkStateTriggerZeroth           = "zeroth";
91
static const char* jkStateTriggerTrueForDuration  = "trueForDuration";
92
static const char* jkStateTriggerExpr             = "expr";
93
static int32_t stateTriggerToJson(const void* pObj, SJson* pJson) {
240,070✔
94
  const SStateWinTrigger* pTrigger = (const SStateWinTrigger*)pObj;
240,070✔
95
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
240,070✔
96
    pJson, jkStateTriggerSlotId, pTrigger->slotId));
97
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
240,070✔
98
    pJson, jkStateTriggerExtend, pTrigger->extend));
99
  if (NULL != pTrigger->zeroth) {
240,070✔
100
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
×
101
      pJson, jkStateTriggerZeroth, (const char*)pTrigger->zeroth));
102
  }
103
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
240,070✔
104
    pJson, jkStateTriggerTrueForDuration, pTrigger->trueForDuration));
105
  if (NULL != pTrigger->expr) {
240,070✔
106
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
240,070✔
107
      pJson, jkStateTriggerExpr, (const char*)pTrigger->expr));
108
  }
109
  return TSDB_CODE_SUCCESS;
240,070✔
110
}
111

112
static int32_t jsonToStateTrigger(const SJson* pJson, void* pObj) {
75,145✔
113
  SStateWinTrigger* pTrigger = (SStateWinTrigger*)pObj;
75,145✔
114
  TAOS_CHECK_RETURN(
75,145✔
115
    tjsonGetSmallIntValue(pJson, jkStateTriggerSlotId, &pTrigger->slotId));
116
  TAOS_CHECK_RETURN(
75,145✔
117
    tjsonGetSmallIntValue(pJson, jkStateTriggerExtend, &pTrigger->extend));
118
  TAOS_CHECK_RETURN(tjsonDupStringValue(
75,145✔
119
    pJson, jkStateTriggerZeroth, (char**)&pTrigger->zeroth));
120
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
75,145✔
121
    pJson, jkStateTriggerTrueForDuration, &pTrigger->trueForDuration));
122
  TAOS_CHECK_RETURN(tjsonDupStringValue(
75,145✔
123
    pJson, jkStateTriggerExpr, (char**)&pTrigger->expr));
124
  return TSDB_CODE_SUCCESS;
75,145✔
125
}
126

127
static const char* jkSlidingTriggerIntervalUnit = "intervalUnit";
128
static const char* jkSlidingTriggerSlidingUnit  = "slidingUnit";
129
static const char* jkSlidingTriggerOffsetUnit   = "offsetUnit";
130
static const char* jkSlidingTriggerSoffsetUnit  = "soffsetUnit";
131
static const char* jkSlidingTriggerPrecision    = "precision";
132
static const char* jkSlidingTriggerInterval     = "interval";
133
static const char* jkSlidingTriggerOffset       = "offset";
134
static const char* jkSlidingTriggerSliding      = "sliding";
135
static const char* jkSlidingTriggerSoffset      = "soffset";
136
static const char* jkSlidingTriggerOverlap      = "overlap";
137
static int32_t slidingTriggerToJson(const void* pObj, SJson* pJson) {
358,252✔
138
  const SSlidingTrigger* pTrigger = (const SSlidingTrigger*)pObj;
358,252✔
139
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
140
    pJson, jkSlidingTriggerIntervalUnit, pTrigger->intervalUnit));
141
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
142
    pJson, jkSlidingTriggerSlidingUnit, pTrigger->slidingUnit));
143
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
144
    pJson, jkSlidingTriggerOffsetUnit, pTrigger->offsetUnit));
145
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
146
    pJson, jkSlidingTriggerSoffsetUnit, pTrigger->soffsetUnit));
147
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
148
    pJson, jkSlidingTriggerPrecision, pTrigger->precision));
149
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
150
    pJson, jkSlidingTriggerInterval, pTrigger->interval));
151
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
152
    pJson, jkSlidingTriggerSliding, pTrigger->sliding));
153
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
154
    pJson, jkSlidingTriggerOffset, pTrigger->offset));
155
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
156
    pJson, jkSlidingTriggerSoffset, pTrigger->soffset));
157
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
358,252✔
158
    pJson, jkSlidingTriggerOverlap, pTrigger->overlap));
159
  return TSDB_CODE_SUCCESS;
358,252✔
160
}
161

162
static int32_t jsonToSlidingTrigger(const SJson* pJson, void* pObj) {
117,616✔
163
  SSlidingTrigger* pTrigger = (SSlidingTrigger*)pObj;
117,616✔
164
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
117,616✔
165
    pJson, jkSlidingTriggerIntervalUnit, &pTrigger->intervalUnit));
166
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
117,616✔
167
    pJson, jkSlidingTriggerSlidingUnit, &pTrigger->slidingUnit));
168
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
117,616✔
169
    pJson, jkSlidingTriggerOffsetUnit, &pTrigger->offsetUnit));
170
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
117,616✔
171
    pJson, jkSlidingTriggerSoffsetUnit, &pTrigger->soffsetUnit));
172
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
117,616✔
173
    pJson, jkSlidingTriggerPrecision, &pTrigger->precision));
174
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
117,616✔
175
    pJson, jkSlidingTriggerInterval, &pTrigger->interval));
176
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
117,616✔
177
    pJson, jkSlidingTriggerSliding, &pTrigger->sliding));
178
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
117,616✔
179
    pJson, jkSlidingTriggerOffset, &pTrigger->offset));
180
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
117,616✔
181
    pJson, jkSlidingTriggerSoffset, &pTrigger->soffset));
182
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
117,616✔
183
    pJson, jkSlidingTriggerOverlap, &pTrigger->overlap));
184
  return TSDB_CODE_SUCCESS;
117,616✔
185
}
186

187
static const char* jkEventTriggerStartCond       = "startCond";
188
static const char* jkEventTriggerEndCond         = "endCond";
189
static const char* jkEventTriggerTrueForDuration = "trueForDuration";
190
static int32_t eventTriggerToJson(const void* pObj, SJson* pJson) {
140,968✔
191
  const SEventTrigger* pTrigger = (const SEventTrigger*)pObj;
140,968✔
192
  if (NULL != pTrigger->startCond) {
140,968✔
193
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
140,968✔
194
      pJson, jkEventTriggerStartCond, (const char*)pTrigger->startCond));
195
  }
196
  if (NULL != pTrigger->endCond) {
140,968✔
197
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
130,888✔
198
      pJson, jkEventTriggerEndCond, (const char*)pTrigger->endCond));
199
  }
200
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
140,968✔
201
    pJson, jkEventTriggerTrueForDuration, pTrigger->trueForDuration));
202
  return TSDB_CODE_SUCCESS;
140,968✔
203
}
204

205
static int32_t jsonToEventTrigger(const SJson* pJson, void* pObj) {
40,690✔
206
  SEventTrigger* pTrigger = (SEventTrigger*)pObj;
40,690✔
207
  TAOS_CHECK_RETURN(tjsonDupStringValue(
40,690✔
208
    pJson, jkEventTriggerStartCond, (char**)&pTrigger->startCond));
209
  TAOS_CHECK_RETURN(tjsonDupStringValue(
40,690✔
210
    pJson, jkEventTriggerEndCond, (char**)&pTrigger->endCond));
211
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
40,690✔
212
    pJson, jkEventTriggerTrueForDuration, &pTrigger->trueForDuration));
213
  return TSDB_CODE_SUCCESS;
40,690✔
214
}
215

216
static const char* jkCountTriggerCountVal = "countVal";
217
static const char* jkCountTriggerSliding  = "sliding";
218
static const char* jkCountTriggerCondCols = "condCols";
219
static int32_t countTriggerToJson(const void* pObj, SJson* pJson) {
94,122✔
220
  const SCountTrigger* pTrigger = (const SCountTrigger*)pObj;
94,122✔
221
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkCountTriggerCountVal, pTrigger->countVal));
94,122✔
222
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkCountTriggerSliding, pTrigger->sliding));
94,122✔
223
  if (NULL != pTrigger->condCols) {
94,122✔
224
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
×
225
      pJson, jkCountTriggerCondCols, (const char*)pTrigger->condCols));
226
  }
227
  return TSDB_CODE_SUCCESS;
94,122✔
228
}
229

230
static int32_t jsonToCountTrigger(const SJson* pJson, void* pObj) {
19,677✔
231
  SCountTrigger* pTrigger = (SCountTrigger*)pObj;
19,677✔
232
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
19,677✔
233
    pJson, jkCountTriggerCountVal, &pTrigger->countVal));
234
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
19,677✔
235
    pJson, jkCountTriggerSliding, &pTrigger->sliding));
236
  TAOS_CHECK_RETURN(tjsonDupStringValue(
19,677✔
237
    pJson, jkCountTriggerCondCols, (char**)&pTrigger->condCols));
238
  return TSDB_CODE_SUCCESS;
19,677✔
239
}
240

241
static const char* jkPeriodTriggerPeriodUnit = "periodUnit";
242
static const char* jkPeriodTriggerOffsetUnit = "offsetUnit";
243
static const char* jkPeriodTriggerPrecision  = "precision";
244
static const char* jkPeriodTriggerPeriod     = "period";
245
static const char* jkPeriodTriggerOffset     = "offset";
246
static int32_t periodTriggerToJson(const void* pObj, SJson* pJson) {
43,622✔
247
  const SPeriodTrigger* pTrigger = (const SPeriodTrigger*)pObj;
43,622✔
248
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
43,622✔
249
    pJson, jkPeriodTriggerPeriodUnit, pTrigger->periodUnit));
250
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
43,622✔
251
    pJson, jkPeriodTriggerOffsetUnit, pTrigger->offsetUnit));
252
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
43,622✔
253
    pJson, jkPeriodTriggerPrecision, pTrigger->precision));
254
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
43,622✔
255
    pJson, jkPeriodTriggerPeriod, pTrigger->period));
256
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
43,622✔
257
    pJson, jkPeriodTriggerOffset, pTrigger->offset));
258
  return TSDB_CODE_SUCCESS;
43,622✔
259
}
260

261
static int32_t jsonToPeriodTrigger(const SJson* pJson, void* pObj) {
13,549✔
262
  SPeriodTrigger* pTrigger = (SPeriodTrigger*)pObj;
13,549✔
263
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
13,549✔
264
    pJson, jkPeriodTriggerPeriodUnit, &pTrigger->periodUnit));
265
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
13,549✔
266
    pJson, jkPeriodTriggerOffsetUnit, &pTrigger->offsetUnit));
267
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
13,549✔
268
    pJson, jkPeriodTriggerPrecision, &pTrigger->precision));
269
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
13,549✔
270
    pJson, jkPeriodTriggerPeriod, &pTrigger->period));
271
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
13,549✔
272
    pJson, jkPeriodTriggerOffset, &pTrigger->offset));
273
  return TSDB_CODE_SUCCESS;
13,549✔
274
}
275

276
static int32_t int32ToJson(const void* pObj, SJson* pJson) {
1,915,630✔
277
  const int32_t* pInt = (const int32_t*)pObj;
1,915,630✔
278
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "value", *pInt));
1,915,630✔
279
  return TSDB_CODE_SUCCESS;
1,915,630✔
280
}
281

282
static int32_t jsonToInt32(const SJson* pJson, void* pObj) {
583,266✔
283
  int32_t* pInt = (int32_t*)pObj;
583,266✔
284
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, "value", pInt));
583,266✔
285
  return TSDB_CODE_SUCCESS;
583,266✔
286
}
287

288
static int32_t int16ToJson(const void* pObj, SJson* pJson) {
124,100✔
289
  const int16_t* pInt = (const int16_t*)pObj;
124,100✔
290
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "value", *pInt));
124,100✔
291
  return TSDB_CODE_SUCCESS;
124,100✔
292
}
293

294
static int32_t jsonToInt16(const SJson* pJson, void* pObj) {
40,958✔
295
  int16_t* pInt = (int16_t*)pObj;
40,958✔
296
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(pJson, "value", pInt));
40,958✔
297
  return TSDB_CODE_SUCCESS;
40,958✔
298
}
299

300
static const char* jkSstreamCalcScanVgList        = "vgList";
301
static const char* jkSstreamCalcScanReadFromCache = "readFromCache";
302
static const char* jkSstreamCalcScanScanPlan      = "scanPlan";
303
static int32_t calcScanPlanToJson(const void* pObj, SJson* pJson) {
1,915,630✔
304
  const SStreamCalcScan* pPlan = (const SStreamCalcScan*)pObj;
1,915,630✔
305
  TAOS_CHECK_RETURN(tjsonAddArray(
1,915,630✔
306
    pJson, jkSstreamCalcScanVgList, int32ToJson,
307
    pPlan->vgList ? TARRAY_GET_ELEM(pPlan->vgList, 0) : NULL, sizeof(int32_t),
308
    pPlan->vgList ? pPlan->vgList->size : 0));
309
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,915,630✔
310
    pJson, jkSstreamCalcScanReadFromCache, pPlan->readFromCache));
311
  if (NULL != pPlan->scanPlan) {
1,915,630✔
312
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,915,630✔
313
      pJson, jkSstreamCalcScanScanPlan, (const char*)pPlan->scanPlan));
314
  }
315
  return TSDB_CODE_SUCCESS;
1,915,630✔
316
}
317

318
static int32_t jsonToCalcScanPlan(const SJson* pJson, void* pObj) {
583,266✔
319
  SStreamCalcScan* pPlan = (SStreamCalcScan*)pObj;
583,266✔
320
  TAOS_CHECK_RETURN(tjsonToTArray(
583,266✔
321
    pJson, jkSstreamCalcScanVgList, jsonToInt32,
322
    &pPlan->vgList, sizeof(int32_t)));
323
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
583,266✔
324
    pJson, jkSstreamCalcScanReadFromCache, &pPlan->readFromCache));
325
  TAOS_CHECK_RETURN(tjsonDupStringValue(
583,266✔
326
    pJson, jkSstreamCalcScanScanPlan, (char**)&pPlan->scanPlan));
327
  return TSDB_CODE_SUCCESS;
583,266✔
328
}
329

330
static const char* jkSDataTypeType      = "type";
331
static const char* jkSDataTypePrecision = "precision";
332
static const char* jkSDataTypeScale     = "scale";
333
static const char* jkSDataTypeBytes     = "bytes";
334
static int32_t sDataTypeToJson(const void* pObj, SJson* pJson) {
98,908✔
335
  const SDataType* pType = (const SDataType*)pObj;
98,908✔
336
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
98,908✔
337
    pJson, jkSDataTypeType, pType->type));
338
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
98,908✔
339
    pJson, jkSDataTypePrecision, pType->precision));
340
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
98,908✔
341
    pJson, jkSDataTypeScale, pType->scale));
342
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
98,908✔
343
    pJson, jkSDataTypeBytes, pType->bytes));
344
  return TSDB_CODE_SUCCESS;
98,908✔
345
}
346

347
static int32_t jsonToSDataType(const SJson* pJson, void* pObj) {
32,850✔
348
  SDataType* pType = (SDataType*)pObj;
32,850✔
349
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
32,850✔
350
    pJson, jkSDataTypeType, &pType->type));
351
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
32,850✔
352
    pJson, jkSDataTypePrecision, &pType->precision));
353
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
32,850✔
354
    pJson, jkSDataTypeScale, &pType->scale));
355
  TAOS_CHECK_RETURN(tjsonGetIntValue(
32,850✔
356
    pJson, jkSDataTypeBytes, &pType->bytes));
357
  return TSDB_CODE_SUCCESS;
32,850✔
358
}
359

360
static const char* jkSStreamOutColExpr = "expr";
361
static const char* jkSStreamOutColType = "type";
362
static int32_t sStreamOutColToJson(const void* pObj, SJson* pJson) {
98,908✔
363
  const SStreamOutCol* pCol = (const SStreamOutCol*)pObj;
98,908✔
364
  if (NULL != pCol->expr) {
98,908✔
365
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
98,908✔
366
      pJson, jkSStreamOutColExpr, (const char*)pCol->expr));
367
  }
368
  TAOS_CHECK_RETURN(tjsonAddObject(
98,908✔
369
    pJson, jkSStreamOutColType, sDataTypeToJson, &pCol->type));
370
  return TSDB_CODE_SUCCESS;
98,908✔
371
}
372

373
static int32_t jsonToSStreamOutCol(const SJson* pJson, void* pObj) {
32,850✔
374
  SStreamOutCol* pCol = (SStreamOutCol*)pObj;
32,850✔
375
  TAOS_CHECK_RETURN(tjsonDupStringValue(
32,850✔
376
    pJson, jkSStreamOutColExpr, (char**)&pCol->expr));
377
  TAOS_CHECK_RETURN(tjsonToObject(
32,850✔
378
    pJson, jkSStreamOutColType, jsonToSDataType, &pCol->type));
379
  return TSDB_CODE_SUCCESS;
32,850✔
380
}
381

382
static int32_t stringToJson(const void* pObj, SJson* pJson) {
1,228,866✔
383
  const char** pStr = (const char**)pObj;
1,228,866✔
384
  TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson, "value", *pStr));
1,228,866✔
385
  return TSDB_CODE_SUCCESS;
1,228,866✔
386
}
387

388
static int32_t jsonToString(const SJson* pJson, void* pObj) {
375,762✔
389
  char** pStr = (char**)pObj;
375,762✔
390
  TAOS_CHECK_RETURN(tjsonDupStringValue(pJson, "value", pStr));
375,762✔
391
  return TSDB_CODE_SUCCESS;
375,762✔
392
}
393

394
static const char* jkCreateStreamReqName                 = "name";
395
static const char* jkCreateStreamReqStreamId             = "streamId";
396
static const char* jkCreateStreamReqSql                  = "sql";
397

398
static const char* jkCreateStreamReqStreamDB             = "streamDB";
399
static const char* jkCreateStreamReqTriggerDB            = "triggerDB";
400
static const char* jkCreateStreamReqOutDB                = "outDB";
401
static const char* jkCreateStreamReqCalcDB               = "calcDB";
402

403
static const char* jkCreateStreamReqTriggerTblName       = "triggerTblName";
404
static const char* jkCreateStreamReqOutTblName           = "outTblName";
405

406
static const char* jkCreateStreamReqIgExists             = "igExists";
407
static const char* jkCreateStreamReqTriggerType          = "triggerType";
408
static const char* jkCreateStreamReqIgDisorder           = "igDisorder";
409
static const char* jkCreateStreamReqDeleteReCalc         = "deleteReCalc";
410
static const char* jkCreateStreamReqDeleteOutTbl         = "deleteOutTbl";
411
static const char* jkCreateStreamReqFillHistory          = "fillHistory";
412
static const char* jkCreateStreamReqFillHistoryFirst     = "fillHistoryFirst";
413
static const char* jkCreateStreamReqCalcNotifyOnly       = "calcNotifyOnly";
414
static const char* jkCreateStreamReqLowLatencyCalc       = "lowLatencyCalc";
415
static const char* jkCreateStreamReqIgNoDataTrigger      = "igNoDataTrigger";
416

417
static const char* jkCreateStreamReqPNotifyAddrUrls      = "pNotifyAddrUrls";
418
static const char* jkCreateStreamReqNotifyEventTypes     = "notifyEventTypes";
419
static const char* jkCreateStreamReqAddOptions           = "addOptions";
420
static const char* jkCreateStreamReqNotifyHistory        = "notifyHistory";
421

422
static const char* jkCreateStreamReqTriggerFilterCols    = "triggerFilterCols";
423
static const char* jkCreateStreamReqTriggerCols          = "triggerCols";
424
static const char* jkCreateStreamReqPartitionCols        = "partitionCols";
425
static const char* jkCreateStreamReqOutCols              = "outCols";
426
static const char* jkCreateStreamReqOutTags              = "outTags";
427
static const char* jkCreateStreamReqMaxDelay             = "maxDelay";
428
static const char* jkCreateStreamReqFillHistoryStartTime = 
429
  "fillHistoryStartTime";
430
static const char* jkCreateStreamReqWatermark            = "watermark";
431
static const char* jkCreateStreamReqExpiredTime          = "expiredTime";
432
static const char* jkCreateStreamReqTrigger              = "trigger";
433

434
static const char* jkCreateStreamReqTriggerTblType       = "triggerTblType";
435
static const char* jkCreateStreamReqTriggerTblUid        = "triggerTblUid";
436
static const char* jkCreateStreamReqTriggerTblSuid       = "triggerTblSuid";
437
static const char* jkCreateStreamReqTriggerPrec          = "triggerPrec";
438
static const char* jkCreateStreamReqVtableCalc           = "vtableCalc";
439
static const char* jkCreateStreamReqOutTblType           = "outTblType";
440
static const char* jkCreateStreamReqOutStbExists         = "outStbExists";
441
static const char* jkCreateStreamReqOutStbUid            = "outStbUid";
442
static const char* jkCreateStreamReqOutStbSversion       = "outStbSversion";
443
static const char* jkCreateStreamReqEventTypes           = "eventTypes";
444
static const char* jkCreateStreamReqFlags                = "flags";
445
static const char* jkCreateStreamReqTsmaId               = "tsmaId";
446
static const char* jkCreateStreamReqPlaceHolderBitmap    = "placeHolderBitmap";
447
static const char* jkCreateStreamReqCalcTsSlotId         = "calcTsSlotId";
448
static const char* jkCreateStreamReqTriTsSlotId          = "triTsSlotId";
449

450
static const char* jkCreateStreamReqTriggerTblVgId       = "triggerTblVgId";
451
static const char* jkCreateStreamReqOutTblVgId           = "outTblVgId";
452

453
static const char* jkCreateStreamReqTriggerScanPlan      = "triggerScanPlan";
454
static const char* jkCreateStreamReqCalcScanPlanList     = "calcScanPlanList";
455

456
static const char* jkCreateStreamReqTriggerHasPF         = "triggerHasPF";
457
static const char* jkCreateStreamReqTriggerPrevFilter    = "triggerPrevFilter";
458

459
static const char* jkCreateStreamReqNumOfCalcSubplan     = "numOfCalcSubplan";
460
static const char* jkCreateStreamReqCalcPlan             = "calcPlan";
461
static const char* jkCreateStreamReqSubTblNameExpr       = "subTblNameExpr";
462
static const char* jkCreateStreamReqTagValueExpr         = "tagValueExpr";
463
static const char* jkCreateStreamReqForceOutCols         = "forceOutCols";
464

465
static const char* jkCreateStreamReqColCids = "colCids";
466
static const char* jkCreateStreamReqTagCids = "tagCids";
467

468
static int32_t scmCreateStreamReqToJsonImpl(const void* pObj, void* pJson) {
942,238✔
469
  const SCMCreateStreamReq* pReq = (const SCMCreateStreamReq*)pObj;
942,238✔
470
  if (NULL != pReq->name) {
942,238✔
471
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
942,238✔
472
      pJson, jkCreateStreamReqName, pReq->name));
473
  }
474
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
475
    pJson, jkCreateStreamReqStreamId, pReq->streamId));
476
  if (NULL != pReq->sql) {
942,238✔
477
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
942,238✔
478
      pJson, jkCreateStreamReqSql, pReq->sql));
479
  }
480
  if (NULL != pReq->streamDB) {
942,238✔
481
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
942,238✔
482
      pJson, jkCreateStreamReqStreamDB, pReq->streamDB));
483
  }
484
  if (NULL != pReq->triggerDB) {
942,238✔
485
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
936,714✔
486
      pJson, jkCreateStreamReqTriggerDB, pReq->triggerDB));
487
  }
488
  if (NULL != pReq->outDB) {
942,238✔
489
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
924,142✔
490
      pJson, jkCreateStreamReqOutDB, pReq->outDB));
491
  }
492
  TAOS_CHECK_RETURN(tjsonAddArray(
942,238✔
493
    pJson, jkCreateStreamReqCalcDB, stringToJson,
494
    pReq->calcDB ? TARRAY_GET_ELEM(pReq->calcDB, 0) : NULL,
495
    pReq->calcDB ? pReq->calcDB->elemSize : 0,
496
    pReq->calcDB ? pReq->calcDB->size : 0));
497
  if (NULL != pReq->triggerTblName) {
942,238✔
498
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
936,714✔
499
      pJson, jkCreateStreamReqTriggerTblName, pReq->triggerTblName));
500
  }
501
  if (NULL != pReq->outTblName) {
942,238✔
502
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
924,142✔
503
      pJson, jkCreateStreamReqOutTblName, pReq->outTblName));
504
  }
505
  // trigger contol part
506
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
507
    pJson, jkCreateStreamReqIgExists, pReq->igExists));
508
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
509
    pJson, jkCreateStreamReqTriggerType, pReq->triggerType));
510
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
511
    pJson, jkCreateStreamReqIgDisorder, pReq->igDisorder));
512
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
513
    pJson, jkCreateStreamReqDeleteReCalc, pReq->deleteReCalc));
514
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
515
    pJson, jkCreateStreamReqDeleteOutTbl, pReq->deleteOutTbl));
516
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
517
    pJson, jkCreateStreamReqFillHistory, pReq->fillHistory));
518
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
519
    pJson, jkCreateStreamReqFillHistoryFirst, pReq->fillHistoryFirst));
520
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
521
    pJson, jkCreateStreamReqCalcNotifyOnly, pReq->calcNotifyOnly));
522
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
523
    pJson, jkCreateStreamReqLowLatencyCalc, pReq->lowLatencyCalc));
524
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
525
    pJson, jkCreateStreamReqIgNoDataTrigger, pReq->igNoDataTrigger));
526

527
  // notify part
528
  TAOS_CHECK_RETURN(tjsonAddArray(
942,238✔
529
    pJson, jkCreateStreamReqPNotifyAddrUrls, stringToJson,
530
    pReq->pNotifyAddrUrls ? TARRAY_GET_ELEM(pReq->pNotifyAddrUrls, 0) : NULL,
531
    pReq->pNotifyAddrUrls ? pReq->pNotifyAddrUrls->elemSize : 0,
532
    pReq->pNotifyAddrUrls ? pReq->pNotifyAddrUrls->size : 0));
533
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
534
    pJson, jkCreateStreamReqNotifyEventTypes, pReq->notifyEventTypes));
535
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
536
    pJson, jkCreateStreamReqAddOptions, pReq->addOptions));
537
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
538
    pJson, jkCreateStreamReqNotifyHistory, pReq->notifyHistory));
539

540
  // out table part
541
  // trigger cols and partition cols
542
  if (NULL != pReq->triggerFilterCols) {
942,238✔
543
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
130,552✔
544
      pJson, jkCreateStreamReqTriggerFilterCols,
545
      (const char*)pReq->triggerFilterCols));
546
  }
547
  if (NULL != pReq->triggerCols) {
942,238✔
548
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
898,616✔
549
      pJson, jkCreateStreamReqTriggerCols, (const char*)pReq->triggerCols));
550
  }
551
  if (NULL != pReq->partitionCols) {
942,238✔
552
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
440,898✔
553
      pJson, jkCreateStreamReqPartitionCols, (const char*)pReq->partitionCols));
554
  }
555

556
  // out cols
557
  TAOS_CHECK_RETURN(tjsonAddArray(
942,238✔
558
    pJson, jkCreateStreamReqOutCols, sfieldWithOptionsToJson,
559
    pReq->outCols ? TARRAY_GET_ELEM(pReq->outCols, 0) : NULL,
560
    pReq->outCols ? pReq->outCols->elemSize : 0,
561
    pReq->outCols ? pReq->outCols->size : 0));
562
  // out tags
563
  TAOS_CHECK_RETURN(tjsonAddArray(
942,238✔
564
    pJson, jkCreateStreamReqOutTags, stagFieldWithOptionsToJson,
565
    pReq->outTags ? TARRAY_GET_ELEM(pReq->outTags, 0) : NULL,
566
    pReq->outTags ? pReq->outTags->elemSize : 0,
567
    pReq->outTags ? pReq->outTags->size : 0));
568
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
569
    pJson, jkCreateStreamReqMaxDelay, pReq->maxDelay));
570
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
571
    pJson, jkCreateStreamReqFillHistoryStartTime, pReq->fillHistoryStartTime));
572
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
573
    pJson, jkCreateStreamReqWatermark, pReq->watermark));
574
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
575
    pJson, jkCreateStreamReqExpiredTime, pReq->expiredTime));
576
  // trigger
577
  switch (pReq->triggerType) {
942,238✔
578
    case WINDOW_TYPE_SESSION:
65,204✔
579
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
65,204✔
580
        sessionTriggerToJson, &pReq->trigger));
581
      break;
65,204✔
582

583
    case WINDOW_TYPE_STATE:
240,070✔
584
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
240,070✔
585
        stateTriggerToJson, &pReq->trigger));
586
      break;
240,070✔
587

588
    case WINDOW_TYPE_INTERVAL:
358,252✔
589
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
358,252✔
590
        slidingTriggerToJson, &pReq->trigger));
591
      break;
358,252✔
592

593
    case WINDOW_TYPE_EVENT:
140,968✔
594
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
140,968✔
595
        eventTriggerToJson, &pReq->trigger));
596
      break;
140,968✔
597

598
    case WINDOW_TYPE_COUNT:
94,122✔
599
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
94,122✔
600
        countTriggerToJson, &pReq->trigger));
601
      break;
94,122✔
602

603
    case WINDOW_TYPE_PERIOD:
43,622✔
604
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
43,622✔
605
        periodTriggerToJson, &pReq->trigger));
606
      break;
43,622✔
607

UNCOV
608
  default:
×
UNCOV
609
    return TSDB_CODE_STREAM_INVALID_TRIGGER;
×
610
  }
611

612
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
613
    pJson, jkCreateStreamReqTriggerTblType, pReq->triggerTblType));
614
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
615
    pJson, jkCreateStreamReqTriggerTblUid, pReq->triggerTblUid));
616
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
617
    pJson, jkCreateStreamReqTriggerTblSuid, pReq->triggerTblSuid));
618
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
619
    pJson, jkCreateStreamReqTriggerPrec, pReq->triggerPrec));
620
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
621
    pJson, jkCreateStreamReqVtableCalc, pReq->vtableCalc));
622
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
623
    pJson, jkCreateStreamReqOutTblType, pReq->outTblType));
624
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
625
    pJson, jkCreateStreamReqOutStbExists, pReq->outStbExists));
626
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
627
    pJson, jkCreateStreamReqOutStbUid, pReq->outStbUid));
628
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
629
    pJson, jkCreateStreamReqOutStbSversion, pReq->outStbSversion));
630
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
631
    pJson, jkCreateStreamReqEventTypes, pReq->eventTypes));
632
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
633
    pJson, jkCreateStreamReqFlags, pReq->flags));
634
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
635
    pJson, jkCreateStreamReqTsmaId, pReq->tsmaId));
636
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
637
    pJson, jkCreateStreamReqPlaceHolderBitmap, pReq->placeHolderBitmap));
638
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
639
    pJson, jkCreateStreamReqCalcTsSlotId, pReq->calcTsSlotId));
640
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
641
    pJson, jkCreateStreamReqTriTsSlotId, pReq->triTsSlotId));
642
  
643
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
644
    pJson, jkCreateStreamReqTriggerTblVgId, pReq->triggerTblVgId));
645
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
646
    pJson, jkCreateStreamReqOutTblVgId, pReq->outTblVgId));
647

648
  if (NULL != pReq->triggerScanPlan) {
942,238✔
649
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
936,714✔
650
      pJson, jkCreateStreamReqTriggerScanPlan, (const char*)pReq->triggerScanPlan));
651
  }
652
  TAOS_CHECK_RETURN(tjsonAddArray(
942,238✔
653
    pJson, jkCreateStreamReqCalcScanPlanList, calcScanPlanToJson,
654
    pReq->calcScanPlanList ? TARRAY_GET_ELEM(pReq->calcScanPlanList, 0) : NULL,
655
    pReq->calcScanPlanList ? pReq->calcScanPlanList->elemSize : 0,
656
    pReq->calcScanPlanList ? pReq->calcScanPlanList->size : 0));
657

658
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
659
    pJson, jkCreateStreamReqTriggerHasPF, pReq->triggerHasPF));
660
  if (NULL != pReq->triggerPrevFilter) {
942,238✔
661
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
51,564✔
662
      pJson, jkCreateStreamReqTriggerPrevFilter,
663
      (const char*)pReq->triggerPrevFilter));
664
  }
665

666
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
942,238✔
667
    pJson, jkCreateStreamReqNumOfCalcSubplan, pReq->numOfCalcSubplan));
668
  if (NULL != pReq->calcPlan) {
942,238✔
669
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
924,142✔
670
      pJson, jkCreateStreamReqCalcPlan, (const char*)pReq->calcPlan));
671
  }
672
  if (NULL != pReq->subTblNameExpr) {
942,238✔
673
    TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson,
440,898✔
674
      jkCreateStreamReqSubTblNameExpr, (const char*)pReq->subTblNameExpr));
675
  }
676
  if (NULL != pReq->tagValueExpr) {
942,238✔
677
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
440,898✔
678
      pJson, jkCreateStreamReqTagValueExpr, (const char*)pReq->tagValueExpr));
679
  }
680
  TAOS_CHECK_RETURN(tjsonAddArray(
942,238✔
681
    pJson, jkCreateStreamReqForceOutCols, sStreamOutColToJson,
682
    pReq->forceOutCols ? TARRAY_GET_ELEM(pReq->forceOutCols, 0) : NULL,
683
    pReq->forceOutCols ? pReq->forceOutCols->elemSize : 0,
684
    pReq->forceOutCols ? pReq->forceOutCols->size : 0));
685
  TAOS_CHECK_RETURN(tjsonAddArray(
942,238✔
686
      pJson, jkCreateStreamReqColCids, int16ToJson,
687
      pReq->colCids ? TARRAY_GET_ELEM(pReq->colCids, 0) : NULL,
688
      pReq->colCids ? pReq->colCids->elemSize : 0,
689
      pReq->colCids ? pReq->colCids->size : 0));
690
  TAOS_CHECK_RETURN(tjsonAddArray(
942,238✔
691
      pJson, jkCreateStreamReqTagCids, int16ToJson,
692
      pReq->tagCids ? TARRAY_GET_ELEM(pReq->tagCids, 0) : NULL,
693
      pReq->tagCids ? pReq->tagCids->elemSize : 0,
694
      pReq->tagCids ? pReq->tagCids->size : 0));
695

696
  return TSDB_CODE_SUCCESS;
942,238✔
697
}
698

699
int32_t scmCreateStreamReqToJson(
942,238✔
700
  const SCMCreateStreamReq* pReq, bool format, char** ppStr, int32_t* pStrLen) {
701
  int32_t code = TSDB_CODE_SUCCESS;
942,238✔
702
  int32_t lino = 0;
942,238✔
703
  int64_t streamId = pReq ? pReq->streamId : -1;
942,238✔
704
  TSDB_CHECK_NULL(pReq, code, lino, _end, TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
942,238✔
705
  TSDB_CHECK_NULL(ppStr, code, lino, _end, TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
942,238✔
706
  TSDB_CHECK_NULL(
942,238✔
707
    pStrLen, code, lino, _end, TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
708

709
  SJson* pJson = tjsonCreateObject();
942,238✔
710
  TSDB_CHECK_NULL(pJson, code, lino, _end, terrno);
942,238✔
711
  TSDB_CHECK_CODE(scmCreateStreamReqToJsonImpl(pReq, pJson), lino, _end);
942,238✔
712

713
  if (TSDB_CODE_SUCCESS == code) {
942,238✔
714
    *ppStr = format ? tjsonToString(pJson) : tjsonToUnformattedString(pJson);
942,238✔
715
    if (*ppStr == NULL) {
942,238✔
UNCOV
716
      code = terrno;
×
717
    } else {
718
      *pStrLen = strlen(*ppStr);
942,238✔
719
    }
720
  }
721

UNCOV
722
_end:
×
723
  if (TSDB_CODE_SUCCESS != code) {
942,238✔
UNCOV
724
    uError(
×
725
      "failed to convert SCMCreateStreamReq to json, lino: %d, since %s",
726
      lino, tstrerror(code));
727
  }
728
  tjsonDelete(pJson);
942,238✔
729
  return code;
942,238✔
730
}
731

732
int32_t jsonToSCMCreateStreamReq(const void* pJson, void* pObj) {
282,288✔
733
  SCMCreateStreamReq* pReq = (SCMCreateStreamReq*)pObj;
282,288✔
734
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
735
    pJson, jkCreateStreamReqName, (char**)&pReq->name));
736
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
737
    pJson, jkCreateStreamReqStreamId, &pReq->streamId));
738
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
739
    pJson, jkCreateStreamReqSql, (char**)&pReq->sql));
740

741
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
742
    pJson, jkCreateStreamReqStreamDB, (char**)&pReq->streamDB));
743
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
744
    pJson, jkCreateStreamReqTriggerDB, (char**)&pReq->triggerDB));
745
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
746
    pJson, jkCreateStreamReqOutDB, (char**)&pReq->outDB));
747
  TAOS_CHECK_RETURN(tjsonToTArray(
282,288✔
748
    pJson, jkCreateStreamReqCalcDB, jsonToString,
749
    &pReq->calcDB, POINTER_BYTES));
750
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
751
    pJson, jkCreateStreamReqTriggerTblName, (char**)&pReq->triggerTblName));
752
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
753
    pJson, jkCreateStreamReqOutTblName, (char**)&pReq->outTblName));
754

755
  // trigger control part
756
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
757
    pJson, jkCreateStreamReqIgExists, &pReq->igExists));
758
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
759
    pJson, jkCreateStreamReqTriggerType, &pReq->triggerType));
760
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
761
    pJson, jkCreateStreamReqIgDisorder, &pReq->igDisorder));
762
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
763
    pJson, jkCreateStreamReqDeleteReCalc, &pReq->deleteReCalc));
764
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
765
    pJson, jkCreateStreamReqDeleteOutTbl, &pReq->deleteOutTbl));
766
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
767
    pJson, jkCreateStreamReqFillHistory, &pReq->fillHistory));
768
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
769
    pJson, jkCreateStreamReqFillHistoryFirst, &pReq->fillHistoryFirst));
770
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
771
    pJson, jkCreateStreamReqCalcNotifyOnly, &pReq->calcNotifyOnly));
772
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
773
    pJson, jkCreateStreamReqLowLatencyCalc, &pReq->lowLatencyCalc));
774
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
775
    pJson, jkCreateStreamReqIgNoDataTrigger, &pReq->igNoDataTrigger));
776

777
  // notify part
778
  TAOS_CHECK_RETURN(tjsonToTArray(
282,288✔
779
    pJson, jkCreateStreamReqPNotifyAddrUrls, jsonToString,
780
    &pReq->pNotifyAddrUrls, POINTER_BYTES));
781
  TAOS_CHECK_RETURN(tjsonGetIntValue(
282,288✔
782
    pJson, jkCreateStreamReqNotifyEventTypes, &pReq->notifyEventTypes));
783
  TAOS_CHECK_RETURN(tjsonGetIntValue(
282,288✔
784
    pJson, jkCreateStreamReqAddOptions, &pReq->addOptions));
785
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
786
    pJson, jkCreateStreamReqNotifyHistory, &pReq->notifyHistory));
787

788
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
789
    pJson, jkCreateStreamReqTriggerFilterCols,
790
    (char**)&pReq->triggerFilterCols));
791
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
792
    pJson, jkCreateStreamReqTriggerCols, (char**)&pReq->triggerCols));
793
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
794
    pJson, jkCreateStreamReqPartitionCols, (char**)&pReq->partitionCols));
795
  // out cols
796
  TAOS_CHECK_RETURN(tjsonToTArray(
282,288✔
797
    pJson, jkCreateStreamReqOutCols, jsonToSFieldWithOptions,
798
    &pReq->outCols, sizeof(SFieldWithOptions)));
799
  // out tags
800
  TAOS_CHECK_RETURN(tjsonToTArray(
282,288✔
801
    pJson, jkCreateStreamReqOutTags, jsonToSTagFieldWithOptions,
802
    &pReq->outTags, sizeof(SFieldWithOptions)));
803
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
804
    pJson, jkCreateStreamReqMaxDelay, &pReq->maxDelay));
805
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
806
    pJson, jkCreateStreamReqFillHistoryStartTime, &pReq->fillHistoryStartTime));
807
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
808
    pJson, jkCreateStreamReqWatermark, &pReq->watermark));
809
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
810
    pJson, jkCreateStreamReqExpiredTime, &pReq->expiredTime));
811
  // trigger
812
  switch (pReq->triggerType) {
282,288✔
813
    case WINDOW_TYPE_SESSION:
15,611✔
814
      TAOS_CHECK_RETURN(tjsonToObject(
15,611✔
815
        pJson, jkCreateStreamReqTrigger, jsonToSessionTrigger, &pReq->trigger));
816
      break;
15,611✔
817
    
818
    case WINDOW_TYPE_STATE:
75,145✔
819
      TAOS_CHECK_RETURN(tjsonToObject(
75,145✔
820
        pJson, jkCreateStreamReqTrigger, jsonToStateTrigger, &pReq->trigger));
821
      break;
75,145✔
822

823
    case WINDOW_TYPE_INTERVAL:
117,616✔
824
      TAOS_CHECK_RETURN(tjsonToObject(
117,616✔
825
        pJson, jkCreateStreamReqTrigger, jsonToSlidingTrigger, &pReq->trigger));
826
      break;
117,616✔
827
    
828
    case WINDOW_TYPE_EVENT:
40,690✔
829
      TAOS_CHECK_RETURN(tjsonToObject(
40,690✔
830
        pJson, jkCreateStreamReqTrigger, jsonToEventTrigger, &pReq->trigger));
831
      break;
40,690✔
832
    
833
    case WINDOW_TYPE_COUNT:
19,677✔
834
      TAOS_CHECK_RETURN(tjsonToObject(
19,677✔
835
        pJson, jkCreateStreamReqTrigger, jsonToCountTrigger, &pReq->trigger));
836
      break;
19,677✔
837
    
838
    case WINDOW_TYPE_PERIOD:
13,549✔
839
      TAOS_CHECK_RETURN(tjsonToObject(
13,549✔
840
        pJson, jkCreateStreamReqTrigger, jsonToPeriodTrigger, &pReq->trigger));
841
      break;
13,549✔
842
    
UNCOV
843
    default:
×
UNCOV
844
      return TSDB_CODE_STREAM_INVALID_TRIGGER;
×
845
  }
846

847
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
848
    pJson, jkCreateStreamReqTriggerTblType, &pReq->triggerTblType));
849
  TAOS_CHECK_RETURN(tjsonGetUBigIntValue(
282,288✔
850
    pJson, jkCreateStreamReqTriggerTblUid, &pReq->triggerTblUid));
851
  TAOS_CHECK_RETURN(tjsonGetUBigIntValue(
282,288✔
852
    pJson, jkCreateStreamReqTriggerTblSuid, &pReq->triggerTblSuid));
853
  TAOS_CHECK_RETURN(tjsonGetUTinyIntValue(
282,288✔
854
    pJson, jkCreateStreamReqTriggerPrec, &pReq->triggerPrec));
855
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
856
    pJson, jkCreateStreamReqVtableCalc, &pReq->vtableCalc));
857
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
858
    pJson, jkCreateStreamReqOutTblType, &pReq->outTblType));
859
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
860
    pJson, jkCreateStreamReqOutStbExists, &pReq->outStbExists));
861
  TAOS_CHECK_RETURN(tjsonGetUBigIntValue(
282,288✔
862
    pJson, jkCreateStreamReqOutStbUid, &pReq->outStbUid));
863
  TAOS_CHECK_RETURN(tjsonGetIntValue(
282,288✔
864
    pJson, jkCreateStreamReqOutStbSversion, &pReq->outStbSversion));
865
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
866
    pJson, jkCreateStreamReqEventTypes, &pReq->eventTypes));
867
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
868
    pJson, jkCreateStreamReqFlags, &pReq->flags));
869
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
870
    pJson, jkCreateStreamReqTsmaId, &pReq->tsmaId));
871
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
282,288✔
872
    pJson, jkCreateStreamReqPlaceHolderBitmap, &pReq->placeHolderBitmap));
873
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(
282,288✔
874
    pJson, jkCreateStreamReqCalcTsSlotId, &pReq->calcTsSlotId));
875
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(
282,288✔
876
    pJson, jkCreateStreamReqTriTsSlotId, &pReq->triTsSlotId));
877

878
  TAOS_CHECK_RETURN(tjsonGetIntValue(
282,288✔
879
    pJson, jkCreateStreamReqTriggerTblVgId, &pReq->triggerTblVgId));
880
  TAOS_CHECK_RETURN(tjsonGetIntValue(
282,288✔
881
    pJson, jkCreateStreamReqOutTblVgId, &pReq->outTblVgId));
882

883
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
884
    pJson, jkCreateStreamReqTriggerScanPlan, (char**)&pReq->triggerScanPlan));
885
  TAOS_CHECK_RETURN(tjsonToTArray(
282,288✔
886
    pJson, jkCreateStreamReqCalcScanPlanList, jsonToCalcScanPlan,
887
    &pReq->calcScanPlanList, sizeof(SStreamCalcScan)));
888

889
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
282,288✔
890
    pJson, jkCreateStreamReqTriggerHasPF, &pReq->triggerHasPF));
891
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
892
    pJson, jkCreateStreamReqTriggerPrevFilter,
893
    (char**)&pReq->triggerPrevFilter));
894
  TAOS_CHECK_RETURN(tjsonGetIntValue(
282,288✔
895
    pJson, jkCreateStreamReqNumOfCalcSubplan, &pReq->numOfCalcSubplan));
896
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
897
    pJson, jkCreateStreamReqCalcPlan, (char**)&pReq->calcPlan));
898
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
899
    pJson, jkCreateStreamReqSubTblNameExpr, (char**)&pReq->subTblNameExpr));
900
  TAOS_CHECK_RETURN(tjsonDupStringValue(
282,288✔
901
    pJson, jkCreateStreamReqTagValueExpr, (char**)&pReq->tagValueExpr));
902
  TAOS_CHECK_RETURN(tjsonToTArray(
282,288✔
903
    pJson, jkCreateStreamReqForceOutCols,
904
    jsonToSStreamOutCol, &pReq->forceOutCols, sizeof(SStreamOutCol)));
905
  TAOS_CHECK_RETURN(tjsonToTArray(pJson, jkCreateStreamReqColCids, jsonToInt16, &pReq->colCids, sizeof(int16_t)));
282,288✔
906
  TAOS_CHECK_RETURN(tjsonToTArray(pJson, jkCreateStreamReqTagCids, jsonToInt16, &pReq->tagCids, sizeof(int16_t)));
282,288✔
907

908
  return TSDB_CODE_SUCCESS;
282,288✔
909
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc