• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.93
/source/client/src/clientSml.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <ctype.h>
17
#include <stdio.h>
18
#include <stdlib.h>
19
#include <string.h>
20

21
#include "clientSml.h"
22

23
#define RETURN_FALSE                                 \
24
  smlBuildInvalidDataMsg(msg, "invalid data", pVal); \
25
  return false;
26

27
#define SET_DOUBLE                     \
28
  kvVal->type = TSDB_DATA_TYPE_DOUBLE; \
29
  kvVal->d = result;
30

31
#define SET_FLOAT                                                                              \
32
  if (!IS_VALID_FLOAT(result)) {                                                               \
33
    smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal); \
34
    return false;                                                                              \
35
  }                                                                                            \
36
  kvVal->type = TSDB_DATA_TYPE_FLOAT;                                                          \
37
  kvVal->f = (float)result;
38

39
#define SET_BIGINT                                                                                       \
40
  SET_ERRNO(0);                                                                                             \
41
  int64_t tmp = taosStr2Int64(pVal, &endptr, 10);                                                        \
42
  if (ERRNO == ERANGE) {                                                                                 \
43
    smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal); \
44
    return false;                                                                                        \
45
  }                                                                                                      \
46
  kvVal->type = TSDB_DATA_TYPE_BIGINT;                                                                   \
47
  kvVal->i = tmp;
48

49
#define SET_INT                                                                    \
50
  if (!IS_VALID_INT(result)) {                                                     \
51
    smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal); \
52
    return false;                                                                  \
53
  }                                                                                \
54
  kvVal->type = TSDB_DATA_TYPE_INT;                                                \
55
  kvVal->i = result;
56

57
#define SET_SMALL_INT                                                          \
58
  if (!IS_VALID_SMALLINT(result)) {                                            \
59
    smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal); \
60
    return false;                                                              \
61
  }                                                                            \
62
  kvVal->type = TSDB_DATA_TYPE_SMALLINT;                                       \
63
  kvVal->i = result;
64

65
#define SET_UBIGINT                                                                             \
66
  SET_ERRNO(0);                                                                                    \
67
  uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);                                             \
68
  if (ERRNO == ERANGE || result < 0) {                                                          \
69
    smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal); \
70
    return false;                                                                               \
71
  }                                                                                             \
72
  kvVal->type = TSDB_DATA_TYPE_UBIGINT;                                                         \
73
  kvVal->u = tmp;
74

75
#define SET_UINT                                                                  \
76
  if (!IS_VALID_UINT(result)) {                                                   \
77
    smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal); \
78
    return false;                                                                 \
79
  }                                                                               \
80
  kvVal->type = TSDB_DATA_TYPE_UINT;                                              \
81
  kvVal->u = result;
82

83
#define SET_USMALL_INT                                                            \
84
  if (!IS_VALID_USMALLINT(result)) {                                              \
85
    smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal); \
86
    return false;                                                                 \
87
  }                                                                               \
88
  kvVal->type = TSDB_DATA_TYPE_USMALLINT;                                         \
89
  kvVal->u = result;
90

91
#define SET_TINYINT                                                       \
92
  if (!IS_VALID_TINYINT(result)) {                                        \
93
    smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal); \
94
    return false;                                                         \
95
  }                                                                       \
96
  kvVal->type = TSDB_DATA_TYPE_TINYINT;                                   \
97
  kvVal->i = result;
98

99
#define SET_UTINYINT                                                            \
100
  if (!IS_VALID_UTINYINT(result)) {                                             \
101
    smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal); \
102
    return false;                                                               \
103
  }                                                                             \
104
  kvVal->type = TSDB_DATA_TYPE_UTINYINT;                                        \
105
  kvVal->u = result;
106

107
#define IS_COMMENT(protocol,data)                             \
108
  (protocol == TSDB_SML_LINE_PROTOCOL && data == '#')
109

110
int64_t smlToMilli[] = {3600000LL, 60000LL, 1000LL};
111
int64_t smlFactorNS[] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
112
int64_t smlFactorS[] = {1000LL, 1000000LL, 1000000000LL};
113

114
static int32_t smlCheckAuth(SSmlHandle *info, SRequestConnInfo *conn, const char *pTabName, AUTH_TYPE type) {
1,869✔
115
  SUserAuthInfo pAuth = {0};
1,869✔
116
  (void)snprintf(pAuth.user, sizeof(pAuth.user), "%s", info->taos->user);
1,869✔
117
  if (NULL == pTabName) {
1,869✔
118
    if (tNameSetDbName(&pAuth.tbName, info->taos->acctId, info->pRequest->pDb, strlen(info->pRequest->pDb)) != 0) {
351!
119
      return TSDB_CODE_SML_INVALID_DATA;
×
120
    }
121
  } else {
122
    toName(info->taos->acctId, info->pRequest->pDb, pTabName, &pAuth.tbName);
1,518✔
123
  }
124
  pAuth.type = type;
1,867✔
125

126
  int32_t      code = TSDB_CODE_SUCCESS;
1,867✔
127
  SUserAuthRes authRes = {0};
1,867✔
128

129
  code = catalogChkAuth(info->pCatalog, conn, &pAuth, &authRes);
1,867✔
130
  nodesDestroyNode(authRes.pCond[AUTH_RES_BASIC]);
1,870✔
131

132
  return (code == TSDB_CODE_SUCCESS)
133
             ? (authRes.pass[AUTH_RES_BASIC] ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED)
1,866✔
134
             : code;
1,866!
135
}
136

137
void smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
104✔
138
  if (pBuf->buf == NULL) {
104✔
139
    return;
94✔
140
  }
141
  pBuf->buf[0] = 0;
10✔
142
  if (msg1) {
10!
143
    (void)strncat(pBuf->buf, msg1, pBuf->len - 1);
10✔
144
  }
145
  int32_t left = pBuf->len - strlen(pBuf->buf);
10✔
146
  if (left > 2 && msg2) {
10!
147
    (void)strncat(pBuf->buf, ":", left - 1);
9✔
148
    (void)strncat(pBuf->buf, msg2, left - 2);
9✔
149
  }
150
}
151

152
int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
99,957✔
153
  char   *endPtr = NULL;
99,957✔
154
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
99,957✔
155
  if (unlikely(value + len != endPtr)) {
99,850✔
156
    return -1;
2✔
157
  }
158

159
  if (unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)) {
99,848✔
160
    int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
57✔
161
    if (tsInt64 != 0 && unit > INT64_MAX / tsInt64) {
57!
162
      return -1;
1✔
163
    }
164
    tsInt64 *= unit;
56✔
165
    fromPrecision = TSDB_TIME_PRECISION_MILLI;
56✔
166
  }
167

168
  return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
99,847✔
169
}
170

171
int32_t smlBuildTableInfo(int numRows, const char *measure, int32_t measureLen, SSmlTableInfo **tInfo) {
1,614✔
172
  int32_t code = 0;
1,614✔
173
  int32_t lino = 0;
1,614✔
174
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
1,614!
175
  SML_CHECK_NULL(tag)
1,617!
176

177
  tag->sTableName = measure;
1,617✔
178
  tag->sTableNameLen = measureLen;
1,617✔
179

180
  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
1,617✔
181
  SML_CHECK_NULL(tag->cols)
1,616!
182
  *tInfo = tag;
1,616✔
183
  return code;
1,616✔
184

185
END:
×
186
  taosMemoryFree(tag);
×
187
  uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino);
×
188
  return code;
×
189
}
190

191
void smlBuildTsKv(SSmlKv *kv, int64_t ts) {
101,412✔
192
  kv->key = tsSmlTsDefaultName;
101,412✔
193
  kv->keyLen = strlen(tsSmlTsDefaultName);
101,412✔
194
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
101,412✔
195
  kv->i = ts;
101,412✔
196
  kv->length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
101,412✔
197
}
101,412✔
198

199
static void smlDestroySTableMeta(void *para) {
1,513✔
200
  if (para == NULL) {
1,513!
201
    return;
×
202
  }
203
  SSmlSTableMeta *meta = *(SSmlSTableMeta **)para;
1,513✔
204
  if (meta == NULL) {
1,513✔
205
    return;
349✔
206
  }
207
  taosHashCleanup(meta->tagHash);
1,164✔
208
  taosHashCleanup(meta->colHash);
1,163✔
209
  taosArrayDestroy(meta->tags);
1,163✔
210
  taosArrayDestroy(meta->cols);
1,164✔
211
  taosMemoryFreeClear(meta->tableMeta);
1,163!
212
  taosMemoryFree(meta);
1,163!
213
}
214

215
int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta **sMeta) {
1,015✔
216
  int32_t     code = TSDB_CODE_SUCCESS;
1,015✔
217
  int32_t     lino = 0;
1,015✔
218
  STableMeta *pTableMeta = NULL;
1,015✔
219

220
  int   measureLen = currElement->measureLen;
1,015✔
221
  char *measure = (char *)taosMemoryMalloc(measureLen);
1,015!
222
  SML_CHECK_NULL(measure);
1,015!
223
  (void)memcpy(measure, currElement->measure, measureLen);
1,015✔
224
  if (currElement->measureEscaped) {
1,015✔
225
    PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
54!
226
  }
227
  smlStrReplace(measure, measureLen);
1,015✔
228
  code = smlGetMeta(info, measure, measureLen, &pTableMeta);
1,015✔
229
  taosMemoryFree(measure);
1,018!
230
  if (code != TSDB_CODE_SUCCESS) {
1,017✔
231
    info->dataFormat = false;
349✔
232
    info->reRun = true;
349✔
233
    goto END;
349✔
234
  }
235
  SML_CHECK_CODE(smlBuildSTableMeta(info->dataFormat, sMeta));
668!
236
  for (int i = 0; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++) {
8,049✔
237
    SSchema *col = pTableMeta->schema + i;
7,380✔
238
    SSmlKv   kv = {.key = col->name, .keyLen = strlen(col->name), .type = col->type};
7,380✔
239
    if (col->type == TSDB_DATA_TYPE_NCHAR) {
7,380✔
240
      kv.length = (col->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
2,196✔
241
    } else if (col->type == TSDB_DATA_TYPE_BINARY || col->type == TSDB_DATA_TYPE_GEOMETRY ||
5,184✔
242
               col->type == TSDB_DATA_TYPE_VARBINARY) {
4,259✔
243
      kv.length = col->bytes - VARSTR_HEADER_SIZE;
937✔
244
    } else {
245
      kv.length = col->bytes;
4,247✔
246
    }
247

248
    if (i < pTableMeta->tableInfo.numOfColumns) {
7,380✔
249
      SML_CHECK_NULL(taosArrayPush((*sMeta)->cols, &kv));
3,914!
250
    } else {
251
      SML_CHECK_NULL(taosArrayPush((*sMeta)->tags, &kv));
10,845!
252
    }
253
  }
254
  SML_CHECK_CODE(taosHashPut(info->superTables, currElement->measure, currElement->measureLen, sMeta, POINTER_BYTES));
669!
255
  (*sMeta)->tableMeta = pTableMeta;
670✔
256
  return code;
670✔
257

258
END:
349✔
259
  smlDestroySTableMeta(sMeta);
349✔
260
  taosMemoryFreeClear(pTableMeta);
349!
261
  RETURN
349!
262
}
263

264
bool isSmlColAligned(SSmlHandle *info, int cnt, SSmlKv *kv) {
87✔
265
  // cnt begin 0, add ts so + 2
266
  if (unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)) {
87!
267
    goto END;
×
268
  }
269
  // bind data
270
  int32_t ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kv, cnt + 1, info->taos->optionInfo.charsetCxt);
87✔
271
  if (unlikely(ret != TSDB_CODE_SUCCESS)) {
88✔
272
    uDebug("smlBuildCol error, retry");
6!
273
    goto END;
6✔
274
  }
275
  if (cnt >= taosArrayGetSize(info->maxColKVs)) {
82!
276
    goto END;
×
277
  }
278
  SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxColKVs, cnt);
82✔
279
  if (maxKV == NULL) {
82!
280
    goto END;
×
281
  }
282
  if (unlikely(!IS_SAME_KEY)) {
82!
283
    goto END;
83✔
284
  }
285

286
  if (unlikely(IS_VAR_DATA_TYPE(kv->type) && kv->length > maxKV->length)) {
×
287
    maxKV->length = kv->length;
×
288
    info->needModifySchema = true;
×
289
  }
290
  return true;
×
291

292
END:
89✔
293
  info->dataFormat = false;
89✔
294
  info->reRun = true;
89✔
295
  return false;
89✔
296
}
297

298
bool isSmlTagAligned(SSmlHandle *info, int cnt, SSmlKv *kv) {
5,891✔
299
  if (unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)) {
5,891✔
300
    goto END;
18✔
301
  }
302

303
  if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) {
5,873!
304
    goto END;
×
305
  }
306
  SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt);
5,872✔
307
  if (maxKV == NULL) {
5,868!
308
    goto END;
×
309
  }
310
  if (unlikely(!IS_SAME_KEY)) {
5,868✔
311
    goto END;
15✔
312
  }
313

314
  if (unlikely(kv->length > maxKV->length)) {
5,853✔
315
    maxKV->length = kv->length;
17✔
316
    info->needModifySchema = true;
17✔
317
  }
318
  return true;
5,853✔
319

320
END:
33✔
321
  info->dataFormat = false;
33✔
322
  info->reRun = true;
33✔
323
  return false;
33✔
324
}
325

326
int32_t smlJoinMeasureTag(SSmlLineInfo *elements) {
871✔
327
  elements->measureTag = (char *)taosMemoryMalloc(elements->measureLen + elements->tagsLen);
871!
328
  if (elements->measureTag == NULL) {
872!
329
    return terrno;
×
330
  }
331
  (void)memcpy(elements->measureTag, elements->measure, elements->measureLen);
872✔
332
  (void)memcpy(elements->measureTag + elements->measureLen, elements->tags, elements->tagsLen);
872✔
333
  elements->measureTagsLen = elements->measureLen + elements->tagsLen;
872✔
334
  return TSDB_CODE_SUCCESS;
872✔
335
}
336

337
static bool smlIsPKTable(STableMeta *pTableMeta) {
934✔
338
  for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
4,087✔
339
    if (pTableMeta->schema[i].flags & COL_IS_KEY) {
3,165✔
340
      return true;
12✔
341
    }
342
  }
343

344
  return false;
922✔
345
}
346

347
int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) {
1,079✔
348
  bool isSameMeasure = IS_SAME_SUPER_TABLE;
1,079!
349
  if (isSameMeasure) {
1,079✔
350
    return TSDB_CODE_SUCCESS;
63✔
351
  }
352
  SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
1,016✔
353

354
  SSmlSTableMeta *sMeta = NULL;
1,016✔
355
  if (unlikely(tmp == NULL)) {
1,016!
356
    int32_t code = smlBuildSuperTableInfo(info, elements, &sMeta);
1,016✔
357
    if (code != 0) return code;
1,019✔
358
  } else {
359
    sMeta = *tmp;
×
360
  }
361
  if (sMeta == NULL) {
670!
362
    uError("smlProcessSuperTable failed to get super table meta");
×
363
    return TSDB_CODE_SML_INTERNAL_ERROR;
×
364
  }
365
  info->currSTableMeta = sMeta->tableMeta;
670✔
366
  info->maxTagKVs = sMeta->tags;
670✔
367
  info->maxColKVs = sMeta->cols;
670✔
368

369
  if (smlIsPKTable(sMeta->tableMeta)) {
670✔
370
    return TSDB_CODE_SML_NOT_SUPPORT_PK;
12✔
371
  }
372
  return 0;
656✔
373
}
374

375
int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
1,686✔
376
  int32_t         code = TSDB_CODE_SUCCESS;
1,686✔
377
  int32_t         lino = 0;
1,686✔
378
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen);
1,686✔
379
  SSmlTableInfo *tinfo = NULL;
1,686✔
380
  if (unlikely(oneTable == NULL)) {
1,686✔
381
    SML_CHECK_CODE(smlBuildTableInfo(1, elements->measure, elements->measureLen, &tinfo));
1,616!
382
    SML_CHECK_CODE(taosHashPut(info->childTables, elements->measureTag, elements->measureTagsLen, &tinfo, POINTER_BYTES));
1,615!
383

384
    tinfo->tags = taosArrayDup(info->preLineTagKV, NULL);
1,617✔
385
    SML_CHECK_NULL(tinfo->tags);
1,615✔
386
    for (size_t i = 0; i < taosArrayGetSize(info->preLineTagKV); i++) {
13,482✔
387
      SSmlKv *kv = (SSmlKv *)taosArrayGet(info->preLineTagKV, i);
11,874✔
388
      SML_CHECK_NULL(kv);
11,871✔
389
      if (kv->keyEscaped) kv->key = NULL;
11,868✔
390
      if (kv->valueEscaped) kv->value = NULL;
11,868✔
391
    }
392

393
    SML_CHECK_CODE(smlSetCTableName(tinfo, info->tbnameKey));
1,613!
394
    SML_CHECK_CODE(getTableUid(info, elements, tinfo));
1,616!
395
    if (info->dataFormat) {
1,616✔
396
      info->currSTableMeta->uid = tinfo->uid;
687✔
397
      SML_CHECK_CODE(smlInitTableDataCtx(info->pQuery, info->currSTableMeta, &tinfo->tableDataCtx));
687!
398
    }
399
  } else {
400
    tinfo = *oneTable;
70✔
401
  }
402
  if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
1,687✔
403
  return TSDB_CODE_SUCCESS;
1,687✔
404

405
END:
×
406
  smlDestroyTableInfo(&tinfo);
×
407
  RETURN
×
408
}
409

410
int32_t smlParseEndTelnetJsonFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv) {
5,074✔
411
  int32_t code = 0;
5,074✔
412
  int32_t lino = 0;
5,074✔
413
  uDebug("SML:0x%" PRIx64 ", %s format true, ts:%" PRId64, info->id, __FUNCTION__ , kvTs->i);
5,074!
414
  SML_CHECK_CODE(smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kvTs, 0, info->taos->optionInfo.charsetCxt));
5,074!
415
  SML_CHECK_CODE(smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kv, 1, info->taos->optionInfo.charsetCxt));
5,081!
416
  SML_CHECK_CODE(smlBuildRow(info->currTableDataCtx));
5,087!
417

418
END:
5,084✔
419
  clearColValArraySml(info->currTableDataCtx->pValues);
5,084✔
420
  RETURN
5,073!
421
}
422

423
int32_t smlParseEndTelnetJsonUnFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv) {
1,767✔
424
  int32_t code = 0;
1,767✔
425
  int32_t lino = 0;
1,767✔
426
  uDebug("SML:0x%" PRIx64 ", %s format false, ts:%" PRId64, info->id, __FUNCTION__, kvTs->i);
1,767!
427
  if (elements->colArray == NULL) {
1,767✔
428
    elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
1,766✔
429
    SML_CHECK_NULL(elements->colArray);
1,767!
430
  }
431
  SML_CHECK_NULL(taosArrayPush(elements->colArray, kvTs));
3,537!
432
  SML_CHECK_NULL (taosArrayPush(elements->colArray, kv));
3,537!
433

434
END:
1,768✔
435
  RETURN
1,768!
436
}
437

438
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs) {
95,315✔
439
  if (info->dataFormat) {
95,315!
440
    uDebug("SML:0x%" PRIx64 ", %s format true, ts:%" PRId64, info->id, __FUNCTION__, kvTs->i);
×
441
    int32_t ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kvTs, 0, info->taos->optionInfo.charsetCxt);
×
442
    if (ret == TSDB_CODE_SUCCESS) {
×
443
      ret = smlBuildRow(info->currTableDataCtx);
×
444
    }
445

446
    clearColValArraySml(info->currTableDataCtx->pValues);
×
447
    taosArrayClearP(info->escapedStringList, NULL);
×
448
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
×
449
      uError("SML:0x%" PRIx64 ", %s smlBuildCol error:%d", info->id, __FUNCTION__, ret);
×
450
      return ret;
×
451
    }
452
  } else {
453
    uDebug("SML:0x%" PRIx64 ", %s format false, ts:%" PRId64, info->id, __FUNCTION__, kvTs->i);
95,315!
454
    taosArraySet(elements->colArray, 0, kvTs);
95,315✔
455
  }
456
  info->preLine = *elements;
96,280✔
457

458
  return TSDB_CODE_SUCCESS;
96,280✔
459
}
460

461
static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnameKey) {
1,613✔
462
  int32_t code = 0;
1,613✔
463
  int32_t lino = 0;
1,613✔
464
  bool    autoChildName = false;
1,613✔
465
  size_t  delimiter = strlen(tsSmlAutoChildTableNameDelimiter);
1,613✔
466
  if (delimiter > 0 && tbnameKey == NULL) {
1,613!
UNCOV
467
    size_t totalNameLen = delimiter * (taosArrayGetSize(tags) - 1);
×
UNCOV
468
    for (int i = 0; i < taosArrayGetSize(tags); i++) {
×
469
      SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
3✔
UNCOV
470
      SML_CHECK_NULL(tag);
×
UNCOV
471
      totalNameLen += tag->length;
×
472
    }
UNCOV
473
    if (totalNameLen < TSDB_TABLE_NAME_LEN) {
×
UNCOV
474
      autoChildName = true;
×
475
    }
476
  }
477
  if (autoChildName) {
1,610!
UNCOV
478
    childTableName[0] = '\0';
×
UNCOV
479
    for (int i = 0; i < taosArrayGetSize(tags); i++) {
×
UNCOV
480
      SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
×
UNCOV
481
      SML_CHECK_NULL(tag);
×
UNCOV
482
      (void)strncat(childTableName, tag->value, TMIN(tag->length, TSDB_TABLE_NAME_LEN - 1 - strlen(childTableName)));
×
UNCOV
483
      if (i != taosArrayGetSize(tags) - 1) {
×
UNCOV
484
        (void)strncat(childTableName, tsSmlAutoChildTableNameDelimiter, TSDB_TABLE_NAME_LEN - 1 - strlen(childTableName));
×
485
      }
486
    }
UNCOV
487
    if (tsSmlDot2Underline) {
×
UNCOV
488
      smlStrReplace(childTableName, strlen(childTableName));
×
489
    }
490
  } else {
491
    if (tbnameKey == NULL) {
1,610!
492
      tbnameKey = tsSmlChildTableName;
1,611✔
493
    }
494
    size_t childTableNameLen = strlen(tbnameKey);
1,610✔
495
    if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;
1,610✔
496

497
    for (int i = 0; i < taosArrayGetSize(tags); i++) {
4,076✔
498
      SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
3,486✔
499
      SML_CHECK_NULL(tag);
3,486!
500
      // handle child table name
501
      if (childTableNameLen == tag->keyLen && strncmp(tag->key, tbnameKey, tag->keyLen) == 0) {
3,486!
502
        tstrncpy(childTableName, tag->value, TMIN(TSDB_TABLE_NAME_LEN, tag->length + 1));
33✔
503
        if (tsSmlDot2Underline) {
33!
UNCOV
504
          smlStrReplace(childTableName, strlen(childTableName));
×
505
        }
506
        taosArrayRemove(tags, i);
33✔
507
        break;
33✔
508
      }
509
    }
510
  }
511

512
END:
591✔
513
  RETURN
624!
514
}
515

516
int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
1,614✔
517
  int32_t code = 0;
1,614✔
518
  int32_t lino = 0;
1,614✔
519
  SArray *dst  = NULL;
1,614✔
520
  SML_CHECK_CODE(smlParseTableName(oneTable->tags, oneTable->childTableName, tbnameKey));
1,614!
521

522
  if (strlen(oneTable->childTableName) == 0) {
1,612✔
523
    dst = taosArrayDup(oneTable->tags, NULL);
1,581✔
524
    SML_CHECK_NULL(dst);
1,582✔
525
    if (oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN) {
1,581!
526
      SML_CHECK_CODE(TSDB_CODE_SML_INTERNAL_ERROR);
×
527
    }
528
    char          superName[TSDB_TABLE_NAME_LEN] = {0};
1,581✔
529
    RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
1,581✔
530
    if (tsSmlDot2Underline) {
1,581✔
531
      (void)memcpy(superName, oneTable->sTableName, oneTable->sTableNameLen);
990✔
532
      smlStrReplace(superName, oneTable->sTableNameLen);
990✔
533
      rName.stbFullName = superName;
992✔
534
    } else {
535
      rName.stbFullName = oneTable->sTableName;
591✔
536
    }
537

538
    SML_CHECK_CODE(buildChildTableName(&rName));
1,583!
539
  }
540

541
END:
31✔
542
  taosArrayDestroy(dst);
1,614✔
543
  RETURN
1,616!
544
}
545

546
int32_t getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo) {
1,613✔
547
  char   key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0};
1,613✔
548
  size_t nLen = strlen(tinfo->childTableName);
1,613✔
549
  (void)memcpy(key, currElement->measure, currElement->measureLen);
1,613✔
550
  if (tsSmlDot2Underline) {
1,613✔
551
    smlStrReplace(key, currElement->measureLen);
991✔
552
  }
553
  (void)memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen);
1,615✔
554
  void *uid =
555
      taosHashGet(info->tableUids, key,
1,615✔
556
                  currElement->measureLen + 1 + nLen);  // use \0 as separator for stable name and child table name
1,615✔
557
  if (uid == NULL) {
1,616✔
558
    tinfo->uid = info->uid++;
1,597✔
559
    return taosHashPut(info->tableUids, key, currElement->measureLen + 1 + nLen, &tinfo->uid, sizeof(uint64_t));
1,597✔
560
  } else {
561
    tinfo->uid = *(uint64_t *)uid;
19✔
562
  }
563
  return TSDB_CODE_SUCCESS;
19✔
564
}
565

566
int32_t smlBuildSTableMeta(bool isDataFormat, SSmlSTableMeta **sMeta) {
1,160✔
567
  int32_t code = 0;
1,160✔
568
  int32_t lino = 0;
1,160✔
569
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
1,160!
570
  SML_CHECK_NULL(meta);
1,164!
571
  if (unlikely(!isDataFormat)) {
1,164✔
572
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
494✔
573
    SML_CHECK_NULL(meta->tagHash);
494!
574
    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
494✔
575
    SML_CHECK_NULL(meta->colHash);
494!
576
  }
577

578
  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
1,164✔
579
  SML_CHECK_NULL(meta->tags);
1,161!
580

581
  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
1,161✔
582
  SML_CHECK_NULL(meta->cols);
1,163!
583
  *sMeta = meta;
1,164✔
584
  return TSDB_CODE_SUCCESS;
1,164✔
585

586
END:
×
587
  smlDestroySTableMeta(&meta);
×
588
  uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino);
×
589
  return code;
×
590
}
591

592
int32_t smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
30,286,389✔
593
  const char *pVal = kvVal->value;
30,286,389✔
594
  int32_t     len = kvVal->length;
30,286,389✔
595
  char       *endptr = NULL;
30,286,389✔
596
  double      result = taosStr2Double(pVal, &endptr);
30,286,389✔
597
  if (pVal == endptr) {
30,287,280✔
598
    RETURN_FALSE
2,907✔
599
  }
600

601
  int32_t left = len - (endptr - pVal);
30,284,373✔
602
  if (left == 0) {
30,284,373✔
603
    SET_DOUBLE
876✔
604
  } else if (left == 3) {
30,283,497✔
605
    if (endptr[0] == 'f' || endptr[0] == 'F') {
20,282,347!
606
      if (endptr[1] == '6' && endptr[2] == '4') {
10,180,089!
607
        SET_DOUBLE
10,001,010✔
608
      } else if (endptr[1] == '3' && endptr[2] == '2') {
179,079!
609
        SET_FLOAT
188,501!
610
      } else {
611
        RETURN_FALSE
1✔
612
      }
613
    } else if (endptr[0] == 'i' || endptr[0] == 'I') {
10,102,258!
614
      if (endptr[1] == '6' && endptr[2] == '4') {
99,990!
615
        SET_BIGINT
1,019✔
616
      } else if (endptr[1] == '3' && endptr[2] == '2') {
98,971!
617
        SET_INT
98,049!
618
      } else if (endptr[1] == '1' && endptr[2] == '6') {
922!
619
        SET_SMALL_INT
971!
620
      } else {
621
        RETURN_FALSE
1✔
622
      }
623
    } else if (endptr[0] == 'u' || endptr[0] == 'U') {
10,002,268!
624
      if (endptr[1] == '6' && endptr[2] == '4') {
10,002,267!
625
        SET_UBIGINT
975✔
626
      } else if (endptr[1] == '3' && endptr[2] == '2') {
10,001,292!
627
        SET_UINT
10,000,644✔
628
      } else if (endptr[1] == '1' && endptr[2] == '6') {
648!
629
        SET_USMALL_INT
647✔
630
      } else {
631
        RETURN_FALSE
1✔
632
      }
633
    } else {
634
      RETURN_FALSE
1✔
635
    }
636
  } else if (left == 2) {
10,001,150✔
637
    if (endptr[0] == 'i' || endptr[0] == 'I') {
10,001,643!
638
      if (endptr[1] == '8') {
992!
639
        SET_TINYINT
993!
640
      } else {
641
        RETURN_FALSE
×
642
      }
643
    } else if (endptr[0] == 'u' || endptr[0] == 'U') {
10,000,651!
644
      if (endptr[1] == '8') {
10,000,646✔
645
        SET_UTINYINT
10,000,645✔
646
      } else {
647
        RETURN_FALSE
1✔
648
      }
649
    } else {
650
      RETURN_FALSE
5✔
651
    }
652
  } else if (left == 1) {
21✔
653
    if (endptr[0] == 'i' || endptr[0] == 'I') {
27!
654
      SET_BIGINT
26✔
655
    } else if (endptr[0] == 'u' || endptr[0] == 'U') {
1!
656
      SET_UBIGINT
×
657
    } else {
658
      RETURN_FALSE
1✔
659
    }
660
  } else {
661
    RETURN_FALSE;
18✔
662
  }
663
  return true;
30,294,555✔
664
}
665

666
int32_t smlGetMeta(SSmlHandle *info, const void *measure, int32_t measureLen, STableMeta **pTableMeta) {
1,015✔
667
  *pTableMeta = NULL;
1,015✔
668

669
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
1,015✔
670
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
1,015✔
671

672
  SRequestConnInfo conn = {0};
1,015✔
673
  conn.pTrans = info->taos->pAppInfo->pTransporter;
1,015✔
674
  conn.requestId = info->pRequest->requestId;
1,015✔
675
  conn.requestObjRefId = info->pRequest->self;
1,015✔
676
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
1,015✔
677
  int32_t len = TMIN(measureLen, TSDB_TABLE_NAME_LEN - 1);
1,018✔
678
  (void)memcpy(pName.tname, measure, measureLen);
1,018✔
679
  pName.tname[len] = 0;
1,018✔
680

681
  return catalogGetSTableMeta(info->pCatalog, &conn, &pName, pTableMeta);
1,018✔
682
}
683

684
static int64_t smlGenId() {
1,020✔
685
  static volatile int64_t linesSmlHandleId = 0;
686

687
  int64_t id = 0;
1,020✔
688
  do {
689
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
1,020✔
690
  } while (id == 0);
1,025!
691

692
  return id;
1,025✔
693
}
694

695
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
6,694✔
696
                                       ESchemaAction *action, SSmlHandle *info) {
697
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
6,694✔
698
  if (index) {
6,696✔
699
    if (colField[*index].type != kv->type) {
3,246✔
700
      snprintf(info->msgBuf.buf, info->msgBuf.len, "SML:0x%" PRIx64 ", %s point type and db type mismatch, db type:%s, point type:%s, key:%s",
6✔
701
               info->id, __FUNCTION__, tDataTypes[colField[*index].type].name, tDataTypes[kv->type].name, kv->key);
6✔
702
      uError("%s", info->msgBuf.buf);
6!
703
      return TSDB_CODE_SML_INVALID_DATA;
6✔
704
    }
705

706
    if (((colField[*index].type == TSDB_DATA_TYPE_VARCHAR || colField[*index].type == TSDB_DATA_TYPE_VARBINARY ||
3,240!
707
          colField[*index].type == TSDB_DATA_TYPE_GEOMETRY) &&
2,950✔
708
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
293✔
709
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
3,237✔
710
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
1,526✔
711
      if (isTag) {
66✔
712
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
60✔
713
      } else {
714
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
6✔
715
      }
716
    }
717
  } else {
718
    if (isTag) {
3,450✔
719
      *action = SCHEMA_ACTION_ADD_TAG;
2,078✔
720
    } else {
721
      *action = SCHEMA_ACTION_ADD_COLUMN;
1,372✔
722
    }
723
  }
724
  return TSDB_CODE_SUCCESS;
6,690✔
725
}
726

727
#define BOUNDARY 1024
728
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
1,492✔
729
  int32_t result = 1;
1,492✔
730
  if (length >= BOUNDARY) {
1,492✔
731
    result = length;
9✔
732
  } else {
733
    while (result <= length) {
7,127✔
734
      result <<= 1;
5,644✔
735
    }
736
  }
737

738
  if ((type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_GEOMETRY) &&
1,492✔
739
      result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
405!
740
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
×
741
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
1,492!
742
    result = (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
743
  }
744

745
  if (type == TSDB_DATA_TYPE_NCHAR) {
1,492✔
746
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
1,087✔
747
  } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_GEOMETRY) {
405!
748
    result = result + VARSTR_HEADER_SIZE;
405✔
749
  }
750
  return result;
1,492✔
751
}
752

753
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
523✔
754
                                      SHashObj *schemaHashCheck, ESchemaAction *action, bool isTag) {
755
  int32_t code = TSDB_CODE_SUCCESS;
523✔
756
  int32_t lino = 0;
523✔
757
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
3,707✔
758
    if (j == 0 && !isTag) continue;
3,197✔
759
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
2,939✔
760
    SML_CHECK_NULL(kv);
2,937!
761
    SML_CHECK_CODE(smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info));
2,937✔
762
    if (taosHashGet(schemaHashCheck, kv->key, kv->keyLen) != NULL) {
2,927✔
763
      SML_CHECK_CODE(TSDB_CODE_PAR_DUPLICATED_COLUMN);
6!
764
    }
765
  }
766

767
END:
509✔
768
  RETURN
521!
769
}
770

771
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols) {
509✔
772
  int32_t   code = TSDB_CODE_SUCCESS;
509✔
773
  int32_t   lino = 0;
509✔
774
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
509✔
775
  SML_CHECK_NULL(hashTmp);
510!
776
  for (int32_t i = 0; i < length; i++) {
3,721✔
777
    SML_CHECK_CODE(taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &schema[i], sizeof(SSchema)));
3,207!
778
  }
779
  for (int32_t i = 0; i < taosArrayGetSize(cols); i++) {
3,691✔
780
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
3,174✔
781
    SML_CHECK_NULL(kv);
3,173!
782
    SSchema *sTmp = taosHashGet(hashTmp, kv->key, kv->keyLen);
3,173✔
783
    if (sTmp == NULL) {
3,178!
784
      SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
×
785
    }
786
    if (IS_VAR_DATA_TYPE(kv->type) && kv->length + VARSTR_HEADER_SIZE > sTmp->bytes){
3,178!
787
      uError("column %s (type %s) bytes invalid. db bytes:%d, kv bytes:%zu", sTmp->name,
×
788
             tDataTypes[sTmp->type].name, sTmp->bytes, kv->length);
789
      SML_CHECK_CODE(TSDB_CODE_INTERNAL_ERROR);
×
790
    }
791
  }
792

793
END:
508✔
794
  taosHashCleanup(hashTmp);
508✔
795
  RETURN
510!
796
}
797

798
static int32_t getBytes(uint8_t type, int32_t length) {
3,436✔
799
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_NCHAR ||
3,436✔
800
      type == TSDB_DATA_TYPE_GEOMETRY) {
801
    return smlFindNearestPowerOf2(length, type);
1,489✔
802
  } else {
803
    return tDataTypes[type].bytes;
1,947✔
804
  }
805
}
806

807
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
743✔
808
                                  SArray *results, int32_t numOfCols, bool isTag) {
809
  int32_t code = TSDB_CODE_SUCCESS;
743✔
810
  int32_t lino = 0;
743✔
811
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
4,504✔
812
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
3,758✔
813
    SML_CHECK_NULL(kv);
3,760!
814
    ESchemaAction action = SCHEMA_ACTION_NULL;
3,760✔
815
    SML_CHECK_CODE(smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info));
3,760!
816
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
7,165✔
817
      SField field = {0};
3,404✔
818
      field.type = kv->type;
3,404✔
819
      field.bytes = getBytes(kv->type, kv->length);
3,404✔
820
      (void)memcpy(field.name, kv->key, TMIN(kv->keyLen, sizeof(field.name) - 1));
3,401✔
821
      SML_CHECK_NULL(taosArrayPush(results, &field));
3,405!
822
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
356✔
823
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
33✔
824
      if (index == NULL) {
33!
825
        SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
×
826
      }
827
      uint16_t newIndex = *index;
33✔
828
      if (isTag) newIndex -= numOfCols;
33✔
829
      SField *field = (SField *)taosArrayGet(results, newIndex);
33✔
830
      SML_CHECK_NULL(field);
33!
831
      field->bytes = getBytes(kv->type, kv->length);
33✔
832
    }
833
  }
834

835
  int32_t maxLen = isTag ? TSDB_MAX_TAGS_LEN : TSDB_MAX_BYTES_PER_ROW;
744✔
836
  int32_t len = 0;
744✔
837
  for (int j = 0; j < taosArrayGetSize(results); ++j) {
4,510✔
838
    SField *field = taosArrayGet(results, j);
3,766✔
839
    SML_CHECK_NULL(field);
3,766!
840
    len += field->bytes;
3,766✔
841
  }
842
  if (len > maxLen) {
743✔
843
    return isTag ? TSDB_CODE_PAR_INVALID_TAGS_LENGTH : TSDB_CODE_PAR_INVALID_ROW_LENGTH;
3!
844
  }
845

846
END:
740✔
847
  RETURN
740!
848
}
849

850
static FORCE_INLINE void smlBuildCreateStbReq(SMCreateStbReq *pReq, int32_t colVer, int32_t tagVer, tb_uid_t suid, int8_t source){
851
  pReq->colVer = colVer;
392✔
852
  pReq->tagVer = tagVer;
392✔
853
  pReq->suid = suid;
392✔
854
  pReq->source = source;
392✔
855
}
392✔
856
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray **pTags, STableMeta *pTableMeta,
392✔
857
                              ESchemaAction action) {
858
  SRequestObj   *pRequest = NULL;
392✔
859
  SMCreateStbReq pReq = {0};
392✔
860
  int32_t        code = TSDB_CODE_SUCCESS;
392✔
861
  int32_t        lino = 0;
392✔
862
  SCmdMsgInfo    pCmdMsg = {0};
392✔
863
  char          *pSql = NULL;
392✔
864

865
  // put front for free
866
  pReq.numOfColumns = taosArrayGetSize(pColumns);
392✔
867
  pReq.pTags = *pTags;
393✔
868
  pReq.numOfTags = taosArrayGetSize(*pTags);
393✔
869
  *pTags = NULL;
393✔
870

871
  pReq.pColumns = taosArrayInit(pReq.numOfColumns, sizeof(SFieldWithOptions));
393✔
872
  SML_CHECK_NULL(pReq.pColumns);
393!
873
  for (int32_t i = 0; i < pReq.numOfColumns; ++i) {
1,999✔
874
    SField *pField = taosArrayGet(pColumns, i);
1,607✔
875
    SML_CHECK_NULL(pField);
1,607!
876
    SFieldWithOptions fieldWithOption = {0};
1,607✔
877
    setFieldWithOptions(&fieldWithOption, pField);
1,607✔
878
    setDefaultOptionsForField(&fieldWithOption);
1,608✔
879
    SML_CHECK_NULL(taosArrayPush(pReq.pColumns, &fieldWithOption));
3,209!
880
  }
881

882
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
392✔
883
    pSql = "sml_create_stable";
345✔
884
    smlBuildCreateStbReq(&pReq, 1, 1, 0, TD_REQ_FROM_APP);
885
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
47✔
886
    pSql = (action == SCHEMA_ACTION_ADD_TAG) ? "sml_add_tag" : "sml_modify_tag_size";
32✔
887
    smlBuildCreateStbReq(&pReq, pTableMeta->sversion, pTableMeta->tversion + 1, pTableMeta->uid, TD_REQ_FROM_TAOX);
32✔
888
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
15!
889
    pSql = (action == SCHEMA_ACTION_ADD_COLUMN) ? "sml_add_column" : "sml_modify_column_size";
15✔
890
    smlBuildCreateStbReq(&pReq, pTableMeta->sversion + 1, pTableMeta->tversion, pTableMeta->uid, TD_REQ_FROM_TAOX);
15✔
891
  } else {
892
    SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
×
893
  }
894

895
  SML_CHECK_CODE(buildRequest(info->taos->id, pSql, strlen(pSql), NULL, false, &pRequest, 0));
392!
896

897
  pRequest->syncQuery = true;
393✔
898
  if (!pRequest->pDb) {
393!
899
    SML_CHECK_CODE(TSDB_CODE_PAR_DB_NOT_SPECIFIED);
×
900
  }
901

902
  if (pReq.numOfTags == 0) {
393✔
903
    pReq.numOfTags = 1;
3✔
904
    SField field = {0};
3✔
905
    field.type = TSDB_DATA_TYPE_NCHAR;
3✔
906
    field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
3✔
907
    tstrncpy(field.name, tsSmlTagName, sizeof(field.name));
3✔
908
    SML_CHECK_NULL(taosArrayPush(pReq.pTags, &field));
6!
909
  }
910

911
  pReq.commentLen = -1;
393✔
912
  pReq.igExists = true;
393✔
913
  SML_CHECK_CODE(tNameExtractFullName(pName, pReq.name));
393!
914

915
  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
393✔
916
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
393✔
917
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
393✔
918
  if (pCmdMsg.msgLen < 0) {
390!
919
    uError("failed to serialize create stable request1, code:%d, terrno:%d", pCmdMsg.msgLen, terrno);
×
920
    SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
×
921
  }
922
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
390!
923
  SML_CHECK_NULL(pCmdMsg.pMsg);
391!
924
  code = tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
391✔
925
  if (code < 0) {
393!
926
    taosMemoryFree(pCmdMsg.pMsg);
×
927
    uError("failed to serialize create stable request2, code:%d, terrno:%d", code, terrno);
×
928
    SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
×
929
  }
930

931
  SQuery pQuery = {0};
393✔
932
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
393✔
933
  pQuery.pCmdMsg = &pCmdMsg;
393✔
934
  pQuery.msgType = pQuery.pCmdMsg->msgType;
393✔
935
  pQuery.stableQuery = true;
393✔
936

937
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // no need to check return value
393✔
938

939
  if (pRequest->code == TSDB_CODE_SUCCESS) {
393✔
940
    SML_CHECK_CODE(catalogRemoveTableMeta(info->pCatalog, pName));
265!
941
  }
942
  code = pRequest->code;
393✔
943

944
END:
393✔
945
  destroyRequest(pRequest);
393✔
946
  tFreeSMCreateStbReq(&pReq);
393✔
947
  RETURN
393!
948
}
949

950
static int32_t smlCreateTable(SSmlHandle *info, SRequestConnInfo *conn, SSmlSTableMeta *sTableData,
351✔
951
                              SName *pName, STableMeta **pTableMeta){
952
  int32_t code = 0;
351✔
953
  int32_t lino = 0;
351✔
954
  SArray *pColumns = NULL;
351✔
955
  SArray *pTags = NULL;
351✔
956
  SML_CHECK_CODE(smlCheckAuth(info, conn, NULL, AUTH_TYPE_WRITE));
351✔
957
  uDebug("SML:0x%" PRIx64 ", %s create table:%s", info->id, __FUNCTION__, pName->tname);
348!
958
  pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
348✔
959
  SML_CHECK_NULL(pColumns);
348!
960
  pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
348✔
961
  SML_CHECK_NULL(pTags);
348!
962
  SML_CHECK_CODE(smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true));
348!
963
  SML_CHECK_CODE(smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false));
348✔
964
  SML_CHECK_CODE(smlSendMetaMsg(info, pName, pColumns, &pTags, NULL, SCHEMA_ACTION_CREATE_STABLE));
345✔
965
  info->cost.numOfCreateSTables++;
217✔
966
  taosMemoryFreeClear(*pTableMeta);
217!
967

968
  SML_CHECK_CODE(catalogGetSTableMeta(info->pCatalog, conn, pName, pTableMeta));
217!
969

970
END:
217✔
971
  taosArrayDestroy(pColumns);
351✔
972
  taosArrayDestroy(pTags);
351✔
973
  RETURN
351!
974
}
975

976
static int32_t smlBuildFields(SArray **pColumns, SArray **pTags, STableMeta *pTableMeta, SSmlSTableMeta *sTableData){
48✔
977
  int32_t code = 0;
48✔
978
  int32_t lino = 0;
48✔
979
  *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + (pTableMeta)->tableInfo.numOfColumns, sizeof(SField));
48✔
980
  SML_CHECK_NULL(pColumns);
48!
981
  *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + (pTableMeta)->tableInfo.numOfTags, sizeof(SField));
48✔
982
  SML_CHECK_NULL(pTags);
48!
983
  for (uint16_t i = 0; i < (pTableMeta)->tableInfo.numOfColumns + (pTableMeta)->tableInfo.numOfTags; i++) {
669✔
984
    SField field = {0};
624✔
985
    field.type = (pTableMeta)->schema[i].type;
624✔
986
    field.bytes = (pTableMeta)->schema[i].bytes;
624✔
987
    tstrncpy(field.name, (pTableMeta)->schema[i].name, sizeof(field.name));
624✔
988
    if (i < (pTableMeta)->tableInfo.numOfColumns) {
624✔
989
      SML_CHECK_NULL(taosArrayPush(*pColumns, &field));
559!
990
    } else {
991
      SML_CHECK_NULL(taosArrayPush(*pTags, &field));
686!
992
    }
993
  }
994
END:
45✔
995
  RETURN
45!
996
}
997
static int32_t smlModifyTag(SSmlHandle *info, SHashObj* hashTmpCheck, SHashObj* hashTmp, SRequestConnInfo *conn,
266✔
998
                            SSmlSTableMeta *sTableData, SName *pName, STableMeta **pTableMeta){
999
  ESchemaAction action = SCHEMA_ACTION_NULL;
266✔
1000
  SArray *pColumns = NULL;
266✔
1001
  SArray *pTags = NULL;
266✔
1002
  int32_t code = 0;
266✔
1003
  int32_t lino = 0;
266✔
1004
  SML_CHECK_CODE(smlProcessSchemaAction(info, (*pTableMeta)->schema, hashTmp, sTableData->tags, hashTmpCheck, &action, true));
266✔
1005

1006
  if (action != SCHEMA_ACTION_NULL) {
258✔
1007
    SML_CHECK_CODE(smlCheckAuth(info, conn, pName->tname, AUTH_TYPE_WRITE));
33!
1008
    uDebug("SML:0x%" PRIx64 ", %s change table tag, table:%s, action:%d", info->id, __FUNCTION__, pName->tname,
33!
1009
           action);
1010
    SML_CHECK_CODE(smlBuildFields(&pColumns, &pTags, *pTableMeta, sTableData));
33!
1011
    SML_CHECK_CODE(smlBuildFieldsList(info, (*pTableMeta)->schema, hashTmp, sTableData->tags, pTags,
32!
1012
                              (*pTableMeta)->tableInfo.numOfColumns, true));
1013

1014
    SML_CHECK_CODE(smlSendMetaMsg(info, pName, pColumns, &pTags, (*pTableMeta), action));
32!
1015

1016
    info->cost.numOfAlterTagSTables++;
33✔
1017
    taosMemoryFreeClear(*pTableMeta);
33!
1018
    SML_CHECK_CODE(catalogRefreshTableMeta(info->pCatalog, conn, pName, -1));
33!
1019
    SML_CHECK_CODE(catalogGetSTableMeta(info->pCatalog, conn, pName, pTableMeta));
33!
1020
  }
1021

1022
END:
258✔
1023
  taosArrayDestroy(pColumns);
267✔
1024
  taosArrayDestroy(pTags);
266✔
1025
  RETURN
267!
1026
}
1027

1028
static int32_t smlModifyCols(SSmlHandle *info, SHashObj* hashTmpCheck, SHashObj* hashTmp, SRequestConnInfo *conn,
258✔
1029
                            SSmlSTableMeta *sTableData, SName *pName, STableMeta **pTableMeta){
1030
  ESchemaAction action = SCHEMA_ACTION_NULL;
258✔
1031
  SArray *pColumns = NULL;
258✔
1032
  SArray *pTags = NULL;
258✔
1033
  int32_t code = 0;
258✔
1034
  int32_t lino = 0;
258✔
1035
  SML_CHECK_CODE(smlProcessSchemaAction(info, (*pTableMeta)->schema, hashTmp, sTableData->cols, hashTmpCheck, &action, false));
258✔
1036

1037
  if (action != SCHEMA_ACTION_NULL) {
255✔
1038
    SML_CHECK_CODE(smlCheckAuth(info, conn, pName->tname, AUTH_TYPE_WRITE));
15!
1039
    uDebug("SML:0x%" PRIx64 ", %s change table col, table:%s, action:%d", info->id, __FUNCTION__, pName->tname,
15!
1040
           action);
1041
    SML_CHECK_CODE(smlBuildFields(&pColumns, &pTags, *pTableMeta, sTableData));
15!
1042
    SML_CHECK_CODE(smlBuildFieldsList(info, (*pTableMeta)->schema, hashTmp, sTableData->cols, pColumns,
15!
1043
                                      (*pTableMeta)->tableInfo.numOfColumns, false));
1044

1045
    SML_CHECK_CODE(smlSendMetaMsg(info, pName, pColumns, &pTags, (*pTableMeta), action));
15!
1046

1047
    info->cost.numOfAlterColSTables++;
15✔
1048
    taosMemoryFreeClear(*pTableMeta);
15!
1049
    SML_CHECK_CODE(catalogRefreshTableMeta(info->pCatalog, conn, pName, -1));
15!
1050
    SML_CHECK_CODE(catalogGetSTableMeta(info->pCatalog, conn, pName, pTableMeta));
15!
1051
  }
1052

1053
END:
255✔
1054
  taosArrayDestroy(pColumns);
258✔
1055
  taosArrayDestroy(pTags);
258✔
1056
  RETURN
258!
1057
}
1058

1059
static int32_t smlBuildTempHash(SHashObj *hashTmp, STableMeta *pTableMeta, uint16_t start, uint16_t end){
532✔
1060
  int32_t code = 0;
532✔
1061
  int32_t lino = 0;
532✔
1062
  for (uint16_t i = start; i < end; i++) {
3,758✔
1063
    SML_CHECK_CODE(taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES));
3,226!
1064
  }
1065

1066
END:
532✔
1067
  return code;
532✔
1068
}
1069

1070
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
1,102✔
1071
  uDebug("SML:0x%" PRIx64 ", %s start, format:%d, needModifySchema:%d", info->id, __FUNCTION__, info->dataFormat,
1,102!
1072
         info->needModifySchema);
1073
  if (info->dataFormat && !info->needModifySchema) {
1,102✔
1074
    return TSDB_CODE_SUCCESS;
528✔
1075
  }
1076
  int32_t     code = 0;
574✔
1077
  int32_t     lino = 0;
574✔
1078
  SHashObj   *colHashTmp = NULL;
574✔
1079
  SHashObj   *tagHashTmp = NULL;
574✔
1080
  STableMeta *pTableMeta = NULL;
574✔
1081

1082
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
574✔
1083
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
574✔
1084

1085
  SRequestConnInfo conn = {0};
574✔
1086
  conn.pTrans = info->taos->pAppInfo->pTransporter;
574✔
1087
  conn.requestId = info->pRequest->requestId;
574✔
1088
  conn.requestObjRefId = info->pRequest->self;
574✔
1089
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
574✔
1090

1091
  SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
577✔
1092
  while (tmp) {
1,049✔
1093
    SSmlSTableMeta *sTableData = *tmp;
618✔
1094
    bool            needCheckMeta = false;  // for multi thread
618✔
1095

1096
    size_t superTableLen = 0;
618✔
1097
    void  *superTable = taosHashGetKey(tmp, &superTableLen);
618✔
1098
    char  *measure = taosMemoryMalloc(superTableLen);
617!
1099
    SML_CHECK_NULL(measure);
763!
1100
    (void)memcpy(measure, superTable, superTableLen);
617✔
1101
    if (info->protocol == TSDB_SML_LINE_PROTOCOL){
617✔
1102
      PROCESS_SLASH_IN_MEASUREMENT(measure, superTableLen);
1,526✔
1103
    }
1104
    smlStrReplace(measure, superTableLen);
617✔
1105
    size_t nameLen = TMIN(superTableLen, TSDB_TABLE_NAME_LEN - 1);
617✔
1106
    (void)memcpy(pName.tname, measure, nameLen);
617✔
1107
    pName.tname[nameLen] = '\0';
617✔
1108
    taosMemoryFree(measure);
617!
1109

1110
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
617✔
1111

1112
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
616!
1113
      SML_CHECK_CODE(smlCreateTable(info, &conn, sTableData, &pName, &pTableMeta));
351✔
1114
    } else if (code == TSDB_CODE_SUCCESS) {
265!
1115
      if (smlIsPKTable(pTableMeta)) {
265!
1116
        SML_CHECK_CODE(TSDB_CODE_SML_NOT_SUPPORT_PK);
×
1117
      }
1118

1119
      colHashTmp = taosHashInit(pTableMeta->tableInfo.numOfColumns, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
263✔
1120
      tagHashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
266✔
1121
      SML_CHECK_NULL(colHashTmp);
267!
1122
      SML_CHECK_NULL(tagHashTmp);
267!
1123
      SML_CHECK_CODE(smlBuildTempHash(tagHashTmp, pTableMeta, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags));
267!
1124
      SML_CHECK_CODE(smlBuildTempHash(colHashTmp, pTableMeta, 0, pTableMeta->tableInfo.numOfColumns));
267!
1125

1126
      SML_CHECK_CODE(smlModifyTag(info, colHashTmp, tagHashTmp, &conn, sTableData, &pName, &pTableMeta));
267✔
1127
      SML_CHECK_CODE(smlModifyCols(info, tagHashTmp, colHashTmp, &conn, sTableData, &pName, &pTableMeta));
258✔
1128

1129
      needCheckMeta = true;
255✔
1130
      taosHashCleanup(colHashTmp);
255✔
1131
      taosHashCleanup(tagHashTmp);
255✔
1132
      colHashTmp = NULL;
255✔
1133
      tagHashTmp = NULL;
255✔
1134
    } else {
1135
      uError("SML:0x%" PRIx64 ", %s load table meta error:%s", info->id, __FUNCTION__, tstrerror(code));
×
1136
      goto END;
×
1137
    }
1138

1139
    if (needCheckMeta) {
472✔
1140
      SML_CHECK_CODE(smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags));
255!
1141
      SML_CHECK_CODE(smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols));
255!
1142
    }
1143

1144
    taosMemoryFreeClear(sTableData->tableMeta);
472!
1145
    sTableData->tableMeta = pTableMeta;
472✔
1146
    uDebug("SML:0x%" PRIx64 ", %s modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, __FUNCTION__, pTableMeta->uid,
472!
1147
           pTableMeta->sversion, pTableMeta->tversion);
1148
    tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
472✔
1149
  }
1150
  uDebug("SML:0x%" PRIx64 ", %s end success, format:%d, needModifySchema:%d", info->id, __FUNCTION__, info->dataFormat,
431!
1151
         info->needModifySchema);
1152

1153
  return TSDB_CODE_SUCCESS;
431✔
1154

1155
END:
146✔
1156
  taosHashCancelIterate(info->superTables, tmp);
146✔
1157
  taosHashCleanup(colHashTmp);
146✔
1158
  taosHashCleanup(tagHashTmp);
146✔
1159
  taosMemoryFreeClear(pTableMeta);
146!
1160
  (void)catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);  // ignore refresh meta code if there is an error
146✔
1161
  uError("SML:0x%" PRIx64 ", %s end failed:%d:%s, format:%d, needModifySchema:%d", info->id, __FUNCTION__, code,
146!
1162
         tstrerror(code), info->dataFormat, info->needModifySchema);
1163

1164
  return code;
146✔
1165
}
1166

1167
static int32_t smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SHashObj *checkDuplicate) {
984✔
1168
  int32_t code = 0;
984✔
1169
  int32_t lino = 0;
984✔
1170
  terrno = 0;
984✔
1171
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
5,973✔
1172
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
4,999✔
1173
    SML_CHECK_NULL(kv);
5,008!
1174
    int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
4,999✔
1175
    if (ret == 0) {
5,000✔
1176
      SML_CHECK_NULL(taosArrayPush(metaArray, kv));
4,997!
1177
      if (taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
4,997✔
1178
        SML_CHECK_CODE(TSDB_CODE_PAR_DUPLICATED_COLUMN);
9!
1179
      }
1180
    } else if (terrno == TSDB_CODE_DUP_KEY) {
3!
1181
      return TSDB_CODE_PAR_DUPLICATED_COLUMN;
3✔
1182
    }
1183
  }
1184

1185
END:
982✔
1186
  RETURN
982!
1187
}
1188

1189
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg,
188,600✔
1190
                             SHashObj *checkDuplicate) {
1191
  int32_t code = 0;
188,600✔
1192
  int32_t lino = 0;
188,600✔
1193
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
752,921✔
1194
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
530,931✔
1195
    SML_CHECK_NULL(kv);
517,405!
1196
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
517,405✔
1197
    if (index) {
541,335!
1198
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
543,414✔
1199
      SML_CHECK_NULL(value);
529,110!
1200

1201
      if (isTag) {
564,269✔
1202
        if (kv->length > value->length) {
204,898✔
1203
          value->length = kv->length;
78✔
1204
        }
1205
        continue;
204,898✔
1206
      }
1207
      if (kv->type != value->type) {
359,371!
1208
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
×
1209
        SML_CHECK_CODE(TSDB_CODE_SML_NOT_SAME_TYPE);
×
1210
      }
1211

1212
      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
359,371!
1213
        value->length = kv->length;
207✔
1214
      }
1215
    } else {
1216
      size_t tmp = taosArrayGetSize(metaArray);
×
1217
      if (tmp > INT16_MAX) {
52!
1218
        smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key);
×
1219
        SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
×
1220
      }
1221
      int16_t size = tmp;
52✔
1222
      SML_CHECK_CODE(taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES));
52!
1223
      SML_CHECK_NULL(taosArrayPush(metaArray, kv));
52!
1224
      if (taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
52!
1225
        SML_CHECK_CODE(TSDB_CODE_PAR_DUPLICATED_COLUMN);
×
1226
      }
1227
    }
1228
  }
1229

1230
END:
138,813✔
1231
  RETURN
138,813!
1232
}
1233

1234
void smlDestroyTableInfo(void *para) {
1,616✔
1235
  SSmlTableInfo *tag = *(SSmlTableInfo **)para;
1,616✔
1236
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
103,080✔
1237
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
100,838✔
1238
    taosHashCleanup(kvHash);
99,799✔
1239
  }
1240

1241
  taosArrayDestroy(tag->cols);
1,616✔
1242
  taosArrayDestroyEx(tag->tags, freeSSmlKv);
1,617✔
1243
  taosMemoryFree(tag);
1,617!
1244
}
1,617✔
1245

1246
void freeSSmlKv(void *data) {
427,124✔
1247
  SSmlKv *kv = (SSmlKv *)data;
427,124✔
1248
  if (kv->keyEscaped) taosMemoryFreeClear(kv->key);
427,124!
1249
  if (kv->valueEscaped) taosMemoryFreeClear(kv->value);
427,124!
1250
#ifdef USE_GEOS
1251
  if (kv->type == TSDB_DATA_TYPE_GEOMETRY) geosFreeBuffer((void *)(kv->value));
427,124✔
1252
#endif
1253
  if (kv->type == TSDB_DATA_TYPE_VARBINARY) taosMemoryFreeClear(kv->value);
427,124!
1254
}
427,124✔
1255

1256
void smlDestroyInfo(SSmlHandle *info) {
2,049✔
1257
  if (info == NULL) return;
2,049✔
1258

1259
  taosHashCleanup(info->pVgHash);
1,025✔
1260
  taosHashCleanup(info->childTables);
1,025✔
1261
  taosHashCleanup(info->superTables);
1,025✔
1262
  taosHashCleanup(info->tableUids);
1,025✔
1263

1264
  for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) {
1,025!
1265
    cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i);
×
1266
    cJSON_Delete(tags);
×
1267
  }
1268
  taosArrayDestroy(info->tagJsonArray);
1,025✔
1269

1270
  for (int i = 0; i < taosArrayGetSize(info->valueJsonArray); i++) {
1,025!
1271
    cJSON *value = (cJSON *)taosArrayGetP(info->valueJsonArray, i);
×
1272
    cJSON_Delete(value);
×
1273
  }
1274
  taosArrayDestroy(info->valueJsonArray);
1,025✔
1275

1276
  taosArrayDestroyEx(info->preLineTagKV, freeSSmlKv);
1,025✔
1277
  taosArrayDestroyP(info->escapedStringList, NULL);
1,025✔
1278

1279
  if (!info->dataFormat) {
1,025✔
1280
    for (int i = 0; i < info->lineNum; i++) {
102,954✔
1281
      taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
102,477✔
1282
      if (info->lines[i].measureTagsLen != 0 && info->protocol != TSDB_SML_LINE_PROTOCOL) {
102,473✔
1283
        taosMemoryFree(info->lines[i].measureTag);
273!
1284
      }
1285
    }
1286
    taosMemoryFree(info->lines);
477!
1287
  }
1288
  if(info->protocol == TSDB_SML_JSON_PROTOCOL)  {
1,025✔
1289
    taosMemoryFreeClear(info->preLine.tags);
513!
1290
  }
1291
  cJSON_Delete(info->root);
1,025✔
1292
  taosMemoryFreeClear(info);
1,025!
1293
}
1294

1295
int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle) {
1,023✔
1296
  int32_t     code = TSDB_CODE_SUCCESS;
1,023✔
1297
  int32_t     lino = 0;
1,023✔
1298
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
1,023!
1299
  SML_CHECK_NULL(info);
1,024!
1300
  if (taos != NULL){
1,024✔
1301
    info->taos = acquireTscObj(*(int64_t *)taos);
1,019✔
1302
    SML_CHECK_NULL(info->taos);
1,019!
1303
    SML_CHECK_CODE(catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog));
1,019!
1304
  }
1305

1306
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,024✔
1307
  SML_CHECK_NULL(info->pVgHash);
1,024!
1308
  info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
1,024✔
1309
  SML_CHECK_NULL(info->childTables);
1,025!
1310
  info->tableUids = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
1,025✔
1311
  SML_CHECK_NULL(info->tableUids);
1,025!
1312
  info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
1,025✔
1313
  SML_CHECK_NULL(info->superTables);
1,025!
1314
  taosHashSetFreeFp(info->superTables, smlDestroySTableMeta);
1,025✔
1315
  taosHashSetFreeFp(info->childTables, smlDestroyTableInfo);
1,024✔
1316

1317
  info->id = smlGenId();
1,025✔
1318
  SML_CHECK_CODE(smlInitHandle(&info->pQuery));
1,025!
1319
  info->dataFormat = true;
1,024✔
1320
  info->tagJsonArray = taosArrayInit(8, POINTER_BYTES);
1,024✔
1321
  SML_CHECK_NULL(info->tagJsonArray);
1,025!
1322
  info->valueJsonArray = taosArrayInit(8, POINTER_BYTES);
1,025✔
1323
  SML_CHECK_NULL(info->valueJsonArray);
1,025!
1324
  info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
1,025✔
1325
  SML_CHECK_NULL(info->preLineTagKV);
1,025!
1326
  info->escapedStringList = taosArrayInit(8, POINTER_BYTES);
1,025✔
1327
  SML_CHECK_NULL(info->escapedStringList);
1,025!
1328

1329
  *handle = info;
1,025✔
1330
  info = NULL;
1,025✔
1331

1332
END:
1,025✔
1333
  smlDestroyInfo(info);
1,025✔
1334
  RETURN
1,024!
1335
}
1336

1337
static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
97,596✔
1338
  int32_t code = TSDB_CODE_SUCCESS;
97,596✔
1339
  int32_t lino = 0;
97,596✔
1340
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
97,596✔
1341
  SML_CHECK_NULL(kvHash);
99,271!
1342
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
467,402✔
1343
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
362,303✔
1344
    SML_CHECK_NULL(kv);
356,031!
1345
    terrno = 0;
356,025✔
1346
    code = taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
348,192✔
1347
    if (terrno == TSDB_CODE_DUP_KEY) {
381,639✔
1348
      SML_CHECK_CODE(TSDB_CODE_PAR_DUPLICATED_COLUMN);
6!
1349
    }
1350
    SML_CHECK_CODE(code);
368,131!
1351
  }
1352

1353
  SML_CHECK_NULL(taosArrayPush(colsArray, &kvHash));
98,961!
1354
  return code;
98,961✔
1355
END:
6✔
1356
  taosHashCleanup(kvHash);
6✔
1357
  RETURN
6!
1358
}
1359

1360
static int32_t smlParseEnd(SSmlHandle *info) {
991✔
1361
  uDebug("SML:0x%" PRIx64 ", %s start, format:%d, linenum:%d", info->id, __FUNCTION__, info->dataFormat,
991!
1362
         info->lineNum);
1363
  int32_t code = 0;
994✔
1364
  int32_t lino = 0;
994✔
1365
  if (info->dataFormat) return TSDB_CODE_SUCCESS;
994✔
1366

1367
  for (int32_t i = 0; i < info->lineNum; i++) {
97,167✔
1368
    SSmlLineInfo  *elements = info->lines + i;
96,735✔
1369
    SSmlTableInfo *tinfo = NULL;
96,735✔
1370
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
96,735✔
1371
      SSmlTableInfo **tmp =
1372
          (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
94,970✔
1373
      if (tmp) tinfo = *tmp;
97,305!
1374
    } else {
1375
      SSmlTableInfo **tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
1,765✔
1376
                                                          elements->measureLen + elements->tagsLen);
1,765✔
1377
      if (tmp) tinfo = *tmp;
1,768!
1378
    }
1379

1380
    if (tinfo == NULL) {
99,073!
1381
      uError("SML:0x%" PRIx64 ", get oneTable failed, line num:%d", info->id, i);
×
1382
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
×
1383
      return TSDB_CODE_SML_INVALID_DATA;
×
1384
    }
1385

1386
    if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
99,073!
1387
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
×
1388
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
×
1389
    }
1390

1391
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
98,264!
1392
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
×
1393
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
×
1394
    }
1395

1396
    SML_CHECK_CODE(smlPushCols(tinfo->cols, elements->colArray));
97,743✔
1397

1398
    SSmlSTableMeta **tableMeta =
1399
        (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
98,314✔
1400
    if (tableMeta) {  // update meta
97,706✔
1401
      uDebug("SML:0x%" PRIx64 ", %s update meta, format:%d, linenum:%d", info->id, __FUNCTION__, info->dataFormat,
97,372!
1402
             info->lineNum);
1403
      SML_CHECK_CODE(smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf,
97,372!
1404
                          (*tableMeta)->tagHash));
1405
      SML_CHECK_CODE(smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf,
96,777!
1406
                          (*tableMeta)->colHash));
1407
    } else {
1408
      uDebug("SML:0x%" PRIx64 ", %s add meta, format:%d, linenum:%d", info->id, __FUNCTION__, info->dataFormat,
334!
1409
             info->lineNum);
1410
      SSmlSTableMeta *meta = NULL;
334✔
1411
      SML_CHECK_CODE(smlBuildSTableMeta(info->dataFormat, &meta));
346!
1412
      code = taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
494✔
1413
      if (code != TSDB_CODE_SUCCESS) {
494!
1414
        smlDestroySTableMeta(&meta);
×
1415
        SML_CHECK_CODE(code);
×
1416
      }
1417
      SML_CHECK_CODE(smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags, NULL));
494✔
1418
      SML_CHECK_CODE(smlInsertMeta(meta->colHash, meta->cols, elements->colArray, meta->tagHash));
491✔
1419
    }
1420
  }
1421
  uDebug("SML:0x%" PRIx64 ", %s end, format:%d, linenum:%d", info->id, __FUNCTION__, info->dataFormat, info->lineNum);
432!
1422

1423
END:
432✔
1424
  RETURN
450!
1425
}
1426

1427
static int32_t smlInsertData(SSmlHandle *info) {
957✔
1428
  int32_t         code      = TSDB_CODE_SUCCESS;
957✔
1429
  int32_t         lino      = 0;
957✔
1430
  char           *measure   = NULL;
957✔
1431
  SSmlTableInfo **oneTable  = NULL;
957✔
1432
  uDebug("SML:0x%" PRIx64 ", %s start, format:%d", info->id, __FUNCTION__, info->dataFormat);
957!
1433

1434
  if (info->pRequest->dbList == NULL) {
957!
1435
    info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
959✔
1436
    SML_CHECK_NULL(info->pRequest->dbList);
958!
1437
  }
1438
  char *data = (char *)taosArrayReserve(info->pRequest->dbList, 1);
956✔
1439
  SML_CHECK_NULL(data);
958!
1440
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
958✔
1441
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
958✔
1442
  (void)tNameGetFullDbName(&pName, data);  // ignore
958✔
1443

1444
  oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
957✔
1445
  while (oneTable) {
2,424✔
1446
    SSmlTableInfo *tableData = *oneTable;
1,471✔
1447

1448
    int   measureLen = tableData->sTableNameLen;
1,471✔
1449
    measure = (char *)taosMemoryMalloc(tableData->sTableNameLen);
1,471!
1450
    SML_CHECK_NULL(measure);
1,476!
1451
    (void)memcpy(measure, tableData->sTableName, tableData->sTableNameLen);
1,470✔
1452
    if (info->protocol == TSDB_SML_LINE_PROTOCOL){
1,470✔
1453
      PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
4,257✔
1454
    }
1455
    smlStrReplace(measure, measureLen);
1,470✔
1456
    (void)memcpy(pName.tname, measure, measureLen);
1,469✔
1457
    pName.tname[measureLen] = '\0';
1,469✔
1458

1459
    if (info->pRequest->tableList == NULL) {
1,469✔
1460
      info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
957✔
1461
      SML_CHECK_NULL(info->pRequest->tableList);
959!
1462
    }
1463
    SML_CHECK_NULL(taosArrayPush(info->pRequest->tableList, &pName));
2,942!
1464
    tstrncpy(pName.tname, tableData->childTableName, sizeof(pName.tname));
1,471✔
1465

1466
    SRequestConnInfo conn = {0};
1,471✔
1467
    conn.pTrans = info->taos->pAppInfo->pTransporter;
1,471✔
1468
    conn.requestId = info->pRequest->requestId;
1,471✔
1469
    conn.requestObjRefId = info->pRequest->self;
1,471✔
1470
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
1,471✔
1471

1472
    SML_CHECK_CODE(smlCheckAuth(info, &conn, pName.tname, AUTH_TYPE_WRITE));
1,470✔
1473

1474
    SVgroupInfo vg = {0};
1,463✔
1475
    SML_CHECK_CODE(catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg));
1,463!
1476
    SML_CHECK_CODE(taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)));
1,465!
1477

1478
    SSmlSTableMeta **pMeta =
1479
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
1,464✔
1480
    if (unlikely(NULL == pMeta || NULL == *pMeta || NULL == (*pMeta)->tableMeta)) {
1,465!
1481
      uError("SML:0x%" PRIx64 ", %s NULL == pMeta. table name:%s", info->id, __FUNCTION__, tableData->childTableName);
×
1482
      SML_CHECK_CODE(TSDB_CODE_SML_INTERNAL_ERROR);
×
1483
    }
1484

1485
    // use tablemeta of stable to save vgid and uid of child table
1486
    (*pMeta)->tableMeta->vgId = vg.vgId;
1,465✔
1487
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
1,465✔
1488
    uDebug("SML:0x%" PRIx64 ", %s table:%s, uid:%" PRIu64 ", format:%d", info->id, __FUNCTION__, pName.tname,
1,465!
1489
           tableData->uid, info->dataFormat);
1490

1491
    SML_CHECK_CODE(smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
1,465!
1492
                       (*pMeta)->tableMeta, tableData->childTableName, measure, measureLen, info->ttl, info->msgBuf.buf,
1493
                       info->msgBuf.len, info->taos->optionInfo.charsetCxt));
1494
    taosMemoryFreeClear(measure);
1,464!
1495
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
1,465✔
1496
  }
1497

1498
  SML_CHECK_CODE(smlBuildOutput(info->pQuery, info->pVgHash));
953!
1499
  info->cost.insertRpcTime = taosGetTimestampUs();
950✔
1500

1501
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
950✔
1502
  (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);  // no need to check return code
950✔
1503

1504
  launchQueryImpl(info->pRequest, info->pQuery, true, NULL);  // no need to check return code
953✔
1505

1506
  uDebug("SML:0x%" PRIx64 ", %s end, format:%d, code:%d,%s", info->id, __FUNCTION__, info->dataFormat, info->pRequest->code,
953!
1507
         tstrerror(info->pRequest->code));
1508

1509
  return info->pRequest->code;
953✔
1510

1511
END:
6✔
1512
  taosMemoryFree(measure);
6!
1513
  taosHashCancelIterate(info->childTables, oneTable);
6✔
1514
  RETURN
6!
1515
}
1516

1517
static void smlPrintStatisticInfo(SSmlHandle *info) {
1,019✔
1518
  uDebug(
1,019!
1519
      "SML:0x%" PRIx64
1520
      " smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
1521
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64,
1522
      info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables,
1523
      info->cost.numOfCTables, info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables,
1524
      info->cost.numOfAlterColSTables, info->cost.schemaTime - info->cost.parseTime,
1525
      info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
1526
      info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
1527
}
1,019✔
1528

1529
int32_t smlClearForRerun(SSmlHandle *info) {
470✔
1530
  int32_t code = 0;
470✔
1531
  int32_t lino = 0;
470✔
1532
  info->reRun = false;
470✔
1533

1534
  taosHashClear(info->childTables);
470✔
1535
  taosHashClear(info->superTables);
472✔
1536
  taosHashClear(info->tableUids);
472✔
1537

1538
  if (!info->dataFormat) {
472!
1539
    info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
472!
1540
    SML_CHECK_NULL(info->lines);
470!
1541
  }
1542

1543
  taosArrayClearP(info->escapedStringList, NULL);
470✔
1544
  if(info->protocol == TSDB_SML_JSON_PROTOCOL)  {
470✔
1545
    taosMemoryFreeClear(info->preLine.tags);
146!
1546
  }
1547
  (void)memset(&info->preLine, 0, sizeof(SSmlLineInfo));
470✔
1548
  info->currSTableMeta = NULL;
470✔
1549
  info->currTableDataCtx = NULL;
470✔
1550

1551
  SVnodeModifyOpStmt *stmt = (SVnodeModifyOpStmt *)(info->pQuery->pRoot);
470✔
1552
  stmt->freeHashFunc(stmt->pTableBlockHashObj);
470✔
1553
  stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
472✔
1554
  SML_CHECK_NULL(stmt->pTableBlockHashObj);
472!
1555

1556
END:
472✔
1557
  RETURN
472!
1558
}
1559

1560
static void printRaw(int64_t id, int lineNum, int numLines, ELogLevel level, char* data, int32_t len){
×
1561
  char *print = taosMemoryMalloc(len + 1);
×
1562
  if (print == NULL) {
×
1563
    uError("SML:0x%" PRIx64 ", smlParseLine failed. code :%d", id, terrno);
×
1564
    return;
×
1565
  }
1566
  (void)memcpy(print, data, len);
×
1567
  print[len] = '\0';
×
1568
  if (level == DEBUG_DEBUG){
×
1569
    uDebug("SML:0x%" PRIx64 ", smlParseLine is raw, line %d/%d :%s", id, lineNum, numLines, print);
×
1570
  }else if (level == DEBUG_ERROR){
×
1571
    uError("SML:0x%" PRIx64 ", smlParseLine failed. line %d/%d :%s", id, lineNum, numLines, print);
×
1572
  }
1573
  taosMemoryFree(print);
×
1574
}
1575

1576
static bool getLine(SSmlHandle *info, char *lines[], char **rawLine, char *rawLineEnd, int numLines, int i, char **tmp,
97,239✔
1577
                    int *len) {
1578
  if (lines) {
97,239✔
1579
    *tmp = lines[i];
97,191✔
1580
    *len = strlen(*tmp);
97,191✔
1581
  } else if (*rawLine) {
48!
1582
    *tmp = *rawLine;
84✔
1583
    while (*rawLine < rawLineEnd) {
487,377✔
1584
      if (*((*rawLine)++) == '\n') {
487,335✔
1585
        break;
42✔
1586
      }
1587
      (*len)++;
487,293✔
1588
    }
1589
    if (IS_COMMENT(info->protocol,(*tmp)[0])) {  // this line is comment
84✔
1590
      return false;
3✔
1591
    }
1592
  }
1593

1594
  if (*rawLine != NULL && (uDebugFlag & DEBUG_DEBUG)) {
97,236!
1595
    printRaw(info->id, i, numLines, DEBUG_DEBUG, *tmp, *len);
×
1596
  } else {
1597
    uDebug("SML:0x%" PRIx64 ", smlParseLine is not raw, line %d/%d :%s", info->id, i, numLines, *tmp);
97,236!
1598
  }
1599
  return true;
98,403✔
1600
}
1601

1602

1603
static int32_t smlParseJson(SSmlHandle *info, char *lines[], char *rawLine) {
512✔
1604
  int32_t code = TSDB_CODE_SUCCESS;
512✔
1605
  if (lines) {
512✔
1606
    code = smlParseJSONExt(info, *lines);
510✔
1607
  } else if (rawLine) {
2!
1608
    code = smlParseJSONExt(info, rawLine);
2✔
1609
  }
1610
  if (code != TSDB_CODE_SUCCESS) {
511✔
1611
    uError("%s failed code:%d", __FUNCTION__ , code);
3!
1612
  }
1613
  return code;
510✔
1614
}
1615

1616
static int32_t smlParseStart(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
1,014✔
1617
  uDebug("SML:0x%" PRIx64 ", %s start", info->id, __FUNCTION__);
1,014!
1618
  int32_t code = TSDB_CODE_SUCCESS;
1,014✔
1619
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
1,014✔
1620
    return smlParseJson(info, lines, rawLine);
513✔
1621
  }
1622

1623
  char   *oldRaw = rawLine;
501✔
1624
  int32_t i = 0;
501✔
1625
  while (i < numLines) {
98,011✔
1626
    char *tmp = NULL;
97,437✔
1627
    int   len = 0;
97,437✔
1628
    if (!getLine(info, lines, &rawLine, rawLineEnd, numLines, i, &tmp, &len)) {
97,437✔
1629
      continue;
328✔
1630
    }
1631
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
98,027✔
1632
      if (info->dataFormat) {
95,756✔
1633
        SSmlLineInfo element = {0};
259✔
1634
        code = smlParseInfluxString(info, tmp, tmp + len, &element);
259✔
1635
      } else {
1636
        code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
95,497✔
1637
      }
1638
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
2,271!
1639
      if (info->dataFormat) {
2,372✔
1640
        SSmlLineInfo element = {0};
1,763✔
1641
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
1,763✔
1642
        if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
1,763!
1643
      } else {
1644
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
609✔
1645
      }
1646
    }
1647
    if (code != TSDB_CODE_SUCCESS) {
97,527✔
1648
      if (rawLine != NULL) {
21!
1649
        printRaw(info->id, i, numLines, DEBUG_ERROR, tmp, len);
×
1650
      } else {
1651
        uError("SML:0x%" PRIx64 ", %s failed. line %d :%s", info->id, __FUNCTION__, i, tmp);
21!
1652
      }
1653
      return code;
21✔
1654
    }
1655
    if (info->reRun) {
97,506✔
1656
      uDebug("SML:0x%" PRIx64 ", %s re run", info->id, __FUNCTION__);
324!
1657
      i = 0;
324✔
1658
      rawLine = oldRaw;
324✔
1659
      code = smlClearForRerun(info);
324✔
1660
      if (code != TSDB_CODE_SUCCESS) {
325!
1661
        return code;
×
1662
      }
1663
      continue;
325✔
1664
    }
1665
    i++;
97,182✔
1666
  }
1667
  uDebug("SML:0x%" PRIx64 ", %s end", info->id, __FUNCTION__);
574!
1668

1669
  return code;
485✔
1670
}
1671

1672
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
1,015✔
1673
  int32_t code = TSDB_CODE_SUCCESS;
1,015✔
1674
  int32_t lino = 0;
1,015✔
1675
  int32_t retryNum = 0;
1,015✔
1676

1677
  info->cost.parseTime = taosGetTimestampUs();
1,018✔
1678

1679
  SML_CHECK_CODE(smlParseStart(info, lines, rawLine, rawLineEnd, numLines));
1,018✔
1680
  SML_CHECK_CODE(smlParseEnd(info));
991✔
1681

1682
  info->cost.lineNum = info->lineNum;
975✔
1683
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
975✔
1684
  info->cost.numOfCTables = taosHashGetSize(info->childTables);
975✔
1685
  info->cost.schemaTime = taosGetTimestampUs();
974✔
1686

1687
  do {
1688
    code = smlModifyDBSchemas(info);
1,104✔
1689
    if (code != TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER && code != TSDB_CODE_SDB_OBJ_CREATING &&
1,105!
1690
        code != TSDB_CODE_MND_TRANS_CONFLICT) {
1691
      break;
977✔
1692
    }
1693
    taosMsleep(100);
128✔
1694
    uInfo("SML:0x%" PRIx64 ", smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
128!
1695
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
128!
1696

1697
  SML_CHECK_CODE(code);
975✔
1698
  info->cost.insertBindTime = taosGetTimestampUs();
959✔
1699
  SML_CHECK_CODE(smlInsertData(info));
959✔
1700

1701
END:
953✔
1702
  RETURN
1,019!
1703
}
1704

1705
void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) {
1,018✔
1706
  if (request->pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
1,018✔
1707
    int32_t len = 0;
23✔
1708
    int32_t rlen = 0;
23✔
1709
    char   *p = NULL;
23✔
1710

1711
    if (lines && lines[0]) {
23!
1712
      len = strlen(lines[0]);
13✔
1713
      p = lines[0];
13✔
1714
    } else if (rawLine) {
10!
1715
      if (rawLineEnd) {
10!
1716
        len = rawLineEnd - rawLine;
10✔
1717
      } else {
1718
        len = strlen(rawLine);
×
1719
      }
1720
      p = rawLine;
10✔
1721
    }
1722

1723
    if (NULL == p) {
23!
1724
      return;
×
1725
    }
1726

1727
    rlen = TMIN(len, TSDB_MAX_ALLOWED_SQL_LEN);
23✔
1728
    rlen = TMAX(rlen, 0);
23✔
1729

1730
    char *sql = taosMemoryMalloc(rlen + 1);
23!
1731
    if (NULL == sql) {
23!
1732
      uError("malloc %d for sml sql failed", rlen + 1);
×
1733
      return;
×
1734
    }
1735
    (void)memcpy(sql, p, rlen);
23✔
1736
    sql[rlen] = 0;
23✔
1737

1738
    request->sqlstr = sql;
23✔
1739
    request->sqlLen = rlen;
23✔
1740
  }
1741
}
1742

1743
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, int numLines,
1,016✔
1744
                                       int protocol, int precision, int32_t ttl, int64_t reqid, char *tbnameKey) {
1745
  int32_t      code    = TSDB_CODE_SUCCESS;
1,016✔
1746
  int32_t      lino    = 0;
1,016✔
1747
  SRequestObj *request = NULL;
1,016✔
1748
  SSmlHandle  *info    = NULL;
1,016✔
1749
  int          cnt     = 0;
1,016✔
1750
  while (1) {
×
1751
    SML_CHECK_CODE(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &request, reqid));
1,017!
1752
    SSmlMsgBuf msg = {request->msgBufLen, request->msgBuf};
1,018✔
1753
    request->code = smlBuildSmlInfo(taos, &info);
1,018✔
1754
    SML_CHECK_CODE(request->code);
1,019!
1755

1756
    info->pRequest = request;
1,019✔
1757
    info->pRequest->pQuery = info->pQuery;
1,019✔
1758
    info->ttl = ttl;
1,019✔
1759
    info->precision = precision;
1,019✔
1760
    info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
1,019✔
1761
    info->msgBuf.buf = info->pRequest->msgBuf;
1,019✔
1762
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
1,019✔
1763
    info->lineNum = numLines;
1,019✔
1764
    info->tbnameKey = tbnameKey;
1,019✔
1765

1766
    smlSetReqSQL(request, lines, rawLine, rawLineEnd);
1,019✔
1767

1768
    if (request->pDb == NULL) {
1,016✔
1769
      request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
1✔
1770
      smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
1✔
1771
      goto END;
1✔
1772
    }
1773

1774
    if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
1,015!
1775
      request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
×
1776
      smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
×
1777
      goto END;
×
1778
    }
1779

1780
    if (protocol == TSDB_SML_LINE_PROTOCOL &&
1,016!
1781
        (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
257✔
UNCOV
1782
      request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
×
UNCOV
1783
      smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
×
1784
      goto END;
×
1785
    }
1786

1787
    if (protocol == TSDB_SML_JSON_PROTOCOL) {
1,016✔
1788
      numLines = 1;
512✔
1789
    } else if (numLines <= 0) {
504!
1790
      request->code = TSDB_CODE_SML_INVALID_DATA;
×
1791
      smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
×
1792
      goto END;
×
1793
    }
1794

1795
    code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
1,016✔
1796
    request->code = code;
1,019✔
1797
    info->cost.endTime = taosGetTimestampUs();
1,019✔
1798
    info->cost.code = code;
1,019✔
1799
    if (NEED_CLIENT_HANDLE_ERROR(code) || code == TSDB_CODE_SDB_OBJ_CREATING || code == TSDB_CODE_PAR_VALUE_TOO_LONG ||
1,019!
1800
        code == TSDB_CODE_MND_TRANS_CONFLICT) {
1801
      if (cnt++ >= 10) {
1!
1802
        uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
×
1803
        break;
1,018✔
1804
      }
1805
      taosMsleep(100);
1✔
1806
      uInfo("SML:%" PRIx64 " retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code,
×
1807
            tstrerror(code));
1808
      code = refreshMeta(request->pTscObj, request);
×
1809
      if (code != 0) {
×
1810
        uInfo("SML:%" PRIx64 " refresh meta error code:%d, msg:%s", info->id, code, tstrerror(code));
×
1811
      }
1812
      smlDestroyInfo(info);
×
1813
      info = NULL;
×
1814
      taos_free_result(request);
×
1815
      request = NULL;
×
1816
      continue;
×
1817
    }
1818
    smlPrintStatisticInfo(info);
1,018✔
1819
    break;
1,018✔
1820
  }
1821

1822
END:
1,019✔
1823
  smlDestroyInfo(info);
1,019✔
1824
  return (TAOS_RES *)request;
1,020✔
1825
}
1826

1827
/**
1828
 * taos_schemaless_insert() parse and insert data points into database according to
1829
 * different protocol.
1830
 *
1831
 * @param $lines input array may contain multiple lines, each line indicates a data point.
1832
 *               If protocol=2 is used input array should contain single JSON
1833
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
1834
 *               multiple data points in JSON format, should include them in $JSON_string
1835
 *               as a JSON array.
1836
 * @param $numLines indicates how many data points in $lines.
1837
 *                  If protocol = 2 is used this param will be ignored as $lines should
1838
 *                  contain single JSON string.
1839
 * @param $protocol indicates which protocol to use for parsing:
1840
 *                  0 - influxDB line protocol
1841
 *                  1 - OpenTSDB telnet line protocol
1842
 *                  2 - OpenTSDB JSON format protocol
1843
 * @return TAOS_RES
1844
 */
1845

1846
TAOS_RES *taos_schemaless_insert_ttl_with_reqid_tbname_key(TAOS *taos, char *lines[], int numLines, int protocol,
987✔
1847
                                                           int precision, int32_t ttl, int64_t reqid, char *tbnameKey) {
1848
  if (taos == NULL || lines == NULL || numLines < 0) {
987!
1849
    terrno = TSDB_CODE_INVALID_PARA;
×
1850
    return NULL;
×
1851
  }
1852
  for (int i = 0; i < numLines; i++){
85,354✔
1853
    if (lines[i] == NULL){
84,364!
1854
      terrno = TSDB_CODE_INVALID_PARA;
×
1855
      return NULL;
×
1856
    }
1857
  }
1858
  return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid, tbnameKey);
990✔
1859
}
1860

1861
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
990✔
1862
                                                int32_t ttl, int64_t reqid) {
1863
  return taos_schemaless_insert_ttl_with_reqid_tbname_key(taos, lines, numLines, protocol, precision, ttl, reqid, NULL);
990✔
1864
}
1865

1866
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
973✔
1867
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
973✔
1868
}
1869

1870
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
12✔
1871
                                     int32_t ttl) {
1872
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
12✔
1873
}
1874

1875
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
×
1876
                                            int64_t reqid) {
1877
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL,
×
1878
                                               reqid);
1879
}
1880

1881
static int32_t getRawLineLen(char *lines, int len, int protocol) {
28✔
1882
  int numLines = 0;
28✔
1883
  char *tmp = lines;
28✔
1884
  for (int i = 0; i < len; i++) {
245,840✔
1885
    if (lines[i] == '\n' || i == len - 1) {
245,812✔
1886
      if (!IS_COMMENT(protocol, tmp[0])) {  // ignore comment
60✔
1887
        numLines++;
57✔
1888
      }
1889
      tmp = lines + i + 1;
60✔
1890
    }
1891
  }
1892
  return numLines;
28✔
1893
}
1894

1895
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(TAOS *taos, char *lines, int len, int32_t *totalRows,
28✔
1896
                                                               int protocol, int precision, int32_t ttl, int64_t reqid,
1897
                                                               char *tbnameKey) {
1898
  if (taos == NULL || lines == NULL || len < 0) {
28!
1899
    terrno = TSDB_CODE_INVALID_PARA;
×
1900
    return NULL;
×
1901
  }
1902
  int numLines = getRawLineLen(lines, len, protocol);
28✔
1903
  if (totalRows != NULL){
28!
1904
    *totalRows = numLines;
28✔
1905
  }
1906
  return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, numLines, protocol, precision, ttl, reqid,
28✔
1907
                                      tbnameKey);
1908
}
1909

1910
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
28✔
1911
                                                    int precision, int32_t ttl, int64_t reqid) {
1912
  return taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(taos, lines, len, totalRows, protocol, precision, ttl,
28✔
1913
                                                              reqid, NULL);
1914
}
1915

1916
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
×
1917
                                                int precision, int64_t reqid) {
1918
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision,
×
1919
                                                   TSDB_DEFAULT_TABLE_TTL, reqid);
1920
}
1921
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
×
1922
                                         int precision, int32_t ttl) {
1923
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
×
1924
}
1925

1926
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
26✔
1927
                                     int precision) {
1928
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision,
26✔
1929
                                                   TSDB_DEFAULT_TABLE_TTL, 0);
1930
}
1931

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc